X-Git-Url: http://these/git/?a=blobdiff_plain;f=crates%2Fapi_common%2Fsrc%2Fsend_activity.rs;h=6c91258ec29b1ccd14e83743a0c52245f495482f;hb=ccc122100e86b826505c7aff44a95b034959df58;hp=a2bc9a6de7009fbaa130dc4c1091236e7a03d889;hpb=5d23ef960e3361d1cc38c1aff1d907d8da58b90f;p=lemmy.git diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs index a2bc9a6d..6c91258e 100644 --- a/crates/api_common/src/send_activity.rs +++ b/crates/api_common/src/send_activity.rs @@ -4,10 +4,13 @@ use futures::future::BoxFuture; use lemmy_db_schema::source::post::Post; use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION}; use once_cell::sync::{Lazy, OnceCell}; -use tokio::sync::{ - mpsc, - mpsc::{UnboundedReceiver, UnboundedSender}, - Mutex, +use tokio::{ + sync::{ + mpsc, + mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender}, + Mutex, + }, + task::JoinHandle, }; type MatchOutgoingActivitiesBoxed = @@ -21,17 +24,22 @@ pub enum SendActivityData { CreatePost(Post), } +// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with +// ctrl+c still works. static ACTIVITY_CHANNEL: Lazy = Lazy::new(|| { let (sender, receiver) = mpsc::unbounded_channel(); + let weak_sender = sender.downgrade(); ActivityChannel { - sender, + weak_sender, receiver: Mutex::new(receiver), + keepalive_sender: Mutex::new(Some(sender)), } }); pub struct ActivityChannel { - sender: UnboundedSender, + weak_sender: WeakUnboundedSender, receiver: Mutex>, + keepalive_sender: Mutex>>, } impl ActivityChannel { @@ -49,10 +57,18 @@ impl ActivityChannel { .get() .expect("retrieve function pointer")(data, context) .await?; - } else { - let lock = &ACTIVITY_CHANNEL.sender; - lock.send(data)?; } + // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender, + // not sure which way is more efficient + else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() { + sender.send(data)?; + } + Ok(()) + } + + pub async fn close(outgoing_activities_task: JoinHandle>) -> LemmyResult<()> { + ACTIVITY_CHANNEL.keepalive_sender.lock().await.take(); + outgoing_activities_task.await??; Ok(()) } }