关于Pyzmq介绍

【相关学习推荐:python教程】

Pyzmq介绍

ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。

是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。

ZMQ 让编写高性能网络应用程序极为简单和有趣。

ZeroMQ并不是一个对socket的封装,不能用它去实现已有的网络协议。

它有自己的模式,不同于更底层的点对点通讯模式。

它有比tcp协议更高一级的协议。(当然ZeroMQ不一定基于TCP协议,它也可以用于进程间和进程内通讯)

zeromq 并不是类似rabbitmq消息列队,它实际上只一个消息列队组件,一个库。


Pyzmq官网:zeromq.org/languages/python/
Githubgithub.com/zeromq/pyzmq
Docszeromq.github.io/pyzmq/
Guidezguide.zeromq.org/py:all

Download

pip install pyzmq

Pyzmq的几种模式

1. 请求应答模式(Request-Reply)(rep 和 req)

消息双向的,有来有往,req端请求的消息,rep端必须答复给req端

2. 订阅发布模式 (pub 和 sub)

消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。

3. push pull模式

消息单向的,也是有去无回的。push的任何一个消息,始终只会有一个pull端收到消息.

后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。

1. Request-Reply模式(请求响应模型):

客户端在请求后,服务端必须回响应

由客户端发起请求,并等待服务端响应请求。从客户端端来看,一定是一对对发收配对的;

反之,在服务端一定是收发对。服务端和客户端都可以是1:N的模型。通常把1认为是server,N认为是Client。

ZMQ可以很好的支持路由功能(实现路由功能的组件叫做Device),把1:N扩展为N:M(只需要加入若干路由节点)。

从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含回应地址,而应用则不关心它

img

服务端:

sever.py

 import zmq import sys
 context = zmq.Context()
 socket = context.socket(zmq.REP)
 socket.bind(tcp://*:5555)
 while True:
  try:
  print(wait for client ...)
  message = socket.recv()
  print(message from client:, message.decode('utf-8'))
  socket.send(message)
  except Exception as e:
  print('异常:',e)
  sys.exit()

客户端:

#client.py

 import zmq import sys
 context = zmq.Context()
 print(Connecting to server...)
 socket = context.socket(zmq.REQ)
 socket.connect(tcp://localhost:5555)
 while True:
 
  input1 = input(请输入内容:).strip()
  if input1 == 'b':
  sys.exit()
  socket.send(input1.encode('utf-8'))
 
  message = socket.recv()
  print(Received reply: , message.decode('utf-8'))

2.Publish-Subscribe模式(发布订阅模型):

广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。

服务端

server.py

 import zmq import time import sys
 context = zmq.Context()
 socket = context.socket(zmq.PUB)
 socket.bind(tcp://*:5555)
 
 while True:
  msg = input(请输入要发布的信息:).strip()
  if msg == 'b':
  sys.exit()
  socket.send(msg.encode('utf-8'))
  time.sleep(1)

客户端1

client1.py

 import zmq
 
 
 context = zmq.Context()
 socket = context.socket(zmq.SUB)
 socket.connect(tcp://localhost:5555)
 socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8'))  # 接收所有消息 while True:
  response = socket.recv().decode('utf-8');
  print(response: %s % response)

客户端2

client2.py

 import zmq
 context = zmq.Context()
 socket = context.socket(zmq.SUB)
 socket.connect(tcp://localhost:5555)
 socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8'))  # 消息过滤  只接受123开头的信息 while True:
  response = socket.recv().decode('utf-8');
  print(response: %s % response)

3.Parallel Pipeline模式(管道模型):

 由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。

当连接被断开,数据不会丢失,重连后数据继续发送到对端。

img

server.py

 import zmq import time
 
 context = zmq.Context()
 socket = context.socket(zmq.PUSH)
 socket.bind(tcp://*:5557)
 
 while True:
  msg = input(请输入要发布的信息:).strip()
  socket.send(msg.encode('utf-8'))
  print(已发送)
  time.sleep(1)

worker.py

 import zmq
 context = zmq.Context()
 receive = context.socket(zmq.PULL)
 receive.connect('tcp://127.0.0.1:5557')
 sender = context.socket(zmq.PUSH)
 sender.connect('tcp://127.0.0.1:5558')
 
 while True:
  data = receive.recv()
  print(正在转发...)
  sender.send(data)

client.py

 import zmq
 context = zmq.Context()
 socket = context.socket(zmq.PULL)
 socket.bind(tcp://*:5558)
 
 while True:
  response = socket.recv().decode('utf-8')
  print(response: %s % response)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


本文适合有 Python 基础的小伙伴进阶学习 作者:pwwang 一、前言 本文基于开源项目: https://github.com/pwwang/python-import-system 补充扩展讲解,希望能够让读者一文搞懂 Python 的 import 机制。 1.1 什么是 import
前言 目前有个python应用需要在容器镜像内拉取git私有仓库的代码,一开始的想法是用GitPython,折腾一番ssh私钥和known_hosts问题后,发现还是在镜像中封装个git最省事,然后用subprocess调用系统命令,镜像体积也没有想象中增加特别多。 准备ssh私钥和known_ho
前言 当网络不稳定或应用页面加载有问题,可以设置等待,避免网络问题导致找不到元素等异常。 隐式等待 隐式等待设置的是最长等待时间,如果在规定时间内网页加载完成,则执行下一步,否则一直等到时间结束。 隐式等待在driver的整个生命周期都有效,初始化的时候设置一次即可。 # 隐式等待10秒 drive
前言 map()、reduce()、filter()是python的三个高阶函数。所谓高阶函数,指的是将函数作为参数并返回函数作为结果的函数。下面代码的sing_ready只是一个简单高阶函数示例: def ready(name): return f"ready,{name}!"
入门使用 # 示例代码 warframe = ["saryn", "wisp", "volt"] counts = [len(n) for n in warframe] for i,j in zip(warframe,counts): pr
前言 功能描述:批量重命名指定目录下的文件,文件名加前缀,默认格式为“目录名_原文件名”。 示例代码 import argparse import os import sys import logging def gen_args(): """ 说明 解析命令行参数 &
前言 常见的应用配置方式有环境变量和配置文件,对于微服务应用,还会从配置中心加载配置,比如nacos、etcd等,有的应用还会把部分配置写在数据库中。此处主要记录从环境变量、.env文件、.ini文件、.yaml文件、.toml文件、.json文件读取配置。 ini文件 ini文件格式一般如下: [
前言 在设计API返回内容时,通常需要与前端约定好API返回响应体内容的格式。这样方便前端进行数据反序列化时相应的解析处理,也方便其它服务调用。不同公司有不同的响应内容规范要求,这里以常见的JSON响应体为例: { "code": 200, "data": {
前言 我们一般使用如下方式点击元素: elem = driver.find_element(...) elem.click() # 或者使用带等待条件的方式 elem = WebDriverWait(driver, 10).until(EC.xxx(...)) elem.click() 正常情况下,
前言 从环境变量和配置文件中获取配置参数,相关库: python-dotenv:第三方库,需要使用pip安装 configparser:标准库 示例代码 test.ini [mysql] host = "192.168.0.10" port = 3306 user = &quot
前言 Relative Locators,相对定位器,是Selenium 4引入的一个新的定位器,相对定位器根据源点元素去定位相对位置的其它元素。 相对定位方法其实是基于JavaScript的 getBoundingClientRect() 而实现,简单的页面还行,复杂页面中可能会定位到需要相同类型
简介 The pytest framework makes it easy to write small, readable tests, and can scale to support complex functional testing for applications and librari
简介 Faker库可用于随机生成测试用的虚假数据。 可生成的数据参考底部的参考链接。 安装: python -m pip install faker 快速入门 from faker import Faker # 实例化一个对象,本地化使用中国 fk - Faker(locale="zh_C
前言 原本应用的日志是全部输出到os的stdout,也就是控制台输出。因其它团队要求也要保留日志文件,便于他们用其他工具统一采集,另一方面还要保留控制台输出,便于出问题的时候自己直接看pod日志。具体需求如下: 日志支持同时控制台输出和文件输出 控制台的输出级别可以高点,比如WARNING,个人这边
按列从多个文件中构建 假设有两个csv文件,列不相同,需要整合为一个dataframe,使用glob模块: from glob import glob import pandas as pd # glob会返回任意排序的文件名,所以需要sort排序 some_files = sorted(glob(
简介 diagrams是python的一个第三方库,用于实现使用代码绘制架构图。 安装 依赖于 Graphviz,安装diagrams之前需要先安装 Graphviz(下载压缩包后,将bin目录添加到系统环境变量Path里即可)。 python3 -m pip install diagrams 快速
前言 最近有个个人需求是要把多个图片文件合并为一个PDF文件,这样方便用PDF阅读器连续看,避免界面点一下,只会图片放大。(比如看漫画) 主要思路是先把单张图片转换成单个PDF文件,然后把PDF文件进行合并。原先是用WPS的转换工具做的,但WPS每次只能批量转换30张,如果有大量图片文件,用WPS就
前言 版本: python:3.9 selenium:4.1.5 获取元素文本 text = driver.find_element(by=By.XPATH, value="").text 获取元素属性值 attr1 = driver.find_element(by=By.XPA
Python中有个内置的函数叫做 enumerate,可以在迭代时返回元素的索引。 # 示例代码01 warframe = ["saryn", "wisp", "volt"] for i,name in enumerate(warframe
前言 版本: python:3.9 selenium:4.1.5 浏览器:firefox 创建浏览器对象 from selenium import webdriver driver = webdriver.Firefox(executable_path=r"C:\software\sele