微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

基于python yield机制的异步操作同步化编程模型

本文总结下如何在编写python代码时对异步操作进行同步化模拟,从而提高代码的可读性和可扩展性。

     游戏引擎一般都采用分布式框架,通过一定的策略来均衡服务器集群的资源负载,从而保证服务器运算的高并发性和cpu高利用率,最终提高游戏的性能负载。由于引擎的逻辑层调用是非抢占式的,服务器之间都是通过异步调用来进行通讯,导致游戏逻辑无法同步执行,所以在代码层不得不人为地添加很多回调函数,使一个原本完整的功能碎片化地分布在各个回调函数中。

异步逻辑

     以游戏中的副本评分逻辑为例,在副本结束时副本管理进程需要收集副本中每个玩家的战斗信息,再结合管理进程内部的统计信息最终给出一个副本评分,发放相应奖励。因为每个玩家实体都随机分布在不同进程中,所以管理进程需要通过异步调用获取玩家身上的战斗信息。

实现代码如下所示:

# -*- coding: gbk -*-
import random
 
# 玩家实体类
class Player(object):
  def __init__(self,entityId):
    super(Player,self).__init__()
    # 玩家标识
    self.entityId = entityId
 
  def onFubenEnd(self,mailBox):
    score = random.randint(1,10)
    print "onFubenEnd player %d score %d"%(self.entityId,score)
 
    # 向副本管理进程发送自己的id和战斗信息
    mailBox.onEvalFubenscore(self.entityId,score)
 
# 副本管理类
class FubenStub(object):
  def __init__(self,players):
    super(FubenStub,self).__init__()
    self.players = players
 
  def evalFubenscore(self):
    self.playerRelayCnt = 0
    self.totalscore = 0
 
    # 通知每个注册的玩家,副本已经结束,索取战斗信息
    for player in self.players:
      player.onFubenEnd(self)
 
  def onEvalFubenscore(self,entityId,score):
    # 收到其中一个玩家的战斗信息
    print "onEvalFubenscore player %d score %d"%(entityId,score)
    self.playerRelayCnt += 1
    self.totalscore += score
 
    # 当收集完所有玩家的信息后,打印评分
    if len(self.players) == self.playerRelayCnt:
      print 'The fuben totalscore is %d'%self.totalscore
 
if __name__ == '__main__':
  # 模拟创建玩家实体
  players = [Player(i) for i in xrange(3)]
 
  # 副本开始时,每个玩家将自己的MailBox注册到副本管理进程
  fs = FubenStub(players)
 
  # 副本进行中
  # ....
 
  # 副本结束,开始评分
  fs.evalFubenscore()

代码简化了副本评分逻辑的实现,其中Player类表示游戏的玩家实体,在游戏运行时无缝地在不同服务器中切换,FubenStub表示副本的管理进程,在副本刚开始的时候该副本内所有玩家会将自己的MailBox注册到管理进程中,其中MailBox表示各个实体的远程调用句柄。在副本结束时,FubenStub首先向各个玩家发送副本结束消息,同时请求玩家的战斗信息,玩家在得到消息后,将自己的战斗信息发送给FubenStub;然后当FubenStub收集完所有玩家的信息后,最终打印副本评分。

同步逻辑

    如果Player和FubenStub在同一进程中的话,那所有的操作都可以同步完成,在FubenStub向玩家发送副本结束消息的同时可以马上得到该玩家的战斗信息,实现代码如下所示:

# -*- coding: gbk -*-
 
import random
 
class Player(object):
  def __init__(self,self).__init__()
    self.entityId = entityId
 
  def onFubenEnd(self,score)
    return self.entityId,score
 
class FubenStub(object):
  def __init__(self,self).__init__()
    self.players = players
 
  def evalFubenscore(self):
    totalscore = 0
    for player in self.players:
      entityId,score = player.onFubenEnd(self)
      print "onEvalFubenscore player %d score %d"%(entityId,score)
      totalscore += score
 
    print 'The fuben totalscore is %d'%totalscore
 
if __name__ == '__main__':
  players = [Player(i) for i in xrange(3)]
 
  fs = FubenStub(players)
  fs.evalFubenscore()

 从以上两份代码可以看到由于异步操作,FubenStub中的评分逻辑人为地分成两个功能点:1)向玩家发送副本结束消息;2)接受玩家的战斗信息;并且两个功能点分布在两个不同的函数中。如果游戏逻辑一旦复杂,势必会造成功能点分散,出现过多onXXX异步回调函数,最终导致代码的开发成本和维护成本提高,可读性和可扩展性下降。

     如果有一种方法,可以让函数在异步调用时暂时挂起,并且在回调函数得到返回值后恢复执行,那么就可以用同步化的编程模式开发异步逻辑。 

yield 关键字

     yield 是 Python中的一个关键字,凡是函数体中出现了 yield 关键字,Python将改变整个函数的上下文,调用函数不再返回值,而是一个生成器对象。只有调用这个生成器的迭代函数next才能开始执行生成器对象,当生成器对象执行到包含 yield 表达式时,函数将暂时挂起,等待下一次next调用来恢复执行,具体机制如下:

         1)调用生成器对象的next方法,启动函数执行;

         2)当生成器对象执行到包含 yield 表达式时,函数挂起;

         3)下一次 next 函数调用又会驱动该生成器对象继续执行此后的语句,直到遇见下一个 yield 再次挂起;

         4)如果某次 next 调用驱动了生成器继续执行,而此后函数正常结束,生成器会抛出 stopiteration 异常;

如下代码所示:

def f():
  print "Before first yield"
  yield 1
  print "Before second yield"
  yield 2
  print "After second yield"
 
g = f()
print "Before first next"
g.next()
print "Before second next"
g.next()
print "Before third yield"
g.next()

执行结果为:

Before first next

Before first yield

Before second next

Before second yield

Before third yield

After second yield

stopiteration

     哈,有了让函数暂时挂起的机制,最后就剩下如何传递异步调用的返回值问题了。其实生成器的next函数已经实现了将参数从生成器对象内部向外传递的机制,并且python还提供了一个send函数将参数从外向生成器对象内部传递的机制,具体机制如下:

         1) 调用next 函数驱动生成器时,next会同时等待生成器中下一个 yield 挂起,并将该yield后面的参数返回给next;

         2)往生成器中传递参数,需要将next函数替换成send,此时send的功能与next相同(驱动生成器执行,等待返回值),同时send将后面的参数传递给生成器内部之前挂起的yield;

如下代码所示:

def f():
  msg = yield 'first yield msg'
  print "generator inner receive:",msg
  msg = yield 'second yield msg'
  print "generator inner receive:",msg
 
g = f()
msg = g.next()
print "generator outer receive:",msg
msg = g.send('first send msg')
print "generator outer receive:",msg
g.send('second send msg')

执行结果为:

generator outer receive: first yield msg

generator inner receive: first send msg

generator outer receive: second yield msg

generator inner receive: second send msg

stopiteration

同步化实现

     好了,万事俱备只欠东风,下面就是简单对yield机制进行工程上封装以方便之后开发。下面的代码提供了一个叫IFakeSyncCall的interface,所有包含异步操作的逻辑类都可以继承这个接口:

class IFakeSyncCall(object):
  def __init__(self):
    super(IFakeSyncCall,self).__init__()
    self.generators = {}
 
  @staticmethod
  def FAKE_SYNCALL():
    def fwrap(method):
      def fakeSyncCall(instance,*args,**kwargs):
        instance.generators[method.__name__] = method(instance,**kwargs)
        func,args = instance.generators[method.__name__].next()
        func(*args)
      return fakeSyncCall
    return fwrap
 
  def onFakeSyncCall(self,identify,result):
    try:
      func,args = self.generators[identify].send(result)
      func(*args)
    except stopiteration:
      self.generators.pop(identify)

 其中interface中属性generators用来保存类中已经开始执行的生成器对象;函数FAKE_SYNCALL是一个decorator,装饰类中包含有yield的函数,改变函数调用上下文,在fakeSyncCall内部封装了对生成器对象的next调用函数onFakeSyncCall封装了所有onXXX函数的逻辑,其他实体通过调用这个函数传递异步回调的返回值。

下面就是经过同步化改进后的异步副本评分逻辑代码

# -*- coding: gbk -*-
import random
 
class Player(object):
  def __init__(self,score)
    mailBox.onFakeSyncCall('evalFubenscore',(self.entityId,score))
 
class FubenStub(IFakeSyncCall):
  def __init__(self,self).__init__()
    self.players = players
 
  @IFakeSyncCall.FAKE_SYNCALL()
  def evalFubenscore(self):
    totalscore = 0
    for player in self.players:
      entityId,score = yield (player.onFubenEnd,(self,))
      print "onEvalFubenscore player %d score %d"%(entityId,score)
      totalscore += score
 
    print 'the totalscore is %d'%totalscore
 
if __name__ == '__main__':
  players = [Player(i) for i in xrange(3)]
 
  fs = FubenStub(players)
  fs.evalFubenscore()

比较evalFubenscore函数,基本已经和原本的同步逻辑代码相差无几。

      利用yield机制实现同步化编程模型的另外一个优点是可以保证所有异步调用的逻辑串行化,从而保证数据的一致性和有效性,特别是在各种异步初始化流程中可以摒弃传统的timer sleep机制,从源头上扼杀一些隐藏很深的由于数据不一致性所导致的bug。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用爬虫利器 Playwright,轻松爬取抖查查数据 我们先分析登录的接口,其中 url 有一些非业务参数:ts、he、sign、secret。 然后根据这些参数作为关键词,定位到相关的 js 代码。 最后,逐步进行代码的跟踪,发现大部分的代码被混淆加密了。 花费了大半天,来还原这些混淆加密的代码
轻松爬取灰豚数据的抖音商品数据 调用两次登录接口实现模拟登录 我们分析登录接口,发现调用了两次不同的接口;而且,需要先调用 https://login.huitun.com/weChat/userLogin,然后再调用 https://dyapi.huitun.com/userLogin 接口。 登
成功绕过阿里无痕验证码,一键爬取飞瓜数据 飞瓜数据的登录接口,接入了阿里云的无痕验证码;通过接口方式模拟登录,难度比较高。所以,我们使用自动化的方式来实现模拟登录,并且获取到 cookie 数据。 [阿里无痕验证码] https://help.aliyun.com/document_detail/1
一文教你从零开始入门蝉妈妈数据爬取,成功逆向破解数据加密算法 通过接口进行模拟登录 我们先通过正常登录的方式,分析对应的登录接口。通过 F12 打开谷歌浏览器的调试面板,可以看到登录需要传递的一些参数;其中看到密码是被加密了。 不过我们通过经验可以大概猜测一下,应该是通过 md5 算法加密了。 接下
抽丝剥茧成功破解红人点集的签名加密算法 抽丝剥茧破解登录签名算法,成功实现模拟登录 headers = {} phone_num = "xxxx" password = "xxxx" md5_hash = hashlib.md5() md5_hash.upda
轻松绕过 Graphql 接口爬取有米有数的商品数据 有米有数数据的 API 接口,使用的是一种 API 查询语言 graphql。所有的 API 只有一个入口,具体的操作隐藏在请求数据体里面传输。 模拟登录,获取 sessionId 调用登录接口,进行模拟登录。 cookies = {} head
我最近重新拾起了计算机视觉,借助Python的opencv还有face_recognition库写了个简单的图像识别demo,额外定制了一些内容,原本想打包成exe然后发给朋友,不过在这当中遇到了许多小问题,都解决了,记录一下踩过的坑。 1、Pyinstaller打包过程当中出现warning,跟d
说到Pooling,相信学习过CNN的朋友们都不会感到陌生。Pooling在中文当中的意思是“池化”,在神经网络当中非常常见,通常用的比较多的一种是Max Pooling,具体操作如下图: 结合图像理解,相信你也会大概明白其中的本意。不过Pooling并不是只可以选取2x2的窗口大小,即便是3x3,
记得大一学Python的时候,有一个题目是判断一个数是否是复数。当时觉得比较复杂不好写,就琢磨了一个偷懒的好办法,用异常处理的手段便可以大大程度帮助你简短代码(偷懒)。以下是判断整数和复数的两段小代码: 相信看到这里,你也有所顿悟,能拓展出更多有意思的方法~
文章目录 3 直方图Histogramplot1. 基本直方图的绘制 Basic histogram2. 数据分布与密度信息显示 Control rug and density on seaborn histogram3. 带箱形图的直方图 Histogram with a boxplot on t