]> Untitled Git - lemmy.git/blob - lemmy_apub/src/fetcher.rs
Merge remote-tracking branch 'origin/main' into move_views_to_diesel
[lemmy.git] / lemmy_apub / src / fetcher.rs
1 use crate::{
2   check_is_apub_id_valid,
3   ActorType,
4   FromApub,
5   GroupExt,
6   NoteExt,
7   PageExt,
8   PersonExt,
9   APUB_JSON_CONTENT_TYPE,
10 };
11 use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
12 use anyhow::{anyhow, Context};
13 use chrono::NaiveDateTime;
14 use diesel::result::Error::NotFound;
15 use lemmy_db::{
16   comment::{Comment, CommentForm},
17   comment_view::CommentView,
18   community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
19   naive_now,
20   post::{Post, PostForm},
21   post_view::PostView,
22   user::{UserForm, User_},
23   views::{community_view::CommunityView, user_view::UserViewSafe},
24   Crud,
25   Joinable,
26   SearchType,
27 };
28 use lemmy_structs::{blocking, site::SearchResponse};
29 use lemmy_utils::{
30   location_info,
31   request::{retry, RecvError},
32   settings::Settings,
33   LemmyError,
34 };
35 use lemmy_websocket::LemmyContext;
36 use log::debug;
37 use reqwest::Client;
38 use serde::Deserialize;
39 use std::{fmt::Debug, time::Duration};
40 use url::Url;
41
42 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
43 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
44
45 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
46 /// fetch through the search).
47 ///
48 /// Tests are passing with a value of 5, so 10 should be safe for production.
49 static MAX_REQUEST_NUMBER: i32 = 10;
50
51 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
52 /// timeouts etc.
53 async fn fetch_remote_object<Response>(
54   client: &Client,
55   url: &Url,
56   recursion_counter: &mut i32,
57 ) -> Result<Response, LemmyError>
58 where
59   Response: for<'de> Deserialize<'de>,
60 {
61   *recursion_counter += 1;
62   if *recursion_counter > MAX_REQUEST_NUMBER {
63     return Err(anyhow!("Maximum recursion depth reached").into());
64   }
65   check_is_apub_id_valid(&url)?;
66
67   let timeout = Duration::from_secs(60);
68
69   let json = retry(|| {
70     client
71       .get(url.as_str())
72       .header("Accept", APUB_JSON_CONTENT_TYPE)
73       .timeout(timeout)
74       .send()
75   })
76   .await?
77   .json()
78   .await
79   .map_err(|e| {
80     debug!("Receive error, {}", e);
81     RecvError(e.to_string())
82   })?;
83
84   Ok(json)
85 }
86
87 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
88 #[serde(untagged)]
89 #[derive(serde::Deserialize, Debug)]
90 enum SearchAcceptedObjects {
91   Person(Box<PersonExt>),
92   Group(Box<GroupExt>),
93   Page(Box<PageExt>),
94   Comment(Box<NoteExt>),
95 }
96
97 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
98 ///
99 /// Some working examples for use with the `docker/federation/` setup:
100 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
101 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
102 /// http://lemmy_gamma:8561/post/3
103 /// http://lemmy_delta:8571/comment/2
104 pub async fn search_by_apub_id(
105   query: &str,
106   context: &LemmyContext,
107 ) -> Result<SearchResponse, LemmyError> {
108   // Parse the shorthand query url
109   let query_url = if query.contains('@') {
110     debug!("Search for {}", query);
111     let split = query.split('@').collect::<Vec<&str>>();
112
113     // User type will look like ['', username, instance]
114     // Community will look like [!community, instance]
115     let (name, instance) = if split.len() == 3 {
116       (format!("/u/{}", split[1]), split[2])
117     } else if split.len() == 2 {
118       if split[0].contains('!') {
119         let split2 = split[0].split('!').collect::<Vec<&str>>();
120         (format!("/c/{}", split2[1]), split[1])
121       } else {
122         return Err(anyhow!("Invalid search query: {}", query).into());
123       }
124     } else {
125       return Err(anyhow!("Invalid search query: {}", query).into());
126     };
127
128     let url = format!(
129       "{}://{}{}",
130       Settings::get().get_protocol_string(),
131       instance,
132       name
133     );
134     Url::parse(&url)?
135   } else {
136     Url::parse(&query)?
137   };
138
139   let mut response = SearchResponse {
140     type_: SearchType::All.to_string(),
141     comments: vec![],
142     posts: vec![],
143     communities: vec![],
144     users: vec![],
145   };
146
147   let domain = query_url.domain().context("url has no domain")?;
148   let recursion_counter = &mut 0;
149   let response = match fetch_remote_object::<SearchAcceptedObjects>(
150     context.client(),
151     &query_url,
152     recursion_counter,
153   )
154   .await?
155   {
156     SearchAcceptedObjects::Person(p) => {
157       let user_uri = p.inner.id(domain)?.context("person has no id")?;
158
159       let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
160
161       response.users = vec![
162         blocking(context.pool(), move |conn| {
163           UserViewSafe::read(conn, user.id)
164         })
165         .await??,
166       ];
167
168       response
169     }
170     SearchAcceptedObjects::Group(g) => {
171       let community_uri = g.inner.id(domain)?.context("group has no id")?;
172
173       let community =
174         get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
175
176       response.communities = vec![
177         blocking(context.pool(), move |conn| {
178           CommunityView::read(conn, community.id, None)
179         })
180         .await??,
181       ];
182
183       response
184     }
185     SearchAcceptedObjects::Page(p) => {
186       let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?;
187
188       let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
189       response.posts =
190         vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
191
192       response
193     }
194     SearchAcceptedObjects::Comment(c) => {
195       let comment_form =
196         CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?;
197
198       let c = blocking(context.pool(), move |conn| {
199         Comment::upsert(conn, &comment_form)
200       })
201       .await??;
202       response.comments = vec![
203         blocking(context.pool(), move |conn| {
204           CommentView::read(conn, c.id, None)
205         })
206         .await??,
207       ];
208
209       response
210     }
211   };
212
213   Ok(response)
214 }
215
216 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
217 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
218 ///
219 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
220 /// Otherwise it is fetched from the remote instance, stored and returned.
221 pub(crate) async fn get_or_fetch_and_upsert_actor(
222   apub_id: &Url,
223   context: &LemmyContext,
224   recursion_counter: &mut i32,
225 ) -> Result<Box<dyn ActorType>, LemmyError> {
226   let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
227   let actor: Box<dyn ActorType> = match community {
228     Ok(c) => Box::new(c),
229     Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
230   };
231   Ok(actor)
232 }
233
234 /// Get a user from its apub ID.
235 ///
236 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
237 /// Otherwise it is fetched from the remote instance, stored and returned.
238 pub(crate) async fn get_or_fetch_and_upsert_user(
239   apub_id: &Url,
240   context: &LemmyContext,
241   recursion_counter: &mut i32,
242 ) -> Result<User_, LemmyError> {
243   let apub_id_owned = apub_id.to_owned();
244   let user = blocking(context.pool(), move |conn| {
245     User_::read_from_actor_id(conn, apub_id_owned.as_ref())
246   })
247   .await?;
248
249   match user {
250     // If its older than a day, re-fetch it
251     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
252       debug!("Fetching and updating from remote user: {}", apub_id);
253       let person =
254         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
255       // If fetching failed, return the existing data.
256       if person.is_err() {
257         return Ok(u);
258       }
259
260       let mut uf = UserForm::from_apub(
261         &person?,
262         context,
263         Some(apub_id.to_owned()),
264         recursion_counter,
265       )
266       .await?;
267       uf.last_refreshed_at = Some(naive_now());
268       let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
269
270       Ok(user)
271     }
272     Ok(u) => Ok(u),
273     Err(NotFound {}) => {
274       debug!("Fetching and creating remote user: {}", apub_id);
275       let person =
276         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
277
278       let uf = UserForm::from_apub(
279         &person,
280         context,
281         Some(apub_id.to_owned()),
282         recursion_counter,
283       )
284       .await?;
285       let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
286
287       Ok(user)
288     }
289     Err(e) => Err(e.into()),
290   }
291 }
292
293 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
294 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
295 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
296 ///
297 /// TODO it won't pick up new avatars, summaries etc until a day after.
298 /// Actors need an "update" activity pushed to other servers to fix this.
299 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
300   let update_interval = if cfg!(debug_assertions) {
301     // avoid infinite loop when fetching community outbox
302     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
303   } else {
304     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
305   };
306   last_refreshed.lt(&(naive_now() - update_interval))
307 }
308
309 /// Get a community from its apub ID.
310 ///
311 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
312 /// Otherwise it is fetched from the remote instance, stored and returned.
313 pub(crate) async fn get_or_fetch_and_upsert_community(
314   apub_id: &Url,
315   context: &LemmyContext,
316   recursion_counter: &mut i32,
317 ) -> Result<Community, LemmyError> {
318   let apub_id_owned = apub_id.to_owned();
319   let community = blocking(context.pool(), move |conn| {
320     Community::read_from_actor_id(conn, apub_id_owned.as_str())
321   })
322   .await?;
323
324   match community {
325     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
326       debug!("Fetching and updating from remote community: {}", apub_id);
327       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
328     }
329     Ok(c) => Ok(c),
330     Err(NotFound {}) => {
331       debug!("Fetching and creating remote community: {}", apub_id);
332       fetch_remote_community(apub_id, context, None, recursion_counter).await
333     }
334     Err(e) => Err(e.into()),
335   }
336 }
337
338 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
339 /// is set, this is an update for a community which is already known locally. If not, we don't know
340 /// the community yet and also pull the outbox, to get some initial posts.
341 async fn fetch_remote_community(
342   apub_id: &Url,
343   context: &LemmyContext,
344   old_community: Option<Community>,
345   recursion_counter: &mut i32,
346 ) -> Result<Community, LemmyError> {
347   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
348   // If fetching failed, return the existing data.
349   if let Some(ref c) = old_community {
350     if group.is_err() {
351       return Ok(c.to_owned());
352     }
353   }
354
355   let group = group?;
356   let cf =
357     CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?;
358   let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
359
360   // Also add the community moderators too
361   let attributed_to = group.inner.attributed_to().context(location_info!())?;
362   let creator_and_moderator_uris: Vec<&Url> = attributed_to
363     .as_many()
364     .context(location_info!())?
365     .iter()
366     .map(|a| a.as_xsd_any_uri().context(""))
367     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
368
369   let mut creator_and_moderators = Vec::new();
370
371   for uri in creator_and_moderator_uris {
372     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
373
374     creator_and_moderators.push(c_or_m);
375   }
376
377   // TODO: need to make this work to update mods of existing communities
378   if old_community.is_none() {
379     let community_id = community.id;
380     blocking(context.pool(), move |conn| {
381       for mod_ in creator_and_moderators {
382         let community_moderator_form = CommunityModeratorForm {
383           community_id,
384           user_id: mod_.id,
385         };
386
387         CommunityModerator::join(conn, &community_moderator_form)?;
388       }
389       Ok(()) as Result<(), LemmyError>
390     })
391     .await??;
392   }
393
394   // fetch outbox (maybe make this conditional)
395   let outbox = fetch_remote_object::<OrderedCollection>(
396     context.client(),
397     &community.get_outbox_url()?,
398     recursion_counter,
399   )
400   .await?;
401   let outbox_items = outbox.items().context(location_info!())?.clone();
402   let mut outbox_items = outbox_items.many().context(location_info!())?;
403   if outbox_items.len() > 20 {
404     outbox_items = outbox_items[0..20].to_vec();
405   }
406   for o in outbox_items {
407     let page = PageExt::from_any_base(o)?.context(location_info!())?;
408
409     // The post creator may be from a blocked instance,
410     // if it errors, then continue
411     let post = match PostForm::from_apub(&page, context, None, recursion_counter).await {
412       Ok(post) => post,
413       Err(_) => continue,
414     };
415     let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
416     // Check whether the post already exists in the local db
417     let existing = blocking(context.pool(), move |conn| {
418       Post::read_from_apub_id(conn, &post_ap_id)
419     })
420     .await?;
421     match existing {
422       Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
423       Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
424     };
425     // TODO: we need to send a websocket update here
426   }
427
428   Ok(community)
429 }
430
431 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
432 /// pulled from its apub ID, inserted and returned.
433 ///
434 /// The parent community is also pulled if necessary. Comments are not pulled.
435 pub(crate) async fn get_or_fetch_and_insert_post(
436   post_ap_id: &Url,
437   context: &LemmyContext,
438   recursion_counter: &mut i32,
439 ) -> Result<Post, LemmyError> {
440   let post_ap_id_owned = post_ap_id.to_owned();
441   let post = blocking(context.pool(), move |conn| {
442     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
443   })
444   .await?;
445
446   match post {
447     Ok(p) => Ok(p),
448     Err(NotFound {}) => {
449       debug!("Fetching and creating remote post: {}", post_ap_id);
450       let post =
451         fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
452       let post_form = PostForm::from_apub(
453         &post,
454         context,
455         Some(post_ap_id.to_owned()),
456         recursion_counter,
457       )
458       .await?;
459
460       let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
461
462       Ok(post)
463     }
464     Err(e) => Err(e.into()),
465   }
466 }
467
468 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
469 /// pulled from its apub ID, inserted and returned.
470 ///
471 /// The parent community, post and comment are also pulled if necessary.
472 pub(crate) async fn get_or_fetch_and_insert_comment(
473   comment_ap_id: &Url,
474   context: &LemmyContext,
475   recursion_counter: &mut i32,
476 ) -> Result<Comment, LemmyError> {
477   let comment_ap_id_owned = comment_ap_id.to_owned();
478   let comment = blocking(context.pool(), move |conn| {
479     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
480   })
481   .await?;
482
483   match comment {
484     Ok(p) => Ok(p),
485     Err(NotFound {}) => {
486       debug!(
487         "Fetching and creating remote comment and its parents: {}",
488         comment_ap_id
489       );
490       let comment =
491         fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
492       let comment_form = CommentForm::from_apub(
493         &comment,
494         context,
495         Some(comment_ap_id.to_owned()),
496         recursion_counter,
497       )
498       .await?;
499
500       let post_id = comment_form.post_id;
501       let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
502       if post.locked {
503         return Err(anyhow!("Post is locked").into());
504       }
505
506       let comment = blocking(context.pool(), move |conn| {
507         Comment::upsert(conn, &comment_form)
508       })
509       .await??;
510
511       Ok(comment)
512     }
513     Err(e) => Err(e.into()),
514   }
515 }