微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何刷新从数据库获取的数据以加入流数据?

如何解决如何刷新从数据库获取的数据以加入流数据?

使用的包:

  • pyspark 2.4.5
  • psycopg2
  • 熊猫

在 postgres 数据库中,我有一些参考数据可用于连接从 Kafka 消息派生的流数据帧。计划是仅在使用 psycopg2 通过 INSERT 查询对其进行更改时才更新参考数据。

我创建了一个处理程序模块,其中包含一个 setter 和 getter,为其获取数据库的初始状态

    def ref_data_stream_reader(self):
        get_ref_data_stream_df: DataFrame = Warehouse.read_stream(self.spark,os.getenv('SPARK_INPUT_TOPIC_REF_DATA'))

        ref_df: DataFrame = get_ref_data_stream_df.select(
            from_json('value',ref_data_schema)
                .alias('ref_data'),)\
            .select('ref_data.*')

        ref_df \
            .writeStream \
            .outputMode('update') \
            .option('truncate','false') \
            .foreachBatch(lambda ref_df,batch_id: self._update_intermediate_layer(ref_df)) \
            .start()

    def _update_intermediate_layer(self,ref_df: DataFrame):
        ref_data_pandas_df_raw: pd.DataFrame = ref_df.toPandas()

        if len(ref_data_pandas_df_raw.index) > 0:
            #Replace NaN values
            ref_data_df: pd.DataFrame = ref_data_pandas_df_raw.replace({pd.np.nan: None})

            # Convert all columns to lower case
            ref_data_df.columns = map(str.lower,ref_data_df.columns)

            try:
                # Get Connection from Pool
                connection_pool = self.conn_pool.get_reference_connection_pool()
                connection = connection_pool.getconn()

                # Set AutoCommit to False,required for transactional statements
                connection.autocommit = False
                # Create cursor
                cursor = connection.cursor()

                query,values = PostgresHelper.get_insert_query(ref_data_df,'schema.ref_table')

                cursor.executemany(query,values)
                connection.commit()

                self.get_data_from_reference()

            except Exception as err:
                logger.error(err)
                if connection is not None:
                    connection.rollback()

            finally:
                cursor.close()
                connection_pool.putconn(connection)

   def get_data_from_reference(self):
        try:
            # Get Connection from Pool
            connection_pool = self.conn_pool.get_reference_connection_pool()
            connection = connection_pool.getconn()

            # Set AutoCommit to False,required for transactional statements
            connection.autocommit = False
            # Create cursor
            cursor = connection.cursor()

            cursor.execute('SELECT * FROM schema.ref_table')
            ref_df = cursor.fetchall()

            self.set_ref_data_state(self.spark.createDataFrame(ref_df,ref_schema))

    def set_ref_data_state(self,input_ref_data_state: DataFrame):
        self.ref_data_state = input_ref_data_state

流数据是使用 readStream 操作从 Kafka 主题获取的(这包含在 Warehouse.read_stream() 方法中)

get_streaming_df: DataFrame = Warehouse.read_stream(self.spark,os.getenv('SPARK_INPUT_TOPIC_STREAM'))

streaming_df: DataFrame = get_streaming_df.select(
    from_json('value',streaming_schema)
    .alias('data'),col('timestamp').alias('data_timestamp'))\
    .select('data.*','data_timestamp')\

return streaming_df

在实例化ref数据对象并启动Kafka readStream后,将此对象传递给执行连接的方法

transaction_process_manager.stream_computation(ref_data_handler,streaming_df)
    def stream_computation(self,ref_data_handler,streaming_df):
        ref_data_df = ref_data_handler.get_ref_data_state()
        # Perform joins between ref_data_df and streaming_df

我发现 ref_data_handler 的初始状态用于所有连接,即使参考层中的数据通过来自 SPARK_INPUT_TOPIC_REF_DATA 主题的消息更改,初始状态仍然使用.

我可以使用什么实现来确保 Postgres 数据库表的当前状态用于所有连接,而无需为每个单独的连接与流数据帧往返于数据库?如果每 10 个事务中发生 1 个对参考层的更新,我应该只需要执行一次 INSERT 和 SELECT,而不是对所有 10 个事务执行 SELECT。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。