微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Mongodb 查找和监视

如何解决Mongodb 查找和监视

我正在尝试通过使用查找和监视组合来实现一个使自身保持最新状态的结构。例如。对于用户集合,我们找到所有用户并将它们存储在哈希图中。之后,我们打开观察流并根据传入事件更新我们的哈希图。但是有一个问题——如果在读取用户和打开流之间发生变化,我们可能最终会不同步。

我的想法是使用 startAtOperationTime 选项,但我们无法可靠地知道我们需要的时间戳。我也没有看到通过交易来做到这一点的方法

问题是 - 我们如何在读取操作后完全打开更改流并且不丢失任何数据。

解决方法

好问题

恕我直言,如果不知道这些 deltas 应该适用于什么初始状态,一个带有 delta 变化的手表是非常无用的,所以当然你也需要做一个查找。 AFAIK 没有用于 mongo 的原子“查找和监视”命令。

即使从问题中删除增量并且仅使用 fullDocument=updateLookup 选项,仍然存在同步问题。考虑这种方法:

  1. collection.watch()
  2. changeStream.pause()
  3. await collection.find().toArray()
  4. changeStream.resume()

我们需要先设置手表,以便在我们发现时捕捉到发生的任何事情。乍一看,这看起来不错。如果在进行查找时发生了某些事情,并且流中有一些更改事件,则这些将在您恢复时得到处理。由于您在查找之前启动了监视,更糟糕的情况是更改流包含在查找之前发生的一些更改,并可能导致保存旧状态。这通常无关紧要,因为更改流也将包含最新的更新并最终保存正确的状态。

但是在找到之前,watch 真的在 mongo 集群上运行了吗?

find 查询是否甚至转到集群中正确同步的节点?

由于统一拓扑和连接池的概念,我认为我们不可能知道这些问题的答案。我们不知道上面几行命令最终会使用哪个连接。如果 mongo 离线,客户端将缓冲像 watch 和 find 这样的命令,然后在连接对池可用时释放它们。但是由于有多个连接在运行(可能在错误的时间阻塞),不能保证第 1 行中的 watch 命令会在第 3 行中的 find 命令之前到达 mongo。所以我很确定这个答案将在 99% 的情况下工作,但并非 100% 可靠。

您可以尝试通过检查 resumeTokenChanged 事件来确认 changeStream 已连接,然后进行查找,但如果您的集群出现某些节点 oplog 需要一段时间同步的问题,您仍然可以“查找()”旧数据并观看为时已晚。这意味着错过了更新。

更新

我认为我们希望的最佳解决方案是结合上述所有内容,并根据您所需的集群同步宽限期添加延迟。

  1. collection.watch()
  2. 通过检查 resumeTokenChanged 事件等待 changeStream 更新。这也意味着我们是相互联系的。
  3. 如果我们没有及时收到 resumeTokenChanged,请设置超时以考虑 changeStream 陈旧。这意味着我们的数据也可能过时。
  4. 等待集群节点同步的宽限期
  5. changeStream.pause()
  6. 如果我们仍未收到来自 changeStream 的任何 change 事件,则 await collection.find().toArray()
  7. changeStream.resume()
  8. 如果 changeStream 发出 error 事件,则重新开始整个过程​​。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。