如何解决有没有办法在 kafka 生产者中使用 while 循环
我试图每 30 秒生成一些数据,使用 kafkaproducer 将其订阅到 kafka 主题,并与 kafkaconsumer 一起使用。最后,使用flask socket将数据发送到网页。
这是我的生产者和消费者代码。
from flask import Flask,render_template
from flask_cors import CORS,cross_origin
from flask_socketio import SocketIO,emit
from kafka import KafkaProducer,KafkaConsumer,TopicPartition
import uuid
from random import randint
from time import sleep
app = Flask(__name__)
socketio = SocketIO(app,cors_allowed_origins="*")
cors = CORS(app)
app.config['CORS_HEADERS'] = 'Content-Type'
BOOTSTRAP_SERVERS = 'localhost:32770'
TOPIC_NAME = 'stackBox'
@app.route('/')
@cross_origin()
def home():
return render_template('index.html')
""" Kafka endpoints """
@socketio.on('connect',namespace='/kafka')
def test_connect():
emit('logs',{'data': 'Connection established'})
@socketio.on('kafkaconsumer',namespace="/kafka")
def kafkaconsumer(message):
consumer = KafkaConsumer(group_id='consumer-1',bootstrap_servers=BOOTSTRAP_SERVERS,enable_auto_commit=True,)
tp = TopicPartition(TOPIC_NAME,0)
# register to the topic
consumer.assign([tp])
# obtain the last offset value
consumer.seek_to_end(tp)
lastOffset = consumer.position(tp)
consumer.seek_to_beginning(tp)
emit('kafkaconsumer1',{'data': ''})
for message in consumer:
emit('kafkaconsumer',{'data': message.value.decode('utf-8')})
if message.offset == lastOffset - 1:
break
consumer.close()
@socketio.on('kafkaproducer',namespace="/kafka")
def kafkaproducer(message):
print(TOPIC_NAME)
print(BOOTSTRAP_SERVERS)
while True :
message = str(randint(10,100))
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
producer.send(TOPIC_NAME,value=bytes(str(message),encoding='utf-8'),key=bytes(str(uuid.uuid4()),encoding='utf-8'))
emit('logs',{'data': 'Added ' + message + ' to topic'})
emit('kafkaproducer',{'data': message})
producer.close()
kafkaconsumer(message)
sleep(30)
if __name__ == '__main__':
socketio.run(app,host='0.0.0.0',port=5000)
<html>
<head>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.5/socket.io.js" crossorigin="anonymous"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function() {
var socket = io.connect('http://localhost:5000/kafka');
socket.on('kafkaconsumer1',function(msg) {
$('#consumer').html("")
});
socket.on('kafkaconsumer',function(msg) {
$('#consumer').append('<br>' + $('<div/>').text(msg.data).html());
});
socket.on('kafkaproducer',function(msg) {
$('#producer').append('<br>' + $('<div/>').text(msg.data).html());
});
socket.on('logs',function(msg) {
$('#log').append('<br>' + $('<div/>').text(msg.data).html());
});
$('form#emit').submit(function(event) {
socket.emit('kafkaproducer',$('#emit_data').val());
return false;
});
});
</script>
</head>
<body>
<form id="emit" method="POST" action='#'>
<input type="text" name="emit_data" id="emit_data" placeholder="Message">
<input type="submit" value="Echo">
</form>
<h2>Logs</h2>
<div id="log"></div>
<h2>Producer</h2>
<div id="producer"></div>
<h2>Consumer</h2>
<div id="consumer"></div>
</body>
</html>
消息有效地订阅了主题,但我只会在关闭烧瓶服务并重新启动它后才显示数据。他们都是同时来的。
我做错了什么?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。