]> Untitled Git - lemmy.git/blob - lemmy_apub/src/fetcher.rs
Fix nginx config for local federation setup (#104)
[lemmy.git] / lemmy_apub / src / fetcher.rs
1 use crate::{
2   check_is_apub_id_valid,
3   ActorType,
4   FromApub,
5   GroupExt,
6   PageExt,
7   PersonExt,
8   APUB_JSON_CONTENT_TYPE,
9 };
10 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
11 use anyhow::{anyhow, Context};
12 use chrono::NaiveDateTime;
13 use diesel::result::Error::NotFound;
14 use lemmy_db::{
15   comment::{Comment, CommentForm},
16   comment_view::CommentView,
17   community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
18   community_view::CommunityView,
19   naive_now,
20   post::{Post, PostForm},
21   post_view::PostView,
22   user::{UserForm, User_},
23   user_view::UserView,
24   Crud,
25   Joinable,
26   SearchType,
27 };
28 use lemmy_structs::{blocking, site::SearchResponse};
29 use lemmy_utils::{
30   apub::get_apub_protocol_string,
31   location_info,
32   request::{retry, RecvError},
33   LemmyError,
34 };
35 use lemmy_websocket::LemmyContext;
36 use log::debug;
37 use reqwest::Client;
38 use serde::Deserialize;
39 use std::{fmt::Debug, time::Duration};
40 use url::Url;
41
42 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
43 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
44
45 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
46 /// timeouts etc.
47 pub async fn fetch_remote_object<Response>(
48   client: &Client,
49   url: &Url,
50 ) -> Result<Response, LemmyError>
51 where
52   Response: for<'de> Deserialize<'de>,
53 {
54   check_is_apub_id_valid(&url)?;
55
56   let timeout = Duration::from_secs(60);
57
58   // speed up tests
59   // before: 305s
60   // after: 240s
61   let json = retry(|| {
62     client
63       .get(url.as_str())
64       .header("Accept", APUB_JSON_CONTENT_TYPE)
65       .timeout(timeout)
66       .send()
67   })
68   .await?
69   .json()
70   .await
71   .map_err(|e| {
72     debug!("Receive error, {}", e);
73     RecvError(e.to_string())
74   })?;
75
76   Ok(json)
77 }
78
79 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
80 #[serde(untagged)]
81 #[derive(serde::Deserialize, Debug)]
82 pub enum SearchAcceptedObjects {
83   Person(Box<PersonExt>),
84   Group(Box<GroupExt>),
85   Page(Box<PageExt>),
86   Comment(Box<Note>),
87 }
88
89 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
90 ///
91 /// Some working examples for use with the docker/federation/ setup:
92 /// http://lemmy_alpha:8540/c/main, or !main@lemmy_alpha:8540
93 /// http://lemmy_alpha:8540/u/lemmy_alpha, or @lemmy_alpha@lemmy_alpha:8540
94 /// http://lemmy_alpha:8540/post/3
95 /// http://lemmy_alpha:8540/comment/2
96 pub async fn search_by_apub_id(
97   query: &str,
98   context: &LemmyContext,
99 ) -> Result<SearchResponse, LemmyError> {
100   // Parse the shorthand query url
101   let query_url = if query.contains('@') {
102     debug!("{}", query);
103     let split = query.split('@').collect::<Vec<&str>>();
104
105     // User type will look like ['', username, instance]
106     // Community will look like [!community, instance]
107     let (name, instance) = if split.len() == 3 {
108       (format!("/u/{}", split[1]), split[2])
109     } else if split.len() == 2 {
110       if split[0].contains('!') {
111         let split2 = split[0].split('!').collect::<Vec<&str>>();
112         (format!("/c/{}", split2[1]), split[1])
113       } else {
114         return Err(anyhow!("Invalid search query: {}", query).into());
115       }
116     } else {
117       return Err(anyhow!("Invalid search query: {}", query).into());
118     };
119
120     let url = format!("{}://{}{}", get_apub_protocol_string(), instance, name);
121     Url::parse(&url)?
122   } else {
123     Url::parse(&query)?
124   };
125
126   let mut response = SearchResponse {
127     type_: SearchType::All.to_string(),
128     comments: vec![],
129     posts: vec![],
130     communities: vec![],
131     users: vec![],
132   };
133
134   let domain = query_url.domain().context("url has no domain")?;
135   let response =
136     match fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url).await? {
137       SearchAcceptedObjects::Person(p) => {
138         let user_uri = p.inner.id(domain)?.context("person has no id")?;
139
140         let user = get_or_fetch_and_upsert_user(&user_uri, context).await?;
141
142         response.users = vec![
143           blocking(context.pool(), move |conn| {
144             UserView::get_user_secure(conn, user.id)
145           })
146           .await??,
147         ];
148
149         response
150       }
151       SearchAcceptedObjects::Group(g) => {
152         let community_uri = g.inner.id(domain)?.context("group has no id")?;
153
154         let community = get_or_fetch_and_upsert_community(community_uri, context).await?;
155
156         response.communities = vec![
157           blocking(context.pool(), move |conn| {
158             CommunityView::read(conn, community.id, None)
159           })
160           .await??,
161         ];
162
163         response
164       }
165       SearchAcceptedObjects::Page(p) => {
166         let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?;
167
168         let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
169         response.posts =
170           vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
171
172         response
173       }
174       SearchAcceptedObjects::Comment(c) => {
175         let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?;
176
177         let c = blocking(context.pool(), move |conn| {
178           Comment::upsert(conn, &comment_form)
179         })
180         .await??;
181         response.comments = vec![
182           blocking(context.pool(), move |conn| {
183             CommentView::read(conn, c.id, None)
184           })
185           .await??,
186         ];
187
188         response
189       }
190     };
191
192   Ok(response)
193 }
194
195 pub async fn get_or_fetch_and_upsert_actor(
196   apub_id: &Url,
197   context: &LemmyContext,
198 ) -> Result<Box<dyn ActorType>, LemmyError> {
199   let user = get_or_fetch_and_upsert_user(apub_id, context).await;
200   let actor: Box<dyn ActorType> = match user {
201     Ok(u) => Box::new(u),
202     Err(_) => Box::new(get_or_fetch_and_upsert_community(apub_id, context).await?),
203   };
204   Ok(actor)
205 }
206
207 /// Check if a remote user exists, create if not found, if its too old update it.Fetch a user, insert/update it in the database and return the user.
208 pub async fn get_or_fetch_and_upsert_user(
209   apub_id: &Url,
210   context: &LemmyContext,
211 ) -> Result<User_, LemmyError> {
212   let apub_id_owned = apub_id.to_owned();
213   let user = blocking(context.pool(), move |conn| {
214     User_::read_from_actor_id(conn, apub_id_owned.as_ref())
215   })
216   .await?;
217
218   match user {
219     // If its older than a day, re-fetch it
220     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
221       debug!("Fetching and updating from remote user: {}", apub_id);
222       let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
223
224       let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
225       uf.last_refreshed_at = Some(naive_now());
226       let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
227
228       Ok(user)
229     }
230     Ok(u) => Ok(u),
231     Err(NotFound {}) => {
232       debug!("Fetching and creating remote user: {}", apub_id);
233       let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
234
235       let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
236       let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
237
238       Ok(user)
239     }
240     Err(e) => Err(e.into()),
241   }
242 }
243
244 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
245 /// ACTOR_REFETCH_INTERVAL_SECONDS after the last refetch, in debug builds always.
246 ///
247 /// TODO it won't pick up new avatars, summaries etc until a day after.
248 /// Actors need an "update" activity pushed to other servers to fix this.
249 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
250   let update_interval = if cfg!(debug_assertions) {
251     // avoid infinite loop when fetching community outbox
252     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
253   } else {
254     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
255   };
256   last_refreshed.lt(&(naive_now() - update_interval))
257 }
258
259 /// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
260 pub async fn get_or_fetch_and_upsert_community(
261   apub_id: &Url,
262   context: &LemmyContext,
263 ) -> Result<Community, LemmyError> {
264   let apub_id_owned = apub_id.to_owned();
265   let community = blocking(context.pool(), move |conn| {
266     Community::read_from_actor_id(conn, apub_id_owned.as_str())
267   })
268   .await?;
269
270   match community {
271     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
272       debug!("Fetching and updating from remote community: {}", apub_id);
273       fetch_remote_community(apub_id, context, Some(c.id)).await
274     }
275     Ok(c) => Ok(c),
276     Err(NotFound {}) => {
277       debug!("Fetching and creating remote community: {}", apub_id);
278       fetch_remote_community(apub_id, context, None).await
279     }
280     Err(e) => Err(e.into()),
281   }
282 }
283
284 async fn fetch_remote_community(
285   apub_id: &Url,
286   context: &LemmyContext,
287   community_id: Option<i32>,
288 ) -> Result<Community, LemmyError> {
289   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
290
291   let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
292   let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
293
294   // Also add the community moderators too
295   let attributed_to = group.inner.attributed_to().context(location_info!())?;
296   let creator_and_moderator_uris: Vec<&Url> = attributed_to
297     .as_many()
298     .context(location_info!())?
299     .iter()
300     .map(|a| a.as_xsd_any_uri().context(""))
301     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
302
303   let mut creator_and_moderators = Vec::new();
304
305   for uri in creator_and_moderator_uris {
306     let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?;
307
308     creator_and_moderators.push(c_or_m);
309   }
310
311   // TODO: need to make this work to update mods of existing communities
312   if community_id.is_none() {
313     let community_id = community.id;
314     blocking(context.pool(), move |conn| {
315       for mod_ in creator_and_moderators {
316         let community_moderator_form = CommunityModeratorForm {
317           community_id,
318           user_id: mod_.id,
319         };
320
321         CommunityModerator::join(conn, &community_moderator_form)?;
322       }
323       Ok(()) as Result<(), LemmyError>
324     })
325     .await??;
326   }
327
328   // fetch outbox (maybe make this conditional)
329   let outbox =
330     fetch_remote_object::<OrderedCollection>(context.client(), &community.get_outbox_url()?)
331       .await?;
332   let outbox_items = outbox.items().context(location_info!())?.clone();
333   let mut outbox_items = outbox_items.many().context(location_info!())?;
334   if outbox_items.len() > 20 {
335     outbox_items = outbox_items[0..20].to_vec();
336   }
337   for o in outbox_items {
338     let page = PageExt::from_any_base(o)?.context(location_info!())?;
339     let post = PostForm::from_apub(&page, context, None).await?;
340     let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
341     // Check whether the post already exists in the local db
342     let existing = blocking(context.pool(), move |conn| {
343       Post::read_from_apub_id(conn, &post_ap_id)
344     })
345     .await?;
346     match existing {
347       Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
348       Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
349     };
350     // TODO: we need to send a websocket update here
351   }
352
353   Ok(community)
354 }
355
356 pub async fn get_or_fetch_and_insert_post(
357   post_ap_id: &Url,
358   context: &LemmyContext,
359 ) -> Result<Post, LemmyError> {
360   let post_ap_id_owned = post_ap_id.to_owned();
361   let post = blocking(context.pool(), move |conn| {
362     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
363   })
364   .await?;
365
366   match post {
367     Ok(p) => Ok(p),
368     Err(NotFound {}) => {
369       debug!("Fetching and creating remote post: {}", post_ap_id);
370       let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
371       let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
372
373       let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
374
375       Ok(post)
376     }
377     Err(e) => Err(e.into()),
378   }
379 }
380
381 pub async fn get_or_fetch_and_insert_comment(
382   comment_ap_id: &Url,
383   context: &LemmyContext,
384 ) -> Result<Comment, LemmyError> {
385   let comment_ap_id_owned = comment_ap_id.to_owned();
386   let comment = blocking(context.pool(), move |conn| {
387     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
388   })
389   .await?;
390
391   match comment {
392     Ok(p) => Ok(p),
393     Err(NotFound {}) => {
394       debug!(
395         "Fetching and creating remote comment and its parents: {}",
396         comment_ap_id
397       );
398       let comment = fetch_remote_object::<Note>(context.client(), comment_ap_id).await?;
399       let comment_form =
400         CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
401
402       let comment = blocking(context.pool(), move |conn| {
403         Comment::upsert(conn, &comment_form)
404       })
405       .await??;
406
407       Ok(comment)
408     }
409     Err(e) => Err(e.into()),
410   }
411 }