如何解决来自带有JSON spooldir插件的kafka-connect中的kafka主题中的空消息
我有以下设置-Zookeeper,kafka,架构注册表和kafka-connect。它们全都在kubernetes中的不同Pod中运行。我想将一些jsons放在kafka-connect和spooldir的文件夹中,以将它们发送到特定的kafka主题。我想将它们用作我们应用程序的冷门。这是kafka-connect的kubernetes配置
kind: StatefulSet
Metadata:
name: kafka-connect
labels:
app.kubernetes.io/name: kafka-connect
spec:
selector:
matchLabels:
app.kubernetes.io/name: kafka-connect
serviceName: kafka-connect-headless
podManagementPolicy: OrderedReady
updateStrategy:
type: OnDelete
replicas: 1
template:
Metadata:
labels:
helm.sh/chart: kafka-0.21.0
app.kubernetes.io/name: kafka-connect
spec:
terminationGracePeriodSeconds: 10
volumes:
- hostPath:
path: /dependency/
type: Directory
name: dependency-data
initContainers:
- name: wait-for-schema-registry
image: alpine:3.12.0
command: ['sh','-c',"until nslookup schema-registry-svc.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for schema registry; sleep 2; done"]
containers:
- name: kafka-connect
image: "confluentinc/cp-kafka-connect:5.5.0"
imagePullPolicy: "IfNotPresent"
volumeMounts:
- mountPath: /dependency/
name: dependency-data
ports:
- containerPort: 8083
name: kafka-connect
protocol: TCP
command:
- sh
- -exc
- |
mkdir -p /dependency/unprocessed && \
mkdir -p /dependency/json/processed && mkdir -p /dependency/json/error && \
confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43 && \
confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.0 && \
exec /etc/confluent/docker/run && \
sleep infinity
env:
- name: CONNECT_REST_ADVERTISED_HOST_NAME
value: "kafka-connect"
- name: KAFKA_HEAP_OPTS
value: "-xms2048M -Xmx2048M"
- name: CONNECT_BOOTSTRAP_SERVERS
value: "kafka:9092"
- name: CONNECT_GROUP_ID
value: "kafka-connect"
- name: CONNECT_PLUGIN_PATH
value: "/usr/share/java,/usr/share/confluent-hub-components/"
- name: CONNECT_KEY_CONVERTER
value: "org.apache.kafka.connect.storage.StringConverter"
- name: CONNECT_VALUE_CONVERTER
value: "io.confluent.connect.avro.AvroConverter"
- name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: "http://schema-registry-svc:8081"
- name: CONNECT_INTERNAL_KEY_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_INTERNAL_VALUE_CONVERTER
value: "org.apache.kafka.connect.json.JsonConverter"
- name: CONNECT_CONfig_STORAGE_TOPIC
value: "kafka-connect-config"
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: "kafka-connect-offset"
- name: CONNECT_STATUS_STORAGE_TOPIC
value: "kafka-connect-status"
- name: CONNECT_CONfig_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "1"
- name: CONNECT_LOG4J_ROOT_LOGLEVEL
value: "DEBUG"
- name: CONNECT_CONNECT_PROTOCOL
value: "compatible"
- name: CONNECT_TOPIC_CREATION_ENABLE
value: "true"
我发送的spooldir配置:
curl -i -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
"name": "kafka-connect-spooldir","config": {
"connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirjsonSourceConnector","tasks.max":"1","input.path":"/dependency/unprocessed/","input.file.pattern":"^.*\\.json$","error.path":"/dependency/json/error","finished.path":"/dependency/json/processed","auto.register.schemas": "false","schema.generation.enabled": "true","topic":"checklist","value.converter.enhanced.avro.schema.support":"true","topic.creation.enable": "true"
}
}'
kafka-connect日志中也没有例外,它将文件从未处理的文件夹移动到已处理的文件夹,但是当我尝试使用kafka-console-consumer从主题中读取内容时,它说它读取了2条消息,但是没有消息完全显示(只有2个空行并挂起)。然后,我尝试在SR中注册清单结构。我再次尝试处理这些文件-同样重用,这一次它说kafka-console-consumer消耗了4条消息,但未显示任何内容。我尝试使用kafka-avro-console-consumer-这次消耗了0条消息。我什至注意到kafka connect为我的对象注册了一个非常“伪”的模式-
{"subject":"checklist-value","version":5,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"com.github.jcustenborder.kafka.connect.model\",\"fields\":[{\"name\":\"Checklist\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"com.github.jcustenborder.kafka.connect.model.Value\"}"}
我也检查了kafka和SR日志,但没有发现有什么干扰。
也许不是必需的,但是我也尝试使用"schema.generation.enabled": "false"
,然后在日志中有java.lang.NullPointerException:如果'schema.generation.enabled'= false,则必须设置'value.schema'。然后尝试使用kafka-connect-avro-converter将我现有的.avsc转换为kafka模式,但事实证明它们没有正确的方式来打印生成的模式,因此我想它的目的与我的工作并不相同
在配置kafka-connect以开始使用SR进行序列化并且不发送主题中的空消息时,我缺少什么?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。