微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python Redis pipeline操作和Redis乐观锁保持数据一致性

<div id="article_content" class="article_content clearfix csdn-tracking-statistics" data-pid="blog" data-mod="popu_307" data-dsm="post">
<div class="htmledit_views">
<p style="margin-left: 10px;">Redis是建立在TCP协议基础上的CS架构,客户端client对redis server采取请求响应的方式交互。


<p style="margin-left: 10px;">redis 乐观锁:也可理解为版本号比较机制,主要是说在读取数据逇时候同时读取其版本号,然后在写入的时候,进行版本号比较,如果一致,则表明此数据在监听期间未被改变,可以写入,如果不一致说明此数据被修改过,不能写入,否则会导致数据不一致的问题。


<p style="margin-left: 10px;">一般来说客户端从提交请求到得到服务器相应,需要传送两个tcp报文。


<p style="margin-left: 10px;">设想这样的一个场景,你要批量的执行一系列redis命令,例如执行100次get key,这时你要向redis请求100次+获取响应100次。如果能一次性将100个请求提交给redis server,执行完成之后批量的获取相应,只需要向redis请求1次,然后批量执行完命令,一次性结果,性能是不是会好很多呢?


<p style="margin-left: 10px;">答案是肯定的,节约的时间是客户端client和服务器redis server之间往返网络延迟的时间。这个时间可以用ping命令查看。


<p style="margin-left: 10px;">网络延迟高:批量执行,性能提升明显


<p style="margin-left: 10px;">网络延迟低(本机):批量执行,性能提升不明显


<p style="margin-left: 10px;">某些客户端(java和python)提供了一种叫做pipeline的编程模式用来解决批量提交请求的方式。


<p style="margin-left: 10px;">这里我们用python客户端来举例说明一下。


<p style="margin-left: 10px;">


<h1 style="margin-left: 10px;">1、pipeline
<p style="margin-left: 10px;">网络延迟


<p style="margin-left: 10px;">client与server机器之间网络延迟如下,大约是30ms。


<p style="margin-left: 10px;">


<p style="margin-left: 10px;">


<p style="margin-left: 10px;">测试用例


<p style="margin-left: 10px;">分别执行其中的try_pipeline和without_pipeline统计处理时间。


<div class="cnblogs_code">


<span style="color: #0000ff;">import<span style="color: #000000;"> redis
<span style="color: #0000ff;">import<span style="color: #000000;"> time
<span style="color: #0000ff;">from concurrent.futures <span style="color: #0000ff;">import<span style="color: #000000;"> ProcesspoolExecutor

r = redis.Redis(host=<span style="color: #800000;">'<span style="color: #800000;">10.93.84.53<span style="color: #800000;">',port=6379,password=<span style="color: #800000;">'<span style="color: #800000;">bigdata123<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> try_pipeline():
start =<span style="color: #000000;"> time.time()
with r.pipeline(transaction=<span style="color: #000000;">False) as p:
p.sadd(<span style="color: #800000;">'<span style="color: #800000;">seta<span style="color: #800000;">',1).sadd(<span style="color: #800000;">'<span style="color: #800000;">seta<span style="color: #800000;">',2).srem(<span style="color: #800000;">'<span style="color: #800000;">seta<span style="color: #800000;">',2).lpush(<span style="color: #800000;">'<span style="color: #800000;">lista<span style="color: #800000;">',1).lrange(<span style="color: #800000;">'<span style="color: #800000;">lista<span style="color: #800000;">',-1<span style="color: #000000;">)
p.execute()
<span style="color: #0000ff;">print time.time() -<span style="color: #000000;"> start

<span style="color: #0000ff;">def<span style="color: #000000;"> without_pipeline():
start =<span style="color: #000000;"> time.time()
r.sadd(<span style="color: #800000;">'<span style="color: #800000;">seta<span style="color: #800000;">',1<span style="color: #000000;">)
r.sadd(<span style="color: #800000;">'<span style="color: #800000;">seta<span style="color: #800000;">',2<span style="color: #000000;">)
r.srem(<span style="color: #800000;">'<span style="color: #800000;">seta<span style="color: #800000;">',2<span style="color: #000000;">)
r.lpush(<span style="color: #800000;">'<span style="color: #800000;">lista<span style="color: #800000;">',1<span style="color: #000000;">)
r.lrange(<span style="color: #800000;">'<span style="color: #800000;">lista<span style="color: #800000;">',-1<span style="color: #000000;">)
<span style="color: #0000ff;">print time.time() -<span style="color: #000000;"> start

<span style="color: #0000ff;">def<span style="color: #000000;"> worker():
<span style="color: #0000ff;">while<span style="color: #000000;"> True:
try_pipeline()

with ProcesspoolExecutor(maxworkers=12<span style="color: #000000;">) as pool:
<span style="color: #0000ff;">for
<span style="color: #0000ff;">in range(10<span style="color: #000000;">):
pool.submit(worker)

<p style="margin-left: 10px;">


<p style="margin-left: 10px;">结果分析


<p style="margin-left: 10px;">try_pipeline平均处理时间:0.04659


<p style="margin-left: 10px;">without_pipeline平均处理时间:0.16672


<p style="margin-left: 10px;">我们的批量里有5个操作,在处理时间维度上性能提升了4倍!


<p style="margin-left: 10px;">网络延迟大约是30ms,不使用批量的情况下,网络上的时间损耗就有0.15s(30ms*5)以上。而pipeline批量操作只进行一次网络往返,所以延迟只有0.03s。可以看到节省的时间基本都是网路延迟。


<p style="margin-left: 10px;">


<h1 style="margin-left: 10px;">2、pipeline与transation(事务)
<p style="margin-left: 10px;">pipeline不仅仅用来批量的提交命令,还用来实现事务transation。


<p style="margin-left: 10px;">这里对redis事务的讨论不会太多,只是给出一个demo。详细的描述你可以参见这篇博客。<a href="http://www.cnblogs.com/kangoroo/p/7535405.html" rel="nofollow" target="_blank">redis事务


<p style="margin-left: 10px;">细心的你可能发现了,使用transaction与否不同之处在与创建pipeline实例的时候,transaction是否打开,默认是打开的。


<div class="cnblogs_code">


<span style="color: #0000ff;">import<span style="color: #000000;"> redis
<span style="color: #0000ff;">from redis <span style="color: #0000ff;">import<span style="color: #000000;"> WatchError
<span style="color: #0000ff;">from concurrent.futures <span style="color: #0000ff;">import<span style="color: #000000;"> ProcessPoolExecutor

r = redis.Redis(host=<span style="color: #800000;">'<span style="color: #800000;">127.0.0.1<span style="color: #800000;">',port=6379<span style="color: #000000;">)

<span style="color: #008000;">#<span style="color: #008000;"> 减库存函数,循环直到减库存完成<span style="color: #008000;">

<span style="color: #008000;"> 库存充足,减库存成功,返回True<span style="color: #008000;">

<span style="color: #008000;"> 库存不足,减库存失败,返回False

<span style="color: #0000ff;">def<span style="color: #000000;"> decr_stock():

</span><span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt; python中redis事务是通过pipeline的封装实现的</span>

<span style="color: #000000;"> with r.pipeline() as pipe:
<span style="color: #0000ff;">while<span style="color: #000000;"> True:
<span style="color: #0000ff;">try<span style="color: #000000;">:
<span style="color: #008000;">#<span style="color: #008000;"> watch库存键,multi后如果该key被其他客户端改变,事务操作会抛出WatchError异常
pipe.watch(<span style="color: #800000;">'<span style="color: #800000;">stock:count<span style="color: #800000;">'<span style="color: #000000;">)
count = int(pipe.get(<span style="color: #800000;">'<span style="color: #800000;">stock:count<span style="color: #800000;">'<span style="color: #000000;">))
<span style="color: #0000ff;">if count > 0: <span style="color: #008000;">#<span style="color: #008000;"> 有库存
<span style="color: #008000;">#<span style="color: #008000;"> 事务开始
pipe.multi() <span style="color: #008000;">#<span style="color: #008000;"> 这里起始位置???????????????
pipe.decr(<span style="color: #800000;">'<span style="color: #800000;">stock:count<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #008000;">#<span style="color: #008000;"> 把命令推送过去
<span style="color: #008000;">#<span style="color: #008000;"> execute返回命令执行结果列表,这里只有一个decr返回当前值
<span style="color: #0000ff;">print<span style="color: #000000;"> pipe.execute()[0]
<span style="color: #0000ff;">return<span style="color: #000000;"> True
<span style="color: #0000ff;">else<span style="color: #000000;">:
<span style="color: #0000ff;">return<span style="color: #000000;"> False
<span style="color: #0000ff;">except<span style="color: #000000;"> WatchError,ex:
<span style="color: #008000;">#<span style="color: #008000;"> 打印WatchError异常,观察被watch锁住的情况
<span style="color: #0000ff;">print<span style="color: #000000;"> ex
pipe.unwatch()

<span style="color: #0000ff;">def<span style="color: #000000;"> worker():
<span style="color: #0000ff;">while<span style="color: #000000;"> True:
<span style="color: #008000;">#<span style="color: #008000;"> 没有库存就退出
<span style="color: #0000ff;">if <span style="color: #0000ff;">not<span style="color: #000000;"> decr_stock():
<span style="color: #0000ff;">break

<span style="color: #008000;">#<span style="color: #008000;"> 实验开始<span style="color: #008000;">

<span style="color: #008000;"> 设置库存为100

r.set(<span style="color: #800000;">"<span style="color: #800000;">stock:count<span style="color: #800000;">",100<span style="color: #000000;">)

<span style="color: #008000;">#<span style="color: #008000;"> 多进程模拟多个客户端提交
with ProcessPoolExecutor(maxworkers=2<span style="color: #000000;">) as pool:
<span style="color: #0000ff;">for
<span style="color: #0000ff;">in range(10<span style="color: #000000;">):
pool.submit(worker)

出处:

dio controls="controls" style="display: none;">

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐