结合 subscribe(on:options:) 操作符

如何解决结合 subscribe(on:options:) 操作符

我有一个关于 subscribe(on:options:) 运算符的问题。如果有人能帮我弄清楚,我将不胜感激。

我们从文档中得到了什么:

指定要在其上执行订阅、取消和请求操作的调度程序。 与影响下游消息的receive(on:options:)相反,subscribe(on:options:)改变了上游消息的执行上下文。

此外,我从不同的文章中得到的是,除非我们明确指定 Scheduler 来接收我们的下游消息(使用 receive(on:options:)),否则消息将在所使用的 Scheduler 上发送用于接收订阅

此信息与我在执行过程中实际获得的信息不一致。

我有一个代码

Just("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: dispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

我希望下一个输出

Map: false
Sink: false

但我得到的是:

Map: true
Sink: false

当我使用 Sequence 发布者时也会发生同样的事情。

如果我交换 map 运算符和 subscribe 运算符的位置,我会得到我想要的:

Just("Some text")
    .subscribe(on: dispatchQueue.global())
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出

Map: false
Sink: false

有趣的事实是,当我使用与自定义发布商的第一个列表相同的运算符顺序时,我会收到我想要的行为:

struct TestJust<Output>: Publisher {
    typealias Failure = Never
    
    private let value: Output
    
    init(_ output: Output) {
        self.value = output
    }
    
    func receive<S>(subscriber: S) where S : Subscriber,Failure == S.Failure,Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(value)
        subscriber.receive(completion: .finished)
    }
}

TestJust("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: dispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

输出

Map: false
Sink: false

所以我认为要么是我对所有这些机制的完全误解,要么是一些发布者故意选择线程来发布值 (Just,Sequence -> Main,{{1 }} -> URLSession.DataTaskPublisher),这对我来说没有意义,因为在这种情况下我们为什么需要这个 Some of Background

你能帮我理解我错过了什么吗?提前致谢。

解决方法

首先要理解的是消息既上行管道,下行管道。沿着管道(“上游”)上行的消息是:

  • 订阅的实际表现(接收订阅)

  • 订阅者向上游发布者请求新值

  • 取消消息(这些消息从最终订阅者向上渗透)

在管道(“下游”)流动的消息是:

  • 价值观

  • 完成,由失败(错误)或正常完成(报告发布者发出其最后一个值)组成

好的,正如文档明确指出的那样,subscribe(on:) 是关于前者:向上游流动的消息。但是您实际上并没有在测试中跟踪任何那些消息,因此您的结果都没有反映有关它们的任何信息!在订阅点上方插入适当的 handleEvents 运算符,以查看管道向上流动的内容(例如,实现其 receiveRequest: 参数):

Just("Some text")
    .handleEvents(receiveRequest: {
        _ in print("Handle1: \(Thread.isMainThread)")
    })
    .map // etc.

同时,您应该不要假设消息将在哪个线程下游(即值和完成)。你说:

此外,我从不同的文章中得到的是,除非我们明确指定调度程序来接收我们的下游消息(使用 receive(on:options:)),否则消息将在用于接收订阅的调度程序上发送。

但这似乎是一个虚假的假设。您的代码没有任何内容以明确的方式确定下游发送线程。正如您所说的那样,您可以通过receive(on:)来控制这件事,但如果您不这样做,我会说您必须对此事不做任何假设。一些发布者肯定会在后台线程上产生一个值,例如数据任务发布者,这是完全合理的(同样的事情发生在数据任务完成处理程序上)。其他人没有。

可以假设的是,除 receive(on:) 之外的运算符通常不会改变值传递线程。但是,运营商是否以及如何使用订阅线程来确定接收线程,这是您不应该假设的。要控制接收线程,请控制它!调用 receive(on:) 或假设什么都不做。

举个例子,如果你把开场改为

Just("Some text")
    .receive(on: DispatchQueue.main)

然后您的 map 和您的 sink 将报告它们正在主线程上接收值。为什么?因为你控制了接收线程。无论您在任何 subscribe(on:) 命令中说什么,这都有效。它们是完全不同的事情。

也许如果你调用subscribe(on:)但你不调用receive(on:),关于下游发送线程的一些事情是由subscribe(on:)线程决定的,但我肯定不会依赖关于它有任何硬性规定;文档中没有任何内容!相反,不要这样做。如果您实施 subscribe(on:),也请实施 receive(on:),以便负责所发生的事情。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?