如何解决无法修复错误“RuntimeError:您需要使用 gevent-websocket 服务器”和“OSError:写错误”
我正在为 Flask
编写一个网站。我使用一堆 uWsgi
+ Nginx
+ Flask-Socketio
。我使用 gevent
作为异步模块。运行过程中出现错误:
运行时错误:您需要使用 gevent-websocket 服务器。 uwsgi_response_writev_headers_and_body_do():在 > 期间断管 [core/writer.c line 306] 2 月 23 日 12:57:55 toaa uwsgi[558436]:OSError:写错误
我尝试了不同的配置,并从 async_mode='gevent'
初始化中删除了 socketio
。
from webapp import app,socketio
if __name__ == '__main__':
socketio.run(app,use_reloader=False,debug=True,log_output=True)
project.ini:
[uwsgi]
module = wsgi:app
master = true
gevent = 1024
gevent-monkey-patch = true
buffer-size=32768 # optionally
socket = /home/sammy/projectnew/projectnew.sock
socket-timeout = 240
chmod-socket = 664
vacuum = true
die-on-term = true
webapp/__init__.py
用于应用程序:
from gevent import monkey
monkey.patch_all()
import grpc.experimental.gevent
grpc.experimental.gevent.init_gevent()
from flask import Flask,session,request
from config import DevelopConfig,MqttConfig,MailConfig,ProductionConfig
from flask_sqlalchemy import sqlAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
# from flask_mqtt import Mqtt
from flask_login import LoginManager
from flask_babel import Babel
from flask_babel_js import BabelJS
from flask_babel import lazy_gettext as _l
from apscheduler.schedulers.gevent import GeventScheduler
# from celery import Celery
app = Flask(__name__)
app.config.from_object(ProductionConfig)
app.config.from_object(MqttConfig)
app.config.from_object(MailConfig)
db = sqlAlchemy(app)
migrate = Migrate(app,db)
mail = Mail(app)
manager = Manager(app,db)
login_manager = LoginManager(app)
login_manager.login_view = 'auth'
login_manager.login_message = _l("Необходимо авторизоваться для доступа к закрытой странице")
login_manager.login_message_category = "error"
# celery = Celery(app.name,broker=Config.CELERY_broKER_URL)
# celery.conf.update(app.config)
scheduler = GeventScheduler()
# socketio = SocketIO(app) - Production Version
socketio = SocketIO(app,async_mode='gevent')
babel = Babel(app)
babeljs = BabelJS(app=app,view_path='/translations/')
import webapp.views
@babel.localeselector
def get_locale():
# if the user has set up the language manually it will be stored in the session,# so we use the locale from the user settings
try:
language = session['language']
except KeyError:
language = None
if language is not None:
print(language)
return language
return request.accept_languages.best_match(app.config['LANGUAGES'].keys())
from webapp import models
if __name__ == "__main__":
manager.run()
使用套接字本身的类(mqtt.py
):
from webapp import socketio,app
from flask import request
from flask_mqtt import Mqtt
from flask_babel import lazy_gettext as _l
from webapp.tasks import SchedulerTask
from webapp import translate_state_gate as tr_msg
import json
import copy
import logging
mqtt = Mqtt(app)
logger = logging.getLogger('flask.flask_mqtt')
logger.disabled = True
class MqttTOAA(object):
type_topic = ["/Control","/Data"]
m_request_state = {"comm": "3"}
m_start = {"Gate": "Start"}
m_stop = {"Gate": "Stop"}
qos_request = 1
qos_sub = 2
struct_state_devices = None
POOL_TIME = 2
end_publish = None
devices = None
schedulers_list = list()
sch_task = None
sid_mqtt = None
code_list = list()
def __init__(self,devices,lang):
mqtt._connect()
self.devices = devices
self.sch_task = SchedulerTask()
if lang not in app.config['LANGUAGES'].keys():
lang = 'ru'
self.dict_gate = {"dict_state_button": {'con_Clos': tr_msg.MessageGate.t_message[lang]["f_open"],'con_Open': tr_msg.MessageGate.t_message[lang]["f_close"],"fl_OpenClos": (tr_msg.MessageGate.t_message[lang]["f_continue"],tr_msg.MessageGate.t_message[lang]["f_stop"],tr_msg.MessageGate.t_message[lang]["f_abort"])},"dict_state_text": {tr_msg.MessageGate.t_message[lang]["f_open"]:\
tr_msg.MessageGate.t_message[lang]["ps_close"],tr_msg.MessageGate.t_message[lang]["f_close"]:\
tr_msg.MessageGate.t_message[lang]["ps_open"],tr_msg.MessageGate.t_message[lang]["f_continue"]:\
tr_msg.MessageGate.t_message[lang]["ps_stop"],tr_msg.MessageGate.t_message[lang]["f_abort"]:\
tr_msg.MessageGate.t_message[lang]["pr_close"],tr_msg.MessageGate.t_message[lang]["f_stop"]:\
(tr_msg.MessageGate.t_message[lang]["pr_open"],tr_msg.MessageGate.t_message[lang]["pr_close"],tr_msg.MessageGate.t_message[lang]["pr_move"])},"dict_type_element": {"button": u'',"text": u'',"device_code": u'',},"state_gate": {},"position": {"state": u'',"stop": False},"reverse": False,}
self.close_msg = tr_msg.MessageGate.t_message[lang]["pr_close"]
self.open_msg = tr_msg.MessageGate.t_message[lang]["pr_open"]
self.create_devices_dict()
self.handle_mqtt_connect()
self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
self.socketio_error = socketio.on_error()(self._handle_error)
self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
self.handle_on_connect = socketio.on('connect')(self._handle_on_connect)
self.handle_unsubscribe_all = socketio.on('unsubscribe_all')(self._handle_unsubscribe_all)
def _handle_on_connect(self):
self.sid_mqtt = request.sid
def handle_mqtt_connect(self):
task = None
for dev in self.devices:
if dev.device_code not in self.code_list:
mqtt.subscribe("BK" + dev.device_code + self.type_topic[1],self.qos_sub)
self.code_list.append(dev.device_code)
task = self.sch_task.add_scheduler_publish(dev.device_code,mqtt,"BK" + dev.device_code +
self.type_topic[0],self.m_request_state,self.qos_request,self.POOL_TIME)
if task is not None:
self.schedulers_list.append(task)
if len(self.schedulers_list) > 0:
self.sch_task.start_schedulers()
self.code_list.clear()
@staticmethod
def _handle_error():
print(request.event["message"]) # "my error event"
print(request.event["args"]) # (data,)
@staticmethod
def _handle_unsubscribe_all():
mqtt.unsubscribe_all()
def _handle_change_state(self,code):
print(code)
# print(self.struct_state_devices[code])
message = None
if code is not None:
try:
type_g = self.struct_state_devices[code]["state_gate"]
if type_g["fl_OpenClos"] == 1:
message = self.m_stop
else:
if self.struct_state_devices[code]["reverse"] is True:
if self.struct_state_devices[code]["position"]["state"] == self.close_msg:
message = self.m_stop
self.struct_state_devices[code]["position"]["state"] = self.open_msg
else:
message = self.m_start
else:
message = self.m_start
print("Msg:" + str(message))
except Exception as ex:
print(ex)
if message is not None:
mqtt.publish("BK" + code + self.type_topic[0],json.dumps(message),self.qos_request)
else:
print("Error change state " + code)
def _handle_mqtt_message(self,client,userdata,message):
# print("Get message")
# print(self.struct_state_devices)
data = dict(
topic=message.topic,payload=message.payload.decode(),qos=message.qos,)
try:
data = json.loads(data['payload'])
self.gate_msg(data)
except Exception as ex:
print("Exception: " + str(ex))
@staticmethod
def _handle_logging(self,level,buf):
print(level,buf)
pass
def create_devices_dict(self):
if self.struct_state_devices is None:
self.struct_state_devices = dict()
for dev in self.devices:
self.struct_state_devices[dev.device_code] = self.dict_gate.copy()
if dev.typedev.reverse:
self.struct_state_devices[dev.device_code]['reverse'] = True
def gate_msg(self,data):
k = ""
code = data["esp_id"][2:]
dict_dev = copy.deepcopy(self.struct_state_devices[code])
dict_dev["state_gate"] = data.copy()
try:
if dict_dev["state_gate"]["con_Clos"] == 0: # ворота закрыты
# print("1")
k = "con_Clos"
dict_dev["position"]["state"] = k
dict_dev["position"]["stop"] = False
elif dict_dev["state_gate"]["con_Open"] == 0: # ворота открыты
# print("2")
k = "con_Open"
dict_dev["position"]["state"] = k
dict_dev["position"]["stop"] = False
elif dict_dev["state_gate"]["fl_OpenClos"] == 0:
# print("3")
k = "fl_OpenClos"
# обратный ход ворот при закрытии
if dict_dev["position"]["state"] == self.close_msg and dict_dev["reverse"] is True:
# print("4")
k1 = 1
k2 = 0
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
dict_dev["position"]["stop"] = False
else:
# print("5")
k1 = 0
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
dict_dev["position"]["stop"] = True
elif dict_dev["state_gate"]["fl_OpenClos"] == 1:
# print("6")
k = "fl_OpenClos"
if len(dict_dev["position"]["state"]) == 0:
# print("7")
k1 = 1
k2 = 2
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
elif dict_dev["position"]["state"] == "con_Clos" or \
dict_dev["position"]["state"] == self.open_msg:
if dict_dev["position"]["stop"]:
# print("8")
k1 = 1
k2 = 1
dict_dev["position"]["stop"] = False
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
else:
# print("9")
k1 = 1
k2 = 0
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
elif dict_dev["position"]["state"] == "con_Open" or \
dict_dev["position"]["state"] == self.close_msg:
if dict_dev["reverse"]:
# print("10")
k1 = 2
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]]
else:
if dict_dev["position"]["stop"]:
# print("11")
k1 = 1
k2 = 0
dict_dev["position"]["stop"] = False
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
else:
# print("12")
k1 = 1
k2 = 1
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k][k1]][k2]
if dict_dev["position"]["state"] != dict_dev["dict_type_element"]["text"]:
# print("13")
dict_dev["position"]["state"] = dict_dev["dict_type_element"]["text"]
if k == "fl_OpenClos":
dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k][k1]
else:
dict_dev["dict_type_element"]["button"] = dict_dev["dict_state_button"][k]
dict_dev["dict_type_element"]["text"] = \
dict_dev["dict_state_text"][dict_dev["dict_state_button"][k]]
except Exception as ex:
print("Exception (gate_msg): " + str(ex))
dict_dev["dict_type_element"]["device_code"] = data["esp_id"][2:]
dict_dev["dict_type_element"]["temp"] = data["temp_1"]
dict_dev["dict_type_element"]["button"] = copy.deepcopy(str(dict_dev["dict_type_element"]["button"]))
dict_dev["dict_type_element"]["text"] = copy.deepcopy(str(dict_dev["dict_type_element"]["text"]))
self.struct_state_devices[code] = copy.deepcopy(dict_dev)
socketio.emit('mqtt_message',data=dict_dev["dict_type_element"],room=self.sid_mqtt)
# print(dict_dev["state_gate"]["esp_id"] + str(dict_dev["dict_type_element"]))
解决方法
当你使用 uWSGI 时,async_mode 应该是 gevent_uwsgi
:
socketio = SocketIO(app,async_mode='gevent_uwsgi')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。