微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Akka实战:分散、聚合模式

分散与聚合:简单说就是一个任务需要拆分成多个小任务,每个小任务执行完后再把结果聚合在一起返回。

代码 http://git.oschina.net/yangbajing/akka-action

实例背景

本实例来自一个真实的线上产品,现将其需求简化如下:

  1. 传入一个关键词:key,根据key从网上抓取相关新闻
  2. 可选传入一个超时参数:duration,设置任务到期时必须反回数据(返回实际已抓取数据)
  3. 若超时到返回实际已爬取数据,则任务应继续运行直到所以数据抓取完成,并存库

设计

根据需求,一个简化的分散、聚合模式可以使用两个actor来实现。

  • NewsTask:接收请求,并设置超时时间
  • SearchPageTask:执行实际的新闻抓取操作(本实例将使用TimeUnit模拟抓取耗时)

实现

NewsTask

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/NewsTask.scala

override def metricPreStart(): Unit = {
    context.system.scheduler.scheduleOnce(doneDuration,self,TaskDelay)
  }

  override def metricReceive: Receive = {
    case StartFetchNews =>
      _receipt = sender()
      (0 until NewsTask.TASK_SIZE).foreach { i =>
        context.actorOf(SearchPageTask.props(self),"scatter-" + i) ! SearchPage(key)
      }

    case GetNewsItem(newsItem) =>
      _newses ::= newsItem
      if (_newses.size == NewsTask.TASK_SIZE) {
        logger.debug(s"分散任务,${NewsTask.TASK_SIZE}个已全部完成")

        if (_receipt != null) {
          _receipt ! NewsResult(key,_newses)
          _receipt = null
        }
        self ! PoisonPill
      }

    case TaskDelay =>
      if (_receipt != null) {
        _receipt ! NewsResult(key,_newses)
        _receipt = null
      }
  }

metricPreStart方法中设置定时方法调用时间为从代码运行开始到doneDuration时间为止。定时被触发时将向当前Actor发送一个TaskDelay消息。

metricReceive方法中,分别对StartFetchNewsGetNewsItemTaskDelay三个消息进行操作。

在收到StartFetchNews消息时,actor首先保存发送者actor的引用(结果将返回到此actor)。再根据TASK_SIZE生成相应子任务

GetNewsItem消息的处理中,每收到一个消息就将其添加_newses列表中。并判断当_newses个数等于TASK_SIZE时(所有子任务已完成)将结果发送给_receipt

self ! PoisonPill,这句代码停止actor自身。它将把“毒药”发送到NewsTask Actor的接收邮箱队列中。

TaskDelay消息被触发时,将直接返回已完成的新闻_newses。返回数据后并不终止当前还未运行完任务。

SearchPageTask

https://github.com/yangbajing/akka-action/blob/master/src/main/scala/me/yangbajing/akkaaction/scattergather/SearchPageTask.scala

override def metricReceive: Receive = {
    case SearchPage(key) =>
      // XXX 模拟抓取新闻时间
      TimeUtils.sleep(Random.nextInt(20).seconds)

      val item = NewsItem(
        "http://newssite/news/" + self.path.name,"测试新闻" + self.path.name,self.path.name,TimeUtils.Now().toString,"内容简介","新闻正文")

      taskRef ! GetNewsItem(item)
      context.stop(self)
  }

SearchPageTask代码逻辑就比较易懂了,这里使用sleep来模拟实际抓取新闻时的耗时。生成结果后返回数据给`taskRef`,并终止自己。

执行测试

./sbt
akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest

总结

这是一个简单的Akka实例,实现了任务分发与结果聚合。提供了一种在指定时间内返回部份有效数据,同时任务继续执行的方式。这种分散、聚合的模式在实际生产中很常用,比如对多种数据源的整合,或某些需要长时间运行同时对返回数据完整性无强制要求的情况等。

MetricActor演示了怎么自定义Actor,并为其提供一些侦测点的方式。以后有时间会写篇详文介绍。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


迭代器模式(Iterator)迭代器模式(Iterator)[Cursor]意图:提供一种方法顺序访问一个聚合对象中的每个元素,而又不想暴露该对象的内部表示。应用:STL标准库迭代器实现、Java集合类型迭代器等模式结构:心得:迭代器模式的目的是在不获知集合对象内部细节的同时能对集合元素进行遍历操作
高性能IO模型浅析服务器端编程经常需要构造高性能的IO模型,常见的IO模型有四种:(1)同步阻塞IO(BlockingIO):即传统的IO模型。(2)同步非阻塞IO(Non-blockingIO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。注意这里所说的N
策略模式(Strategy)策略模式(Strategy)[Policy]意图:定义一系列算法,把他们封装起来,并且使他们可以相互替换,使算法可以独立于使用它的客户而变化。应用:排序的比较方法、封装针对类的不同的算法、消除条件判断、寄存器分配算法等。模式结构:心得:对对象(Context)的处理操作可
访问者模式(Visitor)访问者模式(Visitor)意图:表示一个作用于某对象结构中的各元素的操作,它使你在不改变各元素的类的前提下定义作用于这些元素的新操作。应用:作用于编译器语法树的语义分析算法。模式结构:心得:访问者模式是要解决对对象添加新的操作和功能时候,如何尽可能不修改对象的类的一种方
命令模式(Command)命令模式(Command)[Action/Transaction]意图:将一个请求封装为一个对象,从而可用不同的请求对客户参数化。对请求排队或记录请求日志,以及支持可撤消的操作。应用:用户操作日志、撤销恢复操作。模式结构:心得:命令对象的抽象接口(Command)提供的两个
生成器模式(Builder)生成器模式(Builder)意图:将一个对象的构建和它的表示分离,使得同样的构建过程可以创建不同的表示。 应用:编译器词法分析器指导生成抽象语法树、构造迷宫等。模式结构:心得:和工厂模式不同的是,Builder模式需要详细的指导产品的生产。指导者(Director)使用C
设计模式学习心得《设计模式:可复用面向对象软件的基础》一书以更贴近读者思维的角度描述了GOF的23个设计模式。按照书中介绍的每个设计模式的内容,结合网上搜集的资料,我将对设计模式的学习心得总结出来。网络上关于设计模式的资料和文章汗牛充栋,有些文章对设计模式介绍生动形象。但是我相信“一千个读者,一千个
工厂方法模式(Factory Method)工厂方法模式(Factory Method)[Virtual Constructor]意图:定义一个用于创建对象的接口,让子类决定实例化哪一个类,使一个类的实力化延迟到子类。应用:多文档应用管理不同类型的文档。模式结构:心得:面对同一继承体系(Produc
单例模式(Singleton)单例模式(Singleton)意图:保证一个类只有一个实例,并提供一个访问它的全局访问点。应用:Session或者控件的唯一示例等。模式结构:心得:单例模式应该是设计模式中最简单的结构了,它的目的很简单,就是保证自身的实例只有一份。实现这种目的的方式有很多,在Java中
装饰者模式(Decorator)装饰者模式(Decorator)[Wrapper]意图:动态的给一个对象添加一些额外的职责,就增加功能来说,比生成子类更为灵活。应用:给GUI组件添加功能等。模式结构:心得:装饰器(Decorator)和被装饰的对象(ConcreteComponent)拥有统一的接口