项目中需要构造带有中文字符非json的测试数据,格式如下:
{'userid': 0, 'ts': '2022-08-03 16:33:38.487973', 'user_name': '中国人'}
发过去之后发现消费出来的都是unicode的编码,且指定了utf-8也没用,一开始以为是kafka producer的value_serializer序列化器用的不对,后面发现其实是代码里json.dumps没用好的原因
# -*- coding: utf-8 -*- import time from kafka import KafkaConsumer, KafkaProducer import json from kafka.errors import KafkaError import datetime producer = KafkaProducer(sasl_mechanism='PLAIN', security_protocol='SASL_PLAINTEXT', sasl_plain_username='xxxxx', sasl_plain_password='xxxxxxxx', bootstrap_servers=['xxxxxxxxxxx'], #这里的dumps可以指定ensure_ascii=False value_serializer=lambda m: json.dumps(m,ensure_ascii=False).encode(), api_version="2.0.0") try: # produce asynchronously for i in range(100): Now_time = str(datetime.datetime.Now()) send_json={ "userid": i, "ts":Now_time, "user_name":"中国人" } print(send_json) future = producer.send('xxxxxxxxxxx', send_json) try: record_Metadata = future.get(timeout=2) except KafkaError: # Decide what to do if produce request Failed... print("send error!") pass time.sleep(1) print(record_Metadata.partition) print(record_Metadata.offset) finally: producer.close()
这样就可以把原来的{"userid": 1, "ts": "2022-08-03 16:12:26.595478", "user_name": "\u4e2d\u56fd\u4eba"}改成{"userid": 1, "ts": "2022-08-03 16:33:39.576068", "user_name": "中国人"}
另外1个新手容易犯的错误
1、pyhton中通过str将json强行转换成str类型时,key和value的引号是单引号的,这样发送到kafka,对下游不是很友好,比如下游用java或者flinksql消费的时候可能会出问题,建议用标准序列化json.dumps来转
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。