下面是编程之家 jb51.cc 通过网络收集整理的代码片段。
编程之家小编现在分享给大家,也给大家做个参考。
CREATE OR REPLACE PACKAGE PKG_ETL_CTL IS -- 调用主程序,调用存储过程 PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER); -- 创建新的时间周期,运行数据周期作业及作业流状态处理 PROCEDURE SP_FLOW_RUN_DEAL; -- 创建新的时间周期 PROCEDURE SP_FLOW_CREATE_NEW_PERIOD; -- 运行数据周期内的作业流及作业 PROCEDURE SP_FLOW_RUN_NEW_PERIOD; -- 同步作业流运行状态 PROCEDURE SP_FLOW_RUN_STATUS; -- 失败作业流及作业重置为未处理 PROCEDURE SP_FLOW_RUN_ERROR_RESET; -- 作业状态更新 PROCEDURE SP_JOB_RUN_STATUS ( I_JOB_ID NUMBER,I_ORG_ID VARCHAR2,I_JOB_RUN_STATUS VARCHAR2,I_JOB_RUN_INFO VARCHAR2 ); -- 作业流日志处理 PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER); -- 作业日志处理 PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER); PROCEDURE SP_INSERT_MONITOR_SMS ( O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/ ); PROCEDURE SP_SEND_MONITOR_SMS; -- 获取作业流前置依赖 FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2; -- 获取作业前置依赖 FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2; -- 获取作业流下属作业运行状态 FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2; -- 获取下个数据日期 FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE; -- 获取上级作业流运行状态 FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2; -- 获取上级作业流数据开始时间 FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE; -- 获取上级作业流数据结束时间 FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE; -- 获取周期代码 FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2; END PKG_ETL_CTL; / CREATE OR REPLACE PACKAGE BODY PKG_ETL_CTL IS /******************************************************************* 程序名 :SP_EXEC_PROC 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 调用主程序,调用存储过程 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER) IS VAR_PRO_NAME VARCHAR2(100); VAR_DATA_START_TIME VARCHAR2(20); VAR_DATA_END_TIME VARCHAR2(20); VAR_sql VARCHAR2(4000); VAR_ParaMS VARCHAR2(1000); VAR_ORG_ID VARCHAR2(10); VAR_JOB_RUN_DESC VARCHAR2(100); VAR_JOB_ERR_DESC VARCHAR2(100); BEGIN -- 获取作业正在运行描述 SELECT T.ETL_Para_VAL INTO VAR_JOB_RUN_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_RUN_DESC'; -- 获取作业运行失败描述 SELECT T.ETL_Para_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_ERR_DESC'; -- 获取作业所调用的存储过程,数据开始时间,数据结束时间 SELECT T.ETL_JOB_PROC,TO_CHAR(A.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS'),TO_CHAR(A.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS') INTO VAR_PRO_NAME,VAR_DATA_START_TIME,VAR_DATA_END_TIME FROM ETL_CTL_JOB_INFO T,ETL_JOB_RUN_STS A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_JOB_ID = I_JOB_ID; -- 获取作业全部参数拼接到一起 FOR LOOP_ParaM IN (SELECT T.ETL_Para_NAME,DECODE(T.ETL_Para_TYPE,2,'TO_DATE(' || T.ETL_Para_VAL || ',''YYYYMMDDHH24MISS'')' -- 日期类型参数转化成日期格式,T.ETL_Para_VAL) ETL_Para_VAL,T.ETL_Para_TYPE FROM ETL_JOB_Para T WHERE T.ETL_JOB_ID = I_JOB_ID) LOOP -- 获取机构号 IF LOOP_ParaM.ETL_Para_NAME = 'I_ORG_ID' THEN VAR_ORG_ID := LOOP_ParaM.ETL_Para_VAL; END IF; -- 参数拼接 VAR_ParaMS := VAR_ParaMS || LOOP_ParaM.ETL_Para_NAME || ' => ' || LOOP_ParaM.ETL_Para_VAL || ','; END LOOP; -- 参数加上输出参数(存储过程运行结果和运行信息) VAR_ParaMS := UPPER(VAR_ParaMS) || 'O_RESULT_FLAG => LO_RESULT_FLAG,O_RESULT_MSG => LO_RESULT_MSG'; -- 参数替换为变量 VAR_ParaMS := REPLACE(VAR_ParaMS,'#$I_DATA_START_TIME#',VAR_DATA_START_TIME); VAR_ParaMS := REPLACE(VAR_ParaMS,'#$I_DATA_END_TIME#',VAR_DATA_END_TIME); -- 拼接存储过程进行调用 VAR_sql := 'DECLARE LO_RESULT_FLAG VARCHAR2(10);LO_RESULT_MSG VARCHAR2(300);BEGIN ' || VAR_PRO_NAME || '(' || VAR_ParaMS || ');PKG_ETL_CTL.SP_JOB_RUN_STATUS(' || I_JOB_ID || ',''' || VAR_ORG_ID || ''',LO_RESULT_FLAG,LO_RESULT_MSG);END;'; -- 修改作业状态为正在运行 SP_JOB_RUN_STATUS(I_JOB_ID,VAR_ORG_ID,'1',VAR_JOB_RUN_DESC); -- 运行存储过程 EXECUTE IMMEDIATE VAR_sql; EXCEPTION WHEN OTHERS THEN -- 运行出错,返回错误信息 SP_JOB_RUN_STATUS(I_JOB_ID,'2',sqlERRM); END; /******************************************************************* 程序名 :SP_FLOW_RUN_DEAL 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 创建新的时间周期,运行数据周期作业及作业流状态处理 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_DEAL IS BEGIN -- 创建新的时间周期 SP_FLOW_CREATE_NEW_PERIOD; -- 运行时间周期内的作业 SP_FLOW_RUN_NEW_PERIOD; -- 更新作业流状态 SP_FLOW_RUN_STATUS; -- 失败任务重置 SP_FLOW_RUN_ERROR_RESET; END; /******************************************************************* 程序名 :SP_FLOW_CREATE_NEW_PERIOD 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 生成新的数据周期运行新周期的数据 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_CREATE_NEW_PERIOD IS DTE_DATA_NEXT_TIME DATE; DTE_DATA_START_TIME DATE; DTE_DATA_END_TIME DATE; BEGIN FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,A.ETL_NEXT_EXPIRY_TIME FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_LEVEL = 1 AND T.ETL_FLOW_STATUS = 1) LOOP DTE_DATA_NEXT_TIME := FN_GET_NEXT_DATA_TIME(LOOP_FLOW.ETL_FLOW_ID); IF DTE_DATA_NEXT_TIME <= LOOP_FLOW.ETL_NEXT_EXPIRY_TIME THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_NEXT_DATA_TIME = DTE_DATA_NEXT_TIME WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_SUCC_TIME,T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME,T.ETL_NEXT_DATA_TIME,T.ETL_FLOW_RUN_STATUS,A.ETL_CYC_CODE,A.ETL_FLOW_LEVEL,A.ETL_FLOW_STATUS FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 ORDER BY A.ETL_FLOW_LEVEL) LOOP IF LOOP_FLOW.ETL_FLOW_LEVEL = 1 AND LOOP_FLOW.ETL_FLOW_STATUS = 1 AND LOOP_FLOW.ETL_FLOW_RUN_STATUS = 9 AND LOOP_FLOW.ETL_DATA_SUCC_TIME < LOOP_FLOW.ETL_NEXT_DATA_TIME THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_DATA_START_TIME = CASE WHEN LOOP_FLOW.ETL_CYC_CODE = '01' THEN T.ETL_DATA_SUCC_TIME + 1 WHEN LOOP_FLOW.ETL_CYC_CODE = '02' THEN T.ETL_DATA_SUCC_TIME + 1 / 24 / 60 / 60 WHEN LOOP_FLOW.ETL_CYC_CODE = '03' THEN T.ETL_DATA_SUCC_TIME + 1 END,T.ETL_DATA_END_TIME = T.ETL_NEXT_DATA_TIME,T.ETL_START_TIME = NULL,T.ETL_END_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 0,T.ETL_RESET_TIME = 0 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1 AND LOOP_FLOW.ETL_FLOW_STATUS = 1 AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 0 THEN DTE_DATA_START_TIME := FN_GET_SUPER_DATA_START_TIME(LOOP_FLOW.ETL_FLOW_ID); DTE_DATA_END_TIME := FN_GET_SUPER_DATA_END_TIME(LOOP_FLOW.ETL_FLOW_ID); UPDATE ETL_FLOW_RUN_STS T SET T.ETL_DATA_START_TIME = DTE_DATA_START_TIME,T.ETL_DATA_END_TIME = DTE_DATA_END_TIME,T.ETL_FLOW_RUN_STATUS = 0 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,T.ETL_DATA_START_TIME FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME FLOW_DATA_END_TIME,A.ETL_DATA_START_TIME JOB_DATA_START_TIME,A.ETL_DATA_END_TIME JOB_DATA_END_TIME,B.ETL_JOB_STATUS FROM ETL_FLOW_RUN_STS T,ETL_JOB_RUN_STS A,ETL_CTL_JOB_INFO B WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID AND A.ETL_JOB_ID = B.ETL_JOB_ID AND B.ETL_JOB_STATUS = 1) LOOP IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 0 AND LOOP_JOB.ETL_JOB_STATUS = 1 THEN UPDATE ETL_JOB_RUN_STS T SET T.ETL_DATA_START_TIME = LOOP_JOB.FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME = LOOP_JOB.FLOW_DATA_END_TIME,T.ETL_JOB_RUN_STATUS = 0,T.ETL_SESSION_ID = NULL,T.ETL_LOG_DESC = NULL WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID; END IF; END LOOP; COMMIT; END; /******************************************************************* 程序名 :SP_FLOW_RUN_NEW_PERIOD 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 运行新周期的数据 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_NEW_PERIOD IS BEGIN FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 ORDER BY A.ETL_FLOW_LEVEL) LOOP IF LOOP_FLOW.ETL_FLOW_LEVEL = 1 AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3) AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1 AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 1 AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3) AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; END; /******************************************************************* 程序名 :SP_FLOW_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 作业流状态处理 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_STATUS IS -- VAR_JOB_NO_SESSION VARCHAR2(100); -- 数据库进程不存在 VAR_JOB_OVERTIME_DESC VARCHAR2(100); -- 作业运行超时描述 INT_JOB_OVERTIME NUMBER; -- 作业超时告警时间 INT_LAST_RUNTIME NUMBER; -- 作业上次运行时间 -- INT_JOB_DEAD_TIME NUMBER; -- 调度作业数据库进程不存在运行判定时间 BEGIN -- 获取数据库进程不存在描述 /*SELECT T.ETL_Para_VAL INTO VAR_JOB_NO_SESSION FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_NO_SESSION_DESC';*/ -- 获取作业运行超时告警时间 SELECT T.ETL_Para_VAL INTO INT_JOB_OVERTIME FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_OVERTIME'; -- 调度作业数据库进程不存在运行判定时间 /*SELECT T.ETL_Para_VAL INTO INT_JOB_DEAD_TIME FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_DEAD_TIME';*/ -- 获取作业运行超时描述 SELECT T.ETL_Para_VAL INTO VAR_JOB_OVERTIME_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_OVERTIME_DESC'; FOR LOOP_JOB IN (SELECT T.ETL_JOB_ID,A.ETL_OVERTIME_REM_WAY --,T.ETL_SESSION_ID,(SYSDATE - T.ETL_START_TIME) RUNTIME FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 1) LOOP -- 作业为正在运行,但数据库进程已经不存在(10分钟)的作业置为运行失败 /*IF FN_GET_SESSION_STATUS(LOOP_JOB.ETL_SESSION_ID) = 0 AND LOOP_JOB.RUNTIME * 24 * 60 >= INT_JOB_DEAD_TIME THEN SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,'',VAR_JOB_NO_SESSION); -- 超时提醒方式为超过上次运行时间 ELS*/ IF LOOP_JOB.ETL_OVERTIME_REM_WAY = 1 AND INT_JOB_OVERTIME > 0 THEN BEGIN SELECT RUNTIME INTO INT_LAST_RUNTIME FROM (SELECT T.ETL_LOGID,T.ETL_JOB_ID,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 9) WHERE ROW_NUM = 1; IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME THEN -- 修改作业状态为运行超时 SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,4,VAR_JOB_OVERTIME_DESC); END IF; EXCEPTION WHEN OTHERS THEN NULL; END; -- 超时提醒方式为超过前三次的平均值 ELSIF LOOP_JOB.ETL_OVERTIME_REM_WAY = 2 AND INT_JOB_OVERTIME > 0 THEN BEGIN SELECT AVG(RUNTIME) INTO INT_LAST_RUNTIME FROM (SELECT T.ETL_LOGID,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 9) WHERE ROW_NUM <= 3; IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME THEN -- 修改作业状态为运行超时 SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,VAR_JOB_OVERTIME_DESC); END IF; EXCEPTION WHEN OTHERS THEN NULL; END; END IF; END LOOP; FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_END_TIME FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_RUN_STATUS = 1 ORDER BY A.ETL_FLOW_LEVEL DESC) LOOP -- 下属作业运行成功将作业流状态置为成功 IF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 9 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_DATA_SUCC_TIME = LOOP_FLOW.ETL_DATA_END_TIME,T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 9 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID); -- 下属作业运行失败将作业流置为失败 ELSIF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 2 THEN UPDATE ETL_FLOW_RUN_STS T SET T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 2 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID); END IF; END LOOP; END; /******************************************************************* 程序名 :SP_FLOW_RUN_ERROR_RESET 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 失败任务重置,等待重新运行 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_FLOW_RUN_ERROR_RESET IS VAR_MAX_RESET_TIME NUMBER; BEGIN -- 获取最大任务重置次数 SELECT T.ETL_Para_VAL INTO VAR_MAX_RESET_TIME FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_MAX_RESET_TIME'; FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_RESET_TIME FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND T.ETL_FLOW_RUN_STATUS = 2 ORDER BY A.ETL_FLOW_LEVEL) LOOP IF LOOP_FLOW.ETL_FLOW_LEVEL = 1 AND (VAR_MAX_RESET_TIME = 0 OR LOOP_FLOW.ETL_RESET_TIME < VAR_MAX_RESET_TIME) THEN UPDATE ETL_FLOW_RUN_STS T SET /*T.ETL_START_TIME = NULL,*/ T.ETL_END_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 3,T.ETL_RESET_TIME = T.ETL_RESET_TIME + 1 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1 AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 3 THEN UPDATE ETL_FLOW_RUN_STS T SET /*T.ETL_START_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 3 WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID; COMMIT; END IF; END LOOP; FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,A.ETL_JOB_RUN_STATUS FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_INFO B WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID AND A.ETL_JOB_ID = B.ETL_JOB_ID AND B.ETL_JOB_STATUS = 1) LOOP -- 将运行失败的作业置为重新运行,等待重新运行 IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 3 AND LOOP_JOB.ETL_JOB_RUN_STATUS = 2 THEN UPDATE ETL_JOB_RUN_STS T SET /*T.ETL_START_TIME = NULL,T.ETL_JOB_RUN_STATUS = 3,T.ETL_LOG_DESC = NULL WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID; END IF; END LOOP; COMMIT; END; /******************************************************************* 程序名 :SP_JOB_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 作业运行状态处理 修改人 : zhuyh 修改时间 :2013/9/30 修改原因 : 进程ID由记录数据库进程改为记录操作系统进程 *******************************************************************/ PROCEDURE SP_JOB_RUN_STATUS ( I_JOB_ID NUMBER,I_JOB_RUN_INFO VARCHAR2 ) IS -- VAR_SESSION_ID VARCHAR2(10); DTE_DATA_END_TIME DATE; VAR_JOB_SUCC_DESC VARCHAR2(100); VAR_JOB_ERR_DESC VARCHAR2(100); BEGIN -- 获取作业运行成功描述 SELECT T.ETL_Para_VAL INTO VAR_JOB_SUCC_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_SUCC_DESC'; -- 获取作业运行失败描述 SELECT T.ETL_Para_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_ERR_DESC'; SELECT ETL_DATA_END_TIME INTO DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID; -- 正在运行 IF I_JOB_RUN_STATUS = 1 THEN -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程 -- 获取数据库进程 -- SELECT SYS_CONTEXT('USERENV','SID') INTO VAR_SESSION_ID FROM DUAL; UPDATE ETL_JOB_RUN_STS T SET /*T.ETL_START_TIME = SYSDATE,*/ T.ETL_DATA_ORG_ID = I_ORG_ID,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程 --,T.ETL_SESSION_ID = VAR_SESSION_ID,T.ETL_LOG_DESC = I_JOB_RUN_INFO WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; -- 运行失败 ELSIF I_JOB_RUN_STATUS = 2 THEN UPDATE ETL_JOB_RUN_STS T SET T.ETL_END_TIME = SYSDATE,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS,T.ETL_LOG_DESC = VAR_JOB_ERR_DESC || I_JOB_RUN_INFO WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; -- 写失败日志 SP_ETL_JOB_LOG_INFO(I_JOB_ID); -- 运行成功 ELSIF I_JOB_RUN_STATUS = 9 THEN UPDATE ETL_JOB_RUN_STS T SET T.ETL_DATA_SUCC_TIME = DTE_DATA_END_TIME,T.ETL_LOG_DESC = VAR_JOB_SUCC_DESC || I_JOB_RUN_INFO WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; -- 写失败日志 SP_ETL_JOB_LOG_INFO(I_JOB_ID); END IF; END; /******************************************************************* 程序名 :SP_ETL_FLOW_LOG_INFO 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 记录作业流运行日志 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER) IS DTE_DATA_START_TIME DATE; DTE_DATA_END_TIME DATE; INT_COUNT NUMBER; VAR_ETL_LOGID VARCHAR2(30); -- 日志序号在每个数据周期内排序 BEGIN SELECT T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME INTO DTE_DATA_START_TIME,DTE_DATA_END_TIME FROM ETL_FLOW_RUN_STS T WHERE T.ETL_FLOW_ID = I_FLOW_ID; -- 检查该数据周期有没有运行过 SELECT COUNT(*) INTO INT_COUNT FROM ETL_FLOW_RUN_LOG T WHERE T.ETL_FLOW_ID = I_FLOW_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; -- 未运行过的使用数据开始时间从新编号 IF INT_COUNT = 0 THEN VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') || TO_CHAR(DTE_DATA_END_TIME,'YYYYMMDDHH24MISS') || '01'; ELSE -- 运行过的用最大编号加1 SELECT MAX(ETL_LOGID) INTO VAR_ETL_LOGID FROM ETL_FLOW_RUN_LOG T WHERE T.ETL_FLOW_ID = I_FLOW_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; VAR_ETL_LOGID := VAR_ETL_LOGID + 1; END IF; -- 记日志表 INSERT INTO ETL_FLOW_RUN_LOG (ETL_LOGID,ETL_FLOW_ID,ETL_DATA_START_TIME,ETL_DATA_END_TIME,ETL_START_TIME,ETL_END_TIME,ETL_FLOW_RUN_STATUS,ETL_LOG_DESC) SELECT VAR_ETL_LOGID,T.ETL_FLOW_ID,T.ETL_START_TIME,T.ETL_END_TIME,T.ETL_LOG_DESC FROM ETL_FLOW_RUN_STS T WHERE T.ETL_FLOW_ID = I_FLOW_ID; COMMIT; END; /******************************************************************* 程序名 :SP_ETL_JOB_LOG_INFO 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 记录作业运行日志 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER) IS DTE_DATA_START_TIME DATE; DTE_DATA_END_TIME DATE; INT_COUNT NUMBER; VAR_ETL_LOGID VARCHAR2(30); -- 日志序号在每个数据周期内排序 BEGIN SELECT T.ETL_DATA_START_TIME,DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID; -- 检测该数据周期任务有没有运行过 SELECT COUNT(*) INTO INT_COUNT FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = I_JOB_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; -- 未运行过的作业使用数据结束时间重新编号 IF INT_COUNT = 0 THEN VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') || '01'; ELSE -- 运行过的作业用最大编号加1 SELECT MAX(ETL_LOGID) INTO VAR_ETL_LOGID FROM ETL_JOB_RUN_LOG T WHERE T.ETL_JOB_ID = I_JOB_ID AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME; VAR_ETL_LOGID := VAR_ETL_LOGID + 1; END IF; -- 记日志表 INSERT INTO ETL_JOB_RUN_LOG (ETL_LOGID,ETL_JOB_ID,ETL_DATA_ORG_ID,ETL_JOB_RUN_STATUS,T.ETL_DATA_ORG_ID,T.ETL_JOB_RUN_STATUS,T.ETL_LOG_DESC FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID; COMMIT; END; /******************************************************************* 程序名 :SP_INSERT_JOB_FAIL_SMS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 生成失败作业短信 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_INSERT_MONITOR_SMS ( O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/ ) IS V_DTE_RUN_BEGIN_DT DATE; /*程序每一步骤运行开始*/ V_DTE_RUN_END_DT DATE; /*程序每一步骤运行结束时间*/ V_INT_STEP NUMBER := 0; /*程序执行步骤*/ /*步骤描述信息*/ V_VAR_STEP_DESC VARCHAR2(1000); /*步骤所执行的DML类型*/ V_VAR_STEP_DML_TYPE VARCHAR2(10); /*受影响行数*/ V_INT_ROW_CNT INTEGER := 0; /*过程名称*/ V_VAR_PROC_NAME VARCHAR2(70) := 'PKG_SEND_SMS.SP_INSERT_SMS'; BEGIN V_INT_STEP := V_INT_STEP + 1; /*第一步骤*/ V_VAR_STEP_DESC := V_INT_STEP || '.0:作业运行失败发送短信给运营人员 '; V_VAR_STEP_DML_TYPE := 'INSERT'; /*操作类型*/ /*DML开始运行时间*/ V_DTE_RUN_BEGIN_DT := SYSDATE; /*执行相应的sql语句*/ FOR LOOP_JOB IN (SELECT C.MOBLIE_PHONE,T.ETL_LOGID,A.ETL_JOB_ID,A.ETL_JOB_NAME,A.ETL_JOB_DESC,T.ETL_LOG_DESC FROM ETL_JOB_RUN_LOG T,ETL_CTL_JOB_INFO A,ETL_SEND_SMS_LIST C WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_JOB_RUN_STATUS = 2 AND T.ETL_SEND_FLAG = 0 AND C.SEND_MONITOR_SMS = 1) LOOP INSERT INTO ETL_SEND_SMS_LOG (ETL_LOGID,MOBLIE_PHONE,SMS_CONTENT,SMS_LEVEL,SEND_TIME,ETL_DATE) VALUES (LOOP_JOB.ETL_LOGID,LOOP_JOB.ETL_JOB_ID,LOOP_JOB.MOBLIE_PHONE,'作业ID号[' || LOOP_JOB.ETL_JOB_ID || '],作业名称[' || LOOP_JOB.ETL_JOB_NAME || '],作业描述[' || LOOP_JOB.ETL_JOB_DESC || '],数据周期[' || TO_CHAR(LOOP_JOB.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS') || '-' || TO_CHAR(LOOP_JOB.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS') || '],机构号[' || LOOP_JOB.ETL_DATA_ORG_ID || '],运行描述[' || LOOP_JOB.ETL_LOG_DESC || ']',1,Trunc(SYSDATE),SYSDATE); /*获取受影响行数*/ V_INT_ROW_CNT := V_INT_ROW_CNT + 1; UPDATE ETL_JOB_RUN_LOG T SET T.ETL_SEND_FLAG = 1 WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID AND T.ETL_LOGID = LOOP_JOB.ETL_LOGID; COMMIT; ---提交DML操作 END LOOP; /*DML运行结束时间*/ V_DTE_RUN_END_DT := SYSDATE; /*记录成功的日志信息*/ IF V_INT_ROW_CNT > 0 THEN PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,SYSDATE /*数据开始日期*/,SYSDATE /*数据结束日期*/,1 /*机构代码*/,V_VAR_PROC_NAME /*存储过程名称*/,V_VAR_STEP_DESC /*操作步骤描述*/,V_VAR_STEP_DML_TYPE /*操作类型*/,V_INT_ROW_CNT /*受影响行数*/,1 /*执行结果*/,V_DTE_RUN_BEGIN_DT /*运行开始时间*/,V_DTE_RUN_END_DT /*运行结束时间*/,'' /*运行结果详细信息*/); END IF; /*整个过程执行成功*/ O_RESULT_FLAG := 9; /*整个过程运行结果描述信息*/ O_RESULT_MSG := ''; /*异常处理部分*/ EXCEPTION WHEN OTHERS THEN ---回滚DML操作 ROLLBACK; O_RESULT_FLAG := 2; ----失败 O_RESULT_MSG := sqlERRM; ---记录异常日志信息 PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,V_VAR_STEP_DML_TYPE /*操作步骤类型*/,V_INT_ROW_CNT /*返回的受影响行数*/,0 /*运行结果 0 失败; 1 成功*/,sqlERRM /*运行结果详细信息*/); END; /******************************************************************* 程序名 :SP_SEND_JOB_FAIL_SMS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 发送失败作业短信 修改人 : 修改时间 : 修改原因 : *******************************************************************/ PROCEDURE SP_SEND_MONITOR_SMS IS V_VAR_RESULT_FLAG CHAR(1); V_VAR_RESULT_MSG CHAR(300); V_VAR_RETURN_STATUS INT; CURSOR C_DATA IS SELECT T.ETL_LOGID,T.MOBLIE_PHONE,T.SMS_CONTENT,T.SMS_LEVEL,T.SEND_TIME FROM ETL_SEND_SMS_LOG T WHERE T.SEND_STATUS = 0; V_USER_CODE VARCHAR2(100) DEFAULT 'MIS'; V_PASSWORD VARCHAR2(100) DEFAULT 'MIS#2013'; BEGIN SP_INSERT_MONITOR_SMS(V_VAR_RESULT_FLAG,V_VAR_RESULT_MSG); IF V_VAR_RESULT_FLAG = 9 THEN FOR CC_DATA IN C_DATA LOOP PKG_SMS_INTERFACE.SEND_SMS(V_USER_CODE,V_PASSWORD,CC_DATA.MOBLIE_PHONE,CC_DATA.SMS_CONTENT,CC_DATA.SMS_LEVEL,CC_DATA.SEND_TIME,V_VAR_RETURN_STATUS); --发送成功,更新标志 IF V_VAR_RETURN_STATUS > 0 THEN UPDATE ETL_SEND_SMS_LOG T SET T.SEND_STATUS = '1',T.RECEIVE_STATUS = V_VAR_RETURN_STATUS WHERE T.ETL_LOGID = CC_DATA.ETL_LOGID AND T.ETL_JOB_ID = CC_DATA.ETL_JOB_ID AND T.MOBLIE_PHONE = CC_DATA.MOBLIE_PHONE; END IF; END LOOP; END IF; COMMIT; END; /******************************************************************* 程序名 :FN_GET_FLOW_DEPEND 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取作业流前置依赖 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS VAR_DEP_STS VARCHAR2(100) := 1; BEGIN FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,NVL(B.ETL_DATA_SUCC_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_FLOW_ID) ETL_CYC_CODE,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_DEPD_FLOW_ID) DEP_CYC_CODE FROM ETL_CTL_FLOW_DEPD T,ETL_FLOW_RUN_STS A,ETL_FLOW_RUN_STS B WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_DEPD_FLOW_ID = B.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID) LOOP -- 数据结束时间大于所依赖的数据成功时间 IF (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.ETL_CYC_CODE = LOOP_DEP.DEP_CYC_CODE) OR (LOOP_DEP.ETL_DATA_END_TIME >= Trunc(LOOP_DEP.ETL_DEP_SUCC_TIME) AND LOOP_DEP.ETL_CYC_CODE <> '02' AND LOOP_DEP.DEP_CYC_CODE = '02') OR (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.DEP_CYC_CODE <> '02') THEN VAR_DEP_STS := 0; RETURN VAR_DEP_STS; END IF; END LOOP; RETURN VAR_DEP_STS; EXCEPTION WHEN OTHERS THEN RETURN NULL; END; /******************************************************************* 程序名 :FN_GET_JOB_DEPEND 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取作业前置依赖 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2 IS VAR_DEP_STS VARCHAR2(100) := 1; BEGIN FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME FROM ETL_CTL_JOB_DEPD T,ETL_JOB_RUN_STS B WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND T.ETL_DEPD_JOB_ID = B.ETL_JOB_ID AND T.ETL_JOB_ID = I_JOB_ID) LOOP -- 数据结束时间大于所依赖的数据成功时间 IF LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME THEN VAR_DEP_STS := 0; RETURN VAR_DEP_STS; END IF; END LOOP; RETURN VAR_DEP_STS; END; /******************************************************************* 程序名 :FN_GET_FLOW_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取作业流运行状态 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS VAR_FLOW_RUN_STATUS VARCHAR2(100) := 1; VAR_CHILD_FLAG CHAR(1); INT_STS_NOT_9 NUMBER; INT_STS_0_3 NUMBER; INT_STS_1_4 NUMBER; INT_STS_2 NUMBER; BEGIN SELECT T.ETL_CHILD_FLAG INTO VAR_CHILD_FLAG FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID; -- 不成功的数量 IF VAR_CHILD_FLAG = 0 THEN SELECT COUNT(*) INTO INT_STS_NOT_9 FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS <> 9; -- 正在运行的数量 SELECT COUNT(*) INTO INT_STS_1_4 FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS IN (1,4); -- 满足条件但未运行的数量 SELECT COUNT(*) INTO INT_STS_0_3 FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS IN (0,3) AND FN_GET_FLOW_DEPEND(T.ETL_FLOW_ID) = 1; -- 运行失败的数量 SELECT COUNT(*) INTO INT_STS_2 FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND A.ETL_FLOW_STATUS = 1 AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID AND ETL_FLOW_RUN_STATUS = 2; ELSIF VAR_CHILD_FLAG = 1 THEN SELECT COUNT(*) INTO INT_STS_NOT_9 FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS <> 9; -- 正在运行的数量 SELECT COUNT(*) INTO INT_STS_1_4 FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS IN (1,4); -- 满足条件但未运行的数量 SELECT COUNT(*) INTO INT_STS_0_3 FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS IN (0,3) AND FN_GET_FLOW_DEPEND(T.ETL_JOB_ID) = 1; -- 运行失败的数量 SELECT COUNT(*) INTO INT_STS_2 FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A WHERE T.ETL_JOB_ID = A.ETL_JOB_ID AND A.ETL_JOB_STATUS = 1 AND A.ETL_FLOW_ID = I_FLOW_ID AND ETL_JOB_RUN_STATUS = 2; END IF; -- 不成功的数量为0,则全部成功 IF INT_STS_NOT_9 = 0 THEN VAR_FLOW_RUN_STATUS := 9; -- 不成功的不为0,正在运行的为0,运行失败的数量大于0 ELSIF INT_STS_1_4 = 0 AND INT_STS_0_3 = 0 AND INT_STS_2 > 0 THEN VAR_FLOW_RUN_STATUS := 2; END IF; RETURN VAR_FLOW_RUN_STATUS; END; /******************************************************************* 程序名 :FN_GET_NEXT_DATA_TIME 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取下个数据时间 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE IS VAR_CTL_CYC_CODE VARCHAR2(2); VAR_sql VARCHAR2(1000); VAR_FREQ_TIME VARCHAR2(100); VAR_SRC_DB VARCHAR2(100); DTE_SRC_SYS_TIME DATE; DTE_NEXT_DATA_TIME DATE; DTE_DATA_SUCC_TIME DATE; BEGIN -- 获取运行周期、源数据库,下次运行时间、数据成功时间 SELECT T.ETL_CYC_CODE,T.ETL_SRC_DB,NVL(A.ETL_NEXT_DATA_TIME,A.ETL_DATA_END_TIME),NVL(A.ETL_DATA_SUCC_TIME,A.ETL_DATA_END_TIME) INTO VAR_CTL_CYC_CODE,VAR_SRC_DB,DTE_NEXT_DATA_TIME,DTE_DATA_SUCC_TIME FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; -- 非本数据库 VAR_SRC_DB := VAR_SRC_DB || 'dual'; -- 每天运行的作业流 IF VAR_CTL_CYC_CODE = '01' THEN VAR_FREQ_TIME := 1; VAR_sql := 'SELECT Trunc(SYSDATE) FROM ' || VAR_SRC_DB; EXECUTE IMMEDIATE VAR_sql INTO DTE_SRC_SYS_TIME; DTE_NEXT_DATA_TIME := DTE_DATA_SUCC_TIME + VAR_FREQ_TIME; IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME THEN RETURN DTE_NEXT_DATA_TIME; ELSE RETURN DTE_DATA_SUCC_TIME; END IF; -- 每十分钟运行的作业流 ELSIF VAR_CTL_CYC_CODE = '02' THEN VAR_FREQ_TIME := 1 / 24 / 6; VAR_sql := 'SELECT SYSDATE - 1 / 24 / 6 FROM ' || VAR_SRC_DB; EXECUTE IMMEDIATE VAR_sql INTO DTE_SRC_SYS_TIME; WHILE DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME LOOP DTE_NEXT_DATA_TIME := DTE_NEXT_DATA_TIME + VAR_FREQ_TIME; END LOOP; RETURN DTE_NEXT_DATA_TIME; ELSIF VAR_CTL_CYC_CODE = '03' THEN VAR_FREQ_TIME := 1; VAR_sql := 'SELECT Trunc(SYSDATE) FROM ' || VAR_SRC_DB; EXECUTE IMMEDIATE VAR_sql INTO DTE_SRC_SYS_TIME; DTE_NEXT_DATA_TIME := ADD_MONTHS(DTE_DATA_SUCC_TIME,VAR_FREQ_TIME); IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME THEN RETURN DTE_NEXT_DATA_TIME; ELSE RETURN DTE_DATA_SUCC_TIME; END IF; END IF; END; /******************************************************************* 程序名 :FN_GET_SUPER_FLOW_RUN_STATUS 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取上级作业流运行状态 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS VAR_SUPER_FLOW_RUN_STATUS VARCHAR2(10); BEGIN SELECT ETL_FLOW_RUN_STATUS INTO VAR_SUPER_FLOW_RUN_STATUS FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_SUPER_FLOW_RUN_STATUS; END; /******************************************************************* 程序名 :FN_GET_SUPER_DATA_START_TIME 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取上级作业流数据开始时间 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE IS VAR_SUPER_FLOW_DATA_START_TIME DATE; BEGIN SELECT A.ETL_DATA_START_TIME INTO VAR_SUPER_FLOW_DATA_START_TIME FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_SUPER_FLOW_DATA_START_TIME; END; /******************************************************************* 程序名 :FN_GET_SUPER_DATA_END_TIME 创建人 : zhuyh 创建时间 : 2013/8/20 功能描述 : 获取上级作业流数据结束时间 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE IS VAR_SUPER_FLOW_DATA_END_TIME DATE; BEGIN SELECT A.ETL_DATA_END_TIME INTO VAR_SUPER_FLOW_DATA_END_TIME FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID AND T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_SUPER_FLOW_DATA_END_TIME; END; /******************************************************************* 程序名 :FN_GET_CYC_CODE 创建人 : zhuyh 创建时间 : 2013/8/26 功能描述 : 获取周期代码 修改人 : 修改时间 : 修改原因 : *******************************************************************/ FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2 IS VAR_CYC_CODE VARCHAR2(100); BEGIN SELECT T.ETL_CYC_CODE INTO VAR_CYC_CODE FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID; RETURN VAR_CYC_CODE; END; END PKG_ETL_CTL; /
以上是编程之家(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。
如果觉得编程之家网站内容还不错,欢迎将编程之家网站推荐给程序员好友。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。