文盘Rust -- Mutex解决并发写文件乱序问题

rust,mutex,解决,并发,文件,乱序,问题 · 浏览次数 : 39

小编点评

**解决并发写文件问题的方法:** 1. **使用同步锁:**在写操作之前获取文件互斥锁,确保只有一个线程可以写文件。 2. **使用 `Arc` 或 `Mutex`:**创建一个共享变量,并在写操作之前将其锁定,确保多个线程可以访问该变量。 3. **使用 `tokio::sync::Mutex`:**创建一个单独的互斥锁,并在每个写操作之前获取该锁。 4. **使用异步操作:**将写操作封装为异步操作,并使用 `tokio::spawn` 创建多个线程执行该操作。 5. **使用 `Semaphore`:**创建一个信号量,并在写操作之前将其加到信号量中。 6. **使用 `RwLock`:**使用 `RwLock` 类来访问文件,它可以确保多个线程可以读取或写入文件,并在写操作之前将其加到信号量中。 **示例代码:** ```rust use std::fs::{self, File, OpenOptions}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; fn main() { println!(\"parallel write file!\"); let max_tasks = 200; let file_ref = OpenOptions::new() .create(true) .write(true) .append(true) .open("/tmp/parallel") .unwrap(); let f = Arc::new(Mutex::new(file_ref)); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { loop { while f.lock().await.len() >= max_tasks { f.lock().await.join_next().await; } let mut file = Arc::clone(&f.lock().await); for i in 0..1000 { let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); let content = now.as_secs().to_string(); file.write_all(content.as_bytes()).unwrap(); file.write_all("\n".as_bytes()).unwrap(); } } }); } ``` **注意:** * 在使用任何锁或互斥锁之前,请确保文件已打开。 * 使用 `Arc` 或 `Mutex` 时,请注意线程安全问题。 * 使用 `tokio::sync::Mutex` 或 `RwLock` 时,请确保所有线程都使用相同的锁。

正文

在实际开发过程中,我们可能会遇到并发写文件的场景,如果处理不当很可能出现文件内容乱序问题。下面我们通过一个示例程序描述这一过程并给出解决该问题的方法。

use std::{
    fs::{self, File, OpenOptions},
    io::{Write},
    sync::Arc,
    time::{SystemTime, UNIX_EPOCH},
};
use tokio::task::JoinSet;

fn main() {
    println!("parallel write file!");
    let max_tasks = 200;
    let _ = fs::remove_file("/tmp/parallel");
    let file_ref = OpenOptions::new()
        .create(true)
        .write(true)
        .append(true)
        .open("/tmp/parallel")
        .unwrap();

    let mut set: JoinSet<()> = JoinSet::new();
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        loop {
            while set.len() >= max_tasks {
                set.join_next().await;
            }
            未做写互斥函数
            let mut file_ref = OpenOptions::new()
                .create(true)
                .write(true)
                .append(true)
                .open("/tmp/parallel")
                .unwrap();
            set.spawn(async move { write_line(&mut file_ref) });
        }
    });
}

fn write_line(file: &mut File) {
    for i in 0..1000 {
        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
        let mut content = now.as_secs().to_string();
        content.push_str("_");
        content.push_str(&i.to_string());

        file.write_all(content.as_bytes()).unwrap();
        file.write_all("\n".as_bytes()).unwrap();
        file.write_all("\n".as_bytes()).unwrap();
    }
}


代码不复杂,tokio 实现一个并发runtime,写文件函数是直接写时间戳,为了方便展示乱序所以写入两次换行。

输出的文本大概长这样

1691287258_979





1691287258_7931691287258_301

1691287258_7431691287258_603

1691287258_8941691287258_47






1691287258_895
1691287258_553

1691287258_950
1691287258_980


1691287258_48
1691287258_302

1691287258_896
1691287258_744




1691287258_6041691287258_554


很明显,写入并未达到预期,间隔并不平均,函数内部的执行步骤是乱序的。

我们把上面的程序改造一下

use std::{
    fs::{self, File, OpenOptions},
    io::Write,
    sync::Arc,
    time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::Mutex;
use tokio::task::JoinSet;

fn main() {
    println!("parallel write file!");
    let max_tasks = 200;
    let _ = fs::remove_file("/tmp/parallel");
    let file_ref = OpenOptions::new()
        .create(true)
        .write(true)
        .append(true)
        .open("/tmp/parallel")
        .unwrap();

    let f = Arc::new(Mutex::new(file_ref));

    let mut set: JoinSet<()> = JoinSet::new();
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        loop {
            while set.len() >= max_tasks {
                set.join_next().await;
            }

            let mut file = Arc::clone(&f);
            set.spawn(async move { write_line_mutex(&mut file).await });
        }
    });
}

async fn write_line_mutex(mutex_file: &Arc<Mutex<File>>) {
    for i in 0..1000 {
        let mut f = mutex_file.lock().await;
        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
        let mut content = now.as_secs().to_string();
        content.push_str("_");
        content.push_str(&i.to_string());

        f.write_all(content.as_bytes()).unwrap();
        f.write_all("\n".as_bytes()).unwrap();
        f.write_all("\n".as_bytes()).unwrap();
    }
}


这次我们用到了tokio::sync::Mutex,write_line_mutex函数在每次执行写任务以前先获取文件互斥锁。

看看这次的文件内容

1691288040_374

1691288040_374

1691288040_374

1691288040_375

1691288040_374

1691288040_374

1691288040_374

1691288040_374

1691288040_374

1691288040_374

1691288040_374

1691288040_374

1691288040_374

1691288040_374

1691288040_375

1691288040_375

1691288040_374

1691288040_375

1691288040_375

1691288040_375

1691288040_375

1691288040_375

1691288040_375

1691288040_375

1691288040_375

1691288040_375

1691288040_375


写入的格式正确,保证每次函数写函数完整执行。

关于文件写互斥这点事儿,今儿就聊到这。

完整源码

作者:京东科技 贾世闻

来源:京东云开发者社区

与文盘Rust -- Mutex解决并发写文件乱序问题相似的内容:

文盘Rust -- Mutex解决并发写文件乱序问题

在实际开发过程中,我们可能会遇到并发写文件的场景,如果处理不当很可能出现文件内容乱序问题。下面我们通过一个示例程序描述这一过程并给出解决该问题的方法。

文盘Rust -- 把程序作为守护进程启动

当我们写完一个服务端程序,需要上线部署的时候,或多或少都会和操作系统的守护进程打交道,毕竟谁也不希望shell关闭既停服。今天我们就来聊聊这个事儿。 最早大家部署应用的通常操作是 “nohup xxxx &”,别说像weblogic 或者其他java 容器有启动脚本,里面其实也差不多;很喜欢 ngi

文盘Rust -- r2d2 实现redis连接池

作者:贾世闻 我们在开发应用后端系统的时候经常要和各种数据库、缓存等资源打交道。这一期,我们聊聊如何访问redis 并将资源池化。 在一个应用后端程序访问redis主要要做的工作有两个,单例和池化。 在后端应用集成redis,我们主要用到以下几个crate:​ ​once_cell​​​、​ ​re

文盘Rust -- rust 连接云上数仓 starwift

最近想看看 rust 如何集成 clickhouse,又犯了好吃懒做的心理(不想自己建环境),刚好京东云发布了兼容ck 的云原生数仓 Starwfit,于是搞了个实例折腾一番。 Starwfit 是京东云自主研发的新一代云原生数据仓库,通过存算分离降低了存储成本,同时兼具性能和扩展弹性。其写入和查询速度可达到传统数据仓库的数倍,为用户提供实时数据分析能力。广泛应用于流量分析、精准营销、用户画像、广

文盘Rust -- 领域交互模式如何实现

书接上文,上回说到如何通过interactcli-rs四步实现一个命令行程序。但是 shell 交互模式在有些场景下用户体验并不是很好。比如我们要连接某个服务,比如 mysql 或者 redis 这样的服务。如果每次交互都需要输入地址、端口、用户名等信息,交互起来太麻烦。通常的做法是一次性输入和连接相关的信息或者由统一配置文件进行管理,然后进入领域交互模式,所有的命令和反馈都和该领域相关。inte

文盘Rust -- 本地库引发的依赖冲突

clickhouse 的原生 rust 客户端目前比较好的有两个clickhouse-rs 和 clickhouse.rs 。两个库在单独使用时没有任何问题,但是,在同一工程同时引用时会报错。本篇内容主要讲解如何用rust语言解决本地库引发的依赖冲突问题

文盘Rust -- 安全连接 TiDB/Mysql

最近在折腾rust与数据库集成,选了Tidb Cloud Serverless Tier 作为数据源。Tidb 无疑是近五年来最优秀的国产开源分布式数据库,Tidb Cloud Serverless Tier作为pingcap旗下的云产品方便又经济,这次使用还有一些小惊喜。

文盘Rust -- FFI 浅尝

rust FFI 是rust与其他语言互调的桥梁,通过FFI rust 可以有效继承 C 语言的历史资产。本期通过几个例子来聊聊rust与C 语言交互的具体步骤

文盘Rust -- tokio绑定cpu实践

tokio 是 rust 生态中流行的异步运行时框架。在实际生产中我们如果希望 tokio 应用程序与特定的 cpu core 绑定该怎么处理呢?这次我们来聊聊这个话题。

文盘Rust -- 生命周期问题引发的 static hashmap 锁

2021年上半年,撸了个rust cli开发的框架,基本上把交互模式,子命令提示这些cli该有的常用功能做进去了。项目地址:[https://github.com/jiashiwen/interactcli-rs。](https://github.com/jiashiwen/interactcli-