微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka生产者如何管理与源的偏移量?

如何解决Kafka生产者如何管理与源的偏移量?

Kafka 通过管理消费者的偏移量。 __consumer_offsets 主题很好。但是,Kafka Producer 如何管理与源(如 RDBMS 或文件系统)的偏移量?

例如,如果源是 RDBMS 表,并且 Kafka Producer API 使用“SELECT field1,..,fieldN FROM table”这样的查询,那么 Kafka Producer 如何使用新插入的记录并发送到 Kafka topic。需要我们从代码管理还是Kafka自动管理?

  1. 如果通过代码进行管理,那么代码要做哪些更改?
  2. 如果 Kafka 自动管理它,它是如何工作的?

示例代码以供理解:

public void copyMysqLRecordsToKafkaTopic() throws Exception {
                 
        String msg = null;
        String sales_id =  null;
        String store_id =  null;
        String item_id =  null;
        String scan_type =  null;
        String geo_region_cd =  null;
        String currency_code = null;
        String scan_id =  null;
        String sold_unit_quantity =  null;
        String sales_timestamp =  null;
        String scan_dept_nbr = null;
        String row_insertion_dttm =  null;
        
        String myDriver = "com.MysqL.jdbc.Driver";
        String myUrl = "jdbc:MysqL://dbhost.lcl/salesdb";
        Class.forName(myDriver);
        Connection conn = DriverManager.getConnection(myUrl,"kafkauser","password");
        String query = "SELECT * FROM retailcart_sales_transaction_events";
        Statement st = conn.createStatement();
        ResultSet rs = st.executeQuery(query);
        while (rs.next())
        {
            sales_id = rs.getString("sales_id");
            store_id = rs.getString("store_id");
            item_id = rs.getString("item_id");
            scan_type = rs.getString("scan_type");
            geo_region_cd = rs.getString("geo_region_cd");
            currency_code = rs.getString("currency_code");
            scan_id = rs.getString("scan_id");
            sold_unit_quantity = rs.getString("sold_unit_quantity");
            sales_timestamp = rs.getString("sales_timestamp");
            scan_dept_nbr = rs.getString("scan_dept_nbr");
            row_insertion_dttm = rs.getString("row_insertion_dttm");
            
            msg = sales_id+","+store_id+","+item_id+","+scan_type+","+geo_region_cd+","+currency_code+","+scan_id+","+sold_unit_quantity+","+sales_timestamp+","+
            scan_dept_nbr+","+row_insertion_dttm ;
            ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic,msg);
            producer.send(producerRecord);
        }
        conn.close();
    }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。