微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161 : ValueError

如何解决confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161 : ValueError

我是 python 的新手,并尝试使用“confluent_kafka”来生成 avro 消息。 使用 'confluent_kafka.schema_registry.avro.AvroSerializer' 同样 (参考:https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py

它适用于带有 dict(json 转换为 dict) 输入的简单 avro 模式,但对于以下示例模式,我收到错误

架构:

{
    "type": "record","name": "Envelope","namespace": "CoreOLTPEvents.dbo.Event","fields": [{
        "name": "before","type": ["null",{
            "type": "record","name": "Value","fields": [{
                "name": "EventId","type": "long"
            },{
                "name": "CameraId","long"],"default": null
            }],"connect.name": "CoreOLTPEvents.dbo.Event.Value"
        }],"default": null
    },{
        "name": "after","Value"],{
        "name": "source","type": {
            "type": "record","name": "Source","namespace": "io.debezium.connector.sqlserver","fields": [{
                "name": "version","type": "string"
            },{
                "name": "connector","type": "string"
            }],"connect.name": "io.debezium.connector.sqlserver.source"
        }
    },{
        "name": "op","type": "string"
    }],"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}

输入JSON:

{
    "after": null,"before": {
        "CoreOLTPEvents.dbo.Event.Value" : {
        "EventId": 1111111111,"CameraId": 222222222
    }
    },"source": {
        "version": "InitialLoad","connector": "sqlserver"
    },"op": "C"
}

错误ValueError: {'CoreOLTPEvents.dbo.Event.Value': {'EventId': 1111111111,'CameraId': 222222222}} (type <class 'dict'>) do not match ['null',{'connect.name': 'CoreOLTPEvents.dbo.Event.Value','type': 'record','name': 'CoreOLTPEvents.dbo.Event.Value','fields': [{'name': 'EventId','type': 'long'},{'default': None,'name': 'CameraId','type': ['null','long']}]}] on field before

'before' 字段类型是 union (['null',record]),如果我将其更改为仅记录(删除 union),则它可以正常工作。 但我需要调整我的输入,使其适用于给定的架构。

(注意:我正在使用 'json.load(json_file)' 读取 json 输入,因此它给出了 dict 输出

任何帮助将不胜感激。

更新: 实际大架构:

{
    "type": "record","default": null
            },{
                "name": "SiteId",{
                "name": "VehicleId",{
                "name": "EventReviewStatusID","type": "int"
            },{
                "name": "EventTypeId","int"],{
                "name": "EventDateTime",{
                    "type": "string","connect.name": "net.smartdrive.converters.SmartdriveEventDateFieldConverter"
                }],{
                "name": "FTPUploadDateTime","type": {
                    "type": "long","connect.version": 1,"connect.name": "io.debezium.time.Timestamp"
                }
            },{
                "name": "CAMFileName",{
                "name": "KeypadEntryCode","string"],{
                "name": "IsActive","type": {
                    "type": "boolean","connect.default": true
                },"default": true
            },{
                "name": "Flagged","type": "boolean"
            },{
                "name": "EventTitle",{
                "name": "CreatedBy",{
                "name": "CreatedDate",{
                "name": "ModifiedBy",{
                "name": "ModifiedDate",{
                "name": "ReReviewAnalysis",{
                "name": "LegacyEventId",{
                "name": "TripId",{
                "name": "FiLeversion",{
                "name": "EventNumber",{
                "name": "Latitude",{
                    "type": "bytes","scale": 10,"precision": 13,"connect.parameters": {
                        "scale": "10","connect.decimal.precision": "13"
                    },"connect.name": "org.apache.kafka.connect.data.Decimal","logicalType": "decimal"
                }],{
                "name": "Longitude",{
                "name": "GeoAddressId",{
                "name": "ReviewedEventId",{
                "name": "VideoStatus","type": {
                    "type": "int","connect.default": 0
                },"default": 0
            },{
                "name": "PredictionImportance","precision": 15,"connect.decimal.precision": "15"
                    },{
                "name": "FlaggedBy",{
                "name": "FlaggedDate",{
                    "type": "long","connect.name": "io.debezium.time.Timestamp"
                }],{
                "name": "TriggerTypeId",{
                "name": "VideoDeleteDate",{
                "name": "MetadataDeleteDate",{
                "name": "RetentionStatus","connect.default": 0,"connect.type": "int16"
                },{
                "name": "PartnerTriggerId",{
                "name": "CoachingStateId",{
                "name": "EventKudoHistoryId",{
                "name": "name",{
                "name": "ts_ms",{
                "name": "snapshot","type": [{
                    "type": "string","connect.parameters": {
                        "allowed": "true,last,false"
                    },"connect.default": "false","connect.name": "io.debezium.data.Enum"
                },"null"],"default": "false"
            },{
                "name": "db",{
                "name": "schema",{
                "name": "table",{
                "name": "change_lsn",{
                "name": "commit_lsn",{
                "name": "event_serial_no","type": "string"
    },{
        "name": "ts_ms",{
        "name": "transaction","name": "ConnectDefault","namespace": "io.confluent.connect.avro","fields": [{
                "name": "id",{
                "name": "total_order",{
                "name": "data_collection_order","type": "long"
            }]
        }],"default": null
    }],"connect.name": "CoreOLTPEvents.dbo.Event.Envelope"
}

大型架构的输入:

{
    "before": null,"after": {
        "EventId": 1234566,"CameraId": 2233,"SiteId": 111,"VehicleId": 45587,"EventReviewStatusID": 10,"EventTypeId": 123,"EventDateTime": "2015-01-02T01:30:29Z","FTPUploadDateTime": 1420193330590,"CAMFileName": "XYZ","KeypadEntryCode": "0","IsActive": false,"Flagged": false,"EventTitle": null,"CreatedBy": 1,"CreatedDate": 1420191120730,"ModifiedBy": 1,"ModifiedDate": 1577871185680,"ReReviewAnalysis": null,"LegacyEventId": null,"TripId": 3382,"FiLeversion": "2.2","EventNumber": "AAAA-BBBB","Latitude": "UU9elrA=","Longitude": "/ueZUeFw","GeoAddressId": null,"ReviewedEventId": 129411077,"VideoStatus": 4,"PredictionImportance": 0.1402457539,"FlaggedBy": null,"FlaggedDate": null,"TriggerTypeId": 322,"VideoDeleteDate": 1422783120000,"MetadataDeleteDate": 1577871120000,"RetentionStatus": 15,"PartnerTriggerId": null,"CoachingStateId": 0,"EventKudoHistoryId": null

    },"source": {
        "version": "Final","connector": "sqlserver","name": "CoreOLTP","ts_ms": 1615813992548,"snapshot": "false","db": "CoreOLTP","schema": "dbo","table": "xyz","change_lsn": null,"commit_lsn": null,"event_serial_no": null
    },"op": "C","transaction": null
}

错误

confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="{'EventId': 129411077,'CameraId': 46237,'SiteId': 2148,'VehicleId': 45587,'EventReviewStatusID': 10,'EventTypeId': 247,'EventDateTime': '2015-01-02T01:30:29Z','FTPUploadDateTime': 1420191120590,'CAMFileName': 'JD2BC02120150102013029ER.SDE','KeypadEntryCode': '0','IsActive': False,'Flagged': False,'EventTitle': None,'CreatedBy': 1,'CreatedDate': 1420191120730,'ModifiedBy': 1,'ModifiedDate': 1577871185680,'ReReviewAnalysis': None,'LegacyEventId': None,'TripId': 3382,'FiLeversion': '2.2','EventNumber': 'WSHX-8QQ2','Latitude': 'UU9elrA=','Longitude': '/ueZUeFw','GeoAddressId': None,'ReviewedEventId': 129411077,'VideoStatus': 4,'PredictionImportance': 0.1402457539,'FlaggedBy': None,'FlaggedDate': None,'TriggerTypeId': 322,'VideoDeleteDate': 1422783120000,'MetadataDeleteDate': 1577871120000,'RetentionStatus': 15,'PartnerTriggerId': None,'CoachingStateId': 0,'EventKudoHistoryId': None} (type <class 'dict'>) do not match ['null','CoreOLTPEvents.dbo.Event.Value'] on field after"}

解决方法

您只需要更改您的输入,以便 before 字段没有命名空间。所以它需要看起来像这样:

{
    "after": null,"before": {
        "EventId": 1111111111,"CameraId": 222222222
    },"source": {
        "version": "InitialLoad","connector": "sqlserver"
    },"op": "C"
}

您所看到的原始输入试图使用 JSON 编码的 avro,因为字段 before 具有 CoreOLTPEvents.dbo.Event.Value 命名空间。但是,我猜它一定是手工制作的,因为 CameraId 应该被指定为 {"long": 222222222} 而不仅仅是 222222222

如果你确实有 Avro 编码的 JSON(来自其他过程或其他东西的结果),那么你可以使用类似 fastavro.json_reader 的东西来读入该文件,它会创建正确的内存表示(那不会'不包括联合字段的类型信息)。

更新:

为了找出完整架构和完整数据的问题所在,我首先使用 json.load 加载了两个对象,然后使用 fastavro.validate(record,schema) 其输出是一个以以下内容结尾的堆栈跟踪:

fastavro._validate_common.ValidationError: [
  "CoreOLTPEvents.dbo.Event.Envelope.after is <{'EventId': 1234566,'CameraId': 2233,'SiteId': 111,'VehicleId': 45587,'EventReviewStatusID': 10,'EventTypeId': 123,'EventDateTime': '2015-01-02T01:30:29Z','FTPUploadDateTime': 1420193330590,'CAMFileName': 'XYZ','KeypadEntryCode': '0','IsActive': False,'Flagged': False,'EventTitle': None,'CreatedBy': 1,'CreatedDate': 1420191120730,'ModifiedBy': 1,'ModifiedDate': 1577871185680,'ReReviewAnalysis': None,'LegacyEventId': None,'TripId': 3382,'FileVersion': '2.2','EventNumber': 'AAAA-BBBB','Latitude': 'UU9elrA=','Longitude': '/ueZUeFw','GeoAddressId': None,'ReviewedEventId': 129411077,'VideoStatus': 4,'PredictionImportance': 0.1402457539,'FlaggedBy': None,'FlaggedDate': None,'TriggerTypeId': 322,'VideoDeleteDate': 1422783120000,'MetadataDeleteDate': 1577871120000,'RetentionStatus': 15,'PartnerTriggerId': None,'CoachingStateId': 0,'EventKudoHistoryId': None}> of type <class 'dict'> expected null","CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected null","CoreOLTPEvents.dbo.Event.Value.Latitude is <UU9elrA=> of type <class 'str'> expected {'scale': 10,'precision': 13,'connect.version': 1,'connect.parameters': {'scale': '10','connect.decimal.precision': '13'},'connect.name': 'org.apache.kafka.connect.data.Decimal','logicalType': 'decimal','type': 'bytes'}"
]

所以这是试图告诉我们存在 3 个潜在问题。第一个是 after 中的值不匹配 null,但我们可以忽略它,因为我们不希望 after 匹配 null

后面的两个问题才是真正的问题。它说 Latitude 的值是字符串 UU9elrA=,但这与 nullbytes 都不匹配。这里的字符串看起来是 base64 编码的,所以也许您有一些代码可以将其解码为字节,如果是这样,那么实际问题可能是其他问题,但如果是这样,那么我认为您应该能够使用 fastavro.validate 来弄清楚问题是什么。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。