"syn",
]
+[[package]]
+name = "diesel_json"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2812f0f63b6d3508fb7bfdb872c2dc2321ba938f5e0f4cb9751ec899e8b297c9"
+dependencies = [
+ "diesel",
+ "serde 1.0.118",
+ "serde_json",
+]
+
[[package]]
name = "diesel_migrations"
version = "1.4.0"
"bcrypt",
"chrono",
"diesel",
+ "diesel_json",
"diesel_migrations",
"lazy_static",
"lemmy_db_schema",
use crate::{
- check_is_apub_id_valid,
fetcher::{
fetch::fetch_remote_object,
get_or_fetch_and_upsert_user,
is_deleted,
should_refetch_actor,
},
+ inbox::user_inbox::receive_announce,
objects::FromApub,
ActorType,
GroupExt,
- PageExt,
};
use activitystreams::{
- base::{BaseExt, ExtendsExt},
collection::{CollectionExt, OrderedCollection},
object::ObjectExt,
};
use anyhow::Context;
use diesel::result::Error::NotFound;
use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
-use lemmy_db_schema::source::{
- community::{Community, CommunityModerator, CommunityModeratorForm},
- post::Post,
-};
+use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
.await??;
}
- // fetch outbox (maybe make this conditional)
+ // only fetch outbox for new communities, otherwise this can create an infinite loop
+ if old_community.is_none() {
+ fetch_community_outbox(context, &community, recursion_counter).await?
+ }
+
+ Ok(community)
+}
+
+async fn fetch_community_outbox(
+ context: &LemmyContext,
+ community: &Community,
+ recursion_counter: &mut i32,
+) -> Result<(), LemmyError> {
let outbox = fetch_remote_object::<OrderedCollection>(
context.client(),
&community.get_outbox_url()?,
recursion_counter,
)
.await?;
- let outbox_items = outbox.items().context(location_info!())?.clone();
- let mut outbox_items = outbox_items.many().context(location_info!())?;
- if outbox_items.len() > 20 {
- outbox_items = outbox_items[0..20].to_vec();
+ let outbox_activities = outbox.items().context(location_info!())?.clone();
+ let mut outbox_activities = outbox_activities.many().context(location_info!())?;
+ if outbox_activities.len() > 20 {
+ outbox_activities = outbox_activities[0..20].to_vec();
}
- for o in outbox_items {
- let page = PageExt::from_any_base(o)?.context(location_info!())?;
- let page_id = page.id_unchecked().context(location_info!())?;
- // The post creator may be from a blocked instance, if it errors, then skip it
- if check_is_apub_id_valid(page_id).is_err() {
- continue;
- }
- Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
- // TODO: we need to send a websocket update here
+ for activity in outbox_activities {
+ receive_announce(context, activity, community, recursion_counter).await?;
}
- Ok(community)
+ Ok(())
}
ActorType,
};
use activitystreams::{
- base::{AnyBase, BaseExt, ExtendsExt},
+ base::{AnyBase, BaseExt},
collection::{CollectionExt, OrderedCollection, UnorderedCollection},
};
use actix_web::{body::Body, web, HttpResponse};
-use lemmy_db_queries::source::{community::Community_, post::Post_};
-use lemmy_db_schema::source::{community::Community, post::Post};
+use lemmy_db_queries::source::{activity::Activity_, community::Community_};
+use lemmy_db_schema::source::{activity::Activity, community::Community};
use lemmy_db_views_actor::community_follower_view::CommunityFollowerView;
use lemmy_structs::blocking;
use lemmy_utils::LemmyError;
})
.await??;
- let community_id = community.id;
- let posts = blocking(context.pool(), move |conn| {
- Post::list_for_community(conn, community_id)
+ let community_actor_id = community.actor_id.to_owned();
+ let activities = blocking(context.pool(), move |conn| {
+ Activity::read_community_outbox(conn, &community_actor_id)
})
.await??;
- let mut pages: Vec<AnyBase> = vec![];
- for p in posts {
- pages.push(p.to_apub(context.pool()).await?.into_any_base()?);
- }
-
- let len = pages.len();
+ let activities = activities
+ .iter()
+ .map(AnyBase::from_arbitrary_json)
+ .collect::<Result<Vec<AnyBase>, serde_json::Error>>()?;
+ let len = activities.len();
let mut collection = OrderedCollection::new();
collection
- .set_many_items(pages)
+ .set_many_items(activities)
.set_many_contexts(lemmy_context()?)
.set_id(community.get_outbox_url()?)
.set_total_items(len as u64);
}
/// Takes an announce and passes the inner activity to the appropriate handler.
-async fn receive_announce(
+pub async fn receive_announce(
context: &LemmyContext,
activity: AnyBase,
actor: &dyn ActorType,
lazy_static = "1.4.0"
regex = "1.4.2"
bcrypt = "0.9.0"
+diesel_json = "0.1.1"
use crate::Crud;
-use diesel::{dsl::*, result::Error, *};
-use lemmy_db_schema::source::activity::*;
+use diesel::{dsl::*, result::Error, sql_types::Text, *};
+use lemmy_db_schema::{source::activity::*, Url};
use log::debug;
use serde::Serialize;
+use serde_json::Value;
use std::{
fmt::Debug,
io::{Error as IoError, ErrorKind},
) -> Result<Activity, IoError>
where
T: Serialize + Debug;
+
fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
+
+ /// Returns up to 20 activities of type `Announce/Create/Page` from the community
+ fn read_community_outbox(
+ conn: &PgConnection,
+ community_actor_id: &Url,
+ ) -> Result<Vec<Value>, Error>;
}
impl Activity_ for Activity {
use lemmy_db_schema::schema::activity::dsl::*;
activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
}
+
+ fn read_community_outbox(
+ conn: &PgConnection,
+ community_actor_id: &Url,
+ ) -> Result<Vec<Value>, Error> {
+ use lemmy_db_schema::schema::activity::dsl::*;
+ let res: Vec<Value> = activity
+ .select(data)
+ .filter(
+ sql("activity.data ->> 'type' = 'Announce'")
+ .sql(" AND activity.data -> 'object' ->> 'type' = 'Create'")
+ .sql(" AND activity.data -> 'object' -> 'object' ->> 'type' = 'Page'")
+ .sql(" AND activity.data ->> 'actor' = ")
+ .bind::<Text, _>(community_actor_id),
+ )
+ .limit(20)
+ .get_results(conn)?;
+ Ok(res)
+ }
}
#[cfg(test)]