如何解决如何从actix SyncContext向其他角色发送消息?
我想实施一个长期运行的后台任务,可以将进度报告给其他Actor
。我已经做到了。
但我也希望能够再次取消长期运行的后台任务。
到目前为止,我是这样的:
use actix::prelude::*;
struct Worker {}
impl Actor for Worker {
type Context = SyncContext<Self>;
}
struct Manager {
worker: Addr<Worker>,}
impl Actor for Manager {
type Context = Context<Self>;
}
impl Supervised for Manager {}
impl SystemService for Manager {
fn service_started(&mut self,_ctx: &mut Context<Self>) {}
}
struct Work {}
#[derive(Message)]
#[rtype(result = "()")]
struct PerformWork(Work);
#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProgress(i32);
impl Handler<PerformWork> for Worker {
type Result = ();
fn handle(&mut self,msg: PerformWork,ctx: &mut Self::Context) -> Self::Result {
for i in 0..10000000 {
// Report progress
Manager::from_registry().do_send(ReportProgress(i));
// Do some very slow I/O.
thread::sleep(time::Duration::from_millis(1));
}
}
}
impl Handler<ReportProgress> for Manager {
type Result = ();
fn handle(&mut self,msg: ReportProgress,ctx: &mut Self::Context) -> Self::Result {
// Do something with the progress here
}
}
Manager
还处理将Message
PerformWork
发送到Message
的{{1}}。
我想到给Worker
ReportProgress
一个布尔值返回类型,该类型允许Message
决定是否应该退出循环。但是,我无法管理将具有返回值/结果的Worker
发送到Message
。
使用Manager
代替send()
会返回do_send()
,我无法在Future
内解析。
非常感谢任何想法。
更多背景信息:
- 真正慢的I / O是串行通信。
- actix的版本为0.10
解决方法
我找到了一种解决方案,但我不认为这是一个好方法。
我添加了一个Arc<AtomicBool>>
并传递给Worker
。 Manager
保留对布尔值的引用并可以对其进行修改。如果Worker
修改了布尔值,则Manager
会跳出循环。
use actix::prelude::*;
use std::sync::atomic::{AtomicBool,Ordering};
struct Worker {}
impl Actor for Worker {
type Context = SyncContext<Self>;
}
struct Manager {
worker: Addr<Worker>,}
impl Actor for Manager {
type Context = Context<Self>;
}
impl Supervised for Manager {}
impl SystemService for Manager {
fn service_started(&mut self,_ctx: &mut Context<Self>) {}
}
struct Work {}
#[derive(Message)]
#[rtype(result = "()")]
struct PerformWork(Work,Arc<AtomicBool>>);
#[derive(Message)]
#[rtype(result = "()")]
pub struct ReportProgress(i32);
impl Handler<PerformWork> for Worker {
type Result = ();
fn handle(&mut self,msg: PerformWork,ctx: &mut Self::Context) -> Self::Result {
for i in 0..10000000 {
// Report progress
Manager::from_registry().do_send(ReportProgress(i));
if msg.1.load(Ordering::Relaxed) {
break;
}
// Do some very slow I/O.
thread::sleep(time::Duration::from_millis(1));
}
}
}
impl Handler<ReportProgress> for Manager {
type Result = ();
fn handle(&mut self,msg: ReportProgress,ctx: &mut Self::Context) -> Self::Result {
// Do something with the progress here
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。