]> Untitled Git - lemmy.git/commitdiff
Make activity queue worker count configurable, log stats (#2113)
authorNutomic <me@nutomic.com>
Thu, 3 Mar 2022 18:54:33 +0000 (18:54 +0000)
committerGitHub <noreply@github.com>
Thu, 3 Mar 2022 18:54:33 +0000 (18:54 +0000)
12 files changed:
config/defaults.hjson
crates/apub/src/collections/community_moderators.rs
crates/apub/src/objects/comment.rs
crates/apub/src/objects/community.rs
crates/apub/src/objects/instance.rs
crates/apub/src/objects/mod.rs
crates/apub/src/objects/person.rs
crates/apub/src/objects/post.rs
crates/apub/src/objects/private_message.rs
crates/apub_lib/src/activity_queue.rs
crates/utils/src/settings/structs.rs
src/main.rs

index c30c33ac88625ee9250e0d24778a94c953142150..ac732b20d1f73257ca4916195d209633f71c1636 100644 (file)
@@ -59,6 +59,8 @@
     # use allowlist only for remote communities, and posts/comments in local communities
     # (meaning remote communities will show content from arbitrary instances).
     strict_allowlist: true
+    # Number of workers for sending outgoing activities.
+    worker_count: 16
   }
   captcha: {
     # Whether captcha is required for signup
index d24f06a13e1c32857b516b58c1bc00d82ec99814..a9a691b8c8ed50c4ec346a83f0d96da2f7ac3bed 100644 (file)
@@ -146,7 +146,6 @@ mod tests {
     },
     protocol::tests::file_to_json_object,
   };
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::{
     source::{
       community::Community,
@@ -160,9 +159,7 @@ mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_community_moderators() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let (new_mod, site) = parse_lemmy_person(&context).await;
     let community = parse_lemmy_community(&context).await;
     let community_id = community.id;
index e61cb869911ad0962304ec26aad5528232c9832d..b719ba45b04ae2e6e2c9a139abbbe7234a3ebf10 100644 (file)
@@ -218,7 +218,6 @@ pub(crate) mod tests {
     protocol::tests::file_to_json_object,
   };
   use assert_json_diff::assert_json_include;
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::source::site::Site;
   use serial_test::serial;
 
@@ -248,9 +247,7 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   pub(crate) async fn test_parse_lemmy_comment() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
     let data = prepare_comment_test(&url, &context).await;
 
@@ -279,9 +276,7 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_pleroma_comment() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
     let data = prepare_comment_test(&url, &context).await;
 
index 612616cf94c2c9f4d84152261340a92d4325fadd..82b6d797ccd35e660bbc6e3b423def8f3a5d9382 100644 (file)
@@ -217,7 +217,6 @@ pub(crate) mod tests {
     objects::{instance::tests::parse_lemmy_instance, tests::init_context},
     protocol::tests::file_to_json_object,
   };
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::{source::site::Site, traits::Crud};
   use serial_test::serial;
 
@@ -244,9 +243,7 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_community() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let site = parse_lemmy_instance(&context).await;
     let community = parse_lemmy_community(&context).await;
 
index 40b585b5a10f0056f5bccdd22f20df3f1513f04e..a07ecdc17f2eaac6f12d6101abfb56a95cd27fa1 100644 (file)
@@ -186,7 +186,6 @@ pub(in crate::objects) async fn fetch_instance_actor_for_object(
 pub(crate) mod tests {
   use super::*;
   use crate::{objects::tests::init_context, protocol::tests::file_to_json_object};
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::traits::Crud;
   use serial_test::serial;
 
@@ -207,9 +206,7 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_instance() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let site = parse_lemmy_instance(&context).await;
 
     assert_eq!(site.name, "Enterprise");
index cfe1ecd36c7666442c637bc7d5b9976ab5b65e3d..cf66beccd4b801985f8ac8a03867d228d3609462 100644 (file)
@@ -22,11 +22,11 @@ pub(crate) fn get_summary_from_string_or_source(
 #[cfg(test)]
 pub(crate) mod tests {
   use actix::Actor;
-  use background_jobs::QueueHandle;
   use diesel::{
     r2d2::{ConnectionManager, Pool},
     PgConnection,
   };
+  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::{
     establish_unpooled_connection,
     get_database_url_from_env,
@@ -46,7 +46,11 @@ pub(crate) mod tests {
 
   // TODO: would be nice if we didnt have to use a full context for tests.
   //       or at least write a helper function so this code is shared with main.rs
-  pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext {
+  pub(crate) fn init_context() -> LemmyContext {
+    let client = reqwest::Client::new().into();
+    // activity queue isnt used in tests, so worker count makes no difference
+    let queue_manager = create_activity_queue(client, 4);
+    let activity_queue = queue_manager.queue_handle().clone();
     // call this to run migrations
     establish_unpooled_connection();
     let settings = Settings::init().unwrap();
index b75b107a11cb47c77a0f59189ec725c8a54662a8..8eb7fe8b7c09316afb8a09547128f33955b2f643 100644 (file)
@@ -207,7 +207,6 @@ pub(crate) mod tests {
     },
     protocol::{objects::instance::Instance, tests::file_to_json_object},
   };
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::{source::site::Site, traits::Crud};
   use serial_test::serial;
 
@@ -229,9 +228,7 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_person() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let (person, site) = parse_lemmy_person(&context).await;
 
     assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
@@ -245,9 +242,7 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_pleroma_person() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
 
     // create and parse a fake pleroma instance actor, to avoid network request during test
     let mut json: Instance = file_to_json_object("assets/lemmy/objects/instance.json").unwrap();
index 3fb4b9cfdd3199fb034ec8b492d8f9676eb15b43..a002e72c720570af1dd52ae4d7c60a9b68e48847 100644 (file)
@@ -215,16 +215,13 @@ mod tests {
     },
     protocol::tests::file_to_json_object,
   };
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::source::site::Site;
   use serial_test::serial;
 
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_post() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let (person, site) = parse_lemmy_person(&context).await;
     let community = parse_lemmy_community(&context).await;
 
index b1d5e5b198a7323eeaee7a9b1cc10111ea18ac68..4896e1991dca5606cc1e21b1706eae3fe0d9233d 100644 (file)
@@ -167,7 +167,6 @@ mod tests {
     protocol::tests::file_to_json_object,
   };
   use assert_json_diff::assert_json_include;
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::source::site::Site;
   use serial_test::serial;
 
@@ -203,9 +202,7 @@ mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_pm() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
     let data = prepare_comment_test(&url, &context).await;
     let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json").unwrap();
@@ -232,9 +229,7 @@ mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_pleroma_pm() {
-    let client = reqwest::Client::new().into();
-    let manager = create_activity_queue(client);
-    let context = init_context(manager.queue_handle().clone());
+    let context = init_context();
     let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
     let data = prepare_comment_test(&url, &context).await;
     let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();
index 9be430167a71982b1ec58446a722d371261ac447..ca8645cb1d97dbb027f60f64a277406d9a336d5e 100644 (file)
@@ -42,6 +42,14 @@ pub async fn send_activity(
       }
     } else {
       activity_queue.queue::<SendActivityTask>(message).await?;
+      let stats = activity_queue.get_stats().await?;
+      info!(
+        "Activity queue stats: pending: {}, running: {}, dead (this hour): {}, complete (this hour): {}",
+        stats.pending,
+        stats.running,
+        stats.dead.this_hour(),
+        stats.complete.this_hour()
+      );
     }
   }
 
@@ -110,12 +118,13 @@ async fn do_send(task: SendActivityTask, client: &ClientWithMiddleware) -> Resul
   r
 }
 
-pub fn create_activity_queue(client: ClientWithMiddleware) -> Manager {
+pub fn create_activity_queue(client: ClientWithMiddleware, worker_count: u64) -> Manager {
   // Configure and start our workers
   WorkerConfig::new_managed(Storage::new(), move |_| MyState {
     client: client.clone(),
   })
   .register::<SendActivityTask>()
+  .set_worker_count("default", worker_count)
   .start()
 }
 
index 16ead86373d50856cd5b6e3fd6b0c097b3d8b54d..30f12054418c7af2528ece4f85aba1e5c3972451 100644 (file)
@@ -130,6 +130,9 @@ pub struct FederationConfig {
   /// (meaning remote communities will show content from arbitrary instances).
   #[default(true)]
   pub strict_allowlist: bool,
+  /// Number of workers for sending outgoing activities.
+  #[default(16)]
+  pub worker_count: u64,
 }
 
 #[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]
index 3b6986afd1c5637a65fc51b0ba1a38560ea927d4..a8d74a125c63b77c6517461615c8f7e5388ff4aa 100644 (file)
@@ -101,7 +101,7 @@ async fn main() -> Result<(), LemmyError> {
 
   let client = ClientBuilder::new(client).with(TracingMiddleware).build();
 
-  let queue_manager = create_activity_queue(client.clone());
+  let queue_manager = create_activity_queue(client.clone(), settings.federation.worker_count);
 
   let activity_queue = queue_manager.queue_handle().clone();