如何解决IObservable ObserveOn 正在锁定线程,这是可以预防的吗?
我正在设计一个服务器,它将客户端请求转移到一个专用于处理数据的线程。我这样做是为了防止正在处理的数据出现任何竞争条件或并发问题。因为服务器被设计为反应式的,所以每当服务器收到请求时,我都会使用 Observables 将请求通知给程序的其余部分。现在因为服务器套接字正在侦听和发射来自多个线程的信号,我想确保无论服务器在哪个线程发射,都将始终观察专用数据处理线程。我选择使用 ObserveOn
方法,这立即适得其反。我立即注意到,在一次可观察到的射击时,其他人都没有射击。
不仅如此,发送到专用线程的其他操作也没有触发。
本质上,可观察对象似乎在为自己“声明”线程。该线程完全被 observable 阻塞,除了该 observable 的排放之外,不能用于任何其他事情。我不希望这种情况发生,因为该线程专用于所有数据处理操作,这阻止了我将该线程用于任何其他可观察对象或未来的数据处理任务。那么,我有什么选择可以防止 observable 将线程锁定到自身,或者强制将 observable 观察到我的专用线程而不阻塞其他 observable。
此示例代码演示了该问题。在这里,我们使用单线程任务调度程序,并注意到它运行得很好,直到第一个主题(已设置为 ObserveOn
调度程序)发出它的字符串。发生这种情况后,不会再触发任何主题或动作。第一个主题有效地为自己锁定了线程。
public static class Program
{
static void Main(string[] args)
{
//Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
var _t = new Tester();
string _string = "Hello World";
//These three will print their string to the console
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
//Only subject 1 will emit and print it's string,the other two fail
_t.PrintThroughSubject1(_string);//Succeeds
_t.PrintThroughSubject2(_string);//Fails
_t.PrintThroughSubject3(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
//We essentially can't do anything with the thread after subject 1 observed on it
Console.ReadLine();
}
public class Tester
{
TaskFactory tf;
TaskPoolScheduler pool;
int _actionCount = 0;
Subject<string> s1 = new Subject<string>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();
public Tester()
{
//We're create a task pool that uses a single threaded concurrent task scheduler
var _scheduler = new ConcurrentExclusiveSchedulerPair();
tf = new TaskFactory(_scheduler.ExclusiveScheduler);
pool = new TaskPoolScheduler(tf);
//And then we set the subjects to each be observed on the single threaded scheduler
s1.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s2.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s3.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
$"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
}
public void PrintThroughSubject1(string _string)
{
s1.OnNext(_string);
}
public void PrintThroughSubject2(string _string)
{
s2.OnNext(_string);
}
public void PrintThroughSubject3(string _string)
{
s3.OnNext(_string);
}
public void PrintDirectlyWithAction(string _string)
{
//This is here to demonstrate that the single threaded task scheduler accepts actions just fine
//and can handle them in sequence
tf.StartNew(() =>
{
Console.WriteLine(
$"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
});
}
}
}
TL;DR:我需要能够强制在特定线程上观察多个可观察的发射,但 RxNet 似乎只允许在一个线程上观察单个主题,而没有其他任何东西可以。我如何规避这一点以在同一线程上观察多个 observable?
解决方法
我可能把它复杂化了。 EventLoopScheduler
可能正是您所需要的。
试试这个:
public static class Program
{
static void Main(string[] args)
{
//Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
var _t = new Tester();
string _string = "Hello World";
//These three will print their string to the console
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
_t.PrintDirectlyWithAction(_string);//Succeeds
//Only subject 1 will emit and print it's string,the other two fail
_t.PrintThroughSubject1(_string);//Succeeds
_t.PrintThroughSubject2(_string);//Fails
_t.PrintThroughSubject3(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
_t.PrintDirectlyWithAction(_string);//Fails
//We essentially can't do anything with the thread after subject 1 observed on it
Console.ReadLine();
}
public class Tester
{
private EventLoopScheduler els = new EventLoopScheduler();
int _actionCount = 0;
Subject<string> s1 = new Subject<string>();
Subject<string> s2 = new Subject<string>();
Subject<string> s3 = new Subject<string>();
public Tester()
{
//We're create a task pool that uses a single threaded concurrent task scheduler
//And then we set the subjects to each be observed on the single threaded scheduler
s1.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s2.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
s3.ObserveOn(els).Subscribe(_s => Console.WriteLine(
$"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
}
public void PrintThroughSubject1(string _string)
{
s1.OnNext(_string);
}
public void PrintThroughSubject2(string _string)
{
s2.OnNext(_string);
}
public void PrintThroughSubject3(string _string)
{
s3.OnNext(_string);
}
public void PrintDirectlyWithAction(string _string)
{
//This is here to demonstrate that the single threaded task scheduler accepts actions just fine
//and can handle them in sequence
els.Schedule(() =>
{
Console.WriteLine(
$"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
});
}
}
}
我得到了这个结果:
Direct action (0) says "Hello World" - on thread 17
Direct action (1) says "Hello World" - on thread 17
Direct action (2) says "Hello World" - on thread 17
Subject (1) says "Hello World" - on thread 17
Subject (2) says "Hello World" - on thread 17
Subject (3) says "Hello World" - on thread 17
Direct action (3) says "Hello World" - on thread 17
Direct action (4) says "Hello World" - on thread 17
Direct action (5) says "Hello World" - on thread 17
完成后不要忘记.Dispose()
你的EventLoopScheduler
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。