微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

ETL调度系统

下面是编程之家 jb51.cc 通过网络收集整理的代码片段。

编程之家小编现在分享给大家,也给大家做个参考。

CREATE OR REPLACE PACKAGE PKG_ETL_CTL IS
  -- 调用主程序,调用存储过程
  PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER);

  -- 创建新的时间周期,运行数据周期作业及作业流状态处理
  PROCEDURE SP_FLOW_RUN_DEAL;

  -- 创建新的时间周期
  PROCEDURE SP_FLOW_CREATE_NEW_PERIOD;

  -- 运行数据周期内的作业流及作业
  PROCEDURE SP_FLOW_RUN_NEW_PERIOD;

  -- 同步作业流运行状态
  PROCEDURE SP_FLOW_RUN_STATUS;

  -- 失败作业流及作业重置为未处理
  PROCEDURE SP_FLOW_RUN_ERROR_RESET;

  -- 作业状态更新
  PROCEDURE SP_JOB_RUN_STATUS
  (
    I_JOB_ID NUMBER,I_ORG_ID VARCHAR2,I_JOB_RUN_STATUS VARCHAR2,I_JOB_RUN_INFO VARCHAR2
  );

  -- 作业流日志处理
  PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER);

  -- 作业日志处理
  PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER);

  PROCEDURE SP_INSERT_MONITOR_SMS
  (
    O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/
  );

  PROCEDURE SP_SEND_MONITOR_SMS;

  -- 获取作业流前置依赖
  FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2;

  -- 获取作业前置依赖
  FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2;

  -- 获取作业流下属作业运行状态
  FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;

  -- 获取下个数据日期
  FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE;

  -- 获取上级作业流运行状态
  FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;

  -- 获取上级作业流数据开始时间
  FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE;

  -- 获取上级作业流数据结束时间
  FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE;

  -- 获取周期代码
  FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2;
END PKG_ETL_CTL;
/
CREATE OR REPLACE PACKAGE BODY PKG_ETL_CTL IS
  /*******************************************************************
    程序名   :SP_EXEC_PROC
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 调用主程序,调用存储过程
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER) IS
    VAR_PRO_NAME        VARCHAR2(100);
    VAR_DATA_START_TIME VARCHAR2(20);
    VAR_DATA_END_TIME   VARCHAR2(20);
    VAR_sql             VARCHAR2(4000);
    VAR_ParaMS          VARCHAR2(1000);
    VAR_ORG_ID          VARCHAR2(10);
    VAR_JOB_RUN_DESC    VARCHAR2(100);
    VAR_JOB_ERR_DESC    VARCHAR2(100);
  
  BEGIN
    -- 获取作业正在运行描述
    SELECT T.ETL_Para_VAL INTO VAR_JOB_RUN_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_RUN_DESC';
  
    -- 获取作业运行失败描述
    SELECT T.ETL_Para_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_ERR_DESC';
  
    -- 获取作业所调用的存储过程,数据开始时间,数据结束时间
    SELECT T.ETL_JOB_PROC,TO_CHAR(A.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS'),TO_CHAR(A.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS')
      INTO VAR_PRO_NAME,VAR_DATA_START_TIME,VAR_DATA_END_TIME
      FROM ETL_CTL_JOB_INFO T,ETL_JOB_RUN_STS A
     WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
       AND T.ETL_JOB_ID = I_JOB_ID;
  
    -- 获取作业全部参数拼接到一起
    FOR LOOP_ParaM IN (SELECT T.ETL_Para_NAME,DECODE(T.ETL_Para_TYPE,2,'TO_DATE(' || T.ETL_Para_VAL || ',''YYYYMMDDHH24MISS'')' -- 日期类型参数转化成日期格式,T.ETL_Para_VAL) ETL_Para_VAL,T.ETL_Para_TYPE
                         FROM ETL_JOB_Para T
                        WHERE T.ETL_JOB_ID = I_JOB_ID) LOOP
      -- 获取机构号
      IF LOOP_ParaM.ETL_Para_NAME = 'I_ORG_ID'
      THEN
        VAR_ORG_ID := LOOP_ParaM.ETL_Para_VAL;
      END IF;
      -- 参数拼接
      VAR_ParaMS := VAR_ParaMS || LOOP_ParaM.ETL_Para_NAME || ' => ' || LOOP_ParaM.ETL_Para_VAL || ',';
    END LOOP;
  
    -- 参数加上输出参数(存储过程运行结果和运行信息)
    VAR_ParaMS := UPPER(VAR_ParaMS) || 'O_RESULT_FLAG => LO_RESULT_FLAG,O_RESULT_MSG => LO_RESULT_MSG';
    -- 参数替换为变量
    VAR_ParaMS := REPLACE(VAR_ParaMS,'#$I_DATA_START_TIME#',VAR_DATA_START_TIME);
  
    VAR_ParaMS := REPLACE(VAR_ParaMS,'#$I_DATA_END_TIME#',VAR_DATA_END_TIME);
  
    -- 拼接存储过程进行调用
    VAR_sql := 'DECLARE  LO_RESULT_FLAG VARCHAR2(10);LO_RESULT_MSG VARCHAR2(300);BEGIN ' || VAR_PRO_NAME || '(' ||
               VAR_ParaMS || ');PKG_ETL_CTL.SP_JOB_RUN_STATUS(' || I_JOB_ID || ',''' || VAR_ORG_ID ||
               ''',LO_RESULT_FLAG,LO_RESULT_MSG);END;';
  
    -- 修改作业状态为正在运行
    SP_JOB_RUN_STATUS(I_JOB_ID,VAR_ORG_ID,'1',VAR_JOB_RUN_DESC);
    -- 运行存储过程
    EXECUTE IMMEDIATE VAR_sql;
  
  EXCEPTION
    WHEN OTHERS THEN
      -- 运行出错,返回错误信息
      SP_JOB_RUN_STATUS(I_JOB_ID,'2',sqlERRM);
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_DEAL
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 创建新的时间周期,运行数据周期作业及作业流状态处理
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_DEAL IS
  BEGIN
  
    -- 创建新的时间周期
    SP_FLOW_CREATE_NEW_PERIOD;
    -- 运行时间周期内的作业
    SP_FLOW_RUN_NEW_PERIOD;
    -- 更新作业流状态
    SP_FLOW_RUN_STATUS;
    -- 失败任务重置
    SP_FLOW_RUN_ERROR_RESET;
  END;

  /*******************************************************************
    程序名   :SP_FLOW_CREATE_NEW_PERIOD
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 生成新的数据周期运行新周期的数据
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_CREATE_NEW_PERIOD IS
    DTE_DATA_NEXT_TIME  DATE;
    DTE_DATA_START_TIME DATE;
    DTE_DATA_END_TIME   DATE;
  BEGIN
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,A.ETL_NEXT_EXPIRY_TIME
                        FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND T.ETL_FLOW_LEVEL = 1
                         AND T.ETL_FLOW_STATUS = 1) LOOP
      DTE_DATA_NEXT_TIME := FN_GET_NEXT_DATA_TIME(LOOP_FLOW.ETL_FLOW_ID);
      IF DTE_DATA_NEXT_TIME <= LOOP_FLOW.ETL_NEXT_EXPIRY_TIME
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_NEXT_DATA_TIME = DTE_DATA_NEXT_TIME
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_SUCC_TIME,T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME,T.ETL_NEXT_DATA_TIME,T.ETL_FLOW_RUN_STATUS,A.ETL_CYC_CODE,A.ETL_FLOW_LEVEL,A.ETL_FLOW_STATUS
                        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND A.ETL_FLOW_STATUS = 1
                       ORDER BY A.ETL_FLOW_LEVEL) LOOP
      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
         AND LOOP_FLOW.ETL_FLOW_STATUS = 1
         AND LOOP_FLOW.ETL_FLOW_RUN_STATUS = 9
         AND LOOP_FLOW.ETL_DATA_SUCC_TIME < LOOP_FLOW.ETL_NEXT_DATA_TIME
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_DATA_START_TIME = CASE
                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '01' THEN
                                          T.ETL_DATA_SUCC_TIME + 1
                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '02' THEN
                                          T.ETL_DATA_SUCC_TIME + 1 / 24 / 60 / 60
                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '03' THEN
                                          T.ETL_DATA_SUCC_TIME + 1
                                       END,T.ETL_DATA_END_TIME = T.ETL_NEXT_DATA_TIME,T.ETL_START_TIME = NULL,T.ETL_END_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 0,T.ETL_RESET_TIME = 0
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
            AND LOOP_FLOW.ETL_FLOW_STATUS = 1
            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 0
      THEN
        DTE_DATA_START_TIME := FN_GET_SUPER_DATA_START_TIME(LOOP_FLOW.ETL_FLOW_ID);
        DTE_DATA_END_TIME   := FN_GET_SUPER_DATA_END_TIME(LOOP_FLOW.ETL_FLOW_ID);
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_DATA_START_TIME = DTE_DATA_START_TIME,T.ETL_DATA_END_TIME = DTE_DATA_END_TIME,T.ETL_FLOW_RUN_STATUS = 0
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
    FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,T.ETL_DATA_START_TIME FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME FLOW_DATA_END_TIME,A.ETL_DATA_START_TIME JOB_DATA_START_TIME,A.ETL_DATA_END_TIME JOB_DATA_END_TIME,B.ETL_JOB_STATUS
                       FROM ETL_FLOW_RUN_STS T,ETL_JOB_RUN_STS A,ETL_CTL_JOB_INFO B
                      WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID
                        AND A.ETL_JOB_ID = B.ETL_JOB_ID
                        AND B.ETL_JOB_STATUS = 1) LOOP
      IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 0
         AND LOOP_JOB.ETL_JOB_STATUS = 1
      THEN
        UPDATE ETL_JOB_RUN_STS T
           SET T.ETL_DATA_START_TIME = LOOP_JOB.FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME = LOOP_JOB.FLOW_DATA_END_TIME,T.ETL_JOB_RUN_STATUS = 0,T.ETL_SESSION_ID = NULL,T.ETL_LOG_DESC = NULL
         WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;
      END IF;
    END LOOP;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_NEW_PERIOD
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 运行新周期的数据
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_NEW_PERIOD IS
  
  BEGIN
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND A.ETL_FLOW_STATUS = 1
                       ORDER BY A.ETL_FLOW_LEVEL) LOOP
      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
         AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3)
         AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 1
            AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3)
            AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 作业流状态处理
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_STATUS IS
    -- VAR_JOB_NO_SESSION    VARCHAR2(100); -- 数据库进程不存在
    VAR_JOB_OVERTIME_DESC VARCHAR2(100); -- 作业运行超时描述
    INT_JOB_OVERTIME      NUMBER; -- 作业超时告警时间
    INT_LAST_RUNTIME      NUMBER; -- 作业上次运行时间
    -- INT_JOB_DEAD_TIME     NUMBER; -- 调度作业数据库进程不存在运行判定时间
  BEGIN
    -- 获取数据库进程不存在描述
    /*SELECT T.ETL_Para_VAL
     INTO VAR_JOB_NO_SESSION
     FROM ETL_CTL_Para T
    WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_NO_SESSION_DESC';*/
  
    -- 获取作业运行超时告警时间
    SELECT T.ETL_Para_VAL INTO INT_JOB_OVERTIME FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_OVERTIME';
  
    -- 调度作业数据库进程不存在运行判定时间
    /*SELECT T.ETL_Para_VAL
     INTO INT_JOB_DEAD_TIME
     FROM ETL_CTL_Para T
    WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_DEAD_TIME';*/
  
    -- 获取作业运行超时描述
    SELECT T.ETL_Para_VAL
      INTO VAR_JOB_OVERTIME_DESC
      FROM ETL_CTL_Para T
     WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_OVERTIME_DESC';
  
    FOR LOOP_JOB IN (SELECT T.ETL_JOB_ID,A.ETL_OVERTIME_REM_WAY
                            --,T.ETL_SESSION_ID,(SYSDATE - T.ETL_START_TIME) RUNTIME
                       FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
                        AND T.ETL_JOB_RUN_STATUS = 1) LOOP
      -- 作业为正在运行,但数据库进程已经不存在(10分钟)的作业置为运行失败
      /*IF FN_GET_SESSION_STATUS(LOOP_JOB.ETL_SESSION_ID) = 0
         AND LOOP_JOB.RUNTIME * 24 * 60 >= INT_JOB_DEAD_TIME
      THEN
        SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,'',VAR_JOB_NO_SESSION);
        -- 超时提醒方式为超过上次运行时间
      ELS*/
      IF LOOP_JOB.ETL_OVERTIME_REM_WAY = 1
         AND INT_JOB_OVERTIME > 0
      THEN
        BEGIN
          SELECT RUNTIME
            INTO INT_LAST_RUNTIME
            FROM (SELECT T.ETL_LOGID,T.ETL_JOB_ID,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM
                    FROM ETL_JOB_RUN_LOG T
                   WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
                     AND T.ETL_JOB_RUN_STATUS = 9)
           WHERE ROW_NUM = 1;
          IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME
          THEN
            -- 修改作业状态为运行超时
            SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,4,VAR_JOB_OVERTIME_DESC);
          END IF;
        EXCEPTION
          WHEN OTHERS THEN
            NULL;
        END;
        -- 超时提醒方式为超过前三次的平均值
      ELSIF LOOP_JOB.ETL_OVERTIME_REM_WAY = 2
            AND INT_JOB_OVERTIME > 0
      THEN
        BEGIN
          SELECT AVG(RUNTIME)
            INTO INT_LAST_RUNTIME
            FROM (SELECT T.ETL_LOGID,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM
                    FROM ETL_JOB_RUN_LOG T
                   WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
                     AND T.ETL_JOB_RUN_STATUS = 9)
           WHERE ROW_NUM <= 3;
          IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME
          THEN
            -- 修改作业状态为运行超时
            SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,VAR_JOB_OVERTIME_DESC);
          END IF;
        EXCEPTION
          WHEN OTHERS THEN
            NULL;
        END;
      END IF;
    END LOOP;
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_END_TIME
                        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND T.ETL_FLOW_RUN_STATUS = 1
                       ORDER BY A.ETL_FLOW_LEVEL DESC) LOOP
      -- 下属作业运行成功将作业流状态置为成功
      IF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 9
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_DATA_SUCC_TIME = LOOP_FLOW.ETL_DATA_END_TIME,T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 9
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
        SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);
        -- 下属作业运行失败将作业流置为失败
      ELSIF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 2
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 2
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
        SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);
      END IF;
    END LOOP;
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_ERROR_RESET
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 失败任务重置,等待重新运行
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_ERROR_RESET IS
    VAR_MAX_RESET_TIME NUMBER;
  BEGIN
    -- 获取最大任务重置次数
    SELECT T.ETL_Para_VAL
      INTO VAR_MAX_RESET_TIME
      FROM ETL_CTL_Para T
     WHERE UPPER(T.ETL_Para_NAME) = 'ETL_MAX_RESET_TIME';
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_RESET_TIME
                        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND A.ETL_FLOW_STATUS = 1
                         AND T.ETL_FLOW_RUN_STATUS = 2
                       ORDER BY A.ETL_FLOW_LEVEL) LOOP
      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
         AND (VAR_MAX_RESET_TIME = 0 OR LOOP_FLOW.ETL_RESET_TIME < VAR_MAX_RESET_TIME)
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET /*T.ETL_START_TIME = NULL,*/ T.ETL_END_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 3,T.ETL_RESET_TIME = T.ETL_RESET_TIME + 1
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 3
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET /*T.ETL_START_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 3
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
    FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,A.ETL_JOB_RUN_STATUS
                       FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_INFO B
                      WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID
                        AND A.ETL_JOB_ID = B.ETL_JOB_ID
                        AND B.ETL_JOB_STATUS = 1) LOOP
      -- 将运行失败的作业置为重新运行,等待重新运行
      IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 3
         AND LOOP_JOB.ETL_JOB_RUN_STATUS = 2
      THEN
        UPDATE ETL_JOB_RUN_STS T
           SET /*T.ETL_START_TIME = NULL,T.ETL_JOB_RUN_STATUS = 3,T.ETL_LOG_DESC = NULL
         WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;
      END IF;
    END LOOP;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_JOB_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 作业运行状态处理
    修改人   : zhuyh
    修改时间 :2013/9/30
    修改原因 : 进程ID由记录数据库进程改为记录操作系统进程
  *******************************************************************/
  PROCEDURE SP_JOB_RUN_STATUS
  (
    I_JOB_ID NUMBER,I_JOB_RUN_INFO VARCHAR2
  ) IS
    -- VAR_SESSION_ID    VARCHAR2(10);
    DTE_DATA_END_TIME DATE;
    VAR_JOB_SUCC_DESC VARCHAR2(100);
    VAR_JOB_ERR_DESC  VARCHAR2(100);
  
  BEGIN
    -- 获取作业运行成功描述
    SELECT T.ETL_Para_VAL
      INTO VAR_JOB_SUCC_DESC
      FROM ETL_CTL_Para T
     WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_SUCC_DESC';
  
    -- 获取作业运行失败描述
    SELECT T.ETL_Para_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_Para T WHERE UPPER(T.ETL_Para_NAME) = 'ETL_JOB_ERR_DESC';
  
    SELECT ETL_DATA_END_TIME INTO DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID;
  
    -- 正在运行
    IF I_JOB_RUN_STATUS = 1
    THEN
      -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程
      -- 获取数据库进程
      -- SELECT SYS_CONTEXT('USERENV','SID') INTO VAR_SESSION_ID FROM DUAL;
      UPDATE ETL_JOB_RUN_STS T
         SET /*T.ETL_START_TIME = SYSDATE,*/ T.ETL_DATA_ORG_ID = I_ORG_ID,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS
             -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程
             --,T.ETL_SESSION_ID = VAR_SESSION_ID,T.ETL_LOG_DESC = I_JOB_RUN_INFO
       WHERE T.ETL_JOB_ID = I_JOB_ID;
      COMMIT;
      -- 运行失败
    ELSIF I_JOB_RUN_STATUS = 2
    THEN
      UPDATE ETL_JOB_RUN_STS T
         SET T.ETL_END_TIME = SYSDATE,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS,T.ETL_LOG_DESC = VAR_JOB_ERR_DESC || I_JOB_RUN_INFO
       WHERE T.ETL_JOB_ID = I_JOB_ID;
      COMMIT;
      -- 写失败日志
      SP_ETL_JOB_LOG_INFO(I_JOB_ID);
      -- 运行成功
    ELSIF I_JOB_RUN_STATUS = 9
    THEN
      UPDATE ETL_JOB_RUN_STS T
         SET T.ETL_DATA_SUCC_TIME = DTE_DATA_END_TIME,T.ETL_LOG_DESC = VAR_JOB_SUCC_DESC || I_JOB_RUN_INFO
       WHERE T.ETL_JOB_ID = I_JOB_ID;
      COMMIT;
      -- 写失败日志
      SP_ETL_JOB_LOG_INFO(I_JOB_ID);
    END IF;
  END;

  /*******************************************************************
    程序名   :SP_ETL_FLOW_LOG_INFO
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 记录作业流运行日志
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER) IS
    DTE_DATA_START_TIME DATE;
    DTE_DATA_END_TIME   DATE;
    INT_COUNT           NUMBER;
    VAR_ETL_LOGID       VARCHAR2(30); -- 日志序号在每个数据周期内排序
  BEGIN
    SELECT T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME
      INTO DTE_DATA_START_TIME,DTE_DATA_END_TIME
      FROM ETL_FLOW_RUN_STS T
     WHERE T.ETL_FLOW_ID = I_FLOW_ID;
  
    -- 检查该数据周期有没有运行过
    SELECT COUNT(*)
      INTO INT_COUNT
      FROM ETL_FLOW_RUN_LOG T
     WHERE T.ETL_FLOW_ID = I_FLOW_ID
       AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
       AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
  
    -- 未运行过的使用数据开始时间从新编号
    IF INT_COUNT = 0
    THEN
      VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') ||
                       TO_CHAR(DTE_DATA_END_TIME,'YYYYMMDDHH24MISS') || '01';
    ELSE
      -- 运行过的用最大编号加1
      SELECT MAX(ETL_LOGID)
        INTO VAR_ETL_LOGID
        FROM ETL_FLOW_RUN_LOG T
       WHERE T.ETL_FLOW_ID = I_FLOW_ID
         AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
         AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
    
      VAR_ETL_LOGID := VAR_ETL_LOGID + 1;
    END IF;
  
    -- 记日志表
    INSERT INTO ETL_FLOW_RUN_LOG
      (ETL_LOGID,ETL_FLOW_ID,ETL_DATA_START_TIME,ETL_DATA_END_TIME,ETL_START_TIME,ETL_END_TIME,ETL_FLOW_RUN_STATUS,ETL_LOG_DESC)
      SELECT VAR_ETL_LOGID,T.ETL_FLOW_ID,T.ETL_START_TIME,T.ETL_END_TIME,T.ETL_LOG_DESC
        FROM ETL_FLOW_RUN_STS T
       WHERE T.ETL_FLOW_ID = I_FLOW_ID;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_ETL_JOB_LOG_INFO
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 记录作业运行日志
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER) IS
    DTE_DATA_START_TIME DATE;
    DTE_DATA_END_TIME   DATE;
    INT_COUNT           NUMBER;
    VAR_ETL_LOGID       VARCHAR2(30); -- 日志序号在每个数据周期内排序
  BEGIN
    SELECT T.ETL_DATA_START_TIME,DTE_DATA_END_TIME
      FROM ETL_JOB_RUN_STS T
     WHERE T.ETL_JOB_ID = I_JOB_ID;
  
    -- 检测该数据周期任务有没有运行过
    SELECT COUNT(*)
      INTO INT_COUNT
      FROM ETL_JOB_RUN_LOG T
     WHERE T.ETL_JOB_ID = I_JOB_ID
       AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
       AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
  
    -- 未运行过的作业使用数据结束时间重新编号
    IF INT_COUNT = 0
    THEN
      VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') || '01';
    ELSE
      -- 运行过的作业用最大编号加1
      SELECT MAX(ETL_LOGID)
        INTO VAR_ETL_LOGID
        FROM ETL_JOB_RUN_LOG T
       WHERE T.ETL_JOB_ID = I_JOB_ID
         AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
         AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
    
      VAR_ETL_LOGID := VAR_ETL_LOGID + 1;
    END IF;
  
    -- 记日志表
    INSERT INTO ETL_JOB_RUN_LOG
      (ETL_LOGID,ETL_JOB_ID,ETL_DATA_ORG_ID,ETL_JOB_RUN_STATUS,T.ETL_DATA_ORG_ID,T.ETL_JOB_RUN_STATUS,T.ETL_LOG_DESC
        FROM ETL_JOB_RUN_STS T
       WHERE T.ETL_JOB_ID = I_JOB_ID;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_INSERT_JOB_FAIL_SMS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 生成失败作业短信
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_INSERT_MONITOR_SMS
  (
    O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/
  ) IS
    V_DTE_RUN_BEGIN_DT DATE; /*程序每一步骤运行开始*/
    V_DTE_RUN_END_DT   DATE; /*程序每一步骤运行结束时间*/
    V_INT_STEP         NUMBER := 0; /*程序执行步骤*/
    /*步骤描述信息*/
    V_VAR_STEP_DESC VARCHAR2(1000);
    /*步骤所执行的DML类型*/
    V_VAR_STEP_DML_TYPE VARCHAR2(10);
    /*受影响行数*/
    V_INT_ROW_CNT INTEGER := 0;
    /*过程名称*/
    V_VAR_PROC_NAME VARCHAR2(70) := 'PKG_SEND_SMS.SP_INSERT_SMS';
  
  BEGIN
  
    V_INT_STEP          := V_INT_STEP + 1; /*第一步骤*/
    V_VAR_STEP_DESC     := V_INT_STEP || '.0:作业运行失败发送短信给运营人员 ';
    V_VAR_STEP_DML_TYPE := 'INSERT'; /*操作类型*/
  
    /*DML开始运行时间*/
    V_DTE_RUN_BEGIN_DT := SYSDATE;
  
    /*执行相应的sql语句*/
    FOR LOOP_JOB IN (SELECT C.MOBLIE_PHONE,T.ETL_LOGID,A.ETL_JOB_ID,A.ETL_JOB_NAME,A.ETL_JOB_DESC,T.ETL_LOG_DESC
                       FROM ETL_JOB_RUN_LOG T,ETL_CTL_JOB_INFO A,ETL_SEND_SMS_LIST C
                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
                        AND T.ETL_JOB_RUN_STATUS = 2
                        AND T.ETL_SEND_FLAG = 0
                        AND C.SEND_MONITOR_SMS = 1) LOOP
      INSERT INTO ETL_SEND_SMS_LOG
        (ETL_LOGID,MOBLIE_PHONE,SMS_CONTENT,SMS_LEVEL,SEND_TIME,ETL_DATE)
      VALUES
        (LOOP_JOB.ETL_LOGID,LOOP_JOB.ETL_JOB_ID,LOOP_JOB.MOBLIE_PHONE,'作业ID号[' || LOOP_JOB.ETL_JOB_ID || '],作业名称[' || LOOP_JOB.ETL_JOB_NAME || '],作业描述[' || LOOP_JOB.ETL_JOB_DESC ||
         '],数据周期[' || TO_CHAR(LOOP_JOB.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS') || '-' ||
         TO_CHAR(LOOP_JOB.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS') || '],机构号[' || LOOP_JOB.ETL_DATA_ORG_ID || '],运行描述[' ||
         LOOP_JOB.ETL_LOG_DESC || ']',1,Trunc(SYSDATE),SYSDATE);
      /*获取受影响行数*/
      V_INT_ROW_CNT := V_INT_ROW_CNT + 1;
      UPDATE ETL_JOB_RUN_LOG T
         SET T.ETL_SEND_FLAG = 1
       WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
         AND T.ETL_LOGID = LOOP_JOB.ETL_LOGID;
      COMMIT; ---提交DML操作
    END LOOP;
    /*DML运行结束时间*/
    V_DTE_RUN_END_DT := SYSDATE;
  
    /*记录成功的日志信息*/
    IF V_INT_ROW_CNT > 0
    THEN
      PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,SYSDATE /*数据开始日期*/,SYSDATE /*数据结束日期*/,1 /*机构代码*/,V_VAR_PROC_NAME /*存储过程名称*/,V_VAR_STEP_DESC /*操作步骤描述*/,V_VAR_STEP_DML_TYPE /*操作类型*/,V_INT_ROW_CNT /*受影响行数*/,1 /*执行结果*/,V_DTE_RUN_BEGIN_DT /*运行开始时间*/,V_DTE_RUN_END_DT /*运行结束时间*/,'' /*运行结果详细信息*/);
    END IF;
    /*整个过程执行成功*/
    O_RESULT_FLAG := 9;
    /*整个过程运行结果描述信息*/
    O_RESULT_MSG := '';
  
    /*异常处理部分*/
  EXCEPTION
    WHEN OTHERS THEN
      ---回滚DML操作
      ROLLBACK;
      O_RESULT_FLAG := 2; ----失败
      O_RESULT_MSG  := sqlERRM;
      ---记录异常日志信息
      PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,V_VAR_STEP_DML_TYPE /*操作步骤类型*/,V_INT_ROW_CNT /*返回的受影响行数*/,0 /*运行结果 0 失败; 1 成功*/,sqlERRM /*运行结果详细信息*/);
  END;

  /*******************************************************************
    程序名   :SP_SEND_JOB_FAIL_SMS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 发送失败作业短信
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_SEND_MONITOR_SMS IS
    V_VAR_RESULT_FLAG   CHAR(1);
    V_VAR_RESULT_MSG    CHAR(300);
    V_VAR_RETURN_STATUS INT;
  
    CURSOR C_DATA IS
      SELECT T.ETL_LOGID,T.MOBLIE_PHONE,T.SMS_CONTENT,T.SMS_LEVEL,T.SEND_TIME
        FROM ETL_SEND_SMS_LOG T
       WHERE T.SEND_STATUS = 0;
    V_USER_CODE VARCHAR2(100) DEFAULT 'MIS';
    V_PASSWORD  VARCHAR2(100) DEFAULT 'MIS#2013';
  BEGIN
  
    SP_INSERT_MONITOR_SMS(V_VAR_RESULT_FLAG,V_VAR_RESULT_MSG);
  
    IF V_VAR_RESULT_FLAG = 9
    THEN
    
      FOR CC_DATA IN C_DATA LOOP
      
        PKG_SMS_INTERFACE.SEND_SMS(V_USER_CODE,V_PASSWORD,CC_DATA.MOBLIE_PHONE,CC_DATA.SMS_CONTENT,CC_DATA.SMS_LEVEL,CC_DATA.SEND_TIME,V_VAR_RETURN_STATUS);
        --发送成功,更新标志
        IF V_VAR_RETURN_STATUS > 0
        THEN
          UPDATE ETL_SEND_SMS_LOG T
             SET T.SEND_STATUS = '1',T.RECEIVE_STATUS = V_VAR_RETURN_STATUS
           WHERE T.ETL_LOGID = CC_DATA.ETL_LOGID
             AND T.ETL_JOB_ID = CC_DATA.ETL_JOB_ID
             AND T.MOBLIE_PHONE = CC_DATA.MOBLIE_PHONE;
        END IF;
      END LOOP;
    END IF;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :FN_GET_FLOW_DEPEND
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取作业流前置依赖
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
    VAR_DEP_STS VARCHAR2(100) := 1;
  
  BEGIN
    FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,NVL(B.ETL_DATA_SUCC_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_FLOW_ID) ETL_CYC_CODE,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_DEPD_FLOW_ID) DEP_CYC_CODE
                       FROM ETL_CTL_FLOW_DEPD T,ETL_FLOW_RUN_STS A,ETL_FLOW_RUN_STS B
                      WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                        AND T.ETL_DEPD_FLOW_ID = B.ETL_FLOW_ID
                        AND T.ETL_FLOW_ID = I_FLOW_ID) LOOP
      -- 数据结束时间大于所依赖的数据成功时间
      IF (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.ETL_CYC_CODE = LOOP_DEP.DEP_CYC_CODE)
         OR (LOOP_DEP.ETL_DATA_END_TIME >= Trunc(LOOP_DEP.ETL_DEP_SUCC_TIME) AND LOOP_DEP.ETL_CYC_CODE <> '02' AND
         LOOP_DEP.DEP_CYC_CODE = '02')
         OR (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.DEP_CYC_CODE <> '02')
      THEN
        VAR_DEP_STS := 0;
        RETURN VAR_DEP_STS;
      END IF;
    END LOOP;
    RETURN VAR_DEP_STS;
  EXCEPTION
    WHEN OTHERS THEN
      RETURN NULL;
  END;

  /*******************************************************************
    程序名   :FN_GET_JOB_DEPEND
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取作业前置依赖
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2 IS
    VAR_DEP_STS VARCHAR2(100) := 1;
  
  BEGIN
    FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME
                       FROM ETL_CTL_JOB_DEPD T,ETL_JOB_RUN_STS B
                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
                        AND T.ETL_DEPD_JOB_ID = B.ETL_JOB_ID
                        AND T.ETL_JOB_ID = I_JOB_ID) LOOP
      -- 数据结束时间大于所依赖的数据成功时间
      IF LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME
      THEN
        VAR_DEP_STS := 0;
        RETURN VAR_DEP_STS;
      END IF;
    END LOOP;
    RETURN VAR_DEP_STS;
  END;

  /*******************************************************************
    程序名   :FN_GET_FLOW_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取作业流运行状态
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
    VAR_FLOW_RUN_STATUS VARCHAR2(100) := 1;
    VAR_CHILD_FLAG      CHAR(1);
    INT_STS_NOT_9       NUMBER;
    INT_STS_0_3         NUMBER;
    INT_STS_1_4         NUMBER;
    INT_STS_2           NUMBER;
  BEGIN
    SELECT T.ETL_CHILD_FLAG INTO VAR_CHILD_FLAG FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;
  
    -- 不成功的数量
    IF VAR_CHILD_FLAG = 0
    THEN
      SELECT COUNT(*)
        INTO INT_STS_NOT_9
        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS <> 9;
    
      -- 正在运行的数量
      SELECT COUNT(*)
        INTO INT_STS_1_4
        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS IN (1,4);
    
      -- 满足条件但未运行的数量
      SELECT COUNT(*)
        INTO INT_STS_0_3
        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS IN (0,3)
         AND FN_GET_FLOW_DEPEND(T.ETL_FLOW_ID) = 1;
    
      -- 运行失败的数量
      SELECT COUNT(*)
        INTO INT_STS_2
        FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS = 2;
    ELSIF VAR_CHILD_FLAG = 1
    THEN
      SELECT COUNT(*)
        INTO INT_STS_NOT_9
        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS <> 9;
    
      -- 正在运行的数量
      SELECT COUNT(*)
        INTO INT_STS_1_4
        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS IN (1,4);
    
      -- 满足条件但未运行的数量
      SELECT COUNT(*)
        INTO INT_STS_0_3
        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS IN (0,3)
         AND FN_GET_FLOW_DEPEND(T.ETL_JOB_ID) = 1;
    
      -- 运行失败的数量
      SELECT COUNT(*)
        INTO INT_STS_2
        FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS = 2;
    END IF;
  
    -- 不成功的数量为0,则全部成功
    IF INT_STS_NOT_9 = 0
    THEN
      VAR_FLOW_RUN_STATUS := 9;
      -- 不成功的不为0,正在运行的为0,运行失败的数量大于0
    ELSIF INT_STS_1_4 = 0
          AND INT_STS_0_3 = 0
          AND INT_STS_2 > 0
    THEN
      VAR_FLOW_RUN_STATUS := 2;
    END IF;
  
    RETURN VAR_FLOW_RUN_STATUS;
  END;

  /*******************************************************************
    程序名   :FN_GET_NEXT_DATA_TIME
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取下个数据时间
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
    VAR_CTL_CYC_CODE   VARCHAR2(2);
    VAR_sql            VARCHAR2(1000);
    VAR_FREQ_TIME      VARCHAR2(100);
    VAR_SRC_DB         VARCHAR2(100);
    DTE_SRC_SYS_TIME   DATE;
    DTE_NEXT_DATA_TIME DATE;
    DTE_DATA_SUCC_TIME DATE;
  
  BEGIN
    -- 获取运行周期、源数据库,下次运行时间、数据成功时间
    SELECT T.ETL_CYC_CODE,T.ETL_SRC_DB,NVL(A.ETL_NEXT_DATA_TIME,A.ETL_DATA_END_TIME),NVL(A.ETL_DATA_SUCC_TIME,A.ETL_DATA_END_TIME)
      INTO VAR_CTL_CYC_CODE,VAR_SRC_DB,DTE_NEXT_DATA_TIME,DTE_DATA_SUCC_TIME
      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
     WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
  
    -- 非本数据库
    VAR_SRC_DB := VAR_SRC_DB || 'dual';
  
    -- 每天运行的作业流
    IF VAR_CTL_CYC_CODE = '01'
    THEN
      VAR_FREQ_TIME := 1;
      VAR_sql       := 'SELECT Trunc(SYSDATE) FROM ' || VAR_SRC_DB;
      EXECUTE IMMEDIATE VAR_sql
        INTO DTE_SRC_SYS_TIME;
      DTE_NEXT_DATA_TIME := DTE_DATA_SUCC_TIME + VAR_FREQ_TIME;
      IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME
      THEN
        RETURN DTE_NEXT_DATA_TIME;
      ELSE
        RETURN DTE_DATA_SUCC_TIME;
      END IF;
      -- 每十分钟运行的作业流
    ELSIF VAR_CTL_CYC_CODE = '02'
    THEN
      VAR_FREQ_TIME := 1 / 24 / 6;
      VAR_sql       := 'SELECT SYSDATE - 1 / 24 / 6 FROM ' || VAR_SRC_DB;
      EXECUTE IMMEDIATE VAR_sql
        INTO DTE_SRC_SYS_TIME;
    
      WHILE DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME LOOP
        DTE_NEXT_DATA_TIME := DTE_NEXT_DATA_TIME + VAR_FREQ_TIME;
      END LOOP;
      RETURN DTE_NEXT_DATA_TIME;
    
    ELSIF VAR_CTL_CYC_CODE = '03'
    THEN
      VAR_FREQ_TIME := 1;
      VAR_sql       := 'SELECT Trunc(SYSDATE) FROM ' || VAR_SRC_DB;
      EXECUTE IMMEDIATE VAR_sql
        INTO DTE_SRC_SYS_TIME;
      DTE_NEXT_DATA_TIME := ADD_MONTHS(DTE_DATA_SUCC_TIME,VAR_FREQ_TIME);
      IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME
      THEN
        RETURN DTE_NEXT_DATA_TIME;
      ELSE
        RETURN DTE_DATA_SUCC_TIME;
      END IF;
    END IF;
  END;

  /*******************************************************************
    程序名   :FN_GET_SUPER_FLOW_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取上级作业流运行状态
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
    VAR_SUPER_FLOW_RUN_STATUS VARCHAR2(10);
  BEGIN
    SELECT ETL_FLOW_RUN_STATUS
      INTO VAR_SUPER_FLOW_RUN_STATUS
      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_SUPER_FLOW_RUN_STATUS;
  END;

  /*******************************************************************
    程序名   :FN_GET_SUPER_DATA_START_TIME
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取上级作业流数据开始时间
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
    VAR_SUPER_FLOW_DATA_START_TIME DATE;
  BEGIN
    SELECT A.ETL_DATA_START_TIME
      INTO VAR_SUPER_FLOW_DATA_START_TIME
      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_SUPER_FLOW_DATA_START_TIME;
  END;

  /*******************************************************************
    程序名   :FN_GET_SUPER_DATA_END_TIME
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取上级作业流数据结束时间
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
    VAR_SUPER_FLOW_DATA_END_TIME DATE;
  BEGIN
    SELECT A.ETL_DATA_END_TIME
      INTO VAR_SUPER_FLOW_DATA_END_TIME
      FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_SUPER_FLOW_DATA_END_TIME;
  END;

  /*******************************************************************
    程序名   :FN_GET_CYC_CODE
    创建人   : zhuyh
    创建时间 : 2013/8/26
    功能描述 : 获取周期代码
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2 IS
    VAR_CYC_CODE VARCHAR2(100);
  BEGIN
    SELECT T.ETL_CYC_CODE INTO VAR_CYC_CODE FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_CYC_CODE;
  END;

END PKG_ETL_CTL;
/

以上是编程之家(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。

如果觉得编程之家网站内容还不错,欢迎将编程之家网站推荐给程序员好友。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐