如何解决Kafka生产者如何管理与源的偏移量?
Kafka 通过管理消费者的偏移量。 __consumer_offsets 主题很好。但是,Kafka Producer 如何管理与源(如 RDBMS 或文件系统)的偏移量?
例如,如果源是 RDBMS 表,并且 Kafka Producer API 使用“SELECT field1,..,fieldN FROM table”这样的查询,那么 Kafka Producer 如何使用新插入的记录并发送到 Kafka topic。需要我们从代码管理还是Kafka自动管理?
示例代码以供理解:
public void copyMysqLRecordsToKafkaTopic() throws Exception {
String msg = null;
String sales_id = null;
String store_id = null;
String item_id = null;
String scan_type = null;
String geo_region_cd = null;
String currency_code = null;
String scan_id = null;
String sold_unit_quantity = null;
String sales_timestamp = null;
String scan_dept_nbr = null;
String row_insertion_dttm = null;
String myDriver = "com.MysqL.jdbc.Driver";
String myUrl = "jdbc:MysqL://dbhost.lcl/salesdb";
Class.forName(myDriver);
Connection conn = DriverManager.getConnection(myUrl,"kafkauser","password");
String query = "SELECT * FROM retailcart_sales_transaction_events";
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(query);
while (rs.next())
{
sales_id = rs.getString("sales_id");
store_id = rs.getString("store_id");
item_id = rs.getString("item_id");
scan_type = rs.getString("scan_type");
geo_region_cd = rs.getString("geo_region_cd");
currency_code = rs.getString("currency_code");
scan_id = rs.getString("scan_id");
sold_unit_quantity = rs.getString("sold_unit_quantity");
sales_timestamp = rs.getString("sales_timestamp");
scan_dept_nbr = rs.getString("scan_dept_nbr");
row_insertion_dttm = rs.getString("row_insertion_dttm");
msg = sales_id+","+store_id+","+item_id+","+scan_type+","+geo_region_cd+","+currency_code+","+scan_id+","+sold_unit_quantity+","+sales_timestamp+","+
scan_dept_nbr+","+row_insertion_dttm ;
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic,msg);
producer.send(producerRecord);
}
conn.close();
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。