]> Untitled Git - lemmy.git/commitdiff
Serve activities in community outbox (fixes #1216)
authorFelix Ableitner <me@nutomic.com>
Wed, 27 Jan 2021 14:02:28 +0000 (15:02 +0100)
committerFelix Ableitner <me@nutomic.com>
Fri, 29 Jan 2021 13:48:42 +0000 (14:48 +0100)
Cargo.lock
crates/apub/src/fetcher/community.rs
crates/apub/src/http/community.rs
crates/apub/src/inbox/user_inbox.rs
crates/db_queries/Cargo.toml
crates/db_queries/src/source/activity.rs

index e43b9219cf7fcba85aeff9e4dee115ed53d91f39..f15cf8e88a806c39ada7eef3bffe0ab886f85456 100644 (file)
@@ -1023,6 +1023,17 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "diesel_json"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2812f0f63b6d3508fb7bfdb872c2dc2321ba938f5e0f4cb9751ec899e8b297c9"
+dependencies = [
+ "diesel",
+ "serde 1.0.118",
+ "serde_json",
+]
+
 [[package]]
 name = "diesel_migrations"
 version = "1.4.0"
@@ -1794,6 +1805,7 @@ dependencies = [
  "bcrypt",
  "chrono",
  "diesel",
+ "diesel_json",
  "diesel_migrations",
  "lazy_static",
  "lemmy_db_schema",
index 0249dee1ebecbe931c45de6a958b2c62c50b40ad..7547e0dbd669159532aa580a3feb0368fa86ee17 100644 (file)
@@ -1,28 +1,23 @@
 use crate::{
-  check_is_apub_id_valid,
   fetcher::{
     fetch::fetch_remote_object,
     get_or_fetch_and_upsert_user,
     is_deleted,
     should_refetch_actor,
   },
+  inbox::user_inbox::receive_announce,
   objects::FromApub,
   ActorType,
   GroupExt,
-  PageExt,
 };
 use activitystreams::{
-  base::{BaseExt, ExtendsExt},
   collection::{CollectionExt, OrderedCollection},
   object::ObjectExt,
 };
 use anyhow::Context;
 use diesel::result::Error::NotFound;
 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
-use lemmy_db_schema::source::{
-  community::{Community, CommunityModerator, CommunityModeratorForm},
-  post::Post,
-};
+use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
 use lemmy_structs::blocking;
 use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::LemmyContext;
@@ -119,29 +114,34 @@ async fn fetch_remote_community(
     .await??;
   }
 
-  // fetch outbox (maybe make this conditional)
+  // only fetch outbox for new communities, otherwise this can create an infinite loop
+  if old_community.is_none() {
+    fetch_community_outbox(context, &community, recursion_counter).await?
+  }
+
+  Ok(community)
+}
+
+async fn fetch_community_outbox(
+  context: &LemmyContext,
+  community: &Community,
+  recursion_counter: &mut i32,
+) -> Result<(), LemmyError> {
   let outbox = fetch_remote_object::<OrderedCollection>(
     context.client(),
     &community.get_outbox_url()?,
     recursion_counter,
   )
   .await?;
-  let outbox_items = outbox.items().context(location_info!())?.clone();
-  let mut outbox_items = outbox_items.many().context(location_info!())?;
-  if outbox_items.len() > 20 {
-    outbox_items = outbox_items[0..20].to_vec();
+  let outbox_activities = outbox.items().context(location_info!())?.clone();
+  let mut outbox_activities = outbox_activities.many().context(location_info!())?;
+  if outbox_activities.len() > 20 {
+    outbox_activities = outbox_activities[0..20].to_vec();
   }
-  for o in outbox_items {
-    let page = PageExt::from_any_base(o)?.context(location_info!())?;
-    let page_id = page.id_unchecked().context(location_info!())?;
 
-    // The post creator may be from a blocked instance, if it errors, then skip it
-    if check_is_apub_id_valid(page_id).is_err() {
-      continue;
-    }
-    Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
-    // TODO: we need to send a websocket update here
+  for activity in outbox_activities {
+    receive_announce(context, activity, community, recursion_counter).await?;
   }
 
-  Ok(community)
+  Ok(())
 }
index a0ec7518b6bb04f3faaa8fd69073048afb75e47f..8d6549ad67fbfdabcda6491e4038f5570ecf6ea8 100644 (file)
@@ -5,12 +5,12 @@ use crate::{
   ActorType,
 };
 use activitystreams::{
-  base::{AnyBase, BaseExt, ExtendsExt},
+  base::{AnyBase, BaseExt},
   collection::{CollectionExt, OrderedCollection, UnorderedCollection},
 };
 use actix_web::{body::Body, web, HttpResponse};
-use lemmy_db_queries::source::{community::Community_, post::Post_};
-use lemmy_db_schema::source::{community::Community, post::Post};
+use lemmy_db_queries::source::{activity::Activity_, community::Community_};
+use lemmy_db_schema::source::{activity::Activity, community::Community};
 use lemmy_db_views_actor::community_follower_view::CommunityFollowerView;
 use lemmy_structs::blocking;
 use lemmy_utils::LemmyError;
@@ -76,21 +76,20 @@ pub async fn get_apub_community_outbox(
   })
   .await??;
 
-  let community_id = community.id;
-  let posts = blocking(context.pool(), move |conn| {
-    Post::list_for_community(conn, community_id)
+  let community_actor_id = community.actor_id.to_owned();
+  let activities = blocking(context.pool(), move |conn| {
+    Activity::read_community_outbox(conn, &community_actor_id)
   })
   .await??;
 
-  let mut pages: Vec<AnyBase> = vec![];
-  for p in posts {
-    pages.push(p.to_apub(context.pool()).await?.into_any_base()?);
-  }
-
-  let len = pages.len();
+  let activities = activities
+    .iter()
+    .map(AnyBase::from_arbitrary_json)
+    .collect::<Result<Vec<AnyBase>, serde_json::Error>>()?;
+  let len = activities.len();
   let mut collection = OrderedCollection::new();
   collection
-    .set_many_items(pages)
+    .set_many_items(activities)
     .set_many_contexts(lemmy_context()?)
     .set_id(community.get_outbox_url()?)
     .set_total_items(len as u64);
index 6496a60a019c6f44628949698ef7e568fda392eb..7b90fafad18ded6dc847f128a283249d79c4c0e8 100644 (file)
@@ -236,7 +236,7 @@ async fn receive_accept(
 }
 
 /// Takes an announce and passes the inner activity to the appropriate handler.
-async fn receive_announce(
+pub async fn receive_announce(
   context: &LemmyContext,
   activity: AnyBase,
   actor: &dyn ActorType,
index 5385854cb7e5f85824bc5d24f310fbce0b1d8708..42e159fba6677f7abc7d5d514e179645d79b4843 100644 (file)
@@ -23,3 +23,4 @@ url = { version = "2.2.0", features = ["serde"] }
 lazy_static = "1.4.0"
 regex = "1.4.2"
 bcrypt = "0.9.0"
+diesel_json = "0.1.1"
index 662db3aedc38645febfe9eded1ead44ce45ca0d1..964e50424e221cd46b9f174d6c7eaf403fc426cd 100644 (file)
@@ -1,8 +1,9 @@
 use crate::Crud;
-use diesel::{dsl::*, result::Error, *};
-use lemmy_db_schema::source::activity::*;
+use diesel::{dsl::*, result::Error, sql_types::Text, *};
+use lemmy_db_schema::{source::activity::*, Url};
 use log::debug;
 use serde::Serialize;
+use serde_json::Value;
 use std::{
   fmt::Debug,
   io::{Error as IoError, ErrorKind},
@@ -47,7 +48,14 @@ pub trait Activity_ {
   ) -> Result<Activity, IoError>
   where
     T: Serialize + Debug;
+
   fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
+
+  /// Returns up to 20 activities of type `Announce/Create/Page` from the community
+  fn read_community_outbox(
+    conn: &PgConnection,
+    community_actor_id: &Url,
+  ) -> Result<Vec<Value>, Error>;
 }
 
 impl Activity_ for Activity {
@@ -83,6 +91,25 @@ impl Activity_ for Activity {
     use lemmy_db_schema::schema::activity::dsl::*;
     activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
   }
+
+  fn read_community_outbox(
+    conn: &PgConnection,
+    community_actor_id: &Url,
+  ) -> Result<Vec<Value>, Error> {
+    use lemmy_db_schema::schema::activity::dsl::*;
+    let res: Vec<Value> = activity
+      .select(data)
+      .filter(
+        sql("activity.data ->> 'type' = 'Announce'")
+          .sql(" AND activity.data -> 'object' ->> 'type' = 'Create'")
+          .sql(" AND activity.data -> 'object' -> 'object' ->> 'type' = 'Page'")
+          .sql(" AND activity.data ->> 'actor' = ")
+          .bind::<Text, _>(community_actor_id),
+      )
+      .limit(20)
+      .get_results(conn)?;
+    Ok(res)
+  }
 }
 
 #[cfg(test)]