微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

python kafka发送中文的编码问题

项目中需要构造带有中文字符非json的测试数据,格式如下:

{'userid': 0, 'ts': '2022-08-03 16:33:38.487973', 'user_name': '中国人'}

 

发过去之后发现消费出来的都是unicode的编码,且指定了utf-8也没用,一开始以为是kafka producer的value_serializer序列化器用的不对,后面发现其实是代码里json.dumps没用好的原因

# -*- coding: utf-8 -*-
import time
from kafka import KafkaConsumer, KafkaProducer
import json

from kafka.errors import KafkaError
import datetime


producer = KafkaProducer(sasl_mechanism='PLAIN',
                        security_protocol='SASL_PLAINTEXT',
                        sasl_plain_username='xxxxx',
                        sasl_plain_password='xxxxxxxx',
                        bootstrap_servers=['xxxxxxxxxxx'],
                        #这里的dumps可以指定ensure_ascii=False
                        value_serializer=lambda m: json.dumps(m,ensure_ascii=False).encode(),
                        api_version="2.0.0")

try:
    # produce asynchronously
    for i in range(100):
        Now_time = str(datetime.datetime.Now())
        send_json={
            "userid": i,
            "ts":Now_time,
            "user_name":"中国人"
        }
        print(send_json)
        future = producer.send('xxxxxxxxxxx', send_json)

        try:
            record_Metadata = future.get(timeout=2)
        except KafkaError:
            # Decide what to do if produce request Failed...
            print("send error!")
            pass
        time.sleep(1)

    print(record_Metadata.partition)
    print(record_Metadata.offset)

finally:
    producer.close()

这样就可以把原来的{"userid": 1, "ts": "2022-08-03 16:12:26.595478", "user_name": "\u4e2d\u56fd\u4eba"}改成{"userid": 1, "ts": "2022-08-03 16:33:39.576068", "user_name": "中国人"}

 

另外1个新手容易犯的错误

1、pyhton中通过str将json强行转换成str类型时,key和value的引号是单引号的,这样发送到kafka,对下游不是很友好,比如下游用java或者flinksql消费的时候可能会出问题,建议用标准序列化json.dumps来转

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐