]> Untitled Git - lemmy.git/blob - lemmy_apub/src/fetcher.rs
Some API cleanup, adding site_id to site aggregates.
[lemmy.git] / lemmy_apub / src / fetcher.rs
1 use crate::{
2   check_is_apub_id_valid,
3   objects::FromApub,
4   ActorType,
5   GroupExt,
6   NoteExt,
7   PageExt,
8   PersonExt,
9   APUB_JSON_CONTENT_TYPE,
10 };
11 use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
12 use anyhow::{anyhow, Context};
13 use chrono::NaiveDateTime;
14 use diesel::result::Error::NotFound;
15 use lemmy_db::{
16   naive_now,
17   source::{
18     comment::Comment,
19     community::{Community, CommunityModerator, CommunityModeratorForm},
20     post::Post,
21     user::User_,
22   },
23   views::{
24     comment_view::CommentView,
25     community::community_view::CommunityView,
26     post_view::PostView,
27     user_view::UserViewSafe,
28   },
29   ApubObject,
30   Crud,
31   Joinable,
32   SearchType,
33 };
34 use lemmy_structs::{blocking, site::SearchResponse};
35 use lemmy_utils::{
36   location_info,
37   request::{retry, RecvError},
38   settings::Settings,
39   LemmyError,
40 };
41 use lemmy_websocket::LemmyContext;
42 use log::debug;
43 use reqwest::Client;
44 use serde::Deserialize;
45 use std::{fmt::Debug, time::Duration};
46 use url::Url;
47
48 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
49 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
50
51 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
52 /// fetch through the search).
53 ///
54 /// Tests are passing with a value of 5, so 10 should be safe for production.
55 static MAX_REQUEST_NUMBER: i32 = 10;
56
57 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
58 /// timeouts etc.
59 async fn fetch_remote_object<Response>(
60   client: &Client,
61   url: &Url,
62   recursion_counter: &mut i32,
63 ) -> Result<Response, LemmyError>
64 where
65   Response: for<'de> Deserialize<'de>,
66 {
67   *recursion_counter += 1;
68   if *recursion_counter > MAX_REQUEST_NUMBER {
69     return Err(anyhow!("Maximum recursion depth reached").into());
70   }
71   check_is_apub_id_valid(&url)?;
72
73   let timeout = Duration::from_secs(60);
74
75   let json = retry(|| {
76     client
77       .get(url.as_str())
78       .header("Accept", APUB_JSON_CONTENT_TYPE)
79       .timeout(timeout)
80       .send()
81   })
82   .await?
83   .json()
84   .await
85   .map_err(|e| {
86     debug!("Receive error, {}", e);
87     RecvError(e.to_string())
88   })?;
89
90   Ok(json)
91 }
92
93 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
94 #[serde(untagged)]
95 #[derive(serde::Deserialize, Debug)]
96 enum SearchAcceptedObjects {
97   Person(Box<PersonExt>),
98   Group(Box<GroupExt>),
99   Page(Box<PageExt>),
100   Comment(Box<NoteExt>),
101 }
102
103 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
104 ///
105 /// Some working examples for use with the `docker/federation/` setup:
106 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
107 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
108 /// http://lemmy_gamma:8561/post/3
109 /// http://lemmy_delta:8571/comment/2
110 pub async fn search_by_apub_id(
111   query: &str,
112   context: &LemmyContext,
113 ) -> Result<SearchResponse, LemmyError> {
114   // Parse the shorthand query url
115   let query_url = if query.contains('@') {
116     debug!("Search for {}", query);
117     let split = query.split('@').collect::<Vec<&str>>();
118
119     // User type will look like ['', username, instance]
120     // Community will look like [!community, instance]
121     let (name, instance) = if split.len() == 3 {
122       (format!("/u/{}", split[1]), split[2])
123     } else if split.len() == 2 {
124       if split[0].contains('!') {
125         let split2 = split[0].split('!').collect::<Vec<&str>>();
126         (format!("/c/{}", split2[1]), split[1])
127       } else {
128         return Err(anyhow!("Invalid search query: {}", query).into());
129       }
130     } else {
131       return Err(anyhow!("Invalid search query: {}", query).into());
132     };
133
134     let url = format!(
135       "{}://{}{}",
136       Settings::get().get_protocol_string(),
137       instance,
138       name
139     );
140     Url::parse(&url)?
141   } else {
142     Url::parse(&query)?
143   };
144
145   let mut response = SearchResponse {
146     type_: SearchType::All.to_string(),
147     comments: vec![],
148     posts: vec![],
149     communities: vec![],
150     users: vec![],
151   };
152
153   let domain = query_url.domain().context("url has no domain")?;
154   let recursion_counter = &mut 0;
155   let response = match fetch_remote_object::<SearchAcceptedObjects>(
156     context.client(),
157     &query_url,
158     recursion_counter,
159   )
160   .await?
161   {
162     SearchAcceptedObjects::Person(p) => {
163       let user_uri = p.inner.id(domain)?.context("person has no id")?;
164
165       let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
166
167       response.users = vec![
168         blocking(context.pool(), move |conn| {
169           UserViewSafe::read(conn, user.id)
170         })
171         .await??,
172       ];
173
174       response
175     }
176     SearchAcceptedObjects::Group(g) => {
177       let community_uri = g.inner.id(domain)?.context("group has no id")?;
178
179       let community =
180         get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
181
182       response.communities = vec![
183         blocking(context.pool(), move |conn| {
184           CommunityView::read(conn, community.id, None)
185         })
186         .await??,
187       ];
188
189       response
190     }
191     SearchAcceptedObjects::Page(p) => {
192       let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
193
194       response.posts =
195         vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
196
197       response
198     }
199     SearchAcceptedObjects::Comment(c) => {
200       let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
201
202       response.comments = vec![
203         blocking(context.pool(), move |conn| {
204           CommentView::read(conn, c.id, None)
205         })
206         .await??,
207       ];
208
209       response
210     }
211   };
212
213   Ok(response)
214 }
215
216 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
217 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
218 ///
219 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
220 /// Otherwise it is fetched from the remote instance, stored and returned.
221 pub(crate) async fn get_or_fetch_and_upsert_actor(
222   apub_id: &Url,
223   context: &LemmyContext,
224   recursion_counter: &mut i32,
225 ) -> Result<Box<dyn ActorType>, LemmyError> {
226   let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
227   let actor: Box<dyn ActorType> = match community {
228     Ok(c) => Box::new(c),
229     Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
230   };
231   Ok(actor)
232 }
233
234 /// Get a user from its apub ID.
235 ///
236 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
237 /// Otherwise it is fetched from the remote instance, stored and returned.
238 pub(crate) async fn get_or_fetch_and_upsert_user(
239   apub_id: &Url,
240   context: &LemmyContext,
241   recursion_counter: &mut i32,
242 ) -> Result<User_, LemmyError> {
243   let apub_id_owned = apub_id.to_owned();
244   let user = blocking(context.pool(), move |conn| {
245     User_::read_from_apub_id(conn, apub_id_owned.as_ref())
246   })
247   .await?;
248
249   match user {
250     // If its older than a day, re-fetch it
251     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
252       debug!("Fetching and updating from remote user: {}", apub_id);
253       let person =
254         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
255       // If fetching failed, return the existing data.
256       if person.is_err() {
257         return Ok(u);
258       }
259
260       let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
261
262       let user_id = user.id;
263       blocking(context.pool(), move |conn| {
264         User_::mark_as_updated(conn, user_id)
265       })
266       .await??;
267
268       Ok(user)
269     }
270     Ok(u) => Ok(u),
271     Err(NotFound {}) => {
272       debug!("Fetching and creating remote user: {}", apub_id);
273       let person =
274         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
275
276       let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
277
278       Ok(user)
279     }
280     Err(e) => Err(e.into()),
281   }
282 }
283
284 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
285 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
286 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
287 ///
288 /// TODO it won't pick up new avatars, summaries etc until a day after.
289 /// Actors need an "update" activity pushed to other servers to fix this.
290 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
291   let update_interval = if cfg!(debug_assertions) {
292     // avoid infinite loop when fetching community outbox
293     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
294   } else {
295     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
296   };
297   last_refreshed.lt(&(naive_now() - update_interval))
298 }
299
300 /// Get a community from its apub ID.
301 ///
302 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
303 /// Otherwise it is fetched from the remote instance, stored and returned.
304 pub(crate) async fn get_or_fetch_and_upsert_community(
305   apub_id: &Url,
306   context: &LemmyContext,
307   recursion_counter: &mut i32,
308 ) -> Result<Community, LemmyError> {
309   let apub_id_owned = apub_id.to_owned();
310   let community = blocking(context.pool(), move |conn| {
311     Community::read_from_apub_id(conn, apub_id_owned.as_str())
312   })
313   .await?;
314
315   match community {
316     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
317       debug!("Fetching and updating from remote community: {}", apub_id);
318       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
319     }
320     Ok(c) => Ok(c),
321     Err(NotFound {}) => {
322       debug!("Fetching and creating remote community: {}", apub_id);
323       fetch_remote_community(apub_id, context, None, recursion_counter).await
324     }
325     Err(e) => Err(e.into()),
326   }
327 }
328
329 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
330 /// is set, this is an update for a community which is already known locally. If not, we don't know
331 /// the community yet and also pull the outbox, to get some initial posts.
332 async fn fetch_remote_community(
333   apub_id: &Url,
334   context: &LemmyContext,
335   old_community: Option<Community>,
336   recursion_counter: &mut i32,
337 ) -> Result<Community, LemmyError> {
338   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
339   // If fetching failed, return the existing data.
340   if let Some(ref c) = old_community {
341     if group.is_err() {
342       return Ok(c.to_owned());
343     }
344   }
345
346   let group = group?;
347   let community =
348     Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
349
350   // Also add the community moderators too
351   let attributed_to = group.inner.attributed_to().context(location_info!())?;
352   let creator_and_moderator_uris: Vec<&Url> = attributed_to
353     .as_many()
354     .context(location_info!())?
355     .iter()
356     .map(|a| a.as_xsd_any_uri().context(""))
357     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
358
359   let mut creator_and_moderators = Vec::new();
360
361   for uri in creator_and_moderator_uris {
362     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
363
364     creator_and_moderators.push(c_or_m);
365   }
366
367   // TODO: need to make this work to update mods of existing communities
368   if old_community.is_none() {
369     let community_id = community.id;
370     blocking(context.pool(), move |conn| {
371       for mod_ in creator_and_moderators {
372         let community_moderator_form = CommunityModeratorForm {
373           community_id,
374           user_id: mod_.id,
375         };
376
377         CommunityModerator::join(conn, &community_moderator_form)?;
378       }
379       Ok(()) as Result<(), LemmyError>
380     })
381     .await??;
382   }
383
384   // fetch outbox (maybe make this conditional)
385   let outbox = fetch_remote_object::<OrderedCollection>(
386     context.client(),
387     &community.get_outbox_url()?,
388     recursion_counter,
389   )
390   .await?;
391   let outbox_items = outbox.items().context(location_info!())?.clone();
392   let mut outbox_items = outbox_items.many().context(location_info!())?;
393   if outbox_items.len() > 20 {
394     outbox_items = outbox_items[0..20].to_vec();
395   }
396   for o in outbox_items {
397     let page = PageExt::from_any_base(o)?.context(location_info!())?;
398     let page_id = page.id_unchecked().context(location_info!())?;
399
400     // The post creator may be from a blocked instance, if it errors, then skip it
401     if check_is_apub_id_valid(page_id).is_err() {
402       continue;
403     }
404     Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
405     // TODO: we need to send a websocket update here
406   }
407
408   Ok(community)
409 }
410
411 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
412 /// pulled from its apub ID, inserted and returned.
413 ///
414 /// The parent community is also pulled if necessary. Comments are not pulled.
415 pub(crate) async fn get_or_fetch_and_insert_post(
416   post_ap_id: &Url,
417   context: &LemmyContext,
418   recursion_counter: &mut i32,
419 ) -> Result<Post, LemmyError> {
420   let post_ap_id_owned = post_ap_id.to_owned();
421   let post = blocking(context.pool(), move |conn| {
422     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
423   })
424   .await?;
425
426   match post {
427     Ok(p) => Ok(p),
428     Err(NotFound {}) => {
429       debug!("Fetching and creating remote post: {}", post_ap_id);
430       let page =
431         fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
432       let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
433
434       Ok(post)
435     }
436     Err(e) => Err(e.into()),
437   }
438 }
439
440 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
441 /// pulled from its apub ID, inserted and returned.
442 ///
443 /// The parent community, post and comment are also pulled if necessary.
444 pub(crate) async fn get_or_fetch_and_insert_comment(
445   comment_ap_id: &Url,
446   context: &LemmyContext,
447   recursion_counter: &mut i32,
448 ) -> Result<Comment, LemmyError> {
449   let comment_ap_id_owned = comment_ap_id.to_owned();
450   let comment = blocking(context.pool(), move |conn| {
451     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
452   })
453   .await?;
454
455   match comment {
456     Ok(p) => Ok(p),
457     Err(NotFound {}) => {
458       debug!(
459         "Fetching and creating remote comment and its parents: {}",
460         comment_ap_id
461       );
462       let comment =
463         fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
464       let comment = Comment::from_apub(
465         &comment,
466         context,
467         comment_ap_id.to_owned(),
468         recursion_counter,
469       )
470       .await?;
471
472       let post_id = comment.post_id;
473       let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
474       if post.locked {
475         return Err(anyhow!("Post is locked").into());
476       }
477
478       Ok(comment)
479     }
480     Err(e) => Err(e.into()),
481   }
482 }