]> Untitled Git - lemmy.git/commitdiff
implement ActivitySender actor (#89)
authornutomic <nutomic@noreply.yerbamate.dev>
Mon, 31 Aug 2020 13:48:02 +0000 (13:48 +0000)
committerdessalines <dessalines@noreply.yerbamate.dev>
Mon, 31 Aug 2020 13:48:02 +0000 (13:48 +0000)
Merge pull request 'Adding unique ap_ids. Fixes #1100' (#90) from unique_ap_ids into activity-sender

Reviewed-on: https://yerbamate.dev/LemmyNet/lemmy/pulls/90

Adding back in on_conflict.

Trying to add back in the on_conflict_do_nothing.

Trying to reduce delay time.

Removing createFakes.

Removing some unit tests.

Adding comment jest timeout.

Fixing tests again.

Fixing tests again.

Merge branch 'activity-sender' into unique_ap_ids_2

Replace actix client with reqwest to speed up federation tests

Trying to fix tests again.

Fixing unit tests.

Fixing some broken unit tests, not done yet.

Adding uniques.

Adding unique ap_ids. Fixes #1100

use proper sql functionality for upsert

added logging

in fetcher, replace post/comment::create with upsert

no need to do an actual update in post/comment::upsert

Merge branch 'main' into activity-sender

implement upsert for user/community

reuse http client

got it working

attempt to use background-jobs crate

rewrite with proper error handling and less boilerplate

remove do_send, dont return errors from activity_sender

WIP: implement ActivitySender actor

Co-authored-by: dessalines <dessalines@noreply.yerbamate.dev>
Co-authored-by: Dessalines <tyhou13@gmx.com>
Co-authored-by: Felix Ableitner <me@nutomic.com>
Reviewed-on: https://yerbamate.dev/LemmyNet/lemmy/pulls/89

46 files changed:
server/Cargo.lock
server/Cargo.toml
server/lemmy_db/src/activity.rs
server/lemmy_db/src/comment.rs
server/lemmy_db/src/comment_view.rs
server/lemmy_db/src/community.rs
server/lemmy_db/src/moderator.rs
server/lemmy_db/src/password_reset_request.rs
server/lemmy_db/src/post.rs
server/lemmy_db/src/post_view.rs
server/lemmy_db/src/private_message.rs
server/lemmy_db/src/schema.rs
server/lemmy_db/src/user.rs
server/lemmy_db/src/user_mention.rs
server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql [new file with mode: 0644]
server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql [new file with mode: 0644]
server/src/api/comment.rs
server/src/api/community.rs
server/src/api/post.rs
server/src/api/user.rs
server/src/apub/activities.rs
server/src/apub/activity_queue.rs [new file with mode: 0644]
server/src/apub/comment.rs
server/src/apub/community.rs
server/src/apub/extensions/signatures.rs
server/src/apub/fetcher.rs
server/src/apub/inbox/activities/delete.rs
server/src/apub/inbox/activities/remove.rs
server/src/apub/inbox/activities/undo.rs
server/src/apub/inbox/shared_inbox.rs
server/src/apub/inbox/user_inbox.rs
server/src/apub/mod.rs
server/src/apub/post.rs
server/src/apub/private_message.rs
server/src/apub/user.rs
server/src/code_migrations.rs
server/src/lib.rs
server/src/main.rs
server/src/request.rs
server/src/websocket/server.rs
ui/src/api_tests/comment.spec.ts
ui/src/api_tests/community.spec.ts
ui/src/api_tests/follow.spec.ts
ui/src/api_tests/post.spec.ts
ui/src/api_tests/private_message.spec.ts
ui/src/api_tests/shared.ts

index 9781adaf12e4f5ebd435fae43f552290cef5b2f9..815ebc0a81bd107c961b8eb96a4a5f33909f4573 100644 (file)
@@ -490,6 +490,56 @@ dependencies = [
  "serde_urlencoded",
 ]
 
+[[package]]
+name = "background-jobs"
+version = "0.8.0-alpha.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fb38c4a5de33324650e9023829b0f4129eb5418b29f5dfe69a52100ff5bc50d7"
+dependencies = [
+ "background-jobs-actix",
+ "background-jobs-core",
+]
+
+[[package]]
+name = "background-jobs-actix"
+version = "0.8.0-alpha.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d012b9293806c777f806b537e04b5eec34ecd6eaf876c52792017695ce53262f"
+dependencies = [
+ "actix-rt",
+ "anyhow",
+ "async-trait",
+ "background-jobs-core",
+ "chrono",
+ "log",
+ "num_cpus",
+ "rand 0.7.3",
+ "serde 1.0.114",
+ "serde_json",
+ "thiserror",
+ "tokio",
+ "uuid 0.8.1",
+]
+
+[[package]]
+name = "background-jobs-core"
+version = "0.8.0-alpha.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd5efe91c019d7780d5a2fc2f92a15e1f95b84a761428e1d1972b7428634ebc7"
+dependencies = [
+ "actix-rt",
+ "anyhow",
+ "async-trait",
+ "chrono",
+ "futures",
+ "log",
+ "serde 1.0.114",
+ "serde_json",
+ "thiserror",
+ "tokio",
+ "uuid 0.8.1",
+]
+
 [[package]]
 name = "backtrace"
 version = "0.3.50"
@@ -1501,6 +1551,16 @@ dependencies = [
  "itoa",
 ]
 
+[[package]]
+name = "http-body"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
+dependencies = [
+ "bytes",
+ "http",
+]
+
 [[package]]
 name = "http-signature-normalization"
 version = "0.5.2"
@@ -1544,6 +1604,43 @@ dependencies = [
  "quick-error",
 ]
 
+[[package]]
+name = "hyper"
+version = "0.13.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "h2",
+ "http",
+ "http-body",
+ "httparse",
+ "itoa",
+ "pin-project",
+ "socket2",
+ "time 0.1.43",
+ "tokio",
+ "tower-service",
+ "tracing",
+ "want",
+]
+
+[[package]]
+name = "hyper-tls"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed"
+dependencies = [
+ "bytes",
+ "hyper",
+ "native-tls",
+ "tokio",
+ "tokio-tls",
+]
+
 [[package]]
 name = "ident_case"
 version = "1.0.1"
@@ -1618,9 +1715,15 @@ dependencies = [
  "socket2",
  "widestring",
  "winapi 0.3.9",
- "winreg",
+ "winreg 0.6.2",
 ]
 
+[[package]]
+name = "ipnet"
+version = "2.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135"
+
 [[package]]
 name = "itertools"
 version = "0.9.0"
@@ -1723,6 +1826,7 @@ dependencies = [
  "anyhow",
  "async-trait",
  "awc",
+ "background-jobs",
  "base64 0.12.3",
  "bcrypt",
  "captcha",
@@ -1743,6 +1847,7 @@ dependencies = [
  "openssl",
  "percent-encoding",
  "rand 0.7.3",
+ "reqwest",
  "rss",
  "serde 1.0.114",
  "serde_json",
@@ -2669,6 +2774,42 @@ dependencies = [
  "winapi 0.3.9",
 ]
 
+[[package]]
+name = "reqwest"
+version = "0.10.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e"
+dependencies = [
+ "base64 0.12.3",
+ "bytes",
+ "encoding_rs",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "hyper",
+ "hyper-tls",
+ "ipnet",
+ "js-sys",
+ "lazy_static",
+ "log",
+ "mime",
+ "mime_guess",
+ "native-tls",
+ "percent-encoding",
+ "pin-project-lite",
+ "serde 1.0.114",
+ "serde_json",
+ "serde_urlencoded",
+ "tokio",
+ "tokio-tls",
+ "url",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+ "winreg 0.7.0",
+]
+
 [[package]]
 name = "resolv-conf"
 version = "0.6.3"
@@ -3261,6 +3402,16 @@ dependencies = [
  "webpki",
 ]
 
+[[package]]
+name = "tokio-tls"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343"
+dependencies = [
+ "native-tls",
+ "tokio",
+]
+
 [[package]]
 name = "tokio-util"
 version = "0.2.0"
@@ -3290,6 +3441,12 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tower-service"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860"
+
 [[package]]
 name = "tracing"
 version = "0.1.18"
@@ -3350,6 +3507,12 @@ dependencies = [
  "trust-dns-proto",
 ]
 
+[[package]]
+name = "try-lock"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
+
 [[package]]
 name = "twoway"
 version = "0.2.1"
@@ -3528,6 +3691,16 @@ version = "0.9.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
 
+[[package]]
+name = "want"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
+dependencies = [
+ "log",
+ "try-lock",
+]
+
 [[package]]
 name = "wasi"
 version = "0.9.0+wasi-snapshot-preview1"
@@ -3541,6 +3714,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f0563a9a4b071746dd5aedbc3a28c6fe9be4586fb3fbadb67c400d4f53c6b16c"
 dependencies = [
  "cfg-if",
+ "serde 1.0.114",
+ "serde_json",
  "wasm-bindgen-macro",
 ]
 
@@ -3559,6 +3734,18 @@ dependencies = [
  "wasm-bindgen-shared",
 ]
 
+[[package]]
+name = "wasm-bindgen-futures"
+version = "0.4.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95f8d235a77f880bcef268d379810ea6c0af2eacfa90b1ad5af731776e0c4699"
+dependencies = [
+ "cfg-if",
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
+]
+
 [[package]]
 name = "wasm-bindgen-macro"
 version = "0.2.67"
@@ -3675,6 +3862,15 @@ dependencies = [
  "winapi 0.3.9",
 ]
 
+[[package]]
+name = "winreg"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
+dependencies = [
+ "winapi 0.3.9",
+]
+
 [[package]]
 name = "winutil"
 version = "0.1.1"
index dba0fee6fe0e0c6d0bef5e00f970c0858e5e2eb6..c5bf9c888cbd9e407b96758d54a7435b569d4d4e 100644 (file)
@@ -53,3 +53,5 @@ async-trait = "0.1.36"
 captcha = "0.0.7"
 anyhow = "1.0.32"
 thiserror = "1.0.20"
+background-jobs = " 0.8.0-alpha.2"
+reqwest = { version = "0.10", features = ["json"] }
index 177e6b7cd6221074bff6b228804d73424f8e6b75..c28eda45c06d58249e3de994b8475885c74e0632 100644 (file)
@@ -113,7 +113,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_862362".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
index b2e22aa625e54b8a55157c88774909dd224337f7..c462db1bb4f3d931788992b1cab5fc1c6fbe4dd8 100644 (file)
@@ -39,13 +39,13 @@ pub struct CommentForm {
   pub published: Option<chrono::NaiveDateTime>,
   pub updated: Option<chrono::NaiveDateTime>,
   pub deleted: Option<bool>,
-  pub ap_id: String,
+  pub ap_id: Option<String>,
   pub local: bool,
 }
 
 impl CommentForm {
   pub fn get_ap_id(&self) -> Result<Url, ParseError> {
-    Url::parse(&self.ap_id)
+    Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string()))
   }
 }
 
@@ -163,12 +163,13 @@ impl Comment {
   }
 
   pub fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
-    let existing = Self::read_from_apub_id(conn, &comment_form.ap_id);
-    match existing {
-      Err(NotFound {}) => Ok(Self::create(conn, &comment_form)?),
-      Ok(p) => Ok(Self::update(conn, p.id, &comment_form)?),
-      Err(e) => Err(e),
-    }
+    use crate::schema::comment::dsl::*;
+    insert_into(comment)
+      .values(comment_form)
+      .on_conflict(ap_id)
+      .do_update()
+      .set(comment_form)
+      .get_result::<Self>(conn)
   }
 }
 
@@ -272,7 +273,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_283687".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -292,7 +293,7 @@ mod tests {
       deleted: None,
       updated: None,
       nsfw: false,
-      actor_id: "changeme_928738972".into(),
+      actor_id: None,
       local: true,
       private_key: None,
       public_key: None,
@@ -320,7 +321,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
@@ -337,7 +338,7 @@ mod tests {
       parent_id: None,
       published: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
     };
 
@@ -354,7 +355,7 @@ mod tests {
       parent_id: None,
       published: inserted_comment.published,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: inserted_comment.ap_id.to_owned(),
       local: true,
     };
 
@@ -368,7 +369,7 @@ mod tests {
       read: None,
       published: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
     };
 
index 5b14013770542fd1dbeb4d1f07ee4f1975b013e1..1dcdf1934ad5d4b45e9d1857ab07373d2b68d2ea 100644 (file)
@@ -517,7 +517,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_92873982".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -537,7 +537,7 @@ mod tests {
       deleted: None,
       updated: None,
       nsfw: false,
-      actor_id: "changeme_7625376".into(),
+      actor_id: None,
       local: true,
       private_key: None,
       public_key: None,
@@ -565,7 +565,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
@@ -582,7 +582,7 @@ mod tests {
       read: None,
       published: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
     };
 
@@ -627,7 +627,7 @@ mod tests {
       my_vote: None,
       subscribed: None,
       saved: None,
-      ap_id: "http://fake.com".to_string(),
+      ap_id: inserted_comment.ap_id.to_owned(),
       local: true,
       community_actor_id: inserted_community.actor_id.to_owned(),
       community_local: true,
@@ -665,7 +665,7 @@ mod tests {
       my_vote: Some(1),
       subscribed: Some(false),
       saved: Some(false),
-      ap_id: "http://fake.com".to_string(),
+      ap_id: inserted_comment.ap_id.to_owned(),
       local: true,
       community_actor_id: inserted_community.actor_id.to_owned(),
       community_local: true,
index df5f129412604ccd6ad94c3d42abd5696747a14e..d033412c781cd14520a22b1075bc140470d6796c 100644 (file)
@@ -45,7 +45,7 @@ pub struct CommunityForm {
   pub updated: Option<chrono::NaiveDateTime>,
   pub deleted: Option<bool>,
   pub nsfw: bool,
-  pub actor_id: String,
+  pub actor_id: Option<String>,
   pub local: bool,
   pub private_key: Option<String>,
   pub public_key: Option<String>,
@@ -160,6 +160,16 @@ impl Community {
       .unwrap_or_default()
       .contains(&user_id)
   }
+
+  pub fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result<Community, Error> {
+    use crate::schema::community::dsl::*;
+    insert_into(community)
+      .values(community_form)
+      .on_conflict(actor_id)
+      .do_update()
+      .set(community_form)
+      .get_result::<Self>(conn)
+  }
 }
 
 #[derive(Identifiable, Queryable, Associations, PartialEq, Debug)]
@@ -320,7 +330,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_8266238".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -340,7 +350,7 @@ mod tests {
       removed: None,
       deleted: None,
       updated: None,
-      actor_id: "changeme_7625376".into(),
+      actor_id: None,
       local: true,
       private_key: None,
       public_key: None,
index 1a02d977ca222d6ed53b8d230f327b921a04cbb4..7d453d353ff6736cc8aebfc1ce50dbce9690b100 100644 (file)
@@ -426,7 +426,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_829398".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -454,7 +454,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_82982738".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -474,7 +474,7 @@ mod tests {
       deleted: None,
       updated: None,
       nsfw: false,
-      actor_id: "changeme_283687".into(),
+      actor_id: None,
       local: true,
       private_key: None,
       public_key: None,
@@ -502,7 +502,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
@@ -519,7 +519,7 @@ mod tests {
       parent_id: None,
       published: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
     };
 
index 06615187ea7720ba723d0a743d791d1076826c44..f248f0b49fcae9cb0fbbd072979a709186e68731 100644 (file)
@@ -103,7 +103,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_8292378".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
index a6df50bff6618ea485ecc06d7a089b17e257946a..177659eb996088266c7df3614ed4dceaf59391c8 100644 (file)
@@ -53,13 +53,13 @@ pub struct PostForm {
   pub embed_description: Option<String>,
   pub embed_html: Option<String>,
   pub thumbnail_url: Option<String>,
-  pub ap_id: String,
+  pub ap_id: Option<String>,
   pub local: bool,
 }
 
 impl PostForm {
   pub fn get_ap_id(&self) -> Result<Url, ParseError> {
-    Url::parse(&self.ap_id)
+    Url::parse(&self.ap_id.as_ref().unwrap_or(&"not_a_url".to_string()))
   }
 }
 
@@ -180,12 +180,13 @@ impl Post {
   }
 
   pub fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result<Post, Error> {
-    let existing = Self::read_from_apub_id(conn, &post_form.ap_id);
-    match existing {
-      Err(NotFound {}) => Ok(Self::create(conn, &post_form)?),
-      Ok(p) => Ok(Self::update(conn, p.id, &post_form)?),
-      Err(e) => Err(e),
-    }
+    use crate::schema::post::dsl::*;
+    insert_into(post)
+      .values(post_form)
+      .on_conflict(ap_id)
+      .do_update()
+      .set(post_form)
+      .get_result::<Self>(conn)
   }
 }
 
@@ -358,7 +359,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_8292683678".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -378,7 +379,7 @@ mod tests {
       deleted: None,
       updated: None,
       nsfw: false,
-      actor_id: "changeme_8223262378".into(),
+      actor_id: None,
       local: true,
       private_key: None,
       public_key: None,
@@ -406,7 +407,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
@@ -431,7 +432,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: inserted_post.ap_id.to_owned(),
       local: true,
     };
 
index 35bfc7ab3958b15666147919211b0e1f79e3c323..d792538360faac6a17948187cbbf799b778c8d30 100644 (file)
@@ -423,7 +423,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_8282738268".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -443,7 +443,7 @@ mod tests {
       deleted: None,
       updated: None,
       nsfw: false,
-      actor_id: "changeme_2763".into(),
+      actor_id: None,
       local: true,
       private_key: None,
       public_key: None,
@@ -471,7 +471,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
@@ -555,7 +555,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".to_string(),
+      ap_id: inserted_post.ap_id.to_owned(),
       local: true,
       creator_actor_id: inserted_user.actor_id.to_owned(),
       creator_local: true,
@@ -604,7 +604,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".to_string(),
+      ap_id: inserted_post.ap_id.to_owned(),
       local: true,
       creator_actor_id: inserted_user.actor_id.to_owned(),
       creator_local: true,
index 007a962084d54ea7268754a515195a8afe9b4e73..4361fa900d21b663a619faa204618e039229f627 100644 (file)
@@ -27,7 +27,7 @@ pub struct PrivateMessageForm {
   pub read: Option<bool>,
   pub published: Option<chrono::NaiveDateTime>,
   pub updated: Option<chrono::NaiveDateTime>,
-  pub ap_id: String,
+  pub ap_id: Option<String>,
   pub local: bool,
 }
 
@@ -119,6 +119,17 @@ impl PrivateMessage {
     .set(read.eq(true))
     .get_results::<Self>(conn)
   }
+
+  // TODO use this
+  pub fn upsert(conn: &PgConnection, private_message_form: &PrivateMessageForm) -> Result<Self, Error> {
+    use crate::schema::private_message::dsl::*;
+    insert_into(private_message)
+      .values(private_message_form)
+      .on_conflict(ap_id)
+      .do_update()
+      .set(private_message_form)
+      .get_result::<Self>(conn)
+  }
 }
 
 #[cfg(test)]
@@ -153,7 +164,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_6723878".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -181,7 +192,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_287263876".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -199,7 +210,7 @@ mod tests {
       read: None,
       published: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
     };
 
@@ -214,7 +225,7 @@ mod tests {
       read: false,
       updated: None,
       published: inserted_private_message.published,
-      ap_id: "http://fake.com".into(),
+      ap_id: inserted_private_message.ap_id.to_owned(),
       local: true,
     };
 
index c446edd9f27e72f2add810f181ef57b093a0c36d..a189dbced4fa00d9bac8ff5cf8223db5efe14004 100644 (file)
@@ -523,36 +523,36 @@ joinable!(user_mention -> comment (comment_id));
 joinable!(user_mention -> user_ (recipient_id));
 
 allow_tables_to_appear_in_same_query!(
-  activity,
-  category,
-  comment,
-  comment_aggregates_fast,
-  comment_like,
-  comment_saved,
-  community,
-  community_aggregates_fast,
-  community_follower,
-  community_moderator,
-  community_user_ban,
-  mod_add,
-  mod_add_community,
-  mod_ban,
-  mod_ban_from_community,
-  mod_lock_post,
-  mod_remove_comment,
-  mod_remove_community,
-  mod_remove_post,
-  mod_sticky_post,
-  password_reset_request,
-  post,
-  post_aggregates_fast,
-  post_like,
-  post_read,
-  post_saved,
-  private_message,
-  site,
-  user_,
-  user_ban,
-  user_fast,
-  user_mention,
+    activity,
+    category,
+    comment,
+    comment_aggregates_fast,
+    comment_like,
+    comment_saved,
+    community,
+    community_aggregates_fast,
+    community_follower,
+    community_moderator,
+    community_user_ban,
+    mod_add,
+    mod_add_community,
+    mod_ban,
+    mod_ban_from_community,
+    mod_lock_post,
+    mod_remove_comment,
+    mod_remove_community,
+    mod_remove_post,
+    mod_sticky_post,
+    password_reset_request,
+    post,
+    post_aggregates_fast,
+    post_like,
+    post_read,
+    post_saved,
+    private_message,
+    site,
+    user_,
+    user_ban,
+    user_fast,
+    user_mention,
 );
index 8416d38a1e1024e9aee628ff2929a236b810e0e8..0a39bdb27595d5c030ddb4df9f5283a16828b369 100644 (file)
@@ -57,7 +57,7 @@ pub struct UserForm {
   pub show_avatars: bool,
   pub send_notifications_to_email: bool,
   pub matrix_user_id: Option<String>,
-  pub actor_id: String,
+  pub actor_id: Option<String>,
   pub bio: Option<String>,
   pub local: bool,
   pub private_key: Option<String>,
@@ -152,6 +152,15 @@ impl User_ {
   pub fn get_profile_url(&self, hostname: &str) -> String {
     format!("https://{}/u/{}", hostname, self.name)
   }
+
+  pub fn upsert(conn: &PgConnection, user_form: &UserForm) -> Result<User_, Error> {
+    insert_into(user_)
+      .values(user_form)
+      .on_conflict(actor_id)
+      .do_update()
+      .set(user_form)
+      .get_result::<Self>(conn)
+  }
 }
 
 #[cfg(test)]
@@ -180,7 +189,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_9826382637".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
index 4c0fdc5ff85d0fb808d2371a8e1a15bde5749949..a5985223e30518f1bc7a7f4cc32ccc7199f84057 100644 (file)
@@ -106,7 +106,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_628763".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -134,7 +134,7 @@ mod tests {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: "changeme_927389278".into(),
+      actor_id: None,
       bio: None,
       local: true,
       private_key: None,
@@ -154,7 +154,7 @@ mod tests {
       deleted: None,
       updated: None,
       nsfw: false,
-      actor_id: "changeme_876238".into(),
+      actor_id: None,
       local: true,
       private_key: None,
       public_key: None,
@@ -182,7 +182,7 @@ mod tests {
       embed_description: None,
       embed_html: None,
       thumbnail_url: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
@@ -199,7 +199,7 @@ mod tests {
       parent_id: None,
       published: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
     };
 
diff --git a/server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql b/server/migrations/2020-08-25-132005_add_unique_ap_ids/down.sql
new file mode 100644 (file)
index 0000000..2b43b59
--- /dev/null
@@ -0,0 +1,27 @@
+-- Drop the uniques
+alter table private_message drop constraint idx_private_message_ap_id;
+alter table post drop constraint idx_post_ap_id;
+alter table comment drop constraint idx_comment_ap_id;
+alter table user_ drop constraint idx_user_actor_id;
+alter table community drop constraint idx_community_actor_id;
+
+alter table private_message alter column ap_id set not null;
+alter table private_message alter column ap_id set default 'http://fake.com';
+
+alter table post alter column ap_id set not null;
+alter table post alter column ap_id set default 'http://fake.com';
+
+alter table comment alter column ap_id set not null;
+alter table comment alter column ap_id set default 'http://fake.com';
+
+update private_message
+set ap_id = 'http://fake.com'
+where ap_id like 'changeme_%';
+
+update post
+set ap_id = 'http://fake.com'
+where ap_id like 'changeme_%';
+
+update comment
+set ap_id = 'http://fake.com'
+where ap_id like 'changeme_%';
diff --git a/server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql b/server/migrations/2020-08-25-132005_add_unique_ap_ids/up.sql
new file mode 100644 (file)
index 0000000..75e81ee
--- /dev/null
@@ -0,0 +1,56 @@
+-- Add unique ap_id for private_message, comment, and post
+
+-- Need to delete the possible dupes for ones that don't start with the fake one
+delete from private_message a using (
+  select min(id) as id, ap_id
+    from private_message 
+    group by ap_id having count(*) > 1
+) b
+where a.ap_id = b.ap_id 
+and a.id <> b.id;
+
+delete from post a using (
+  select min(id) as id, ap_id
+    from post 
+    group by ap_id having count(*) > 1
+) b
+where a.ap_id = b.ap_id 
+and a.id <> b.id;
+
+delete from comment a using (
+  select min(id) as id, ap_id
+    from comment 
+    group by ap_id having count(*) > 1
+) b
+where a.ap_id = b.ap_id 
+and a.id <> b.id;
+
+-- Replacing the current default on the columns, to the unique one
+update private_message 
+set ap_id = generate_unique_changeme()
+where ap_id = 'http://fake.com';
+
+update post 
+set ap_id = generate_unique_changeme()
+where ap_id = 'http://fake.com';
+
+update comment 
+set ap_id = generate_unique_changeme()
+where ap_id = 'http://fake.com';
+
+-- Add the unique indexes
+alter table private_message alter column ap_id set not null;
+alter table private_message alter column ap_id set default generate_unique_changeme();
+
+alter table post alter column ap_id set not null;
+alter table post alter column ap_id set default generate_unique_changeme();
+
+alter table comment alter column ap_id set not null;
+alter table comment alter column ap_id set default generate_unique_changeme();
+
+-- Add the uniques, for user_ and community too
+alter table private_message add constraint idx_private_message_ap_id unique (ap_id);
+alter table post add constraint idx_post_ap_id unique (ap_id);
+alter table comment add constraint idx_comment_ap_id unique (ap_id);
+alter table user_ add constraint idx_user_actor_id unique (actor_id);
+alter table community add constraint idx_community_actor_id unique (actor_id);
index 3384993f887dd4115ab6a2c19ab4ce6fc185e6d8..f2effab58b48e8aa65225f9ae8dac69783440a7a 100644 (file)
@@ -146,7 +146,7 @@ impl Perform for CreateComment {
       read: None,
       published: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
     };
 
index 7b63c672691e7d07653aec321345e6a99446b7f8..c94ca59b7ba284fbab78c2bd0500cb8f6e0f83a4 100644 (file)
@@ -274,7 +274,7 @@ impl Perform for CreateCommunity {
       deleted: None,
       nsfw: data.nsfw,
       updated: None,
-      actor_id,
+      actor_id: Some(actor_id),
       local: true,
       private_key: Some(keypair.private_key),
       public_key: Some(keypair.public_key),
@@ -368,7 +368,7 @@ impl Perform for EditCommunity {
       deleted: Some(read_community.deleted),
       nsfw: data.nsfw,
       updated: Some(naive_now()),
-      actor_id: read_community.actor_id,
+      actor_id: Some(read_community.actor_id),
       local: read_community.local,
       private_key: read_community.private_key,
       public_key: read_community.public_key,
index 5cb7e3222192df531285d30f669aa399a46c35c5..9f0fb3be051d3a5a6c202326392712f9e31880c9 100644 (file)
@@ -187,7 +187,7 @@ impl Perform for CreatePost {
       embed_description: iframely_description,
       embed_html: iframely_html,
       thumbnail_url: pictrs_thumbnail,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
@@ -518,7 +518,7 @@ impl Perform for EditPost {
       embed_description: iframely_description,
       embed_html: iframely_html,
       thumbnail_url: pictrs_thumbnail,
-      ap_id: orig_post.ap_id,
+      ap_id: Some(orig_post.ap_id),
       local: orig_post.local,
       published: None,
     };
index e97a6d33beb6f8017ee99d1f428980db941d0f4b..e6cf2a820ad61fbcf29d1a244ce8edd58bb212ef 100644 (file)
@@ -410,7 +410,7 @@ impl Perform for Register {
       lang: "browser".into(),
       show_avatars: true,
       send_notifications_to_email: false,
-      actor_id: make_apub_endpoint(EndpointType::User, &data.username).to_string(),
+      actor_id: Some(make_apub_endpoint(EndpointType::User, &data.username).to_string()),
       bio: None,
       local: true,
       private_key: Some(user_keypair.private_key),
@@ -441,37 +441,38 @@ impl Perform for Register {
     let main_community_keypair = generate_actor_keypair()?;
 
     // Create the main community if it doesn't exist
-    let main_community = match blocking(context.pool(), move |conn| Community::read(conn, 2))
-      .await?
-    {
-      Ok(c) => c,
-      Err(_e) => {
-        let default_community_name = "main";
-        let community_form = CommunityForm {
-          name: default_community_name.to_string(),
-          title: "The Default Community".to_string(),
-          description: Some("The Default Community".to_string()),
-          category_id: 1,
-          nsfw: false,
-          creator_id: inserted_user.id,
-          removed: None,
-          deleted: None,
-          updated: None,
-          actor_id: make_apub_endpoint(EndpointType::Community, default_community_name).to_string(),
-          local: true,
-          private_key: Some(main_community_keypair.private_key),
-          public_key: Some(main_community_keypair.public_key),
-          last_refreshed_at: None,
-          published: None,
-          icon: None,
-          banner: None,
-        };
-        blocking(context.pool(), move |conn| {
-          Community::create(conn, &community_form)
-        })
-        .await??
-      }
-    };
+    let main_community =
+      match blocking(context.pool(), move |conn| Community::read(conn, 2)).await? {
+        Ok(c) => c,
+        Err(_e) => {
+          let default_community_name = "main";
+          let community_form = CommunityForm {
+            name: default_community_name.to_string(),
+            title: "The Default Community".to_string(),
+            description: Some("The Default Community".to_string()),
+            category_id: 1,
+            nsfw: false,
+            creator_id: inserted_user.id,
+            removed: None,
+            deleted: None,
+            updated: None,
+            actor_id: Some(
+              make_apub_endpoint(EndpointType::Community, default_community_name).to_string(),
+            ),
+            local: true,
+            private_key: Some(main_community_keypair.private_key),
+            public_key: Some(main_community_keypair.public_key),
+            last_refreshed_at: None,
+            published: None,
+            icon: None,
+            banner: None,
+          };
+          blocking(context.pool(), move |conn| {
+            Community::create(conn, &community_form)
+          })
+          .await??
+        }
+      };
 
     // Sign them up for main community no matter what
     let community_follower_form = CommunityFollowerForm {
@@ -643,7 +644,7 @@ impl Perform for SaveUserSettings {
       lang: data.lang.to_owned(),
       show_avatars: data.show_avatars,
       send_notifications_to_email: data.send_notifications_to_email,
-      actor_id: read_user.actor_id,
+      actor_id: Some(read_user.actor_id),
       bio,
       local: read_user.local,
       private_key: read_user.private_key,
@@ -1218,7 +1219,7 @@ impl Perform for CreatePrivateMessage {
       deleted: None,
       read: None,
       updated: None,
-      ap_id: "http://fake.com".into(),
+      ap_id: None,
       local: true,
       published: None,
     };
index 4700bb0892ebc3598907fd1ffe357301b29d9b7a..b4d6c4d2baa0c908fb4ed942620e1028af6ca33a 100644 (file)
@@ -1,72 +1,38 @@
 use crate::{
-  apub::{
-    check_is_apub_id_valid,
-    community::do_announce,
-    extensions::signatures::sign,
-    insert_activity,
-    ActorType,
-  },
-  request::retry_custom,
+  apub::{activity_queue::send_activity, community::do_announce, insert_activity},
   LemmyContext,
   LemmyError,
 };
-use activitystreams::base::AnyBase;
-use actix_web::client::Client;
+use activitystreams::{
+  base::{Extends, ExtendsExt},
+  object::AsObject,
+};
 use lemmy_db::{community::Community, user::User_};
 use lemmy_utils::{get_apub_protocol_string, settings::Settings};
-use log::debug;
+use serde::{export::fmt::Debug, Serialize};
 use url::{ParseError, Url};
 use uuid::Uuid;
 
-pub async fn send_activity_to_community(
+pub async fn send_activity_to_community<T, Kind>(
   creator: &User_,
   community: &Community,
   to: Vec<Url>,
-  activity: AnyBase,
+  activity: T,
   context: &LemmyContext,
-) -> Result<(), LemmyError> {
+) -> Result<(), LemmyError>
+where
+  T: AsObject<Kind> + Extends<Kind> + Serialize + Debug + Send + Clone + 'static,
+  Kind: Serialize,
+  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+  // TODO: looks like call this sometimes with activity, and sometimes with any_base
   insert_activity(creator.id, activity.clone(), true, context.pool()).await?;
 
   // if this is a local community, we need to do an announce from the community instead
   if community.local {
-    do_announce(activity, &community, creator, context).await?;
+    do_announce(activity.into_any_base()?, &community, creator, context).await?;
   } else {
-    send_activity(context.client(), &activity, creator, to).await?;
-  }
-
-  Ok(())
-}
-
-/// Send an activity to a list of recipients, using the correct headers etc.
-pub async fn send_activity(
-  client: &Client,
-  activity: &AnyBase,
-  actor: &dyn ActorType,
-  to: Vec<Url>,
-) -> Result<(), LemmyError> {
-  if !Settings::get().federation.enabled {
-    return Ok(());
-  }
-
-  let activity = serde_json::to_string(&activity)?;
-  debug!("Sending activitypub activity {} to {:?}", activity, to);
-
-  for to_url in to {
-    check_is_apub_id_valid(&to_url)?;
-
-    let res = retry_custom(|| async {
-      let request = client
-        .post(to_url.as_str())
-        .header("Content-Type", "application/json");
-
-      match sign(request, actor, activity.clone()).await {
-        Ok(signed) => Ok(signed.send().await),
-        Err(e) => Err(e),
-      }
-    })
-    .await?;
-
-    debug!("Result for activity send: {:?}", res);
+    send_activity(context.activity_queue(), activity, creator, to)?;
   }
 
   Ok(())
diff --git a/server/src/apub/activity_queue.rs b/server/src/apub/activity_queue.rs
new file mode 100644 (file)
index 0000000..bc5faaa
--- /dev/null
@@ -0,0 +1,133 @@
+use crate::{
+  apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType},
+  LemmyError,
+};
+use activitystreams::{
+  base::{Extends, ExtendsExt},
+  object::AsObject,
+};
+use anyhow::{anyhow, Context, Error};
+use awc::Client;
+use background_jobs::{
+  create_server,
+  memory_storage::Storage,
+  ActixJob,
+  Backoff,
+  MaxRetries,
+  QueueHandle,
+  WorkerConfig,
+};
+use lemmy_utils::{location_info, settings::Settings};
+use log::warn;
+use serde::{Deserialize, Serialize};
+use std::{future::Future, pin::Pin};
+use url::Url;
+
+pub fn send_activity<T, Kind>(
+  activity_sender: &QueueHandle,
+  activity: T,
+  actor: &dyn ActorType,
+  to: Vec<Url>,
+) -> Result<(), LemmyError>
+where
+  T: AsObject<Kind>,
+  T: Extends<Kind>,
+  Kind: Serialize,
+  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+  if !Settings::get().federation.enabled {
+    return Ok(());
+  }
+
+  let activity = activity.into_any_base()?;
+  let serialised_activity = serde_json::to_string(&activity)?;
+
+  for to_url in &to {
+    check_is_apub_id_valid(&to_url)?;
+  }
+
+  // TODO: it would make sense to create a separate task for each destination server
+  let message = SendActivityTask {
+    activity: serialised_activity,
+    to,
+    actor_id: actor.actor_id()?,
+    private_key: actor.private_key().context(location_info!())?,
+  };
+  activity_sender.queue::<SendActivityTask>(message)?;
+
+  Ok(())
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct SendActivityTask {
+  activity: String,
+  to: Vec<Url>,
+  actor_id: Url,
+  private_key: String,
+}
+
+impl ActixJob for SendActivityTask {
+  type State = MyState;
+  type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
+  const NAME: &'static str = "SendActivityTask";
+
+  const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
+  const BACKOFF: Backoff = Backoff::Exponential(2);
+
+  fn run(self, state: Self::State) -> Self::Future {
+    Box::pin(async move {
+      for to_url in &self.to {
+        let request = state
+          .client
+          .post(to_url.as_str())
+          .header("Content-Type", "application/json");
+
+        // TODO: i believe we have to do the signing in here because it is only valid for a few seconds
+        let signed = sign(
+          request,
+          self.activity.clone(),
+          &self.actor_id,
+          self.private_key.to_owned(),
+        )
+        .await;
+        let signed = match signed {
+          Ok(s) => s,
+          Err(e) => {
+            warn!("{}", e);
+            // dont return an error because retrying would probably not fix the signing
+            return Ok(());
+          }
+        };
+        if let Err(e) = signed.send().await {
+          warn!("{}", e);
+          return Err(anyhow!(
+            "Failed to send activity {} to {}",
+            &self.activity,
+            to_url
+          ));
+        }
+      }
+
+      Ok(())
+    })
+  }
+}
+
+pub fn create_activity_queue() -> QueueHandle {
+  // Start the application server. This guards access to to the jobs store
+  let queue_handle = create_server(Storage::new());
+
+  // Configure and start our workers
+  WorkerConfig::new(|| MyState {
+    client: Client::default(),
+  })
+  .register::<SendActivityTask>()
+  .start(queue_handle.clone());
+
+  queue_handle
+}
+
+#[derive(Clone)]
+struct MyState {
+  pub client: Client,
+}
index a9a97c0833a609aa24c6720fe7eac6dda570da94..988904e9738c1eb8e4cbc785a5a35dc77c4b9656 100644 (file)
@@ -192,7 +192,7 @@ impl FromApub for CommentForm {
       published: note.published().map(|u| u.to_owned().naive_local()),
       updated: note.updated().map(|u| u.to_owned().naive_local()),
       deleted: None,
-      ap_id: check_actor_domain(note, expected_domain)?,
+      ap_id: Some(check_actor_domain(note, expected_domain)?),
       local: false,
     })
   }
@@ -224,14 +224,7 @@ impl ApubObjectType for Comment {
       // Set the mention tags
       .set_many_tags(maa.get_tags()?);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      maa.inboxes,
-      create.into_any_base()?,
-      context,
-    )
-    .await?;
+    send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?;
     Ok(())
   }
 
@@ -259,14 +252,7 @@ impl ApubObjectType for Comment {
       // Set the mention tags
       .set_many_tags(maa.get_tags()?);
 
-    send_activity_to_community(
-      &creator,
-      &community,
-      maa.inboxes,
-      update.into_any_base()?,
-      context,
-    )
-    .await?;
+    send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?;
     Ok(())
   }
 
@@ -293,7 +279,7 @@ impl ApubObjectType for Comment {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      delete.into_any_base()?,
+      delete,
       context,
     )
     .await?;
@@ -336,7 +322,7 @@ impl ApubObjectType for Comment {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      undo.into_any_base()?,
+      undo,
       context,
     )
     .await?;
@@ -366,7 +352,7 @@ impl ApubObjectType for Comment {
       &mod_,
       &community,
       vec![community.get_shared_inbox_url()?],
-      remove.into_any_base()?,
+      remove,
       context,
     )
     .await?;
@@ -405,7 +391,7 @@ impl ApubObjectType for Comment {
       &mod_,
       &community,
       vec![community.get_shared_inbox_url()?],
-      undo.into_any_base()?,
+      undo,
       context,
     )
     .await?;
@@ -438,7 +424,7 @@ impl ApubLikeableType for Comment {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      like.into_any_base()?,
+      like,
       context,
     )
     .await?;
@@ -468,7 +454,7 @@ impl ApubLikeableType for Comment {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      dislike.into_any_base()?,
+      dislike,
       context,
     )
     .await?;
@@ -510,7 +496,7 @@ impl ApubLikeableType for Comment {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      undo.into_any_base()?,
+      undo,
       context,
     )
     .await?;
index 016f342dc629eb1d36d4fb361d39ac4b77b0659a..67baa7860cee8efac128e6b9b378f3c5d35b899f 100644 (file)
@@ -1,7 +1,8 @@
 use crate::{
   api::{check_slurs, check_slurs_opt},
   apub::{
-    activities::{generate_activity_id, send_activity},
+    activities::generate_activity_id,
+    activity_queue::send_activity,
     check_actor_domain,
     create_apub_response,
     create_apub_tombstone_response,
@@ -155,7 +156,7 @@ impl ActorType for Community {
 
     insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?;
 
-    send_activity(context.client(), &accept.into_any_base()?, self, vec![to]).await?;
+    send_activity(context.activity_queue(), accept, self, vec![to])?;
     Ok(())
   }
 
@@ -176,7 +177,7 @@ impl ActorType for Community {
     // Note: For an accept, since it was automatic, no one pushed a button,
     // the community was the actor.
     // But for delete, the creator is the actor, and does the signing
-    send_activity(context.client(), &delete.into_any_base()?, creator, inboxes).await?;
+    send_activity(context.activity_queue(), delete, creator, inboxes)?;
     Ok(())
   }
 
@@ -208,7 +209,7 @@ impl ActorType for Community {
     // Note: For an accept, since it was automatic, no one pushed a button,
     // the community was the actor.
     // But for delete, the creator is the actor, and does the signing
-    send_activity(context.client(), &undo.into_any_base()?, creator, inboxes).await?;
+    send_activity(context.activity_queue(), undo, creator, inboxes)?;
     Ok(())
   }
 
@@ -229,7 +230,7 @@ impl ActorType for Community {
     // Note: For an accept, since it was automatic, no one pushed a button,
     // the community was the actor.
     // But for delete, the creator is the actor, and does the signing
-    send_activity(context.client(), &remove.into_any_base()?, mod_, inboxes).await?;
+    send_activity(context.activity_queue(), remove, mod_, inboxes)?;
     Ok(())
   }
 
@@ -258,7 +259,7 @@ impl ActorType for Community {
     // Note: For an accept, since it was automatic, no one pushed a button,
     // the community was the actor.
     // But for remove , the creator is the actor, and does the signing
-    send_activity(context.client(), &undo.into_any_base()?, mod_, inboxes).await?;
+    send_activity(context.activity_queue(), undo, mod_, inboxes)?;
     Ok(())
   }
 
@@ -402,7 +403,7 @@ impl FromApub for CommunityForm {
       updated: group.inner.updated().map(|u| u.to_owned().naive_local()),
       deleted: None,
       nsfw: group.ext_one.sensitive,
-      actor_id: check_actor_domain(group, expected_domain)?,
+      actor_id: Some(check_actor_domain(group, expected_domain)?),
       local: false,
       private_key: None,
       public_key: Some(group.ext_two.to_owned().public_key.public_key_pem),
@@ -511,7 +512,7 @@ pub async fn do_announce(
   let community_shared_inbox = community.get_shared_inbox_url()?;
   to.retain(|x| x != &community_shared_inbox);
 
-  send_activity(context.client(), &announce.into_any_base()?, community, to).await?;
+  send_activity(context.activity_queue(), announce, community, to)?;
 
   Ok(())
 }
index 96063d5e0bdd2bbe34823ce020bdab2b8006e4e3..cb9697775382d76cf62d7d2646a19ecfef6af8aa 100644 (file)
@@ -16,6 +16,7 @@ use openssl::{
 };
 use serde::{Deserialize, Serialize};
 use sha2::{Digest, Sha256};
+use url::Url;
 
 lazy_static! {
   static ref HTTP_SIG_CONFIG: Config = Config::new();
@@ -24,11 +25,11 @@ lazy_static! {
 /// Signs request headers with the given keypair.
 pub async fn sign(
   request: ClientRequest,
-  actor: &dyn ActorType,
   activity: String,
+  actor_id: &Url,
+  private_key: String,
 ) -> Result<DigestClient<String>, LemmyError> {
-  let signing_key_id = format!("{}#main-key", actor.actor_id()?);
-  let private_key = actor.private_key().context(location_info!())?;
+  let signing_key_id = format!("{}#main-key", actor_id);
 
   let digest_client = request
     .signature_with_digest(
index 0c42aa14e4b8bf57f222d2cbbec4b57d31edf03c..0fcb115037b3de65bd9e7dcd19a74b3495934ec7 100644 (file)
@@ -15,7 +15,6 @@ use crate::{
   LemmyError,
 };
 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
-use actix_web::client::Client;
 use anyhow::{anyhow, Context};
 use chrono::NaiveDateTime;
 use diesel::result::Error::NotFound;
@@ -35,6 +34,7 @@ use lemmy_db::{
 };
 use lemmy_utils::{get_apub_protocol_string, location_info};
 use log::debug;
+use reqwest::Client;
 use serde::Deserialize;
 use std::{fmt::Debug, time::Duration};
 use url::Url;
@@ -55,6 +55,9 @@ where
 
   let timeout = Duration::from_secs(60);
 
+  // speed up tests
+  // before: 305s
+  // after: 240s
   let json = retry(|| {
     client
       .get(url.as_str())
@@ -230,7 +233,7 @@ pub async fn get_or_fetch_and_upsert_user(
       let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
 
       let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
-      let user = blocking(context.pool(), move |conn| User_::create(conn, &uf)).await??;
+      let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
 
       Ok(user)
     }
@@ -286,14 +289,7 @@ async fn fetch_remote_community(
   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
 
   let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
-  let community = blocking(context.pool(), move |conn| {
-    if let Some(cid) = community_id {
-      Community::update(conn, cid, &cf)
-    } else {
-      Community::create(conn, &cf)
-    }
-  })
-  .await??;
+  let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
 
   // Also add the community moderators too
   let attributed_to = group.inner.attributed_to().context(location_info!())?;
@@ -341,7 +337,7 @@ async fn fetch_remote_community(
   for o in outbox_items {
     let page = PageExt::from_any_base(o)?.context(location_info!())?;
     let post = PostForm::from_apub(&page, context, None).await?;
-    let post_ap_id = post.ap_id.clone();
+    let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
     // Check whether the post already exists in the local db
     let existing = blocking(context.pool(), move |conn| {
       Post::read_from_apub_id(conn, &post_ap_id)
@@ -349,7 +345,7 @@ async fn fetch_remote_community(
     .await?;
     match existing {
       Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
-      Err(_) => blocking(context.pool(), move |conn| Post::create(conn, &post)).await??,
+      Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
     };
     // TODO: we need to send a websocket update here
   }
@@ -374,7 +370,7 @@ pub async fn get_or_fetch_and_insert_post(
       let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
       let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
 
-      let post = blocking(context.pool(), move |conn| Post::create(conn, &post_form)).await??;
+      let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
 
       Ok(post)
     }
@@ -404,7 +400,7 @@ pub async fn get_or_fetch_and_insert_comment(
         CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
 
       let comment = blocking(context.pool(), move |conn| {
-        Comment::create(conn, &comment_form)
+        Comment::upsert(conn, &comment_form)
       })
       .await??;
 
index 2a6689dbf7efd221affff48111d67f17933de14d..9c4f0beeffe688f19cf6ede99784d92472c00267 100644 (file)
@@ -78,7 +78,7 @@ async fn receive_delete_post(
     embed_description: post.embed_description,
     embed_html: post.embed_html,
     thumbnail_url: post.thumbnail_url,
-    ap_id: post.ap_id,
+    ap_id: Some(post.ap_id),
     local: post.local,
     published: None,
   };
@@ -131,7 +131,7 @@ async fn receive_delete_comment(
     read: None,
     published: None,
     updated: Some(naive_now()),
-    ap_id: comment.ap_id,
+    ap_id: Some(comment.ap_id),
     local: comment.local,
   };
   let comment_id = comment.id;
@@ -175,7 +175,8 @@ async fn receive_delete_community(
 
   let community_actor_id = CommunityForm::from_apub(&group, context, Some(user.actor_id()?))
     .await?
-    .actor_id;
+    .actor_id
+    .context(location_info!())?;
 
   let community = blocking(context.pool(), move |conn| {
     Community::read_from_actor_id(conn, &community_actor_id)
@@ -193,7 +194,7 @@ async fn receive_delete_community(
     updated: Some(naive_now()),
     deleted: Some(true),
     nsfw: community.nsfw,
-    actor_id: community.actor_id,
+    actor_id: Some(community.actor_id),
     local: community.local,
     private_key: community.private_key,
     public_key: community.public_key,
index 83a748436aab3822715bca55e26db3b8dd5ac223..83eb6f3394172e6b19fa2844e7a4f254df537165 100644 (file)
@@ -85,7 +85,7 @@ async fn receive_remove_post(
     embed_description: post.embed_description,
     embed_html: post.embed_html,
     thumbnail_url: post.thumbnail_url,
-    ap_id: post.ap_id,
+    ap_id: Some(post.ap_id),
     local: post.local,
     published: None,
   };
@@ -138,7 +138,7 @@ async fn receive_remove_comment(
     read: None,
     published: None,
     updated: Some(naive_now()),
-    ap_id: comment.ap_id,
+    ap_id: Some(comment.ap_id),
     local: comment.local,
   };
   let comment_id = comment.id;
@@ -182,7 +182,8 @@ async fn receive_remove_community(
 
   let community_actor_id = CommunityForm::from_apub(&group, context, Some(mod_.actor_id()?))
     .await?
-    .actor_id;
+    .actor_id
+    .context(location_info!())?;
 
   let community = blocking(context.pool(), move |conn| {
     Community::read_from_actor_id(conn, &community_actor_id)
@@ -200,7 +201,7 @@ async fn receive_remove_community(
     updated: Some(naive_now()),
     deleted: None,
     nsfw: community.nsfw,
-    actor_id: community.actor_id,
+    actor_id: Some(community.actor_id),
     local: community.local,
     private_key: community.private_key,
     public_key: community.public_key,
index f356c91be80ccd84de988936039ab822f22cb0bc..9a589554d8686c5fe3919930c8b785c1382d4cf8 100644 (file)
@@ -175,7 +175,7 @@ async fn receive_undo_delete_comment(
     read: None,
     published: None,
     updated: Some(naive_now()),
-    ap_id: comment.ap_id,
+    ap_id: Some(comment.ap_id),
     local: comment.local,
   };
   let comment_id = comment.id;
@@ -234,7 +234,7 @@ async fn receive_undo_remove_comment(
     read: None,
     published: None,
     updated: Some(naive_now()),
-    ap_id: comment.ap_id,
+    ap_id: Some(comment.ap_id),
     local: comment.local,
   };
   let comment_id = comment.id;
@@ -299,7 +299,7 @@ async fn receive_undo_delete_post(
     embed_description: post.embed_description,
     embed_html: post.embed_html,
     thumbnail_url: post.thumbnail_url,
-    ap_id: post.ap_id,
+    ap_id: Some(post.ap_id),
     local: post.local,
     published: None,
   };
@@ -359,7 +359,7 @@ async fn receive_undo_remove_post(
     embed_description: post.embed_description,
     embed_html: post.embed_html,
     thumbnail_url: post.thumbnail_url,
-    ap_id: post.ap_id,
+    ap_id: Some(post.ap_id),
     local: post.local,
     published: None,
   };
@@ -399,7 +399,8 @@ async fn receive_undo_delete_community(
 
   let community_actor_id = CommunityForm::from_apub(&group, context, Some(user.actor_id()?))
     .await?
-    .actor_id;
+    .actor_id
+    .context(location_info!())?;
 
   let community = blocking(context.pool(), move |conn| {
     Community::read_from_actor_id(conn, &community_actor_id)
@@ -417,7 +418,7 @@ async fn receive_undo_delete_community(
     updated: Some(naive_now()),
     deleted: Some(false),
     nsfw: community.nsfw,
-    actor_id: community.actor_id,
+    actor_id: Some(community.actor_id),
     local: community.local,
     private_key: community.private_key,
     public_key: community.public_key,
@@ -464,7 +465,8 @@ async fn receive_undo_remove_community(
 
   let community_actor_id = CommunityForm::from_apub(&group, context, Some(mod_.actor_id()?))
     .await?
-    .actor_id;
+    .actor_id
+    .context(location_info!())?;
 
   let community = blocking(context.pool(), move |conn| {
     Community::read_from_actor_id(conn, &community_actor_id)
@@ -482,7 +484,7 @@ async fn receive_undo_remove_community(
     updated: Some(naive_now()),
     deleted: None,
     nsfw: community.nsfw,
-    actor_id: community.actor_id,
+    actor_id: Some(community.actor_id),
     local: community.local,
     private_key: community.private_key,
     public_key: community.public_key,
index c9f9324dc98543f3f19ba517f8d828605d6481e9..0f8cc8ed883138d101c6cd8e0336ad61c2bb86f6 100644 (file)
@@ -66,6 +66,8 @@ pub async fn shared_inbox(
   let json = serde_json::to_string(&activity)?;
   debug!("Shared inbox received activity: {}", json);
 
+  // TODO: if we already received an activity with identical ID, then ignore this (same in other inboxes)
+
   let sender = &activity
     .actor()?
     .to_owned()
index 103fd92ab126941aa444550371f1cd29977d3a71..27d58ebcd3b469a0efa2e117de30de78a177b94b 100644 (file)
@@ -175,7 +175,11 @@ async fn receive_update_private_message(
   let domain = Some(update.id_unchecked().context(location_info!())?.to_owned());
   let private_message_form = PrivateMessageForm::from_apub(&note, context, domain).await?;
 
-  let private_message_ap_id = private_message_form.ap_id.clone();
+  let private_message_ap_id = private_message_form
+    .ap_id
+    .as_ref()
+    .context(location_info!())?
+    .clone();
   let private_message = blocking(&context.pool(), move |conn| {
     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
   })
@@ -224,7 +228,7 @@ async fn receive_delete_private_message(
   let domain = Some(delete.id_unchecked().context(location_info!())?.to_owned());
   let private_message_form = PrivateMessageForm::from_apub(&note, context, domain).await?;
 
-  let private_message_ap_id = private_message_form.ap_id;
+  let private_message_ap_id = private_message_form.ap_id.context(location_info!())?;
   let private_message = blocking(&context.pool(), move |conn| {
     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
   })
@@ -236,7 +240,7 @@ async fn receive_delete_private_message(
     creator_id: private_message.creator_id,
     deleted: Some(true),
     read: None,
-    ap_id: private_message.ap_id,
+    ap_id: Some(private_message.ap_id),
     local: private_message.local,
     published: None,
     updated: Some(naive_now()),
@@ -287,7 +291,11 @@ async fn receive_undo_delete_private_message(
   let domain = Some(undo.id_unchecked().context(location_info!())?.to_owned());
   let private_message = PrivateMessageForm::from_apub(&note, context, domain).await?;
 
-  let private_message_ap_id = private_message.ap_id.clone();
+  let private_message_ap_id = private_message
+    .ap_id
+    .as_ref()
+    .context(location_info!())?
+    .clone();
   let private_message_id = blocking(&context.pool(), move |conn| {
     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id).map(|pm| pm.id)
   })
index dddbd7e04d64eaa3f0930ec6c0450ff3158f7a15..c545a5fd04a3318d9629d1b53d645bbb9452acde 100644 (file)
@@ -1,4 +1,5 @@
 pub mod activities;
+pub mod activity_queue;
 pub mod comment;
 pub mod community;
 pub mod extensions;
@@ -30,7 +31,7 @@ use activitystreams::{
   prelude::*,
 };
 use activitystreams_ext::{Ext1, Ext2};
-use actix_web::{body::Body, client::Client, HttpResponse};
+use actix_web::{body::Body, HttpResponse};
 use anyhow::{anyhow, Context};
 use chrono::NaiveDateTime;
 use lemmy_db::{activity::do_insert_activity, user::User_};
@@ -42,6 +43,7 @@ use lemmy_utils::{
   MentionData,
 };
 use log::debug;
+use reqwest::Client;
 use serde::Serialize;
 use url::{ParseError, Url};
 
@@ -326,7 +328,7 @@ pub async fn fetch_webfinger_url(
   );
   debug!("Fetching webfinger url: {}", &fetch_url);
 
-  let mut response = retry(|| client.get(&fetch_url).send()).await?;
+  let response = retry(|| client.get(&fetch_url).send()).await?;
 
   let res: WebFingerResponse = response
     .json()
index 4f2a831552a25e06f0ea3b6c8ba3a7f2dd2df263..606c4752197472ae11176c3a46fb653e07472e92 100644 (file)
@@ -289,7 +289,7 @@ impl FromApub for PostForm {
       embed_description: embed.description,
       embed_html: embed.html,
       thumbnail_url,
-      ap_id: check_actor_domain(page, expected_domain)?,
+      ap_id: Some(check_actor_domain(page, expected_domain)?),
       local: false,
     })
   }
@@ -318,7 +318,7 @@ impl ApubObjectType for Post {
       creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      create.into_any_base()?,
+      create,
       context,
     )
     .await?;
@@ -346,7 +346,7 @@ impl ApubObjectType for Post {
       creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      update.into_any_base()?,
+      update,
       context,
     )
     .await?;
@@ -373,7 +373,7 @@ impl ApubObjectType for Post {
       creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      delete.into_any_base()?,
+      delete,
       context,
     )
     .await?;
@@ -412,7 +412,7 @@ impl ApubObjectType for Post {
       creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      undo.into_any_base()?,
+      undo,
       context,
     )
     .await?;
@@ -439,7 +439,7 @@ impl ApubObjectType for Post {
       mod_,
       &community,
       vec![community.get_shared_inbox_url()?],
-      remove.into_any_base()?,
+      remove,
       context,
     )
     .await?;
@@ -474,7 +474,7 @@ impl ApubObjectType for Post {
       mod_,
       &community,
       vec![community.get_shared_inbox_url()?],
-      undo.into_any_base()?,
+      undo,
       context,
     )
     .await?;
@@ -504,7 +504,7 @@ impl ApubLikeableType for Post {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      like.into_any_base()?,
+      like,
       context,
     )
     .await?;
@@ -531,7 +531,7 @@ impl ApubLikeableType for Post {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      dislike.into_any_base()?,
+      dislike,
       context,
     )
     .await?;
@@ -570,7 +570,7 @@ impl ApubLikeableType for Post {
       &creator,
       &community,
       vec![community.get_shared_inbox_url()?],
-      undo.into_any_base()?,
+      undo,
       context,
     )
     .await?;
index 8e5836885c3457939d5a70850a3b7ce607ad687a..1b3472e328fbd941a52e91d8e113ecf962a21b0d 100644 (file)
@@ -1,6 +1,7 @@
 use crate::{
   apub::{
-    activities::{generate_activity_id, send_activity},
+    activities::generate_activity_id,
+    activity_queue::send_activity,
     check_actor_domain,
     check_is_apub_id_valid,
     create_tombstone,
@@ -110,7 +111,7 @@ impl FromApub for PrivateMessageForm {
       updated: note.updated().map(|u| u.to_owned().naive_local()),
       deleted: None,
       read: None,
-      ap_id: check_actor_domain(note, expected_domain)?,
+      ap_id: Some(check_actor_domain(note, expected_domain)?),
       local: false,
     })
   }
@@ -134,13 +135,7 @@ impl ApubObjectType for PrivateMessage {
 
     insert_activity(creator.id, create.clone(), true, context.pool()).await?;
 
-    send_activity(
-      context.client(),
-      &create.into_any_base()?,
-      creator,
-      vec![to],
-    )
-    .await?;
+    send_activity(context.activity_queue(), create, creator, vec![to])?;
     Ok(())
   }
 
@@ -160,13 +155,7 @@ impl ApubObjectType for PrivateMessage {
 
     insert_activity(creator.id, update.clone(), true, context.pool()).await?;
 
-    send_activity(
-      context.client(),
-      &update.into_any_base()?,
-      creator,
-      vec![to],
-    )
-    .await?;
+    send_activity(context.activity_queue(), update, creator, vec![to])?;
     Ok(())
   }
 
@@ -185,13 +174,7 @@ impl ApubObjectType for PrivateMessage {
 
     insert_activity(creator.id, delete.clone(), true, context.pool()).await?;
 
-    send_activity(
-      context.client(),
-      &delete.into_any_base()?,
-      creator,
-      vec![to],
-    )
-    .await?;
+    send_activity(context.activity_queue(), delete, creator, vec![to])?;
     Ok(())
   }
 
@@ -221,7 +204,7 @@ impl ApubObjectType for PrivateMessage {
 
     insert_activity(creator.id, undo.clone(), true, context.pool()).await?;
 
-    send_activity(context.client(), &undo.into_any_base()?, creator, vec![to]).await?;
+    send_activity(context.activity_queue(), undo, creator, vec![to])?;
     Ok(())
   }
 
index f6225dea2f5a30df30ead134c07ad8b286b14049..a61813c1858be96d0c950eb6da04d22ff1365f8d 100644 (file)
@@ -1,7 +1,8 @@
 use crate::{
   api::{check_slurs, check_slurs_opt},
   apub::{
-    activities::{generate_activity_id, send_activity},
+    activities::generate_activity_id,
+    activity_queue::send_activity,
     check_actor_domain,
     create_apub_response,
     fetcher::get_or_fetch_and_upsert_actor,
@@ -127,7 +128,7 @@ impl ActorType for User_ {
 
     insert_activity(self.id, follow.clone(), true, context.pool()).await?;
 
-    send_activity(context.client(), &follow.into_any_base()?, self, vec![to]).await?;
+    send_activity(context.activity_queue(), follow, self, vec![to])?;
     Ok(())
   }
 
@@ -152,7 +153,7 @@ impl ActorType for User_ {
 
     insert_activity(self.id, undo.clone(), true, context.pool()).await?;
 
-    send_activity(context.client(), &undo.into_any_base()?, self, vec![to]).await?;
+    send_activity(context.activity_queue(), undo, self, vec![to])?;
     Ok(())
   }
 
@@ -268,7 +269,7 @@ impl FromApub for UserForm {
       show_avatars: false,
       send_notifications_to_email: false,
       matrix_user_id: None,
-      actor_id: check_actor_domain(person, expected_domain)?,
+      actor_id: Some(check_actor_domain(person, expected_domain)?),
       bio,
       local: false,
       private_key: None,
index 7f45237c4b56b6fbdece7b82c76f93f15218705d..d5139441d9975f7a3deeda5abe3f16400af01fc2 100644 (file)
@@ -67,7 +67,7 @@ fn user_updates_2020_04_02(conn: &PgConnection) -> Result<(), LemmyError> {
       lang: cuser.lang.to_owned(),
       show_avatars: cuser.show_avatars,
       send_notifications_to_email: cuser.send_notifications_to_email,
-      actor_id: make_apub_endpoint(EndpointType::User, &cuser.name).to_string(),
+      actor_id: Some(make_apub_endpoint(EndpointType::User, &cuser.name).to_string()),
       bio: cuser.bio.to_owned(),
       local: cuser.local,
       private_key: Some(keypair.private_key),
@@ -111,7 +111,7 @@ fn community_updates_2020_04_02(conn: &PgConnection) -> Result<(), LemmyError> {
       deleted: None,
       nsfw: ccommunity.nsfw,
       updated: None,
-      actor_id: make_apub_endpoint(EndpointType::Community, &ccommunity.name).to_string(),
+      actor_id: Some(make_apub_endpoint(EndpointType::Community, &ccommunity.name).to_string()),
       local: ccommunity.local,
       private_key: Some(keypair.private_key),
       public_key: Some(keypair.public_key),
@@ -138,7 +138,7 @@ fn post_updates_2020_04_03(conn: &PgConnection) -> Result<(), LemmyError> {
 
   // Update the ap_id
   let incorrect_posts = post
-    .filter(ap_id.eq("http://fake.com"))
+    .filter(ap_id.eq("changeme_%"))
     .filter(local.eq(true))
     .load::<Post>(conn)?;
 
@@ -163,7 +163,7 @@ fn comment_updates_2020_04_03(conn: &PgConnection) -> Result<(), LemmyError> {
 
   // Update the ap_id
   let incorrect_comments = comment
-    .filter(ap_id.eq("http://fake.com"))
+    .filter(ap_id.eq("changeme_%"))
     .filter(local.eq(true))
     .load::<Comment>(conn)?;
 
@@ -188,7 +188,7 @@ fn private_message_updates_2020_05_05(conn: &PgConnection) -> Result<(), LemmyEr
 
   // Update the ap_id
   let incorrect_pms = private_message
-    .filter(ap_id.eq("http://fake.com"))
+    .filter(ap_id.eq("changeme_%"))
     .filter(local.eq(true))
     .load::<PrivateMessage>(conn)?;
 
index 07ee15d4061bd37a476fe242e1df44c7066b2639..32b43ef8454aedc4bd8a527901f2aedde07a8f1d 100644 (file)
@@ -14,6 +14,7 @@ pub extern crate dotenv;
 pub extern crate jsonwebtoken;
 extern crate log;
 pub extern crate openssl;
+pub extern crate reqwest;
 pub extern crate rss;
 pub extern crate serde;
 pub extern crate serde_json;
@@ -33,12 +34,15 @@ use crate::{
   request::{retry, RecvError},
   websocket::server::ChatServer,
 };
+
 use actix::Addr;
-use actix_web::{client::Client, dev::ConnectionInfo};
+use actix_web::dev::ConnectionInfo;
 use anyhow::anyhow;
+use background_jobs::QueueHandle;
 use lemmy_utils::{get_apub_protocol_string, settings::Settings};
 use log::error;
 use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
+use reqwest::Client;
 use serde::Deserialize;
 use std::process::Command;
 
@@ -75,14 +79,21 @@ pub struct LemmyContext {
   pub pool: DbPool,
   pub chat_server: Addr<ChatServer>,
   pub client: Client,
+  pub activity_queue: QueueHandle,
 }
 
 impl LemmyContext {
-  pub fn create(pool: DbPool, chat_server: Addr<ChatServer>, client: Client) -> LemmyContext {
+  pub fn create(
+    pool: DbPool,
+    chat_server: Addr<ChatServer>,
+    client: Client,
+    activity_queue: QueueHandle,
+  ) -> LemmyContext {
     LemmyContext {
       pool,
       chat_server,
       client,
+      activity_queue,
     }
   }
   pub fn pool(&self) -> &DbPool {
@@ -94,6 +105,9 @@ impl LemmyContext {
   pub fn client(&self) -> &Client {
     &self.client
   }
+  pub fn activity_queue(&self) -> &QueueHandle {
+    &self.activity_queue
+  }
 }
 
 impl Clone for LemmyContext {
@@ -102,6 +116,7 @@ impl Clone for LemmyContext {
       pool: self.pool.clone(),
       chat_server: self.chat_server.clone(),
       client: self.client.clone(),
+      activity_queue: self.activity_queue.clone(),
     }
   }
 }
@@ -117,7 +132,7 @@ pub struct IframelyResponse {
 pub async fn fetch_iframely(client: &Client, url: &str) -> Result<IframelyResponse, LemmyError> {
   let fetch_url = format!("http://iframely/oembed?url={}", url);
 
-  let mut response = retry(|| client.get(&fetch_url).send()).await?;
+  let response = retry(|| client.get(&fetch_url).send()).await?;
 
   let res: IframelyResponse = response
     .json()
@@ -146,7 +161,7 @@ pub async fn fetch_pictrs(client: &Client, image_url: &str) -> Result<PictrsResp
     utf8_percent_encode(image_url, NON_ALPHANUMERIC) // TODO this might not be needed
   );
 
-  let mut response = retry(|| client.get(&fetch_url).send()).await?;
+  let response = retry(|| client.get(&fetch_url).send()).await?;
 
   let response: PictrsResponse = response
     .json()
@@ -319,7 +334,7 @@ mod tests {
   #[test]
   fn test_image() {
     actix_rt::System::new("tset_image").block_on(async move {
-      let client = actix_web::client::Client::default();
+      let client = reqwest::Client::default();
       assert!(is_image_content_type(&client, "https://1734811051.rsc.cdn77.org/data/images/full/365645/as-virus-kills-navajos-in-their-homes-tribal-women-provide-lifeline.jpg?w=600?w=650").await.is_ok());
       assert!(is_image_content_type(&client,
                                     "https://twitter.com/BenjaminNorton/status/1259922424272957440?s=20"
index 4a012ab47e718419609579233603504dcf01b0b1..72fce5c00f5834fb15fbab64b16902c920211b38 100644 (file)
@@ -6,7 +6,6 @@ pub extern crate lazy_static;
 use actix::prelude::*;
 use actix_web::{
   body::Body,
-  client::Client,
   dev::{Service, ServiceRequest, ServiceResponse},
   http::{
     header::{CACHE_CONTROL, CONTENT_TYPE},
@@ -20,6 +19,7 @@ use diesel::{
 };
 use lemmy_db::get_database_url_from_env;
 use lemmy_server::{
+  apub::activity_queue::create_activity_queue,
   blocking,
   code_migrations::run_advanced_migrations,
   rate_limit::{rate_limiter::RateLimiter, RateLimit},
@@ -29,6 +29,7 @@ use lemmy_server::{
   LemmyError,
 };
 use lemmy_utils::{settings::Settings, CACHE_CONTROL_REGEX};
+use reqwest::Client;
 use std::sync::Arc;
 use tokio::sync::Mutex;
 
@@ -74,12 +75,23 @@ async fn main() -> Result<(), LemmyError> {
     settings.bind, settings.port
   );
 
-  let chat_server =
-    ChatServer::startup(pool.clone(), rate_limiter.clone(), Client::default()).start();
+  let activity_queue = create_activity_queue();
+  let chat_server = ChatServer::startup(
+    pool.clone(),
+    rate_limiter.clone(),
+    Client::default(),
+    activity_queue.clone(),
+  )
+  .start();
 
   // Create Http server with websocket support
   HttpServer::new(move || {
-    let context = LemmyContext::create(pool.clone(), chat_server.to_owned(), Client::default());
+    let context = LemmyContext::create(
+      pool.clone(),
+      chat_server.to_owned(),
+      Client::default(),
+      activity_queue.to_owned(),
+    );
     let settings = Settings::get();
     let rate_limiter = rate_limiter.clone();
     App::new()
index 70a2b6933ead96078d2e4e968d02410193789436..490609e7dc0be31954e5fd961e79e12579119f4e 100644 (file)
@@ -14,15 +14,15 @@ pub struct RecvError(pub String);
 pub async fn retry<F, Fut, T>(f: F) -> Result<T, LemmyError>
 where
   F: Fn() -> Fut,
-  Fut: Future<Output = Result<T, actix_web::client::SendRequestError>>,
+  Fut: Future<Output = Result<T, reqwest::Error>>,
 {
   retry_custom(|| async { Ok((f)().await) }).await
 }
 
-pub async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
+async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
 where
   F: Fn() -> Fut,
-  Fut: Future<Output = Result<Result<T, actix_web::client::SendRequestError>, LemmyError>>,
+  Fut: Future<Output = Result<Result<T, reqwest::Error>, LemmyError>>,
 {
   let mut response = Err(anyhow!("connect timeout").into());
 
@@ -30,7 +30,7 @@ where
     match (f)().await? {
       Ok(t) => return Ok(t),
       Err(e) => {
-        if is_connect_timeout(&e) {
+        if e.is_timeout() {
           response = Err(SendError(e.to_string()).into());
           continue;
         }
@@ -41,13 +41,3 @@ where
 
   response
 }
-
-fn is_connect_timeout(e: &actix_web::client::SendRequestError) -> bool {
-  if let actix_web::client::SendRequestError::Connect(e) = e {
-    if let actix_web::client::ConnectError::Timeout = e {
-      return true;
-    }
-  }
-
-  false
-}
index 4d0a1c4d3afd8439cc2b10f2b9b858a399d4124c..2a4c558cd383408adbd56278fc85c1d051e37b02 100644 (file)
@@ -15,10 +15,12 @@ use crate::{
   PostId,
   UserId,
 };
-use actix_web::{client::Client, web};
+use actix_web::web;
 use anyhow::Context as acontext;
+use background_jobs::QueueHandle;
 use lemmy_db::naive_now;
 use lemmy_utils::location_info;
+use reqwest::Client;
 
 /// Chat server sends this messages to session
 #[derive(Message)]
@@ -181,6 +183,8 @@ pub struct ChatServer {
 
   /// An HTTP Client
   client: Client,
+
+  activity_queue: QueueHandle,
 }
 
 impl ChatServer {
@@ -188,6 +192,7 @@ impl ChatServer {
     pool: Pool<ConnectionManager<PgConnection>>,
     rate_limiter: RateLimit,
     client: Client,
+    activity_queue: QueueHandle,
   ) -> ChatServer {
     ChatServer {
       sessions: HashMap::new(),
@@ -199,6 +204,7 @@ impl ChatServer {
       rate_limiter,
       captchas: Vec::new(),
       client,
+      activity_queue,
     }
   }
 
@@ -460,6 +466,7 @@ impl ChatServer {
     };
 
     let client = self.client.clone();
+    let activity_queue = self.activity_queue.clone();
     async move {
       let msg = msg;
       let json: Value = serde_json::from_str(&msg.msg)?;
@@ -474,6 +481,7 @@ impl ChatServer {
         pool,
         chat_server: addr,
         client,
+        activity_queue,
       };
       let args = Args {
         context,
index 747ec910907bfcdc81b7f8cf0b03ca29d7b3f692..83526cf962a7bddba1ec6855104627c4854dedf8 100644 (file)
@@ -1,3 +1,4 @@
+jest.setTimeout(120000);
 import {
   alpha,
   beta,
@@ -19,6 +20,7 @@ import {
   createCommunity,
   registerUser,
   API,
+  delay,
 } from './shared';
 
 import { PostResponse } from 'lemmy-js-client';
@@ -30,6 +32,7 @@ beforeAll(async () => {
   await followBeta(alpha);
   await followBeta(gamma);
   let search = await searchForBetaCommunity(alpha);
+  await delay(10000);
   postRes = await createPost(
     alpha,
     search.communities.filter(c => c.local == false)[0].id
@@ -47,6 +50,7 @@ test('Create a comment', async () => {
   expect(commentRes.comment.community_local).toBe(false);
   expect(commentRes.comment.creator_local).toBe(true);
   expect(commentRes.comment.score).toBe(1);
+  await delay();
 
   // Make sure that comment is liked on beta
   let searchBeta = await searchComment(beta, commentRes.comment);
@@ -64,12 +68,14 @@ test('Create a comment in a non-existent post', async () => {
 
 test('Update a comment', async () => {
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
   let updateCommentRes = await updateComment(alpha, commentRes.comment.id);
   expect(updateCommentRes.comment.content).toBe(
     'A jest test federated comment update'
   );
   expect(updateCommentRes.comment.community_local).toBe(false);
   expect(updateCommentRes.comment.creator_local).toBe(true);
+  await delay();
 
   // Make sure that post is updated on beta
   let searchBeta = await searchComment(beta, commentRes.comment);
@@ -79,23 +85,21 @@ test('Update a comment', async () => {
 
 test('Delete a comment', async () => {
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
+
   let deleteCommentRes = await deleteComment(
     alpha,
     true,
     commentRes.comment.id
   );
   expect(deleteCommentRes.comment.deleted).toBe(true);
+  await delay();
 
-  // Make sure that comment is deleted on beta
-  // The search doesnt work below, because it returns a tombstone / http::gone
-  // let searchBeta = await searchComment(beta, commentRes.comment);
-  // console.log(searchBeta);
-  // let betaComment = searchBeta.comments[0];
-  // Create a fake post, just to get the previous new post id
-  let createdBetaPostJustToGetId = await createPost(beta, 2);
-  let betaPost = await getPost(beta, createdBetaPostJustToGetId.post.id - 1);
-  let betaComment = betaPost.comments[0];
-  expect(betaComment.deleted).toBe(true);
+  // Make sure that comment is undefined on beta
+  let searchBeta = await searchComment(beta, commentRes.comment);
+  let betaComment = searchBeta.comments[0];
+  expect(betaComment).toBeUndefined();
+  await delay();
 
   let undeleteCommentRes = await deleteComment(
     alpha,
@@ -103,6 +107,7 @@ test('Delete a comment', async () => {
     commentRes.comment.id
   );
   expect(undeleteCommentRes.comment.deleted).toBe(false);
+  await delay();
 
   // Make sure that comment is undeleted on beta
   let searchBeta2 = await searchComment(beta, commentRes.comment);
@@ -112,6 +117,7 @@ test('Delete a comment', async () => {
 
 test('Remove a comment from admin and community on the same instance', async () => {
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
 
   // Get the id for beta
   let betaCommentId = (await searchComment(beta, commentRes.comment))
@@ -120,6 +126,7 @@ test('Remove a comment from admin and community on the same instance', async ()
   // The beta admin removes it (the community lives on beta)
   let removeCommentRes = await removeComment(beta, true, betaCommentId);
   expect(removeCommentRes.comment.removed).toBe(true);
+  await delay();
 
   // Make sure that comment is removed on alpha (it gets pushed since an admin from beta removed it)
   let refetchedPost = await getPost(alpha, postRes.post.id);
@@ -127,6 +134,7 @@ test('Remove a comment from admin and community on the same instance', async ()
 
   let unremoveCommentRes = await removeComment(beta, false, betaCommentId);
   expect(unremoveCommentRes.comment.removed).toBe(false);
+  await delay();
 
   // Make sure that comment is unremoved on beta
   let refetchedPost2 = await getPost(alpha, postRes.post.id);
@@ -142,15 +150,19 @@ test('Remove a comment from admin and community on different instance', async ()
 
   // New alpha user creates a community, post, and comment.
   let newCommunity = await createCommunity(newAlphaApi);
+  await delay();
   let newPost = await createPost(newAlphaApi, newCommunity.community.id);
+  await delay();
   let commentRes = await createComment(newAlphaApi, newPost.post.id);
   expect(commentRes.comment.content).toBeDefined();
+  await delay();
 
   // Beta searches that to cache it, then removes it
   let searchBeta = await searchComment(beta, commentRes.comment);
   let betaComment = searchBeta.comments[0];
   let removeCommentRes = await removeComment(beta, true, betaComment.id);
   expect(removeCommentRes.comment.removed).toBe(true);
+  await delay();
 
   // Make sure its not removed on alpha
   let refetchedPost = await getPost(newAlphaApi, newPost.post.id);
@@ -159,8 +171,10 @@ test('Remove a comment from admin and community on different instance', async ()
 
 test('Unlike a comment', async () => {
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
   let unlike = await likeComment(alpha, 0, commentRes.comment);
   expect(unlike.comment.score).toBe(0);
+  await delay();
 
   // Make sure that post is unliked on beta
   let searchBeta = await searchComment(beta, commentRes.comment);
@@ -173,6 +187,7 @@ test('Unlike a comment', async () => {
 
 test('Federated comment like', async () => {
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
 
   // Find the comment on beta
   let searchBeta = await searchComment(beta, commentRes.comment);
@@ -180,6 +195,7 @@ test('Federated comment like', async () => {
 
   let like = await likeComment(beta, 1, betaComment);
   expect(like.comment.score).toBe(2);
+  await delay();
 
   // Get the post from alpha, check the likes
   let post = await getPost(alpha, postRes.post.id);
@@ -189,6 +205,7 @@ test('Federated comment like', async () => {
 test('Reply to a comment', async () => {
   // Create a comment on alpha, find it on beta
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
   let searchBeta = await searchComment(beta, commentRes.comment);
   let betaComment = searchBeta.comments[0];
 
@@ -201,6 +218,7 @@ test('Reply to a comment', async () => {
   expect(replyRes.comment.creator_local).toBe(true);
   expect(replyRes.comment.parent_id).toBe(betaComment.id);
   expect(replyRes.comment.score).toBe(1);
+  await delay();
 
   // Make sure that comment is seen on alpha
   // TODO not sure why, but a searchComment back to alpha, for the ap_id of betas
@@ -219,6 +237,7 @@ test('Mention beta', async () => {
   // Create a mention on alpha
   let mentionContent = 'A test mention of @lemmy_beta@lemmy-beta:8550';
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
   let mentionRes = await createComment(
     alpha,
     postRes.post.id,
@@ -229,6 +248,7 @@ test('Mention beta', async () => {
   expect(mentionRes.comment.community_local).toBe(false);
   expect(mentionRes.comment.creator_local).toBe(true);
   expect(mentionRes.comment.score).toBe(1);
+  await delay();
 
   let mentionsRes = await getMentions(beta);
   expect(mentionsRes.mentions[0].content).toBeDefined();
@@ -239,6 +259,7 @@ test('Mention beta', async () => {
 
 test('Comment Search', async () => {
   let commentRes = await createComment(alpha, postRes.post.id);
+  await delay();
   let searchBeta = await searchComment(beta, commentRes.comment);
   expect(searchBeta.comments[0].ap_id).toBe(commentRes.comment.ap_id);
 });
@@ -247,6 +268,7 @@ test('A and G subscribe to B (center) A posts, G mentions B, it gets announced t
   // Create a local post
   let alphaPost = await createPost(alpha, 2);
   expect(alphaPost.post.community_local).toBe(true);
+  await delay();
 
   // Make sure gamma sees it
   let search = await searchPost(gamma, alphaPost.post);
@@ -264,6 +286,7 @@ test('A and G subscribe to B (center) A posts, G mentions B, it gets announced t
   expect(commentRes.comment.community_local).toBe(false);
   expect(commentRes.comment.creator_local).toBe(true);
   expect(commentRes.comment.score).toBe(1);
+  await delay();
 
   // Make sure alpha sees it
   let alphaPost2 = await getPost(alpha, alphaPost.post.id);
@@ -291,6 +314,7 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
   // B creates a post, and two comments, should be invisible to A
   let postRes = await createPost(beta, 2);
   expect(postRes.post.name).toBeDefined();
+  await delay();
 
   let parentCommentContent = 'An invisible top level comment from beta';
   let parentCommentRes = await createComment(
@@ -300,6 +324,7 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
     parentCommentContent
   );
   expect(parentCommentRes.comment.content).toBe(parentCommentContent);
+  await delay();
 
   // B creates a comment, then a child one of that.
   let childCommentContent = 'An invisible child comment from beta';
@@ -310,11 +335,13 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
     childCommentContent
   );
   expect(childCommentRes.comment.content).toBe(childCommentContent);
+  await delay();
 
   // Follow beta again
   let follow = await followBeta(alpha);
   expect(follow.community.local).toBe(false);
   expect(follow.community.name).toBe('main');
+  await delay();
 
   // An update to the child comment on beta, should push the post, parent, and child to alpha now
   let updatedCommentContent = 'An update child comment from beta';
@@ -324,10 +351,14 @@ test('Fetch in_reply_tos: A is unsubbed from B, B makes a post, and some embedde
     updatedCommentContent
   );
   expect(updateRes.comment.content).toBe(updatedCommentContent);
+  await delay();
 
   // Get the post from alpha
-  let createFakeAlphaPostToGetId = await createPost(alpha, 2);
-  let alphaPost = await getPost(alpha, createFakeAlphaPostToGetId.post.id - 1);
+  let search = await searchPost(alpha, postRes.post);
+  let alphaPostB = search.posts[0];
+  await delay();
+
+  let alphaPost = await getPost(alpha, alphaPostB.id);
   expect(alphaPost.post.name).toBeDefined();
   expect(alphaPost.comments[1].content).toBe(parentCommentContent);
   expect(alphaPost.comments[0].content).toBe(updatedCommentContent);
index 6945e3323921d72f4eb1c92e9417dfcd14fa510f..bd498009a98a43b589ec684252d9e8bfc2a75efe 100644 (file)
@@ -6,6 +6,7 @@ import {
   createCommunity,
   deleteCommunity,
   removeCommunity,
+  delay,
 } from './shared';
 
 beforeAll(async () => {
@@ -24,12 +25,14 @@ test('Create community', async () => {
 
 test('Delete community', async () => {
   let communityRes = await createCommunity(beta);
+  await delay();
   let deleteCommunityRes = await deleteCommunity(
     beta,
     true,
     communityRes.community.id
   );
   expect(deleteCommunityRes.community.deleted).toBe(true);
+  await delay();
 
   // Make sure it got deleted on A
   let search = await searchForBetaCommunity(alpha);
@@ -44,6 +47,7 @@ test('Delete community', async () => {
     communityRes.community.id
   );
   expect(undeleteCommunityRes.community.deleted).toBe(false);
+  await delay();
 
   // Make sure it got undeleted on A
   let search2 = await searchForBetaCommunity(alpha);
@@ -54,6 +58,7 @@ test('Delete community', async () => {
 
 test('Remove community', async () => {
   let communityRes = await createCommunity(beta);
+  await delay();
   let removeCommunityRes = await removeCommunity(
     beta,
     true,
@@ -66,6 +71,7 @@ test('Remove community', async () => {
   let communityA = search.communities[0];
   // TODO this fails currently, because no updates are pushed
   // expect(communityA.removed).toBe(true);
+  await delay();
 
   // unremove
   let unremoveCommunityRes = await removeCommunity(
@@ -74,6 +80,7 @@ test('Remove community', async () => {
     communityRes.community.id
   );
   expect(unremoveCommunityRes.community.removed).toBe(false);
+  await delay();
 
   // Make sure it got unremoved on A
   let search2 = await searchForBetaCommunity(alpha);
index 2f1f8cd89c2ae512490b2c99f47dd8492dce9df3..41af6effb46922399f67a7a665b952e76e64cc9e 100644 (file)
@@ -5,6 +5,7 @@ import {
   followCommunity,
   checkFollowedCommunities,
   unfollowRemotes,
+  delay,
 } from './shared';
 
 beforeAll(async () => {
@@ -22,6 +23,7 @@ test('Follow federated community', async () => {
   // Make sure the follow response went through
   expect(follow.community.local).toBe(false);
   expect(follow.community.name).toBe('main');
+  await delay();
 
   // Check it from local
   let followCheck = await checkFollowedCommunities(alpha);
@@ -33,6 +35,7 @@ test('Follow federated community', async () => {
   // Test an unfollow
   let unfollow = await followCommunity(alpha, false, remoteCommunityId);
   expect(unfollow.community.local).toBe(false);
+  await delay();
 
   // Make sure you are unsubbed locally
   let unfollowCheck = await checkFollowedCommunities(alpha);
index ab9c63fb16c82895396c1d367f45f922312afc2d..c2cbad6d5799dbe656df56912381fa2b18eca7a2 100644 (file)
@@ -1,3 +1,4 @@
+jest.setTimeout(120000);
 import {
   alpha,
   beta,
@@ -18,6 +19,7 @@ import {
   removePost,
   getPost,
   unfollowRemotes,
+  delay,
 } from './shared';
 
 beforeAll(async () => {
@@ -26,6 +28,7 @@ beforeAll(async () => {
   await followBeta(gamma);
   await followBeta(delta);
   await followBeta(epsilon);
+  await delay(10000);
 });
 
 afterAll(async () => {
@@ -37,11 +40,13 @@ afterAll(async () => {
 
 test('Create a post', async () => {
   let search = await searchForBetaCommunity(alpha);
+  await delay();
   let postRes = await createPost(alpha, search.communities[0].id);
   expect(postRes.post).toBeDefined();
   expect(postRes.post.community_local).toBe(false);
   expect(postRes.post.creator_local).toBe(true);
   expect(postRes.post.score).toBe(1);
+  await delay();
 
   // Make sure that post is liked on beta
   let searchBeta = await searchPost(beta, postRes.post);
@@ -69,12 +74,15 @@ test('Create a post in a non-existent community', async () => {
 test('Unlike a post', async () => {
   let search = await searchForBetaCommunity(alpha);
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
   let unlike = await likePost(alpha, 0, postRes.post);
   expect(unlike.post.score).toBe(0);
+  await delay();
 
   // Try to unlike it again, make sure it stays at 0
   let unlike2 = await likePost(alpha, 0, postRes.post);
   expect(unlike2.post.score).toBe(0);
+  await delay();
 
   // Make sure that post is unliked on beta
   let searchBeta = await searchPost(beta, postRes.post);
@@ -89,12 +97,14 @@ test('Unlike a post', async () => {
 test('Update a post', async () => {
   let search = await searchForBetaCommunity(alpha);
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
 
   let updatedName = 'A jest test federated post, updated';
   let updatedPost = await updatePost(alpha, postRes.post);
   expect(updatedPost.post.name).toBe(updatedName);
   expect(updatedPost.post.community_local).toBe(false);
   expect(updatedPost.post.creator_local).toBe(true);
+  await delay();
 
   // Make sure that post is updated on beta
   let searchBeta = await searchPost(beta, postRes.post);
@@ -102,6 +112,7 @@ test('Update a post', async () => {
   expect(betaPost.community_local).toBe(true);
   expect(betaPost.creator_local).toBe(false);
   expect(betaPost.name).toBe(updatedName);
+  await delay();
 
   // Make sure lemmy beta cannot update the post
   let updatedPostBeta = await updatePost(beta, betaPost);
@@ -111,9 +122,11 @@ test('Update a post', async () => {
 test('Sticky a post', async () => {
   let search = await searchForBetaCommunity(alpha);
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
 
   let stickiedPostRes = await stickyPost(alpha, true, postRes.post);
   expect(stickiedPostRes.post.stickied).toBe(true);
+  await delay();
 
   // Make sure that post is stickied on beta
   let searchBeta = await searchPost(beta, postRes.post);
@@ -125,6 +138,7 @@ test('Sticky a post', async () => {
   // Unsticky a post
   let unstickiedPost = await stickyPost(alpha, false, postRes.post);
   expect(unstickiedPost.post.stickied).toBe(false);
+  await delay();
 
   // Make sure that post is unstickied on beta
   let searchBeta2 = await searchPost(beta, postRes.post);
@@ -137,6 +151,7 @@ test('Sticky a post', async () => {
   let searchGamma = await searchPost(gamma, postRes.post);
   let gammaPost = searchGamma.posts[0];
   let gammaTrySticky = await stickyPost(gamma, true, gammaPost);
+  await delay();
   let searchBeta3 = await searchPost(beta, postRes.post);
   let betaPost3 = searchBeta3.posts[0];
   expect(gammaTrySticky.post.stickied).toBe(true);
@@ -145,10 +160,13 @@ test('Sticky a post', async () => {
 
 test('Lock a post', async () => {
   let search = await searchForBetaCommunity(alpha);
+  await delay();
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
 
   let lockedPostRes = await lockPost(alpha, true, postRes.post);
   expect(lockedPostRes.post.locked).toBe(true);
+  await delay();
 
   // Make sure that post is locked on beta
   let searchBeta = await searchPost(beta, postRes.post);
@@ -160,14 +178,17 @@ test('Lock a post', async () => {
   // Try to make a new comment there, on alpha
   let comment = await createComment(alpha, postRes.post.id);
   expect(comment['error']).toBe('locked');
+  await delay();
 
   // Try to create a new comment, on beta
   let commentBeta = await createComment(beta, betaPost.id);
   expect(commentBeta['error']).toBe('locked');
+  await delay();
 
   // Unlock a post
   let unlockedPost = await lockPost(alpha, false, postRes.post);
   expect(unlockedPost.post.locked).toBe(false);
+  await delay();
 
   // Make sure that post is unlocked on beta
   let searchBeta2 = await searchPost(beta, postRes.post);
@@ -180,68 +201,84 @@ test('Lock a post', async () => {
 test('Delete a post', async () => {
   let search = await searchForBetaCommunity(alpha);
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
 
   let deletedPost = await deletePost(alpha, true, postRes.post);
   expect(deletedPost.post.deleted).toBe(true);
+  await delay();
 
   // Make sure lemmy beta sees post is deleted
-  let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1;
-  let betaPost = await getPost(beta, createFakeBetaPostToGetId);
-  expect(betaPost.post.deleted).toBe(true);
+  let searchBeta = await searchPost(beta, postRes.post);
+  let betaPost = searchBeta.posts[0];
+  // This will be undefined because of the tombstone
+  expect(betaPost).toBeUndefined();
+  await delay();
 
   // Undelete
   let undeletedPost = await deletePost(alpha, false, postRes.post);
   expect(undeletedPost.post.deleted).toBe(false);
+  await delay();
 
   // Make sure lemmy beta sees post is undeleted
-  let betaPost2 = await getPost(beta, createFakeBetaPostToGetId);
-  expect(betaPost2.post.deleted).toBe(false);
+  let searchBeta2 = await searchPost(beta, postRes.post);
+  let betaPost2 = searchBeta2.posts[0];
+  expect(betaPost2.deleted).toBe(false);
 
   // Make sure lemmy beta cannot delete the post
-  let deletedPostBeta = await deletePost(beta, true, betaPost2.post);
+  let deletedPostBeta = await deletePost(beta, true, betaPost2);
   expect(deletedPostBeta).toStrictEqual({ error: 'no_post_edit_allowed' });
 });
 
 test('Remove a post from admin and community on different instance', async () => {
   let search = await searchForBetaCommunity(alpha);
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
 
   let removedPost = await removePost(alpha, true, postRes.post);
   expect(removedPost.post.removed).toBe(true);
+  await delay();
 
   // Make sure lemmy beta sees post is NOT removed
-  let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1;
-  let betaPost = await getPost(beta, createFakeBetaPostToGetId);
-  expect(betaPost.post.removed).toBe(false);
+  let searchBeta = await searchPost(beta, postRes.post);
+  let betaPost = searchBeta.posts[0];
+  expect(betaPost.removed).toBe(false);
+  await delay();
 
   // Undelete
   let undeletedPost = await removePost(alpha, false, postRes.post);
   expect(undeletedPost.post.removed).toBe(false);
+  await delay();
 
   // Make sure lemmy beta sees post is undeleted
-  let betaPost2 = await getPost(beta, createFakeBetaPostToGetId);
-  expect(betaPost2.post.removed).toBe(false);
+  let searchBeta2 = await searchPost(beta, postRes.post);
+  let betaPost2 = searchBeta2.posts[0];
+  expect(betaPost2.removed).toBe(false);
 });
 
 test('Remove a post from admin and community on same instance', async () => {
   let search = await searchForBetaCommunity(alpha);
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
 
   // Get the id for beta
-  let createFakeBetaPostToGetId = (await createPost(beta, 2)).post.id - 1;
-  let betaPost = await getPost(beta, createFakeBetaPostToGetId);
+  let searchBeta = await searchPost(beta, postRes.post);
+  let betaPost = searchBeta.posts[0];
+  await delay();
 
   // The beta admin removes it (the community lives on beta)
-  let removePostRes = await removePost(beta, true, betaPost.post);
+  let removePostRes = await removePost(beta, true, betaPost);
   expect(removePostRes.post.removed).toBe(true);
+  await delay();
 
   // Make sure lemmy alpha sees post is removed
   let alphaPost = await getPost(alpha, postRes.post.id);
   expect(alphaPost.post.removed).toBe(true);
+  await delay();
 
   // Undelete
-  let undeletedPost = await removePost(beta, false, betaPost.post);
+  let undeletedPost = await removePost(beta, false, betaPost);
   expect(undeletedPost.post.removed).toBe(false);
+  await delay();
 
   // Make sure lemmy alpha sees post is undeleted
   let alphaPost2 = await getPost(alpha, postRes.post.id);
@@ -250,7 +287,9 @@ test('Remove a post from admin and community on same instance', async () => {
 
 test('Search for a post', async () => {
   let search = await searchForBetaCommunity(alpha);
+  await delay();
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
   let searchBeta = await searchPost(beta, postRes.post);
 
   expect(searchBeta.posts[0].name).toBeDefined();
@@ -259,6 +298,7 @@ test('Search for a post', async () => {
 test('A and G subscribe to B (center) A posts, it gets announced to G', async () => {
   let search = await searchForBetaCommunity(alpha);
   let postRes = await createPost(alpha, search.communities[0].id);
+  await delay();
 
   let search2 = await searchPost(gamma, postRes.post);
   expect(search2.posts[0].name).toBeDefined();
index 4bf3f07a2baad94fa76121620fb4b873172f1154..78d467da98fc987216e54ccc26d30056760190a2 100644 (file)
@@ -8,13 +8,16 @@ import {
   listPrivateMessages,
   deletePrivateMessage,
   unfollowRemotes,
+  delay,
 } from './shared';
 
 let recipient_id: number;
 
 beforeAll(async () => {
   await setupLogins();
-  recipient_id = (await followBeta(alpha)).community.creator_id;
+  let follow = await followBeta(alpha);
+  await delay(10000);
+  recipient_id = follow.community.creator_id;
 });
 
 afterAll(async () => {
@@ -27,6 +30,7 @@ test('Create a private message', async () => {
   expect(pmRes.message.local).toBe(true);
   expect(pmRes.message.creator_local).toBe(true);
   expect(pmRes.message.recipient_local).toBe(false);
+  await delay();
 
   let betaPms = await listPrivateMessages(beta);
   expect(betaPms.messages[0].content).toBeDefined();
@@ -41,6 +45,7 @@ test('Update a private message', async () => {
   let pmRes = await createPrivateMessage(alpha, recipient_id);
   let pmUpdated = await updatePrivateMessage(alpha, pmRes.message.id);
   expect(pmUpdated.message.content).toBe(updatedContent);
+  await delay();
 
   let betaPms = await listPrivateMessages(beta);
   expect(betaPms.messages[0].content).toBe(updatedContent);
@@ -48,15 +53,18 @@ test('Update a private message', async () => {
 
 test('Delete a private message', async () => {
   let pmRes = await createPrivateMessage(alpha, recipient_id);
+  await delay();
   let betaPms1 = await listPrivateMessages(beta);
   let deletedPmRes = await deletePrivateMessage(alpha, true, pmRes.message.id);
   expect(deletedPmRes.message.deleted).toBe(true);
+  await delay();
 
   // The GetPrivateMessages filters out deleted,
   // even though they are in the actual database.
   // no reason to show them
   let betaPms2 = await listPrivateMessages(beta);
   expect(betaPms2.messages.length).toBe(betaPms1.messages.length - 1);
+  await delay();
 
   // Undelete
   let undeletedPmRes = await deletePrivateMessage(
@@ -65,6 +73,7 @@ test('Delete a private message', async () => {
     pmRes.message.id
   );
   expect(undeletedPmRes.message.deleted).toBe(false);
+  await delay();
 
   let betaPms3 = await listPrivateMessages(beta);
   expect(betaPms3.messages.length).toBe(betaPms1.messages.length);
index 710671c0e089fe469ce2c3dd1d23b7863786a83a..eb4c6da03a5d72631544b742cd0b72c4f8f04a5f 100644 (file)
@@ -198,7 +198,7 @@ export async function searchPost(
 ): Promise<SearchResponse> {
   let form: SearchForm = {
     q: post.ap_id,
-    type_: SearchType.All,
+    type_: SearchType.Posts,
     sort: SortType.TopAll,
   };
   return api.client.search(form);
@@ -220,7 +220,7 @@ export async function searchComment(
 ): Promise<SearchResponse> {
   let form: SearchForm = {
     q: comment.ap_id,
-    type_: SearchType.All,
+    type_: SearchType.Comments,
     sort: SortType.TopAll,
   };
   return api.client.search(form);
@@ -233,7 +233,7 @@ export async function searchForBetaCommunity(
   // Use short-hand search url
   let form: SearchForm = {
     q: '!main@lemmy-beta:8550',
-    type_: SearchType.All,
+    type_: SearchType.Communities,
     sort: SortType.TopAll,
   };
   return api.client.search(form);
@@ -247,7 +247,7 @@ export async function searchForUser(
   // Use short-hand search url
   let form: SearchForm = {
     q: apShortname,
-    type_: SearchType.All,
+    type_: SearchType.Users,
     sort: SortType.TopAll,
   };
   return api.client.search(form);
@@ -524,6 +524,11 @@ export async function followBeta(api: API): Promise<CommunityResponse> {
   }
 }
 
+export const delay = (millis: number = 1500) =>
+  new Promise((resolve, _reject) => {
+    setTimeout(_ => resolve(), millis);
+  });
+
 export function wrapper(form: any): string {
   return JSON.stringify(form);
 }