微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 slick 避免 postgres 更新中的竞争条件

如何解决使用 slick 避免 postgres 更新中的竞争条件

case class Item(id: String,count: Int).  

class ItemRepo(db: Database) {
  val query = TableQuery[ItemTable]


def updateAmount(id: String,incCount :Int) = {
   val currentRow = db.run(query.filter(_.id === id).result).head
   val updatedRow =  Item(currentRow.id,currentRow.count + incCount)
   db.run((query returning query).insertOrUpdate(updatedRow))
}

上面的代码一个竞争条件——如果两个线程并行运行,它们可能都读取相同的计数,只有最后一个更新线程会增加它们的 incCount。

我怎样才能避免这种情况?我尝试在执行 .forUpdate 的行中使用 query.filter 但它不会阻止其他线程。我错过了什么吗?

解决方法

当您从数据库中获取数据时,您应该使用 SELECT ... FOR UPDATE,以便您在行上拥有一个排他锁,以防止其他会话在您的事务完成之前更新数据。

在 Slick 中,您可以使用 forUpdate 构造 available since version 3.2.0 来做到这一点。

,

您可以使用一些技巧来改善这种情况。

首先,您要向数据库发送两个独立的查询(两个 db.run 调用)。您可以通过将它们组合成一个动作并将其发送到数据库来改进它。例如:

// Danger: I've not tried to compile this. Please excuse typos.

val lookupAction = query.filter(_.id === id).result


val updateAction = lookupAction.flatMap { matchingRows =>
   val newItem = matchingRows.headOption match {
      case Some(Item(_,count)) => Item(id,count + incCount)
      case None => Item(id,1) // or whatever your default is 
   }
   (query returning query).insertOrUpdate(newItem)
}

// and you db.run(updateAction.transactionally)

这会让您有所收获,具体取决于您的数据库的事务保证。我提到它是因为在 Slick 中组合动作是一个重要的概念。这样,您的 forUpdate(Laurenz Albe 指出)可能会按预期运行。

但是,您可能更喜欢向数据库发送更新。您需要使用 Slick 的普通 SQL 功能来执行此操作:

val action = sqlu"UPDATE items SET count = count + $incCount WHERE id = $id"
// And then you db.run(action)

...并允许您的数据库处理并发性(取决于数据库隔离级别)。

如果你真的想在客户端做这一切,在 JVM 上的 Scala 代码中,有并发概念,比如锁、actor 和 refs。 Slick 本身没有什么可以为您进行 JVM 锁定。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。