微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在通过 CamelKafkaAzureBlobSink 连接器的每条记录处附加一个新行字符

如何解决在通过 CamelKafkaAzureBlobSink 连接器的每条记录处附加一个新行字符

我们目前正在开发一个数据管道堆栈,我们使用了 CamelAzurestorageblobSinkConnector(0.9.x),它基本上是从 Kafka(cp-kafka-5.0.0) 读取特定主题并将每条记录附加到特定的 Azure AppendBlob。

同步工作正在完美进行,但我们在堆栈中发现了一个小故障

JSON 记录已附加到 blob 文件中,没有任何换行,如下所示 -

{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"}{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373"," bar"}{"foo":"bar"}

这会影响 blob 文件的进一步处理。

我们的 CamelAzurestorageblobSinkConnector.properties 如下所示 -

name=CamelAzure-storage-blobSinkConnector
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
camel.sink.marshal=json-jackson
# comma separated topics to get messages from
topics=test-topic

camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:Now:yyyyMMdd}/${date:Now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob

非常感谢这里的任何帮助!

解决方法

首先,感谢@OneCricketeer 的建议。

我能够成功地在通过接收器连接器的每个事件/记录处附加行分隔符。

已经在camel组件级别做了如下配置:

  1. 将键值转换器从 JSONConverter 修改为 StringConverter。这是修改后的 CamelAzurestorageblobSinkConnector.properties
name=CamelAzure-storage-blobSinkConnector 
connector.class=org.apache.camel.kafkaconnector.azurestorageblob.CamelAzurestorageblobSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
topics=test-topic
camel.sink.path.accountName=<storage-account>
camel.sink.path.containerName=<blob-container>
camel.sink.endpoint.blobName=data/test-topic/${date:now:yyyyMMdd}/${date:now:HH}-id.json
camel.sink.endpoint.accessKey=<account-key>
camel.sink.endpoint.operation=commitAppendBlob
camel.sink.endpoint.createAppendBlob=true
camel.sink.endpoint.blobType=appendblob
  1. 像这样覆盖 apply() 方法 -
 @Override
 private R apply (R record) {
    
    String value = (String) operatingValue(record);
    
    String updated_value = value+System.lineSeparator();
    System.out.println("UPDATED SMT RECORD: "+updated_value);

    return newRecord(record,null,updated_value);
  }

现在我们在 blob 中有如下记录 -

{"uuid":"6e7190e2-987d-44f5-9b20-ba854d8d4274","foo":"bar"} 
{"uuid":"6f0d3912-b7c1-4cc4-a41b-0d54cd623373","foo":"bar"}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。