我在
Scala中使用
Akka Streams使用
AWS Java SDK从
AWS SQS队列中进行轮询.我创建了一个
ActorPublisher,它以两秒的间隔使消息出列:
class SQSSubscriber(name: String) extends ActorPublisher[Message] { implicit val materializer = ActorMaterializer() val schedule = context.system.scheduler.schedule(0 seconds,2 seconds,self,"dequeue") val client = new AmazonSQSClient() client.setRegion(RegionUtils.getRegion("us-east-1")) val url = client.getQueueUrl(name).getQueueUrl val MaxBufferSize = 100 var buf = Vector.empty[Message] override def receive: Receive = { case "dequeue" => val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList messages.foreach(self ! _) case message: Message if buf.size == MaxBufferSize => log.error("The buffer is full") case message: Message => if (buf.isEmpty && totalDemand > 0) onNext(message) else { buf :+= message deliverBuf() } case Request(_) => deliverBuf() case Cancel => context.stop(self) } @tailrec final def deliverBuf(): Unit = if (totalDemand > 0) { if (totalDemand <= Int.MaxValue) { val (use,keep) = buf.splitAt(totalDemand.toInt) buf = keep use foreach onNext } else { val (use,keep) = buf.splitAt(Int.MaxValue) buf = keep use foreach onNext deliverBuf() } } }
在我的应用程序中,我也试图以2秒的间隔运行流程:
val system = ActorSystem("system") val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name")) val flow = Flow[Message] .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem } .to(Sink.ignore) system.scheduler.schedule(0 seconds,2 seconds) { flow.runWith(sqsSource)(ActorMaterializer()(system)) }
但是,当我运行我的应用程序时,我收到java.util.concurrent.TimeoutException:Futures在[20000毫秒]之后超时,以及由ActorMaterializer引起的后续死信通知.
是否有推荐的方法来持续实现Akka Stream?
解决方法
我认为你不需要每2秒创建一个新的ActorPublisher.这似乎是多余的,浪费了内存.另外,我认为不需要ActorPublisher.从我所知的代码来看,你的实现将有越来越多的Streams查询相同的数据.来自客户端的每条消息将由N个不同的akka Streams处理,更糟糕的是,N将随着时间的推移而增长.
无限循环查询的迭代器
您可以使用scala的Iterator从ActorPublisher获得相同的行为.可以创建一个不断查询客户端的Iterator:
//setup the client val client = { val sqsClient = new AmazonSQSClient() sqsClient setRegion (RegionUtils getRegion "us-east-1") sqsClient } val url = client.getQueueUrl(name).getQueueUrl //single query def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable { client receiveMessage (new ReceiveMessageRequest(url).getMessages) } def messageListIteartor : Iterator[Iterable[Message]] = Iterator continually messageListStream //messages one-at-a-time "on demand",no timer pushing you around def messageIterator() : Iterator[Message] = messageListIterator flatMap identity
此实现仅在所有先前消息已被使用时查询客户端,因此确实是reactive.无需跟踪具有固定大小的缓冲区.您的解决方案需要一个缓冲区,因为消息(通过计时器)的创建与消息消息(通过println)分离.在我的实施,创作和通过背压消耗量为tightly coupled.
Akka Stream Source
然后,您可以使用此Iterator生成器函数来提供akka流源:
def messageSource : Source[Message,_] = Source fromIterator messageIterator
流动形成
最后,您可以使用此Source来执行println(作为旁注:您的流量值实际上是一个接收器,因为Flow Sink = Sink).使用问题中的流量值:
messageSource runWith flow
一个akka Stream处理所有消息.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。