两次 move 以后怎样保证 tokio channel 有足够长的生命周期? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
steins2628
V2EX    Rust

两次 move 以后怎样保证 tokio channel 有足够长的生命周期?

  •  
  •   steins2628 2023-08-08 22:57:41 +08:00 2014 次点击
    这是一个创建于 796 天前的主题,其中的信息可能已经有所发展或是发生改变。

    如题,在尝试把 notify 的 debouncerwatcher 改成 async 模式, 主要是为了把 watcher 顺利放结构体里然后内部调用,实验发现不改 async 的话整个程序会死等事件

    提了个问题 Is there some way to make notify debounce watcher async?,照着说法改着改着发现对于 tx channel 两次 move 以后已经没办法保证其生命周期了

    这是不是说明我这样的改法不行,或者有没有别的方法可以来让 debouncer-watcher 变成 async 的模式?

    具体代码及报错如下, 这里 main loop 是为了测试

    use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher, Error}; use std::{path::Path, time::Duration}; use chrono::prelude::*; use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult, DebouncedEvent}; use tokio::sync::mpsc::Receiver; pub struct NotifyHandler { pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>, pub receiver: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>> } impl NotifyHandler { pub async fn initialize_notify_scheduler(&mut self) { let (tx, rx) = tokio::sync::mpsc::channel(1); let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| { tokio::spawn(async move { if let Err(e) = tx.send(result).await { println!("Error sending event result: {:?}", e); } }); }); match debouncer { Ok(watcher)=> { println!("Initialize notify watcher success"); self.notify_watcher = Some(watcher); self.receiver = Some(rx); }, Err(error) => { println!("{:?}", error); } } } pub async fn watch(&mut self, path: &str) -> notify::Result<()> { let watch_path = Path::new(path); if watch_path.exists() { let is_file = watch_path.is_file(); println!("Valid path {} is file {}", path, is_file); } else { println!("watch path {:?} not exists", watch_path); } if let Some(mut watcher) = self.notify_watcher.take() { watcher .watcher() .watch(watch_path, RecursiveMode::Recursive)?; watcher .cache() .add_root(watch_path, RecursiveMode::Recursive); if let Some(mut rx) = self.receiver.take() { tokio::spawn(async move { while let Some(res) = rx.recv().await { match res { Ok(events) => { println!("events: {:?}", events); }, Err(errors) => { println!("errors: {:?}", errors) } } } }); } } Ok(()) } } #[tokio::main] async fn main() { let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None, receiver: None }; notifier.initialize_notify_scheduler().await; notifier.watch("D:\\TEMP\\TestNote.txt").await.unwrap(); loop { tokio::time::sleep(Duration::from_secs(3)).await; let time: DateTime<Local> = Local::now(); println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string()); } } 
    expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce` required for `[closure@src\main.rs:16:69: 16:103]` to implement `DebounceEventHandler`rustcClick for full compiler diagnostic main.rs(16, 69): the requirement to implement `FnMut` derives from here main.rs(18, 33): closure is `FnOnce` because it moves the variable `tx` out of its environment main.rs(16, 25): required by a bound introduced by this call lib.rs(634, 25): required by a bound in `new_debouncer` 
    8 条回复    2023-08-12 22:09:38 +08:00
    araraloren
        1
    araraloren  
       2023-08-09 09:57:33 +08:00
    You should clone the `tx` before pass it to async block.

    ```
    let tx = tx.clone();
    tokio::spawn(async move {
    ...
    ```
    steins2628
        2
    steins2628  
    OP
       2023-08-09 22:41:32 +08:00
    @araraloren 感谢回答,倒是能通过编译了,但是 watcher 的 event 完全不会触发

    ```rust
    let sapwn_tx = tx.clone();
    let test_ts = tx.clone();
    self.sender = Som(test_ts);

    tokio::spawn(async move {
    if let Err(e) = sapwn_tx.send(result).await {
    println!("Error sending event result: {:?}", e);
    }
    })

    ...

    loop {
    ...
    if let Some(tx) = notifier.sender.take() {
    tx.send(Err(notify::error(notify::ErrorKind::PathNotfoundError))).await;
    }
    }
    ```

    然后我尝试把代码改成这样子,在 main loop 里手动发 error event 是正常接收的,但 watcher 的 event 是完全没有的,
    我查了下相关问题比如 [Alternative to cloning tokio channel's sender for futures' closures]( https://stackoverflow.com/questions/5452165/alternative-to-cloning-tokio-channels-sender-for-futures-closures), 理论上 sender clone 不会对原对象有什么影响,这是不是能说明 debouncer_watcher 就是不能这么改?
    fakeshadow
        3
    fakeshadow  
       2023-08-10 02:51:46 +08:00
    你好,建议朋友附上`Cargo.tmol`以及最小化例子,这样能提高获得答案的几率。我把你的例子缩小了一下,并且给了出一个解决方案。
    ```
    use notify::{RecursiveMode, Watcher};
    use notify_debouncer_full::{new_debouncer, DebounceEventResult};
    use std::{path::Path, time::Duration};

    #[tokio::main]
    async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

    let mut watcher = new_debouncer(
    Duration::from_secs(3),
    None,
    move |result: DebounceEventResult| {
    let _ = tx.send(result);
    },
    )
    .unwrap();

    let path = Path::new("D:\\temp\\TestNote.txt");

    watcher
    .watcher()
    .watch(&path, RecursiveMode::Recursive)
    .unwrap();

    watcher.cache().add_root(&path, RecursiveMode::Recursive);

    while let Some(events) = rx.recv().await.transpose().unwrap() {
    println!("{events:?}");
    }
    }

    ```
    你的问题本质看上去像是同步和异步的协同问题。我们一般可以采用`unbounded_channel`来让同步端无阻塞唤醒异步端,这个和你的代码基本上是等效的,这个方法有一个缺点就是异步端如果消费不及时,可能造成内存溢出。另一种方法是使用`tokio::sync::mpsc::Sender::blocking_send`方法,堵塞你的 debounder 线程来唤醒异步端,这个方法虽不会有内存泄漏的问题但消费端不及时会造成 debouncer 线程的阻塞。
    fakeshadow
        4
    fakeshadow  
       2023-08-10 02:59:00 +08:00
    还有就是如果只需要单体观测的话可以使用 tokio::sync::watch::channel.
    araraloren
        5
    araraloren  
       2023-08-10 10:42:26 +08:00
    @steins2628 The code you provide has a lot of problems.
    You take the `notify_watcher` from self, the `watcher` will drop end of `if` block.
    The tokio::spawn can not called from handler provide for `new_debouncer`, you should using `Handler::spawn` instead.
    steins2628
        6
    steins2628  
    OP
       2023-08-10 22:18:05 +08:00
    @fakeshadow 我分类尝试了十种情况,发现只要是把 wacher 放结构体里,然后初始化和 rx 分开处理,就不会有 events 发出来,感觉和 notify 本体关系可能更大一点
    [Test template for rust notify]( https://gist.github.com/Hellager/6bc77d610ff20932ccec379e20599083)


    @araraloren 感谢回答,handler::spawn 是 std::thread::spawn 吗?我最开始是 std handle block send 的,就像这样,但这样也是不行的,具体过程在 StackOverflow 那个问题里

    ```rust

    fn get_runtime_handle() -> (Handle, Option<Runtime>) {
    match Handle::try_current() {
    Ok(h) => (h, None),
    Err(_) => {
    let rt = Runtime::new().unwrap();
    (rt.handle().clone(), Some(rt))
    }
    }
    }

    ...
    let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| {
    let (handle, _rt) = get_runtime_handle();
    handle.block_on(async {
    tx.send(result).await.unwrap();
    })
    });
    ...
    ```
    araraloren
        7
    araraloren  
       2023-08-11 09:09:12 +08:00
    steins2628
        8
    steins2628  
    OP
       2023-08-12 22:09:38 +08:00
    @araraloren Great thanks for your help, now it works!
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5436 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 01:21 PVG 09:21 LAX 18:21 JFK 21:21
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86