微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何通过Akka使用非阻塞代码连续调用REST服务

如何解决如何通过Akka使用非阻塞代码连续调用REST服务

我正在从REST端点访问数据:

"https://api-public.sandBox.pro.coinbase.com/products/BTC-EUR/ticker"

要每秒访问一次数据,我使用无限循环while(true) {每秒调用一次发送给actor的消息,该消息开始调用REST请求的过程:

用于访问数据的actor是:

object ProductTickerRestActor {

  case class StringData(data: String)

}

class ProductTickerRestActor extends Actor {
  
  override def receive: PartialFunction[Any,Unit] = {

    case ProductTickerRestActor.StringData(data) =>
      try {
        println("in ProductTickerRestActor")
        val rData = scala.io.source.fromURL("https://api-public.sandBox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
        println("rData : "+rData)

      }
      catch {
        case e: Exception =>
          println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
      }

    case msg => println(s"I cannot understand ${msg.toString}")
  }
}

我使用以下方法启动应用程序:

object ExchangeModelDataApplication {

  def main(args: Array[String]): Unit = {

    val actorSystem = ActorSystemConfig.getActorSystem

    val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor],"ProductTickerRestActor")
    val throttler = Throttlers.getThrottler(priceDataActor)
    while(true) {
      throttler ! ProductTickerRestActor.StringData("test")
      Thread.sleep(1000)
    }

}

节流阀:

object Throttlers {


  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000,OverflowStrategy.dropNew)
    .throttle(1,1.second)
    .to(Sink.actorRef(priceDataActor,NotUsed))
    .run()
}

如何异步运行以下代码,而不是使用无限循环进行阻止? :

throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000) 

在这种情况下,调节器也可能是多余的,因为无论如何我都在循环中调节请求。

解决方法

我只会将Akka Streams与Akka HTTP一起使用。使用Akka 2.6.x,按照这些原则,足以满足每秒1个请求

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl._

import scala.concurrent.duration._

object HTTPRepeatedly {
  implicit val system = ActorSystem()
  import system.dispatcher

  val sourceFromHttp: Source[String,NotUsed] =
    Source.repeated("test") // Not sure what "test" is actually used for here...
      .throttle(1,1.second)
      .map { str =>
        HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
      }.mapAsync(1) { req =>
        Http().singleRequest(req)
      }.mapAsync(1)(_.entity.toStrict(1.minute))
      .map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
}

那么您可以(例如,为简单起见,将其放在main中的HTTPRepeatedly中,以便隐式对象在范围内,等等)

val done: Future[Done] =
  sourceFromHttp
    .take(10) // stop after 10 requests
    .runWith(Sink.foreach { rData => println(s"rData: $rData") })

scala.concurrent.Await.result(done,11.minute)

system.terminate()
,

每秒发送一个请求不是一个好主意。如果由于某种原因该请求被延迟,您将堆积很多请求。相反,请在前一个请求完成后一秒钟发送下一个请求。

由于此代码使用了同步的GET请求,因此您可以在mkString返回后一秒钟发送下一个请求。

但是使用同步请求并不是在Akka中使用RESTful API的好方法。它会阻塞参与者receive方法,直到请求完成为止,最终可能会阻塞整个ActorSystem

相反,请使用Akka Http和singleRequest来执行异步请求。

Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))

这将返回Future。在请求完成后一秒钟发出新请求(例如,在onComplete上使用Future)。

fromUrl

相比,这不仅更安全,更异步,而且还提供了更多的REST API调用控制权

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。