微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 async rust 中处理重复插入到数据库中

如何解决在 async rust 中处理重复插入到数据库中

这里是 Rust 和异步编程的初学者。

我有一个函数可以下载并在数据库中存储一堆推文:

pub async fn process_user_timeline(config: &Settings,pool: &PgPool,user_object: &Value) {
    // get timeline
    if let Ok((user_timeline,_)) =
        get_user_timeline(config,user_object["id"].as_str().unwrap()).await
    {
        // store tweets
        if let Some(tweets) = user_timeline["data"].as_array() {
            for tweet in tweets.iter() {
                store_tweet(pool,&tweet,&user_timeline,"normal")
                    .await
                    .unwrap_or_else(|e| {
                        println!(
                            ">>>X>>> Failed to store tweet {}: {:?}",tweet["id"].as_str().unwrap(),e
                        )
                    });
            }
        }
    }
}

它被另一个函数在异步循环中调用

pub async fn loop_until_hit_rate_limit<'a,T,Fut>(
    object_arr: &'a [T],settings: &'a Settings,pool: &'a PgPool,f: impl Fn(&'a Settings,&'a PgPool,&'a T) -> Fut + copy,rate_limit: usize,) where
    Fut: Future,{
    let total = object_arr.len();
    let capped_total = min(total,rate_limit);

    let mut futs = vec![];
    for (i,object) in object_arr[..capped_total].iter().enumerate() {
        futs.push(async move {
            println!(">>> PROCESSING {}/{}",i + 1,total);
            f(settings,pool,object).await;
        });
    }
    futures::future::join_all(futs).await;
}

有时两个异步任务会尝试同时插入相同的推文,从而产生此错误

Failed to store tweet 1398307091442409475: Database(PgDatabaseError { severity: Error,code: "23505",message: "duplicate key value violates unique constraint \"tweets_tweet_id_key\"",detail: Some("Key (tweet_id)=(1398307091442409475) already exists."),hint: None,position: None,where: None,schema: Some("public"),table: Some("tweets"),column: None,data_type: None,constraint: Some("tweets_tweet_id_key"),file: Some("nbtinsert.c"),line: Some(656),routine: Some("_bt_check_unique") })

请注意,代码在插入推文之前已经检查是否存在推文,因此这只发生在以下情况:从任务 1 读取 > 从任务 2 读取 > 从任务 1 写入(成功)> 从任务 2 写入(错误)。

为了解决这个问题,我迄今为止最好的尝试是放置一个 unwrap_or_else() 子句,它可以让其中一个任务失败而不会在整个执行过程中出现恐慌。我知道至少有一个缺点 - 有时两项任务都会完成,推文永远不会被写出来。它发生在

我不知道我的方法还有其他缺点吗?

处理这个问题的正确方法是什么?我讨厌丢失数据,更糟糕的是不确定性地丢失数据。

PS 我使用 actix websqlx 作为我的网络服务器/数据库库。

解决方法

通常对于可能由多个线程/进程编写的任何内容,任何逻辑,如

if (!exists) {
  writeValue()
}

要么需要某种锁的保护,要么需要更改代码以进行原子写入,但写入可能会失败,因为其他内容已经写入。

对于 Rust 中的内存数据,您可以使用 Mutex 来确保您可以读取数据,然后在其他任何读取数据之前将其写回,或者使用 Atomic 以这样的方式修改数据方式,如果已经写了它,你可以检测到。

在数据库中,对于可能与同时发生的其他查询发生冲突的任何查询,您需要在查询中使用 ON CONFLICT 子句,以便数据库本身知道在尝试时要做什么写入数据,它已经存在。

对于你的情况,因为我猜推文是不可变的,你可能想要做 ON CONFLICT tweet_id DO NOTHING(或任何你的 ID 列),在这种情况下,INSERT 将跳过插入,如果已经有一条带有您插入的 ID 的推文,它不会抛出错误。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?