use crossbeam_utils::atomic::AtomicCell; use dashmap::DashMap; use smol::channel::{Receiver, Sender}; pub struct EventBus { label: String, incr_val: AtomicCell, tx_map: DashMap>, } impl EventBus { pub fn with_label(label: String) -> Self { Self { label, incr_val: Default::default(), tx_map: Default::default(), } } pub async fn publish(&self, val: E) { let mut dropped_senders: Vec = vec![]; let mut keys: Vec = self.tx_map.iter().map(|x| x.key().to_owned()).collect(); let last_key = keys.pop(); for key in keys { if let Some(entry) = self.tx_map.get(&key) { if let Err(_) = entry.send(val.clone()).await { dropped_senders.push(key); } } } // 最后一个元素可以直接发送,减少一次clone if let Some(key) = last_key { if let Some(entry) = self.tx_map.get(&key) { if let Err(_) = entry.send(val).await { dropped_senders.push(key); } } } for key in dropped_senders.iter() { self.tx_map.remove(key); log::info!("[EventBus][{}] remove receiver {}", self.label, key); } } pub fn register_receiver(&self) -> Receiver { let (tx, rx) = smol::channel::unbounded(); let key = self.incr_val.fetch_add(1); self.tx_map.insert(key, tx); log::info!("[EventBus][{}] add receiver {}", self.label, key); rx } }