3 fetch::fetch_remote_object,
4 get_or_fetch_and_upsert_user,
8 inbox::user_inbox::receive_announce,
13 use activitystreams::{
14 collection::{CollectionExt, OrderedCollection},
18 use diesel::result::Error::NotFound;
19 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
20 use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
21 use lemmy_structs::blocking;
22 use lemmy_utils::{location_info, LemmyError};
23 use lemmy_websocket::LemmyContext;
27 /// Get a community from its apub ID.
29 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
30 /// Otherwise it is fetched from the remote instance, stored and returned.
31 pub(crate) async fn get_or_fetch_and_upsert_community(
33 context: &LemmyContext,
34 recursion_counter: &mut i32,
35 ) -> Result<Community, LemmyError> {
36 let apub_id_owned = apub_id.to_owned();
37 let community = blocking(context.pool(), move |conn| {
38 Community::read_from_apub_id(conn, &apub_id_owned.into())
43 Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
44 debug!("Fetching and updating from remote community: {}", apub_id);
45 fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
49 debug!("Fetching and creating remote community: {}", apub_id);
50 fetch_remote_community(apub_id, context, None, recursion_counter).await
52 Err(e) => Err(e.into()),
56 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
57 /// is set, this is an update for a community which is already known locally. If not, we don't know
58 /// the community yet and also pull the outbox, to get some initial posts.
59 async fn fetch_remote_community(
61 context: &LemmyContext,
62 old_community: Option<Community>,
63 recursion_counter: &mut i32,
64 ) -> Result<Community, LemmyError> {
65 let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
67 if let Some(c) = old_community.to_owned() {
68 if is_deleted(&group) {
69 blocking(context.pool(), move |conn| {
70 Community::update_deleted(conn, c.id, true)
73 } else if group.is_err() {
74 // If fetching failed, return the existing data.
81 Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
83 // Also add the community moderators too
84 let attributed_to = group.inner.attributed_to().context(location_info!())?;
85 let creator_and_moderator_uris: Vec<&Url> = attributed_to
87 .context(location_info!())?
89 .map(|a| a.as_xsd_any_uri().context(""))
90 .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
92 let mut creator_and_moderators = Vec::new();
94 for uri in creator_and_moderator_uris {
95 let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
97 creator_and_moderators.push(c_or_m);
100 // TODO: need to make this work to update mods of existing communities
101 if old_community.is_none() {
102 let community_id = community.id;
103 blocking(context.pool(), move |conn| {
104 for mod_ in creator_and_moderators {
105 let community_moderator_form = CommunityModeratorForm {
110 CommunityModerator::join(conn, &community_moderator_form)?;
112 Ok(()) as Result<(), LemmyError>
117 // only fetch outbox for new communities, otherwise this can create an infinite loop
118 if old_community.is_none() {
119 fetch_community_outbox(context, &community, recursion_counter).await?
125 async fn fetch_community_outbox(
126 context: &LemmyContext,
127 community: &Community,
128 recursion_counter: &mut i32,
129 ) -> Result<(), LemmyError> {
130 let outbox = fetch_remote_object::<OrderedCollection>(
132 &community.get_outbox_url()?,
136 let outbox_activities = outbox.items().context(location_info!())?.clone();
137 let mut outbox_activities = outbox_activities.many().context(location_info!())?;
138 if outbox_activities.len() > 20 {
139 outbox_activities = outbox_activities[0..20].to_vec();
142 for activity in outbox_activities {
143 receive_announce(context, activity, community, recursion_counter).await?;