]> Untitled Git - lemmy.git/blob - crates/apub/src/fetcher/community.rs
cb9ec865153cf51c99538987295810e9ae28165f
[lemmy.git] / crates / apub / src / fetcher / community.rs
1 use crate::{
2   fetcher::{
3     fetch::fetch_remote_object,
4     get_or_fetch_and_upsert_user,
5     is_deleted,
6     should_refetch_actor,
7   },
8   inbox::user_inbox::receive_announce,
9   objects::FromApub,
10   GroupExt,
11 };
12 use activitystreams::{
13   actor::ApActorExt,
14   collection::{CollectionExt, OrderedCollection},
15   object::ObjectExt,
16 };
17 use anyhow::Context;
18 use diesel::result::Error::NotFound;
19 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
20 use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm};
21 use lemmy_structs::blocking;
22 use lemmy_utils::{location_info, LemmyError};
23 use lemmy_websocket::LemmyContext;
24 use log::debug;
25 use url::Url;
26
27 /// Get a community from its apub ID.
28 ///
29 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
30 /// Otherwise it is fetched from the remote instance, stored and returned.
31 pub(crate) async fn get_or_fetch_and_upsert_community(
32   apub_id: &Url,
33   context: &LemmyContext,
34   recursion_counter: &mut i32,
35 ) -> Result<Community, LemmyError> {
36   let apub_id_owned = apub_id.to_owned();
37   let community = blocking(context.pool(), move |conn| {
38     Community::read_from_apub_id(conn, &apub_id_owned.into())
39   })
40   .await?;
41
42   match community {
43     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
44       debug!("Fetching and updating from remote community: {}", apub_id);
45       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
46     }
47     Ok(c) => Ok(c),
48     Err(NotFound {}) => {
49       debug!("Fetching and creating remote community: {}", apub_id);
50       fetch_remote_community(apub_id, context, None, recursion_counter).await
51     }
52     Err(e) => Err(e.into()),
53   }
54 }
55
56 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
57 /// is set, this is an update for a community which is already known locally. If not, we don't know
58 /// the community yet and also pull the outbox, to get some initial posts.
59 async fn fetch_remote_community(
60   apub_id: &Url,
61   context: &LemmyContext,
62   old_community: Option<Community>,
63   recursion_counter: &mut i32,
64 ) -> Result<Community, LemmyError> {
65   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
66
67   if let Some(c) = old_community.to_owned() {
68     if is_deleted(&group) {
69       blocking(context.pool(), move |conn| {
70         Community::update_deleted(conn, c.id, true)
71       })
72       .await??;
73     } else if group.is_err() {
74       // If fetching failed, return the existing data.
75       return Ok(c);
76     }
77   }
78
79   let group = group?;
80   let community =
81     Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
82
83   // Also add the community moderators too
84   let attributed_to = group.inner.attributed_to().context(location_info!())?;
85   let creator_and_moderator_uris: Vec<&Url> = attributed_to
86     .as_many()
87     .context(location_info!())?
88     .iter()
89     .map(|a| a.as_xsd_any_uri().context(""))
90     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
91
92   let mut creator_and_moderators = Vec::new();
93
94   for uri in creator_and_moderator_uris {
95     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
96
97     creator_and_moderators.push(c_or_m);
98   }
99
100   // TODO: need to make this work to update mods of existing communities
101   if old_community.is_none() {
102     let community_id = community.id;
103     blocking(context.pool(), move |conn| {
104       for mod_ in creator_and_moderators {
105         let community_moderator_form = CommunityModeratorForm {
106           community_id,
107           user_id: mod_.id,
108         };
109
110         CommunityModerator::join(conn, &community_moderator_form)?;
111       }
112       Ok(()) as Result<(), LemmyError>
113     })
114     .await??;
115   }
116
117   // only fetch outbox for new communities, otherwise this can create an infinite loop
118   if old_community.is_none() {
119     let outbox = group.inner.outbox()?.context(location_info!())?;
120     fetch_community_outbox(context, outbox, &community, recursion_counter).await?
121   }
122
123   Ok(community)
124 }
125
126 async fn fetch_community_outbox(
127   context: &LemmyContext,
128   outbox: &Url,
129   community: &Community,
130   recursion_counter: &mut i32,
131 ) -> Result<(), LemmyError> {
132   let outbox =
133     fetch_remote_object::<OrderedCollection>(context.client(), outbox, recursion_counter).await?;
134   let outbox_activities = outbox.items().context(location_info!())?.clone();
135   let mut outbox_activities = outbox_activities.many().context(location_info!())?;
136   if outbox_activities.len() > 20 {
137     outbox_activities = outbox_activities[0..20].to_vec();
138   }
139
140   for activity in outbox_activities {
141     receive_announce(context, activity, community, recursion_counter).await?;
142   }
143
144   Ok(())
145 }