втором работает tokio с асинк циклом. В конце работы - первый тред ставит флаг exit и вызывает wake, чтобы футура во втором треде проснулась:
self.exit.store(true, Ordering::Relaxed);
self.waker.lock().unwrap().wake();
Во втором треде при получении wake() футура успешно завершается, после чего проверяется флаг exit и происходит выход:
pending_future.await; // гарантировано ожидаем wake тут
if self.exit.load(Ordering::Relaxed) {
return;
}
Вопрос вот в чем. exit - атомарный булиан, никаких операций кроме единичной записи перед wake() и чтения во втором треде после wake() нет. Возможна ли следующая последовательность исполнения на x86-64 или arm64 с relaxed мемори ордерингом?:
thread1: wake() - thread2: awaken - thread2:reads exit false - thread1: writes exit true
Как бы с одной стороны модификейшн ордер для exit: false, true и ничто не мешает второму треду получить из кэша false. Но с другой стороны в первом треде все-таки запись происходит до wake, а wake еще и под мьютексом - не вызовет ли что-то там все-таки какую-то синхронизацию кэшей?
Похоже да, wake() у себя внутри вызывает compare_exchange c AcqRel, который обещает что все записи в память от треда 1 до этого будут видны треду 2. Точно ли я ничего не путаю?
А через что ты будишь токио тред?
Просто waker из контекста. impl Future for PendingFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.get_mut(); // ... тут проверки не готова ли футура me.waker.lock().replace(cx.waker().clone()); Poll::Pending } }
Собственно проверил loomом: #[cfg(test)] mod atomic_tests { use loom::sync::atomic::AtomicBool; use loom::sync::atomic::Ordering; use loom::thread; use loom::thread::yield_now; #[test] fn test_sync() { loom::model(|| { let exit: &'static _ = Box::leak(Box::new(AtomicBool::new(false))); let wake: &'static _ = Box::leak(Box::new(AtomicBool::new(false))); let b= thread::spawn(move || { while !wake.load(Ordering::Acquire) { yield_now(); }; assert!(exit.load(Ordering::Relaxed)); }); exit.store(true, Ordering::Relaxed); // wake() в tokio под капотом: // https://github.com/tokio-rs/tokio/blob/master/tokio/src/runtime/task/state.rs#L215 // https://github.com/tokio-rs/tokio/blob/master/tokio/src/runtime/task/state.rs#L467 assert!(wake.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire).is_ok()); b.join().unwrap(); }) } }
Должно. Если глянуть в реализацию мьютексов, там там данные которые он охраняет даже не в атомике лежат и работает)
Обсуждают сегодня