Есть таска со входящим стримом данных, которая броадкастит данные на клиентов. Как только появляется клиент, я добавляю его в броадкаст. Проблема в том, что на каждый send() мне приходится лочить мьютекс, чего я делать совершенно не хочу по-причине того, что из-за этого мьютекса свич контекста токио - это просто rng. Вопрос таков, можно ли вообще избавиться от этого мьютекса (пример ниже, надеюсь не слишком простыня, чтобы её в плейграунд нести) ?
let (broadcast, _) = tokio::sync::broadcast::channel::<Bytes>(100);
let tx_broadcast = std::sync::Arc::new(std::sync::Mutex::new(broadcast));
let rx_broadcast = tx_broadcast.clone();
tokio::spawn(async move {
while let Some((_, data)) = connection.try_next().await.unwrap() {
tx_broadcast.lock().unwrap().send(data);
}
});
while let Some(Ok(mut socket)) = multiplexer.next().await {
let mut receiver = rx_broadcast.lock().unwrap().subscribe();
tokio::spawn(async move {
let mut stream = stream::unfold(receiver, |mut rx| async move {
loop {
match rx.recv().await {
Ok(data) => {
break Some((Ok((Instant::now(), data)), rx));
}
Err(_) => continue,
}
}
})
.boxed();
socket.send_all(&mut stream).await;
});
}
"Sender handles are clone-able, allowing concurrent send and receive actions". Arc<Mutex<>> не нужен
В шары долблюсь, сори. Спасибо большое, почему то в упор не увидел этого. Ласт вопрос по этому куску кода. Можно ли переписать этот луп красивее ? loop { match rx.recv().await { Ok(data) => { break Some((Ok((Instant::now(), data)), rx)); } Err(_) => continue, } }
if let Ok(data) = ...
Я имею в виду без лупа в принципе. Я так понимаю, через while я этого не сделаю ?
while let Ok(x) = rx.recv().await { ... }
проявить фантазию конечно придется
Завернуть в tokio_stream::wrappers::BroadcastStream и .filter_map(Result::ok)
А почему именно broadcast?
может как то вроде такого while let Err(_) = rx.recv().await {} let value = Some((Ok((Instant::now(), rx.recv().await.unwrap())), rx));
Receiver такой.
Не знаю валидный ли код, не тестировал
Спасибо, попробую.
Да, .boxed() там не особо нужен (если он не для уменьшения размера бинарника, конечно). unfold тоже не будет нужен
Почему?
Обсуждают сегодня