From: Nutomic <me@nutomic.com>
Date: Thu, 20 Jul 2023 15:36:48 +0000 (+0200)
Subject: Fix process shutdown (#3673)
X-Git-Url: http://these/git/%22https:/lemmy.ml/u/Liwott/%7B%60%24%7BarchiveTodayUrl%7D/?a=commitdiff_plain;h=ccc122100e86b826505c7aff44a95b034959df58;p=lemmy.git

Fix process shutdown (#3673)

I noticed that stopping the Lemmy process with ctrl+c wasnt working
because the activity channel isnt properly closed. This is now fixed.

Later we should also move the channel from static into LemmyContext,
Im not doing that now to avoid conflicts with #3670.
---

diff --git a/crates/api_common/src/send_activity.rs b/crates/api_common/src/send_activity.rs
index a2bc9a6d..6c91258e 100644
--- a/crates/api_common/src/send_activity.rs
+++ b/crates/api_common/src/send_activity.rs
@@ -4,10 +4,13 @@ use futures::future::BoxFuture;
 use lemmy_db_schema::source::post::Post;
 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
 use once_cell::sync::{Lazy, OnceCell};
-use tokio::sync::{
-  mpsc,
-  mpsc::{UnboundedReceiver, UnboundedSender},
-  Mutex,
+use tokio::{
+  sync::{
+    mpsc,
+    mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
+    Mutex,
+  },
+  task::JoinHandle,
 };
 
 type MatchOutgoingActivitiesBoxed =
@@ -21,17 +24,22 @@ pub enum SendActivityData {
   CreatePost(Post),
 }
 
+// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
+//       ctrl+c still works.
 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
   let (sender, receiver) = mpsc::unbounded_channel();
+  let weak_sender = sender.downgrade();
   ActivityChannel {
-    sender,
+    weak_sender,
     receiver: Mutex::new(receiver),
+    keepalive_sender: Mutex::new(Some(sender)),
   }
 });
 
 pub struct ActivityChannel {
-  sender: UnboundedSender<SendActivityData>,
+  weak_sender: WeakUnboundedSender<SendActivityData>,
   receiver: Mutex<UnboundedReceiver<SendActivityData>>,
+  keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
 }
 
 impl ActivityChannel {
@@ -49,10 +57,18 @@ impl ActivityChannel {
         .get()
         .expect("retrieve function pointer")(data, context)
       .await?;
-    } else {
-      let lock = &ACTIVITY_CHANNEL.sender;
-      lock.send(data)?;
     }
+    // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
+    // not sure which way is more efficient
+    else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
+      sender.send(data)?;
+    }
+    Ok(())
+  }
+
+  pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
+    ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
+    outgoing_activities_task.await??;
     Ok(())
   }
 }
diff --git a/src/lib.rs b/src/lib.rs
index e07ae268..4950aff8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -21,7 +21,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   lemmy_db_views::structs::SiteView,
   request::build_user_agent,
-  send_activity::MATCH_OUTGOING_ACTIVITIES,
+  send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES},
   utils::{
     check_private_instance_and_federation_enabled,
     local_site_rate_limit_to_rate_limit_config,
@@ -227,7 +227,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
   .await?;
 
   // Wait for outgoing apub sends to complete
-  outgoing_activities_task.await??;
+  ActivityChannel::close(outgoing_activities_task).await?;
 
   Ok(())
 }