如题,在尝试把 notify 的 debouncerwatcher 改成 async 模式, 主要是为了把 watcher 顺利放结构体里然后内部调用,实验发现不改 async 的话整个程序会死等事件
提了个问题 Is there some way to make notify debounce watcher async?,照着说法改着改着发现对于 tx channel
两次 move 以后已经没办法保证其生命周期了
这是不是说明我这样的改法不行,或者有没有别的方法可以来让 debouncer-watcher 变成 async 的模式?
具体代码及报错如下, 这里 main loop 是为了测试
use notify::{RecursiveMode, Watcher, ReadDirectoryChangesWatcher, Error}; use std::{path::Path, time::Duration}; use chrono::prelude::*; use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap, DebounceEventResult, DebouncedEvent}; use tokio::sync::mpsc::Receiver; pub struct NotifyHandler { pub notify_watcher: Option<Debouncer<ReadDirectoryChangesWatcher, FileIdMap>>, pub receiver: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<Error>>>> } impl NotifyHandler { pub async fn initialize_notify_scheduler(&mut self) { let (tx, rx) = tokio::sync::mpsc::channel(1); let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| { tokio::spawn(async move { if let Err(e) = tx.send(result).await { println!("Error sending event result: {:?}", e); } }); }); match debouncer { Ok(watcher)=> { println!("Initialize notify watcher success"); self.notify_watcher = Some(watcher); self.receiver = Some(rx); }, Err(error) => { println!("{:?}", error); } } } pub async fn watch(&mut self, path: &str) -> notify::Result<()> { let watch_path = Path::new(path); if watch_path.exists() { let is_file = watch_path.is_file(); println!("Valid path {} is file {}", path, is_file); } else { println!("watch path {:?} not exists", watch_path); } if let Some(mut watcher) = self.notify_watcher.take() { watcher .watcher() .watch(watch_path, RecursiveMode::Recursive)?; watcher .cache() .add_root(watch_path, RecursiveMode::Recursive); if let Some(mut rx) = self.receiver.take() { tokio::spawn(async move { while let Some(res) = rx.recv().await { match res { Ok(events) => { println!("events: {:?}", events); }, Err(errors) => { println!("errors: {:?}", errors) } } } }); } } Ok(()) } } #[tokio::main] async fn main() { let mut notifier: NotifyHandler = NotifyHandler { notify_watcher: None, receiver: None }; notifier.initialize_notify_scheduler().await; notifier.watch("D:\\TEMP\\TestNote.txt").await.unwrap(); loop { tokio::time::sleep(Duration::from_secs(3)).await; let time: DateTime<Local> = Local::now(); println!("{}: Hello, world!", time.format("%Y-%m-%d %H:%M:%S").to_string()); } }
expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce` required for `[closure@src\main.rs:16:69: 16:103]` to implement `DebounceEventHandler`rustcClick for full compiler diagnostic main.rs(16, 69): the requirement to implement `FnMut` derives from here main.rs(18, 33): closure is `FnOnce` because it moves the variable `tx` out of its environment main.rs(16, 25): required by a bound introduced by this call lib.rs(634, 25): required by a bound in `new_debouncer`
![]() | 1 araraloren 2023-08-09 09:57:33 +08:00 You should clone the `tx` before pass it to async block. ``` let tx = tx.clone(); tokio::spawn(async move { ... ``` |
![]() | 2 steins2628 OP @araraloren 感谢回答,倒是能通过编译了,但是 watcher 的 event 完全不会触发 ```rust let sapwn_tx = tx.clone(); let test_ts = tx.clone(); self.sender = Som(test_ts); tokio::spawn(async move { if let Err(e) = sapwn_tx.send(result).await { println!("Error sending event result: {:?}", e); } }) ... loop { ... if let Some(tx) = notifier.sender.take() { tx.send(Err(notify::error(notify::ErrorKind::PathNotfoundError))).await; } } ``` 然后我尝试把代码改成这样子,在 main loop 里手动发 error event 是正常接收的,但 watcher 的 event 是完全没有的, 我查了下相关问题比如 [Alternative to cloning tokio channel's sender for futures' closures]( https://stackoverflow.com/questions/5452165/alternative-to-cloning-tokio-channels-sender-for-futures-closures), 理论上 sender clone 不会对原对象有什么影响,这是不是能说明 debouncer_watcher 就是不能这么改? |
3 fakeshadow 2023-08-10 02:51:46 +08:00 你好,建议朋友附上`Cargo.tmol`以及最小化例子,这样能提高获得答案的几率。我把你的例子缩小了一下,并且给了出一个解决方案。 ``` use notify::{RecursiveMode, Watcher}; use notify_debouncer_full::{new_debouncer, DebounceEventResult}; use std::{path::Path, time::Duration}; #[tokio::main] async fn main() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let mut watcher = new_debouncer( Duration::from_secs(3), None, move |result: DebounceEventResult| { let _ = tx.send(result); }, ) .unwrap(); let path = Path::new("D:\\temp\\TestNote.txt"); watcher .watcher() .watch(&path, RecursiveMode::Recursive) .unwrap(); watcher.cache().add_root(&path, RecursiveMode::Recursive); while let Some(events) = rx.recv().await.transpose().unwrap() { println!("{events:?}"); } } ``` 你的问题本质看上去像是同步和异步的协同问题。我们一般可以采用`unbounded_channel`来让同步端无阻塞唤醒异步端,这个和你的代码基本上是等效的,这个方法有一个缺点就是异步端如果消费不及时,可能造成内存溢出。另一种方法是使用`tokio::sync::mpsc::Sender::blocking_send`方法,堵塞你的 debounder 线程来唤醒异步端,这个方法虽不会有内存泄漏的问题但消费端不及时会造成 debouncer 线程的阻塞。 |
4 fakeshadow 2023-08-10 02:59:00 +08:00 还有就是如果只需要单体观测的话可以使用 tokio::sync::watch::channel. |
![]() | 5 araraloren 2023-08-10 10:42:26 +08:00 @steins2628 The code you provide has a lot of problems. You take the `notify_watcher` from self, the `watcher` will drop end of `if` block. The tokio::spawn can not called from handler provide for `new_debouncer`, you should using `Handler::spawn` instead. |
![]() | 6 steins2628 OP @fakeshadow 我分类尝试了十种情况,发现只要是把 wacher 放结构体里,然后初始化和 rx 分开处理,就不会有 events 发出来,感觉和 notify 本体关系可能更大一点 [Test template for rust notify]( https://gist.github.com/Hellager/6bc77d610ff20932ccec379e20599083) @araraloren 感谢回答,handler::spawn 是 std::thread::spawn 吗?我最开始是 std handle block send 的,就像这样,但这样也是不行的,具体过程在 StackOverflow 那个问题里 ```rust fn get_runtime_handle() -> (Handle, Option<Runtime>) { match Handle::try_current() { Ok(h) => (h, None), Err(_) => { let rt = Runtime::new().unwrap(); (rt.handle().clone(), Some(rt)) } } } ... let debouncer = new_debouncer(Duration::from_secs(3), None, move |result: DebounceEventResult| { let (handle, _rt) = get_runtime_handle(); handle.block_on(async { tx.send(result).await.unwrap(); }) }); ... ``` |
![]() | 7 araraloren 2023-08-11 09:09:12 +08:00 Here is okay, not get what you mean. `https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e338107386436eedbdb8291c03107015` |
![]() | 8 steins2628 OP @araraloren Great thanks for your help, now it works! |