]> Untitled Git - lemmy.git/blob - crates/apub/src/fetcher/community.rs
Move most code into crates/ subfolder
[lemmy.git] / crates / apub / src / fetcher / community.rs
1 use crate::{
2   check_is_apub_id_valid,
3   fetcher::{
4     fetch::fetch_remote_object,
5     get_or_fetch_and_upsert_user,
6     is_deleted,
7     should_refetch_actor,
8   },
9   objects::FromApub,
10   ActorType,
11   GroupExt,
12   PageExt,
13 };
14 use activitystreams::{
15   base::{BaseExt, ExtendsExt},
16   collection::{CollectionExt, OrderedCollection},
17   object::ObjectExt,
18 };
19 use anyhow::Context;
20 use diesel::result::Error::NotFound;
21 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
22 use lemmy_db_schema::source::{
23   community::{Community, CommunityModerator, CommunityModeratorForm},
24   post::Post,
25 };
26 use lemmy_structs::blocking;
27 use lemmy_utils::{location_info, LemmyError};
28 use lemmy_websocket::LemmyContext;
29 use log::debug;
30 use url::Url;
31
32 /// Get a community from its apub ID.
33 ///
34 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
35 /// Otherwise it is fetched from the remote instance, stored and returned.
36 pub(crate) async fn get_or_fetch_and_upsert_community(
37   apub_id: &Url,
38   context: &LemmyContext,
39   recursion_counter: &mut i32,
40 ) -> Result<Community, LemmyError> {
41   let apub_id_owned = apub_id.to_owned();
42   let community = blocking(context.pool(), move |conn| {
43     Community::read_from_apub_id(conn, apub_id_owned.as_str())
44   })
45   .await?;
46
47   match community {
48     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
49       debug!("Fetching and updating from remote community: {}", apub_id);
50       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
51     }
52     Ok(c) => Ok(c),
53     Err(NotFound {}) => {
54       debug!("Fetching and creating remote community: {}", apub_id);
55       fetch_remote_community(apub_id, context, None, recursion_counter).await
56     }
57     Err(e) => Err(e.into()),
58   }
59 }
60
61 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
62 /// is set, this is an update for a community which is already known locally. If not, we don't know
63 /// the community yet and also pull the outbox, to get some initial posts.
64 async fn fetch_remote_community(
65   apub_id: &Url,
66   context: &LemmyContext,
67   old_community: Option<Community>,
68   recursion_counter: &mut i32,
69 ) -> Result<Community, LemmyError> {
70   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
71
72   if let Some(c) = old_community.to_owned() {
73     if is_deleted(&group) {
74       blocking(context.pool(), move |conn| {
75         Community::update_deleted(conn, c.id, true)
76       })
77       .await??;
78     } else if group.is_err() {
79       // If fetching failed, return the existing data.
80       return Ok(c);
81     }
82   }
83
84   let group = group?;
85   let community =
86     Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
87
88   // Also add the community moderators too
89   let attributed_to = group.inner.attributed_to().context(location_info!())?;
90   let creator_and_moderator_uris: Vec<&Url> = attributed_to
91     .as_many()
92     .context(location_info!())?
93     .iter()
94     .map(|a| a.as_xsd_any_uri().context(""))
95     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
96
97   let mut creator_and_moderators = Vec::new();
98
99   for uri in creator_and_moderator_uris {
100     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
101
102     creator_and_moderators.push(c_or_m);
103   }
104
105   // TODO: need to make this work to update mods of existing communities
106   if old_community.is_none() {
107     let community_id = community.id;
108     blocking(context.pool(), move |conn| {
109       for mod_ in creator_and_moderators {
110         let community_moderator_form = CommunityModeratorForm {
111           community_id,
112           user_id: mod_.id,
113         };
114
115         CommunityModerator::join(conn, &community_moderator_form)?;
116       }
117       Ok(()) as Result<(), LemmyError>
118     })
119     .await??;
120   }
121
122   // fetch outbox (maybe make this conditional)
123   let outbox = fetch_remote_object::<OrderedCollection>(
124     context.client(),
125     &community.get_outbox_url()?,
126     recursion_counter,
127   )
128   .await?;
129   let outbox_items = outbox.items().context(location_info!())?.clone();
130   let mut outbox_items = outbox_items.many().context(location_info!())?;
131   if outbox_items.len() > 20 {
132     outbox_items = outbox_items[0..20].to_vec();
133   }
134   for o in outbox_items {
135     let page = PageExt::from_any_base(o)?.context(location_info!())?;
136     let page_id = page.id_unchecked().context(location_info!())?;
137
138     // The post creator may be from a blocked instance, if it errors, then skip it
139     if check_is_apub_id_valid(page_id).is_err() {
140       continue;
141     }
142     Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
143     // TODO: we need to send a websocket update here
144   }
145
146   Ok(community)
147 }