参考更新和使用 Cats 效应的光纤触发器

如何解决参考更新和使用 Cats 效应的光纤触发器

问题: 我正在尝试解决一个问题,我需要每 x 分钟安排一次,我需要更新缓存并且可以并发获取。>

尝试过的解决方案:

  1. 将 TrieMap 和 ScheduledThreadPool Executor 与 Cats Effects 结合使用:

我实际上是从使用 TrieMap 开始的,因为它提供了线程安全性并使用了调度线程池来调度更新

import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode,IO,IOApp}

import java.util.concurrent.{Executors,ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt,FiniteDuration}
import scala.util.Random

object ExploreTrieMap extends IOApp {
  def callForEvery[A](f: => Unit,d: FiniteDuration)
                     (implicit sc: ScheduledExecutorService): IO[Unit] = {
    IO.cancelable {
      cb =>
        val r = new Runnable {
          override def run(): Unit = cb(Right(f))
        }
        val scFut = sc.scheduleAtFixedRate(r,d.length,d.unit)
        IO(scFut.cancel(false)).void
    }
  }

  val map = TrieMap.empty[String,String]
  override def run(args: List[String]): IO[ExitCode] = {
    implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
    for {
      _ <- callForEvery(println(map.get("token")),1 second)
      _ <- callForEvery(println(map.put("token",Random.nextString(10))),3 second)
    } yield ExitCode.Success
  }
}

  1. 使用 Ref 和 Cats Effect 纤维:

然后创造了一个纯粹的猫效应解决方案。

下面的代码会导致 StackOverflow 错误吗?

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift,ExitCode,Fiber,IOApp}

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt,FiniteDuration}
import scala.util.Random

object ExploreCatFiber extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO,String]("")
      s <- scheduleAndPopulate(ref,1 minute)
      r <- keepPollingUsingFiber(ref)
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulate(r: Ref[IO,String],duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO,Unit]] = {
    (for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      rS <- scheduleAndPopulate(r,duration)(cs)
      _ <- rS.join
    } yield ()).start(cs)
  }


  def keepPollingUsingFiber(r: Ref[IO,String])(implicit cs: ContextShift[IO]): IO[Fiber[IO,Unit]] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingFiber(r)(cs)
      _ <- w.join
    } yield ()).start(cs)
  }
}

我正在尝试更新一个 Ref,并将该 Ref 用作正在由另一个光纤更新的并发缓存。我正在使用递归触发光纤创建。我知道纤维可用于堆栈安全操作。在这种情况下,我将加入创建的旧光纤。所以想了解以下代码是否安全。

更新(下面提供的答案的解决方案)

第三种解决方案:基于答案之一的输入。与其为每个递归调用分叉,不如将其分叉给调用方。

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift,FiniteDuration}
import scala.util.Random

object ExploreCatFiberWithIO extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO,String]("")
      s <- scheduleAndPopulateWithIO(ref,1 second).start
      r <- keepPollingUsingIO(ref).start
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulateWithIO(r: Ref[IO,duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
    for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      _ <- scheduleAndPopulateWithIO(r,duration)(cs)
    } yield ()
  }

  def keepPollingUsingIO(r: Ref[IO,String])(implicit cs: ContextShift[IO]): IO[Unit] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingIO(r)(cs)
    } yield ())
  }
}

很想知道上述方法的优缺点。

解决方法

对于第二种方法,您可以通过不在 FiberscheduleAndPopulate 中分叉 keepPollingUsingFiber 来使其更简单。相反,保留递归调用,并将它们分叉到调用者中。 IO 是堆栈安全的,因此递归调用不会炸毁堆栈。

您可以使用 start 对每个进行分叉,但 parTupled 可能更简单。它是 parMapN 的一种变体,用于分叉每个效果并收集它们的结果。

(此外,在您的代码中,您不需要显式传递隐式值,例如 cs,编译器会为您推断它们。)

object ExploreCatFiber extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO,String]("")
      _ <- (scheduleAndPopulate(ref,1 minute),keepPollingUsingFiber(ref)).parTupled
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulate(r: Ref[IO,String],duration: FiniteDuration): IO[Unit] = {
    (for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      _ <- scheduleAndPopulate(r,duration)
    } yield ()
  }

  def keepPollingUsingFiber(r: Ref[IO,String]): IO[Unit] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      _ <- keepPollingUsingFiber(r)
    } yield ()
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res