微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

来自带有JSON spooldir插件的kafka-connect中的kafka主题中的空消息

如何解决来自带有JSON spooldir插件的kafka-connect中的kafka主题中的空消息

我有以下设置-Zookeeper,kafka,架构注册表和kafka-connect。它们全都在kubernetes中的不同Pod中运行。我想将一些jsons放在kafka-connect和spooldir的文件夹中,以将它们发送到特定的kafka主题。我想将它们用作我们应用程序的冷门。这是kafka-connect的kubernetes配置

kind: StatefulSet
Metadata:
  name: kafka-connect
  labels:
    app.kubernetes.io/name: kafka-connect
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: kafka-connect
  serviceName: kafka-connect-headless
  podManagementPolicy: OrderedReady
  updateStrategy:
    type: OnDelete
  replicas: 1
  template:
    Metadata:
      labels:
        helm.sh/chart: kafka-0.21.0
        app.kubernetes.io/name: kafka-connect
    spec:
      terminationGracePeriodSeconds: 10
      volumes:
        - hostPath:
            path: /dependency/
            type: Directory
          name: dependency-data
      initContainers:
        - name: wait-for-schema-registry
          image: alpine:3.12.0
          command: ['sh','-c',"until nslookup schema-registry-svc.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for schema registry; sleep 2; done"]
      containers:
        - name: kafka-connect
          image: "confluentinc/cp-kafka-connect:5.5.0"
          imagePullPolicy: "IfNotPresent"
          volumeMounts:
            - mountPath: /dependency/
              name: dependency-data
          ports:
            - containerPort: 8083
              name: kafka-connect
              protocol: TCP
          command:
            - sh
            - -exc
            - |
              mkdir -p /dependency/unprocessed && \
              mkdir -p /dependency/json/processed && mkdir -p /dependency/json/error && \
              confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43 && \
              confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.0 && \
              exec /etc/confluent/docker/run && \ 
              sleep infinity
          env:
            - name: CONNECT_REST_ADVERTISED_HOST_NAME
              value: "kafka-connect"
            - name: KAFKA_HEAP_OPTS
              value: "-xms2048M -Xmx2048M"
            - name: CONNECT_BOOTSTRAP_SERVERS
              value: "kafka:9092"
            - name: CONNECT_GROUP_ID
              value: "kafka-connect"
            - name: CONNECT_PLUGIN_PATH
              value: "/usr/share/java,/usr/share/confluent-hub-components/"
            - name: CONNECT_KEY_CONVERTER
              value: "org.apache.kafka.connect.storage.StringConverter"
            - name: CONNECT_VALUE_CONVERTER
              value: "io.confluent.connect.avro.AvroConverter"
            - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
              value: "http://schema-registry-svc:8081"
            - name: CONNECT_INTERNAL_KEY_CONVERTER
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_INTERNAL_VALUE_CONVERTER
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_CONfig_STORAGE_TOPIC
              value: "kafka-connect-config"
            - name: CONNECT_OFFSET_STORAGE_TOPIC
              value: "kafka-connect-offset"
            - name: CONNECT_STATUS_STORAGE_TOPIC
              value: "kafka-connect-status"
            - name: CONNECT_CONfig_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_LOG4J_ROOT_LOGLEVEL
              value: "DEBUG"
            - name: CONNECT_CONNECT_PROTOCOL
              value: "compatible"
            - name: CONNECT_TOPIC_CREATION_ENABLE
              value: "true"

我发送的spooldir配置:

curl -i -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
  "name": "kafka-connect-spooldir","config": {
    "connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirjsonSourceConnector","tasks.max":"1","input.path":"/dependency/unprocessed/","input.file.pattern":"^.*\\.json$","error.path":"/dependency/json/error","finished.path":"/dependency/json/processed","auto.register.schemas": "false","schema.generation.enabled": "true","topic":"checklist","value.converter.enhanced.avro.schema.support":"true","topic.creation.enable": "true"
    }
}'

kafka-connect日志中也没有例外,它将文件从未处理的文件夹移动到已处理的文件夹,但是当我尝试使用kafka-console-consumer从主题中读取内容时,它说它读取了2条消息,但是没有消息完全显示(只有2个空行并挂起)。然后,我尝试在SR中注册清单结构。我再次尝试处理这些文件-同样重用,这一次它说kafka-console-consumer消耗了4条消息,但未显示任何内容。我尝试使用kafka-avro-console-consumer-这次消耗了0条消息。我什至注意到kafka connect为我的对象注册一个非常“伪”的模式- {"subject":"checklist-value","version":5,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"com.github.jcustenborder.kafka.connect.model\",\"fields\":[{\"name\":\"Checklist\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"com.github.jcustenborder.kafka.connect.model.Value\"}"}我也检查了kafka和SR日志,但没有发现有什么干扰。 也许不是必需的,但是我也尝试使用"schema.generation.enabled": "false",然后在日志中有java.lang.NullPointerException:如果'schema.generation.enabled'= false,则必须设置'value.schema'。然后尝试使用kafka-connect-avro-converter将我现有的.avsc转换为kafka模式,但事实证明它们没有正确的方式来打印生成的模式,因此我想它的目的与我的工作并不相同

在配置kafka-connect以开始使用SR进行序列化并且不发送主题中的空消息时,我缺少什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。