微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何将 Python 使用者连接到 AWS MSK

如何解决如何将 Python 使用者连接到 AWS MSK

我正在尝试将我的 Python 使用者连接到 AWS MSK 集群。我该怎么做?

运行 AWS MSK 集群 我正在尝试使用 python 和 kafka python 消费来自 MSK 集群的消息。

我收到的错误

Traceback (most recent call last):
  File "consumer.py",line 23,in <module>
    for message in consumer:
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py",line 1193,in __next__
    return self.next_v2()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py",line 1201,in next_v2
    return next(self._iterator)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py",line 1116,in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms,update_offsets=False)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py",line 655,in poll
    records = self._poll_once(remaining,max_records,update_offsets=update_offsets)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/consumer/group.py",line 675,in _poll_once
    self._coordinator.poll()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/consumer.py",line 270,in poll
    self.ensure_coordinator_ready()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/coordinator/base.py",line 258,in ensure_coordinator_ready
    self._client.poll(future=future)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py",line 582,in poll
    self._maybe_connect(node_id)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/client_async.py",line 392,in _maybe_connect
    conn.connect()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py",line 429,in connect
    if self._try_handshake():
  File "/home/ubuntu/.local/lib/python3.6/site-packages/kafka/conn.py",line 508,in _try_handshake
    self._sock.do_handshake()
  File "/usr/lib/python3.6/ssl.py",line 1077,in do_handshake
    self._sslobj.do_handshake()
  File "/usr/lib/python3.6/ssl.py",line 689,in do_handshake
    self._sslobj.do_handshake()
OSError: [Errno 0] Error

解决方法

使用 kafka-python:

from kafka import KafkaConsumer

if __name__ == '__main__':
    topic_name = 'example-topic'

    consumer = KafkaConsumer(topic_name,auto_offset_reset='earliest',bootstrap_servers=['kafka2:9092'],api_version=(0,10),consumer_timeout_ms=1000)
    for msg in consumer:
        print(msg.value)

    if consumer is not None:
        consumer.close()

from time import sleep

from kafka import KafkaProducer

# publish messages on topic
def publish_message(producer_instance,topic_name,key,value):
    try:
        key_bytes = bytes(key,encoding='utf-8')
        value_bytes = bytes(value,encoding='utf-8')
        producer_instance.send(topic_name,key=key_bytes,value=value_bytes)
        producer_instance.flush()
        print('Message ' + key + ' published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))

# establish kafka connection
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['kafka1:9092'])
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

if __name__ == '__main__':
    kafka_producer = connect_kafka_producer()
    x = 0
    while True:
        publish_message(kafka_producer,'raw_recipes',str(x),'This is message ' + str(x))
        x += 1
    
    if kafka_producer is not None:
            kafka_producer.close()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。