微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

有没有办法在 kafka 生产者中使用 while 循环

如何解决有没有办法在 kafka 生产者中使用 while 循环

我试图每 30 秒生成一些数据,使用 kafkaproducer 将其订阅到 kafka 主题,并与 kafkaconsumer 一起使用。最后,使用flask socket将数据发送到网页。

这是我的生产者和消费者代码

from flask import Flask,render_template
from flask_cors import CORS,cross_origin
from flask_socketio import SocketIO,emit
from kafka import KafkaProducer,KafkaConsumer,TopicPartition
import uuid
from random import randint
from time import sleep

app = Flask(__name__)
socketio = SocketIO(app,cors_allowed_origins="*")
cors = CORS(app)
app.config['CORS_HEADERS'] = 'Content-Type'

BOOTSTRAP_SERVERS = 'localhost:32770'
TOPIC_NAME = 'stackBox'


@app.route('/')
@cross_origin()
def home():
    return render_template('index.html')

""" Kafka endpoints """


@socketio.on('connect',namespace='/kafka')
def test_connect():
    emit('logs',{'data': 'Connection established'})


@socketio.on('kafkaconsumer',namespace="/kafka")
def kafkaconsumer(message):
    consumer = KafkaConsumer(group_id='consumer-1',bootstrap_servers=BOOTSTRAP_SERVERS,enable_auto_commit=True,)
    tp = TopicPartition(TOPIC_NAME,0)
    # register to the topic
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)
    consumer.seek_to_beginning(tp)
    emit('kafkaconsumer1',{'data': ''})
    for message in consumer:
        emit('kafkaconsumer',{'data': message.value.decode('utf-8')})
        if message.offset == lastOffset - 1:
            break
    consumer.close()


@socketio.on('kafkaproducer',namespace="/kafka")
def kafkaproducer(message):
    print(TOPIC_NAME)
    print(BOOTSTRAP_SERVERS)
    while True :
        message = str(randint(10,100))
        producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
        producer.send(TOPIC_NAME,value=bytes(str(message),encoding='utf-8'),key=bytes(str(uuid.uuid4()),encoding='utf-8'))
        emit('logs',{'data': 'Added ' + message + ' to topic'})
        emit('kafkaproducer',{'data': message})
        producer.close()
        kafkaconsumer(message)
        sleep(30)



if __name__ == '__main__':
    socketio.run(app,host='0.0.0.0',port=5000)

<html>
<head>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.5/socket.io.js" crossorigin="anonymous"></script>
    <script type="text/javascript" charset="utf-8">
        $(document).ready(function() {
            var socket = io.connect('http://localhost:5000/kafka');

            socket.on('kafkaconsumer1',function(msg) {
                $('#consumer').html("")
            });
            socket.on('kafkaconsumer',function(msg) {
                $('#consumer').append('<br>' + $('<div/>').text(msg.data).html());
            });

            socket.on('kafkaproducer',function(msg) {
                $('#producer').append('<br>' + $('<div/>').text(msg.data).html());
            });

            socket.on('logs',function(msg) {
                $('#log').append('<br>' + $('<div/>').text(msg.data).html());
            });

            $('form#emit').submit(function(event) {
                socket.emit('kafkaproducer',$('#emit_data').val());
                return false;
            });
        });
    </script>
</head>
<body>
<form id="emit" method="POST" action='#'>
    <input type="text" name="emit_data" id="emit_data" placeholder="Message">
    <input type="submit" value="Echo">
</form>
<h2>Logs</h2>
<div id="log"></div>
<h2>Producer</h2>
<div id="producer"></div>
<h2>Consumer</h2>
<div id="consumer"></div>
</body>
</html>

消息有效地订阅主题,但我只会在关闭烧瓶服务并重新启动它后才显示数据。他们都是同时来的。

我做错了什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。