]> Untitled Git - lemmy.git/commitdiff
Fix process shutdown (#3673)
authorNutomic <me@nutomic.com>
Thu, 20 Jul 2023 15:36:48 +0000 (17:36 +0200)
committerGitHub <noreply@github.com>
Thu, 20 Jul 2023 15:36:48 +0000 (11:36 -0400)
I noticed that stopping the Lemmy process with ctrl+c wasnt working
because the activity channel isnt properly closed. This is now fixed.

Later we should also move the channel from static into LemmyContext,
Im not doing that now to avoid conflicts with #3670.

crates/api_common/src/send_activity.rs
src/lib.rs

index a2bc9a6de7009fbaa130dc4c1091236e7a03d889..6c91258ec29b1ccd14e83743a0c52245f495482f 100644 (file)
@@ -4,10 +4,13 @@ use futures::future::BoxFuture;
 use lemmy_db_schema::source::post::Post;
 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
 use once_cell::sync::{Lazy, OnceCell};
-use tokio::sync::{
-  mpsc,
-  mpsc::{UnboundedReceiver, UnboundedSender},
-  Mutex,
+use tokio::{
+  sync::{
+    mpsc,
+    mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
+    Mutex,
+  },
+  task::JoinHandle,
 };
 
 type MatchOutgoingActivitiesBoxed =
@@ -21,17 +24,22 @@ pub enum SendActivityData {
   CreatePost(Post),
 }
 
+// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
+//       ctrl+c still works.
 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
   let (sender, receiver) = mpsc::unbounded_channel();
+  let weak_sender = sender.downgrade();
   ActivityChannel {
-    sender,
+    weak_sender,
     receiver: Mutex::new(receiver),
+    keepalive_sender: Mutex::new(Some(sender)),
   }
 });
 
 pub struct ActivityChannel {
-  sender: UnboundedSender<SendActivityData>,
+  weak_sender: WeakUnboundedSender<SendActivityData>,
   receiver: Mutex<UnboundedReceiver<SendActivityData>>,
+  keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
 }
 
 impl ActivityChannel {
@@ -49,10 +57,18 @@ impl ActivityChannel {
         .get()
         .expect("retrieve function pointer")(data, context)
       .await?;
-    } else {
-      let lock = &ACTIVITY_CHANNEL.sender;
-      lock.send(data)?;
     }
+    // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
+    // not sure which way is more efficient
+    else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
+      sender.send(data)?;
+    }
+    Ok(())
+  }
+
+  pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
+    ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
+    outgoing_activities_task.await??;
     Ok(())
   }
 }
index e07ae2685d478736c756b320de429bd2f1914e5c..4950aff820d4ba974e326848b513586d075df928 100644 (file)
@@ -21,7 +21,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   lemmy_db_views::structs::SiteView,
   request::build_user_agent,
-  send_activity::MATCH_OUTGOING_ACTIVITIES,
+  send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES},
   utils::{
     check_private_instance_and_federation_enabled,
     local_site_rate_limit_to_rate_limit_config,
@@ -227,7 +227,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
   .await?;
 
   // Wait for outgoing apub sends to complete
-  outgoing_activities_task.await??;
+  ActivityChannel::close(outgoing_activities_task).await?;
 
   Ok(())
 }