]> Untitled Git - lemmy.git/commitdiff
background-jobs 0.11 (#1943)
authorRiley <asonix@asonix.dog>
Tue, 23 Nov 2021 12:20:01 +0000 (06:20 -0600)
committerGitHub <noreply@github.com>
Tue, 23 Nov 2021 12:20:01 +0000 (12:20 +0000)
15 files changed:
Cargo.lock
crates/api/Cargo.toml
crates/api_crud/Cargo.toml
crates/apub/Cargo.toml
crates/apub/src/collections/community_moderators.rs
crates/apub/src/objects/comment.rs
crates/apub/src/objects/community.rs
crates/apub/src/objects/mod.rs
crates/apub/src/objects/person.rs
crates/apub/src/objects/post.rs
crates/apub/src/objects/private_message.rs
crates/apub_lib/Cargo.toml
crates/apub_lib/src/activity_queue.rs
crates/websocket/Cargo.toml
src/main.rs

index e71f11f3f7d9e739cf5487de3e547d191569892f..a5b5c86c51267abbca5df5c1f6e324766d40172d 100644 (file)
@@ -421,9 +421,9 @@ dependencies = [
 
 [[package]]
 name = "background-jobs"
-version = "0.9.1"
+version = "0.11.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bb7df0fd6abf9d55139d4c9e569c0a8cd271ec265862c41bd215b46b36c52397"
+checksum = "77f4508c6c5b5cfc6c18d43d0ba6ecda339710206854da9e1c9ac9dfb7e3eb6f"
 dependencies = [
  "background-jobs-actix",
  "background-jobs-core",
@@ -431,9 +431,9 @@ dependencies = [
 
 [[package]]
 name = "background-jobs-actix"
-version = "0.9.6"
+version = "0.11.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38aebb545b0fac45046421993890eb49cc04896a93b85bbfb1b9017decc413f9"
+checksum = "5dabf6a2204fe034db7910a38f8e2d183fe24eb92abd4c0aaca59f8cacf4e48b"
 dependencies = [
  "actix-rt",
  "anyhow",
@@ -441,31 +441,32 @@ dependencies = [
  "async-trait",
  "background-jobs-core",
  "chrono",
- "log",
  "num_cpus",
  "serde",
  "serde_json",
  "thiserror",
  "tokio",
+ "tracing",
+ "tracing-futures",
  "uuid",
 ]
 
 [[package]]
 name = "background-jobs-core"
-version = "0.9.5"
+version = "0.11.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ee8604ff89c62ca8eefc1ea2c3f359a53b7930e640fb22bf7890eab13b4640d2"
+checksum = "174d36b170699ecc13b7513bda9eff6f12cc889eae5d16b792daa3f7b21be452"
 dependencies = [
  "actix-rt",
  "anyhow",
  "async-mutex",
  "async-trait",
  "chrono",
- "log",
  "serde",
  "serde_json",
  "thiserror",
- "tokio",
+ "tracing",
+ "tracing-futures",
  "uuid",
 ]
 
index b2db6e69063c50ab00fc523d9a41bb1ac65cd3d7..f09c1234d896516c0b8ff48e2aeeb37eae48f4b7 100644 (file)
@@ -48,5 +48,5 @@ async-trait = "0.1.51"
 captcha = "0.0.8"
 anyhow = "1.0.44"
 thiserror = "1.0.29"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
 reqwest = { version = "0.11.4", features = ["json"] }
index f188710738789d73e50c7e07dc469e3b60b3ff7f..2b1cba764969cc53588f79e86717ddf8581705fa 100644 (file)
@@ -43,6 +43,6 @@ sha2 = "0.9.8"
 async-trait = "0.1.51"
 anyhow = "1.0.44"
 thiserror = "1.0.29"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
 reqwest = { version = "0.11.4", features = ["json"] }
 webmention = "0.4.0"
index b2140270fca1426a9c6b7337d67c619741971e3c..e7d49adbece02dfcf70228602252471fc29a4a6f 100644 (file)
@@ -47,7 +47,7 @@ sha2 = "0.9.8"
 async-trait = "0.1.51"
 anyhow = "1.0.44"
 thiserror = "1.0.29"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
 reqwest = { version = "0.11.4", features = ["json"] }
 html2md = "0.2.13"
 once_cell = "1.8.0"
index 1b0911ab7f5517538ce3312e30d29deeecd8b0d4..9b8d26d9f1ff7286a4de19a0eddb61ea899313e0 100644 (file)
@@ -136,6 +136,7 @@ mod tests {
     person::tests::parse_lemmy_person,
     tests::{file_to_json_object, init_context},
   };
+  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::{
     source::{
       community::Community,
@@ -148,7 +149,8 @@ mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_community_moderators() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let community = parse_lemmy_community(&context).await;
     let community_id = community.id;
 
index 007d3def4409cf7b6f4d1f2e9b02efd361447902..41ea12b9a674a75d661da5d94bdca312da571a46 100644 (file)
@@ -214,6 +214,7 @@ pub(crate) mod tests {
     tests::{file_to_json_object, init_context},
   };
   use assert_json_diff::assert_json_include;
+  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use serial_test::serial;
 
   async fn prepare_comment_test(
@@ -241,7 +242,8 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   pub(crate) async fn test_parse_lemmy_comment() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
     let data = prepare_comment_test(&url, &context).await;
 
@@ -270,7 +272,8 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_pleroma_comment() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
     let data = prepare_comment_test(&url, &context).await;
 
index 72d9c0c270cb35991e76879216dbeab297dff0ae..7bb57ce6c5dfaa23a03e9b6cb9dca7d231eef676 100644 (file)
@@ -214,6 +214,7 @@ impl ApubCommunity {
 pub(crate) mod tests {
   use super::*;
   use crate::objects::tests::{file_to_json_object, init_context};
+  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::traits::Crud;
   use serial_test::serial;
 
@@ -240,7 +241,8 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_community() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let community = parse_lemmy_community(&context).await;
 
     assert_eq!(community.title, "Ten Forward");
index b577dabefd3834328c0a827d095fe039e2cd7870..5e0a0809648ab33c7c017d0ba94bae72123246eb 100644 (file)
@@ -21,11 +21,11 @@ pub(crate) fn get_summary_from_string_or_source(
 #[cfg(test)]
 pub(crate) mod tests {
   use actix::Actor;
+  use background_jobs::QueueHandle;
   use diesel::{
     r2d2::{ConnectionManager, Pool},
     PgConnection,
   };
-  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::{
     establish_unpooled_connection,
     get_database_url_from_env,
@@ -45,7 +45,7 @@ pub(crate) mod tests {
 
   // TODO: would be nice if we didnt have to use a full context for tests.
   //       or at least write a helper function so this code is shared with main.rs
-  pub(crate) fn init_context() -> LemmyContext {
+  pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext {
     // call this to run migrations
     establish_unpooled_connection();
     let settings = Settings::init().unwrap();
@@ -57,7 +57,6 @@ pub(crate) mod tests {
       .user_agent(build_user_agent(&settings))
       .build()
       .unwrap();
-    let activity_queue = create_activity_queue();
     let secret = Secret {
       id: 0,
       jwt_secret: "".to_string(),
index 8c0587ddb54b50c1a632350a7a0e03049b5dbf9c..9c93e76d748dd439a6cb04b12a79d70792613fee 100644 (file)
@@ -198,6 +198,7 @@ impl ActorType for ApubPerson {
 pub(crate) mod tests {
   use super::*;
   use crate::objects::tests::{file_to_json_object, init_context};
+  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use lemmy_db_schema::traits::Crud;
   use serial_test::serial;
 
@@ -218,7 +219,8 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_person() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let person = parse_lemmy_person(&context).await;
 
     assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
@@ -231,7 +233,8 @@ pub(crate) mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_pleroma_person() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let json = file_to_json_object("assets/pleroma/objects/person.json");
     let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap();
     let mut request_counter = 0;
index 1d4eddb9ce6bb71acfbe05012739eb925e18d2ce..cb86b97df0fe74725f07fb2ef2d5fdd013573046 100644 (file)
@@ -205,12 +205,14 @@ mod tests {
     post::ApubPost,
     tests::{file_to_json_object, init_context},
   };
+  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use serial_test::serial;
 
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_post() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let community = parse_lemmy_community(&context).await;
     let person = parse_lemmy_person(&context).await;
 
index 30c8e4dc5e86ec76fb55873dfb07d1187a9c2f7a..d624e133e2c47c660f123a01b4862a3fa6ba7a84 100644 (file)
@@ -162,6 +162,7 @@ mod tests {
     tests::{file_to_json_object, init_context},
   };
   use assert_json_diff::assert_json_include;
+  use lemmy_apub_lib::activity_queue::create_activity_queue;
   use serial_test::serial;
 
   async fn prepare_comment_test(url: &Url, context: &LemmyContext) -> (ApubPerson, ApubPerson) {
@@ -191,7 +192,8 @@ mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_lemmy_pm() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
     let data = prepare_comment_test(&url, &context).await;
     let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json");
@@ -218,7 +220,8 @@ mod tests {
   #[actix_rt::test]
   #[serial]
   async fn test_parse_pleroma_pm() {
-    let context = init_context();
+    let manager = create_activity_queue();
+    let context = init_context(manager.queue_handle().clone());
     let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
     let data = prepare_comment_test(&url, &context).await;
     let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();
index 31f41dfc2e9dfec3d4f4a254e9fade46c4a1dcb9..0ab4b2b22e894f9109c83366533a7b39c879cccc 100644 (file)
@@ -26,5 +26,5 @@ sha2 = "0.9.8"
 actix-web = { version = "4.0.0-beta.9", default-features = false }
 http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] }
 http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] }
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
 diesel = "1.4.8"
index 582997e29e6110fc15c48b2187ae3ac15d955e7f..a6562deb7cc3c9e61215c967c3d17aa1acc52364 100644 (file)
@@ -1,10 +1,10 @@
 use crate::{signatures::sign_and_send, traits::ActorType};
 use anyhow::{anyhow, Context, Error};
 use background_jobs::{
-  create_server,
   memory_storage::Storage,
   ActixJob,
   Backoff,
+  Manager,
   MaxRetries,
   QueueHandle,
   WorkerConfig,
@@ -35,7 +35,7 @@ pub async fn send_activity(
     if env::var("APUB_TESTING_SEND_SYNC").is_ok() {
       do_send(message, client).await?;
     } else {
-      activity_queue.queue::<SendActivityTask>(message)?;
+      activity_queue.queue::<SendActivityTask>(message).await?;
     }
   }
 
@@ -101,19 +101,13 @@ async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
   Ok(())
 }
 
-pub fn create_activity_queue() -> QueueHandle {
-  // Start the application server. This guards access to to the jobs store
-  let queue_handle = create_server(Storage::new());
-  let arbiter = actix_web::rt::Arbiter::new();
-
+pub fn create_activity_queue() -> Manager {
   // Configure and start our workers
-  WorkerConfig::new(|| MyState {
+  WorkerConfig::new_managed(Storage::new(), |_| MyState {
     client: Client::default(),
   })
   .register::<SendActivityTask>()
-  .start_in_arbiter(&arbiter, queue_handle.clone());
-
-  queue_handle
+  .start()
 }
 
 #[derive(Clone)]
index 2f91826a743329391bb81e53b0447d15bad008ad..787faab1b641d212b0e8c5e17cfeaabba5c8bbde 100644 (file)
@@ -26,7 +26,7 @@ serde_json = { version = "1.0.68", features = ["preserve_order"] }
 actix = "0.12.0"
 anyhow = "1.0.44"
 diesel = "1.4.8"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
 tokio = "1.12.0"
 strum = "0.21.0"
 strum_macros = "0.21.1"
index c5e777b1d0f98d53878f41aee6b15d0868fcb4aa..8a0b37f3bbd472194f0a69f3533c26a5188ee3f5 100644 (file)
@@ -94,7 +94,9 @@ async fn main() -> Result<(), LemmyError> {
     .user_agent(build_user_agent(&settings))
     .build()?;
 
-  let activity_queue = create_activity_queue();
+  let queue_manager = create_activity_queue();
+
+  let activity_queue = queue_manager.queue_handle().clone();
 
   let chat_server = ChatServer::startup(
     pool.clone(),
@@ -135,5 +137,7 @@ async fn main() -> Result<(), LemmyError> {
   .run()
   .await?;
 
+  drop(queue_manager);
+
   Ok(())
 }