]> Untitled Git - lemmy.git/blobdiff - crates/api_common/src/send_activity.rs
Fix process shutdown (#3673)
[lemmy.git] / crates / api_common / src / send_activity.rs
index a2bc9a6de7009fbaa130dc4c1091236e7a03d889..6c91258ec29b1ccd14e83743a0c52245f495482f 100644 (file)
@@ -4,10 +4,13 @@ use futures::future::BoxFuture;
 use lemmy_db_schema::source::post::Post;
 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
 use once_cell::sync::{Lazy, OnceCell};
-use tokio::sync::{
-  mpsc,
-  mpsc::{UnboundedReceiver, UnboundedSender},
-  Mutex,
+use tokio::{
+  sync::{
+    mpsc,
+    mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
+    Mutex,
+  },
+  task::JoinHandle,
 };
 
 type MatchOutgoingActivitiesBoxed =
@@ -21,17 +24,22 @@ pub enum SendActivityData {
   CreatePost(Post),
 }
 
+// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
+//       ctrl+c still works.
 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
   let (sender, receiver) = mpsc::unbounded_channel();
+  let weak_sender = sender.downgrade();
   ActivityChannel {
-    sender,
+    weak_sender,
     receiver: Mutex::new(receiver),
+    keepalive_sender: Mutex::new(Some(sender)),
   }
 });
 
 pub struct ActivityChannel {
-  sender: UnboundedSender<SendActivityData>,
+  weak_sender: WeakUnboundedSender<SendActivityData>,
   receiver: Mutex<UnboundedReceiver<SendActivityData>>,
+  keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
 }
 
 impl ActivityChannel {
@@ -49,10 +57,18 @@ impl ActivityChannel {
         .get()
         .expect("retrieve function pointer")(data, context)
       .await?;
-    } else {
-      let lock = &ACTIVITY_CHANNEL.sender;
-      lock.send(data)?;
     }
+    // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
+    // not sure which way is more efficient
+    else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
+      sender.send(data)?;
+    }
+    Ok(())
+  }
+
+  pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
+    ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
+    outgoing_activities_task.await??;
     Ok(())
   }
 }