我使用免费monad为ETL过程实现了一种简单的语言.当使用List作为数据获取和存储的输入和输出时,一切正常.但是我使用异步库并与Future [List]一起使用
常见的进口和定义
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import cats.free.Free import cats.free.Free._ sealed trait Ops[A] type OpsF[A] = Free[Ops,A]
使用List
case class Fetch(offset: Int,amount: Int) extends Ops[List[Record]] case class Store(recs: List[Record]) extends Ops[List[Response]] def fetch(offset: Int,amount: Int): OpsF[List[Record]] = liftF[Ops,List[Record]](Fetch(offset,amount)) def store(recs: List[Record]): OpsF[List[Response]] = liftF[Ops,List[Response]](Store(recs)) def simpleEtl(offset: Int,amount: Int): Free[Ops,List[Response]] = fetch(offset,amount).flatMap(r => store(r))
不与Future合作[List]
case class Fetch(offset: Int,amount: Int) extends Ops[Future[List[Record]]] case class Store(recs: List[Record]) extends Ops[Future[List[Response]]] def fetch(offset: Int,amount: Int): OpsF[Future[List[Record]]] = liftF[Ops,Future[List[Record]]](Fetch(offset,amount)) def store(recs: List[Record]): OpsF[Future[List[Response]]] = liftF[Ops,Future[List[Response]]](Store(recs)) // explicit types in case I am misunderstanding more than I think def simpleEtl(offset: Int,Future[List[Response]]] = fetch(offset,amount).flatMap { rf: Future[List[Record]] => val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] => store(r) } getResponses }
正如预期的那样,从flatMap / map返回的类型是错误的 – 我没有获得OpsF [Future]而是未来[OpsF]
Error:(34,60) type mismatch; found : scala.concurrent.Future[OpsF[scala.concurrent.Future[List[Response]]]] (which expands to) scala.concurrent.Future[cats.free.Free[Ops,scala.concurrent.Future[List[String]]]] required: OpsF[scala.concurrent.Future[List[Response]]] (which expands to) cats.free.Free[Ops,scala.concurrent.Future[List[String]]] val getResponses: OpsF[Future[List[Response]]] = rf map { r: List[Record] =>
我目前的解决方法是让商店接受Future [List [Record]]并让解释器映射到Future,但感觉很笨拙.
该问题并非特定于列表 – 例如选项也很有用.
我做错了吗?这是否有某种monad变压器?
解决方法
抽象数据类型Ops定义了一个代数来获取和存储多个记录.它描述了两个操作,但这也是代数应该做的唯一事情.如何实际执行操作,对Fetch和Store来说根本不重要,你期望的唯一有用的东西分别是List [Record]和List [Response].
通过制作Fetch和Store a Future [List [Record]]]的预期结果类型,可以限制如何解释此代数的可能性.也许在您的测试中,您不希望异步连接到Web服务或数据库,只想使用Map [Int,Result]或Vector [Result]进行测试,但现在您需要返回一个Future,测试比它们更复杂.
但是说你不需要ETL [Future [List [Record]]]并没有解决你的问题:你正在使用异步库而你可能想要返回一些Future.
从第一次实施开始:
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import cats.implicits._ import cats.free.Free type Record = String type Response = String sealed trait EtlOp[T] case class Fetch(offset: Int,amount: Int) extends EtlOp[List[Record]] case class Store(recs: List[Record]) extends EtlOp[List[Response]] type ETL[A] = Free[EtlOp,A] def fetch(offset: Int,amount: Int): ETL[List[Record]] = Free.liftF(Fetch(offset,amount)) def store(recs: List[Record]): ETL[List[Response]] = Free.liftF(Store(recs)) def fetchStore(offset: Int,amount: Int): ETL[List[Response]] = fetch(offset,amount).flatMap(store)
但现在我们还没有期货?这是我们翻译的工作:
import cats.~> val interpretFutureDumb: EtlOp ~> Future = new (EtlOp ~> Future) { def apply[A](op: EtlOp[A]): Future[A] = op match { case Store(records) => Future.successful(records.map(rec => s"Resp($rec)")) // store in DB,send to webservice,... case Fetch(offset,amount) => Future.successful(List.fill(amount)(offset.toString)) // get from DB,from webservice,... } }
有了这个解释器(当然你会用更有用的东西替换Future.successful(…))我们可以得到我们的Future [List [Response]]:
val responses: Future[List[Response]] = fetchStore(1,5).foldMap(interpretFutureDumb) val records: Future[List[Record]] = fetch(2,4).foldMap(interpretFutureDumb) responses.foreach(println) // List(Resp(1),Resp(1),Resp(1)) records.foreach(println) // List(2,2,2)
但是我们仍然可以创建一个不会返回Future的不同解释器:
import scala.collection.mutable.ListBuffer import cats.Id val interpretSync: EtlOp ~> Id = new (EtlOp ~> Id) { val records: ListBuffer[Record] = ListBuffer() def apply[A](op: EtlOp[A]): Id[A] = op match { case Store(recs) => records ++= recs records.toList case Fetch(offset,amount) => records.drop(offset).take(amount).toList } } val etlResponse: ETL[List[Response]] = for { _ <- store(List("a","b","c","d")) records <- fetch(1,2) resp <- store(records) } yield resp val responses2: List[Response] = etlResponse.foldMap(interpretSync) // List(a,b,c,d,c)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。