]> Untitled Git - lemmy.git/blob - lemmy_apub/src/fetcher.rs
43466a0f59db8444886c65e36e8570923cfeec82
[lemmy.git] / lemmy_apub / src / fetcher.rs
1 use crate::{
2   check_is_apub_id_valid,
3   ActorType,
4   FromApub,
5   GroupExt,
6   PageExt,
7   PersonExt,
8   APUB_JSON_CONTENT_TYPE,
9 };
10 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
11 use anyhow::{anyhow, Context};
12 use chrono::NaiveDateTime;
13 use diesel::result::Error::NotFound;
14 use lemmy_db::{
15   comment::{Comment, CommentForm},
16   comment_view::CommentView,
17   community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
18   community_view::CommunityView,
19   naive_now,
20   post::{Post, PostForm},
21   post_view::PostView,
22   user::{UserForm, User_},
23   user_view::UserView,
24   Crud,
25   Joinable,
26   SearchType,
27 };
28 use lemmy_structs::{blocking, site::SearchResponse};
29 use lemmy_utils::{
30   location_info,
31   request::{retry, RecvError},
32   settings::Settings,
33   LemmyError,
34 };
35 use lemmy_websocket::LemmyContext;
36 use log::debug;
37 use reqwest::Client;
38 use serde::Deserialize;
39 use std::{fmt::Debug, time::Duration};
40 use url::Url;
41
42 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
43 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
44
45 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
46 /// fetch through the search).
47 ///
48 /// Tests are passing with a value of 5, so 10 should be safe for production.
49 static MAX_REQUEST_NUMBER: i32 = 10;
50
51 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
52 /// timeouts etc.
53 async fn fetch_remote_object<Response>(
54   client: &Client,
55   url: &Url,
56   recursion_counter: &mut i32,
57 ) -> Result<Response, LemmyError>
58 where
59   Response: for<'de> Deserialize<'de>,
60 {
61   *recursion_counter += 1;
62   if *recursion_counter > MAX_REQUEST_NUMBER {
63     return Err(anyhow!("Maximum recursion depth reached").into());
64   }
65   check_is_apub_id_valid(&url)?;
66
67   let timeout = Duration::from_secs(60);
68
69   let json = retry(|| {
70     client
71       .get(url.as_str())
72       .header("Accept", APUB_JSON_CONTENT_TYPE)
73       .timeout(timeout)
74       .send()
75   })
76   .await?
77   .json()
78   .await
79   .map_err(|e| {
80     debug!("Receive error, {}", e);
81     RecvError(e.to_string())
82   })?;
83
84   Ok(json)
85 }
86
87 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
88 #[serde(untagged)]
89 #[derive(serde::Deserialize, Debug)]
90 pub enum SearchAcceptedObjects {
91   Person(Box<PersonExt>),
92   Group(Box<GroupExt>),
93   Page(Box<PageExt>),
94   Comment(Box<Note>),
95 }
96
97 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
98 ///
99 /// Some working examples for use with the `docker/federation/` setup:
100 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
101 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
102 /// http://lemmy_gamma:8561/post/3
103 /// http://lemmy_delta:8571/comment/2
104 pub async fn search_by_apub_id(
105   query: &str,
106   context: &LemmyContext,
107 ) -> Result<SearchResponse, LemmyError> {
108   // Parse the shorthand query url
109   let query_url = if query.contains('@') {
110     debug!("Search for {}", query);
111     let split = query.split('@').collect::<Vec<&str>>();
112
113     // User type will look like ['', username, instance]
114     // Community will look like [!community, instance]
115     let (name, instance) = if split.len() == 3 {
116       (format!("/u/{}", split[1]), split[2])
117     } else if split.len() == 2 {
118       if split[0].contains('!') {
119         let split2 = split[0].split('!').collect::<Vec<&str>>();
120         (format!("/c/{}", split2[1]), split[1])
121       } else {
122         return Err(anyhow!("Invalid search query: {}", query).into());
123       }
124     } else {
125       return Err(anyhow!("Invalid search query: {}", query).into());
126     };
127
128     let url = format!(
129       "{}://{}{}",
130       Settings::get().get_protocol_string(),
131       instance,
132       name
133     );
134     Url::parse(&url)?
135   } else {
136     Url::parse(&query)?
137   };
138
139   let mut response = SearchResponse {
140     type_: SearchType::All.to_string(),
141     comments: vec![],
142     posts: vec![],
143     communities: vec![],
144     users: vec![],
145   };
146
147   let domain = query_url.domain().context("url has no domain")?;
148   let recursion_counter = &mut 0;
149   let response = match fetch_remote_object::<SearchAcceptedObjects>(
150     context.client(),
151     &query_url,
152     recursion_counter,
153   )
154   .await?
155   {
156     SearchAcceptedObjects::Person(p) => {
157       let user_uri = p.inner.id(domain)?.context("person has no id")?;
158
159       let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
160
161       response.users = vec![
162         blocking(context.pool(), move |conn| {
163           UserView::get_user_secure(conn, user.id)
164         })
165         .await??,
166       ];
167
168       response
169     }
170     SearchAcceptedObjects::Group(g) => {
171       let community_uri = g.inner.id(domain)?.context("group has no id")?;
172
173       let community =
174         get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
175
176       response.communities = vec![
177         blocking(context.pool(), move |conn| {
178           CommunityView::read(conn, community.id, None)
179         })
180         .await??,
181       ];
182
183       response
184     }
185     SearchAcceptedObjects::Page(p) => {
186       let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?;
187
188       let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
189       response.posts =
190         vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
191
192       response
193     }
194     SearchAcceptedObjects::Comment(c) => {
195       let comment_form =
196         CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?;
197
198       let c = blocking(context.pool(), move |conn| {
199         Comment::upsert(conn, &comment_form)
200       })
201       .await??;
202       response.comments = vec![
203         blocking(context.pool(), move |conn| {
204           CommentView::read(conn, c.id, None)
205         })
206         .await??,
207       ];
208
209       response
210     }
211   };
212
213   Ok(response)
214 }
215
216 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
217 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
218 ///
219 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
220 /// Otherwise it is fetched from the remote instance, stored and returned.
221 pub(crate) async fn get_or_fetch_and_upsert_actor(
222   apub_id: &Url,
223   context: &LemmyContext,
224   recursion_counter: &mut i32,
225 ) -> Result<Box<dyn ActorType>, LemmyError> {
226   let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
227   let actor: Box<dyn ActorType> = match community {
228     Ok(c) => Box::new(c),
229     Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
230   };
231   Ok(actor)
232 }
233
234 /// Get a user from its apub ID.
235 ///
236 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
237 /// Otherwise it is fetched from the remote instance, stored and returned.
238 pub(crate) async fn get_or_fetch_and_upsert_user(
239   apub_id: &Url,
240   context: &LemmyContext,
241   recursion_counter: &mut i32,
242 ) -> Result<User_, LemmyError> {
243   let apub_id_owned = apub_id.to_owned();
244   let user = blocking(context.pool(), move |conn| {
245     User_::read_from_actor_id(conn, apub_id_owned.as_ref())
246   })
247   .await?;
248
249   match user {
250     // If its older than a day, re-fetch it
251     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
252       debug!("Fetching and updating from remote user: {}", apub_id);
253       let person =
254         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
255
256       let mut uf = UserForm::from_apub(
257         &person,
258         context,
259         Some(apub_id.to_owned()),
260         recursion_counter,
261       )
262       .await?;
263       uf.last_refreshed_at = Some(naive_now());
264       let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
265
266       Ok(user)
267     }
268     Ok(u) => Ok(u),
269     Err(NotFound {}) => {
270       debug!("Fetching and creating remote user: {}", apub_id);
271       let person =
272         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
273
274       let uf = UserForm::from_apub(
275         &person,
276         context,
277         Some(apub_id.to_owned()),
278         recursion_counter,
279       )
280       .await?;
281       let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
282
283       Ok(user)
284     }
285     Err(e) => Err(e.into()),
286   }
287 }
288
289 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
290 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
291 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
292 ///
293 /// TODO it won't pick up new avatars, summaries etc until a day after.
294 /// Actors need an "update" activity pushed to other servers to fix this.
295 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
296   let update_interval = if cfg!(debug_assertions) {
297     // avoid infinite loop when fetching community outbox
298     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
299   } else {
300     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
301   };
302   last_refreshed.lt(&(naive_now() - update_interval))
303 }
304
305 /// Get a community from its apub ID.
306 ///
307 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
308 /// Otherwise it is fetched from the remote instance, stored and returned.
309 pub(crate) async fn get_or_fetch_and_upsert_community(
310   apub_id: &Url,
311   context: &LemmyContext,
312   recursion_counter: &mut i32,
313 ) -> Result<Community, LemmyError> {
314   let apub_id_owned = apub_id.to_owned();
315   let community = blocking(context.pool(), move |conn| {
316     Community::read_from_actor_id(conn, apub_id_owned.as_str())
317   })
318   .await?;
319
320   match community {
321     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
322       debug!("Fetching and updating from remote community: {}", apub_id);
323       fetch_remote_community(apub_id, context, Some(c.id), recursion_counter).await
324     }
325     Ok(c) => Ok(c),
326     Err(NotFound {}) => {
327       debug!("Fetching and creating remote community: {}", apub_id);
328       fetch_remote_community(apub_id, context, None, recursion_counter).await
329     }
330     Err(e) => Err(e.into()),
331   }
332 }
333
334 /// Request a community by apub ID from a remote instance, including moderators. If `community_id`,
335 /// is set, this is an update for a community which is already known locally. If not, we don't know
336 /// the community yet and also pull the outbox, to get some initial posts.
337 async fn fetch_remote_community(
338   apub_id: &Url,
339   context: &LemmyContext,
340   community_id: Option<i32>,
341   recursion_counter: &mut i32,
342 ) -> Result<Community, LemmyError> {
343   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await?;
344
345   let cf =
346     CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?;
347   let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
348
349   // Also add the community moderators too
350   let attributed_to = group.inner.attributed_to().context(location_info!())?;
351   let creator_and_moderator_uris: Vec<&Url> = attributed_to
352     .as_many()
353     .context(location_info!())?
354     .iter()
355     .map(|a| a.as_xsd_any_uri().context(""))
356     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
357
358   let mut creator_and_moderators = Vec::new();
359
360   for uri in creator_and_moderator_uris {
361     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
362
363     creator_and_moderators.push(c_or_m);
364   }
365
366   // TODO: need to make this work to update mods of existing communities
367   if community_id.is_none() {
368     let community_id = community.id;
369     blocking(context.pool(), move |conn| {
370       for mod_ in creator_and_moderators {
371         let community_moderator_form = CommunityModeratorForm {
372           community_id,
373           user_id: mod_.id,
374         };
375
376         CommunityModerator::join(conn, &community_moderator_form)?;
377       }
378       Ok(()) as Result<(), LemmyError>
379     })
380     .await??;
381   }
382
383   // fetch outbox (maybe make this conditional)
384   let outbox = fetch_remote_object::<OrderedCollection>(
385     context.client(),
386     &community.get_outbox_url()?,
387     recursion_counter,
388   )
389   .await?;
390   let outbox_items = outbox.items().context(location_info!())?.clone();
391   let mut outbox_items = outbox_items.many().context(location_info!())?;
392   if outbox_items.len() > 20 {
393     outbox_items = outbox_items[0..20].to_vec();
394   }
395   for o in outbox_items {
396     let page = PageExt::from_any_base(o)?.context(location_info!())?;
397
398     // The post creator may be from a blocked instance,
399     // if it errors, then continue
400     let post = match PostForm::from_apub(&page, context, None, recursion_counter).await {
401       Ok(post) => post,
402       Err(_) => continue,
403     };
404     let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
405     // Check whether the post already exists in the local db
406     let existing = blocking(context.pool(), move |conn| {
407       Post::read_from_apub_id(conn, &post_ap_id)
408     })
409     .await?;
410     match existing {
411       Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
412       Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
413     };
414     // TODO: we need to send a websocket update here
415   }
416
417   Ok(community)
418 }
419
420 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
421 /// pulled from its apub ID, inserted and returned.
422 ///
423 /// The parent community is also pulled if necessary. Comments are not pulled.
424 pub(crate) async fn get_or_fetch_and_insert_post(
425   post_ap_id: &Url,
426   context: &LemmyContext,
427   recursion_counter: &mut i32,
428 ) -> Result<Post, LemmyError> {
429   let post_ap_id_owned = post_ap_id.to_owned();
430   let post = blocking(context.pool(), move |conn| {
431     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
432   })
433   .await?;
434
435   match post {
436     Ok(p) => Ok(p),
437     Err(NotFound {}) => {
438       debug!("Fetching and creating remote post: {}", post_ap_id);
439       let post =
440         fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
441       let post_form = PostForm::from_apub(
442         &post,
443         context,
444         Some(post_ap_id.to_owned()),
445         recursion_counter,
446       )
447       .await?;
448
449       let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
450
451       Ok(post)
452     }
453     Err(e) => Err(e.into()),
454   }
455 }
456
457 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
458 /// pulled from its apub ID, inserted and returned.
459 ///
460 /// The parent community, post and comment are also pulled if necessary.
461 pub(crate) async fn get_or_fetch_and_insert_comment(
462   comment_ap_id: &Url,
463   context: &LemmyContext,
464   recursion_counter: &mut i32,
465 ) -> Result<Comment, LemmyError> {
466   let comment_ap_id_owned = comment_ap_id.to_owned();
467   let comment = blocking(context.pool(), move |conn| {
468     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
469   })
470   .await?;
471
472   match comment {
473     Ok(p) => Ok(p),
474     Err(NotFound {}) => {
475       debug!(
476         "Fetching and creating remote comment and its parents: {}",
477         comment_ap_id
478       );
479       let comment =
480         fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
481       let comment_form = CommentForm::from_apub(
482         &comment,
483         context,
484         Some(comment_ap_id.to_owned()),
485         recursion_counter,
486       )
487       .await?;
488
489       let comment = blocking(context.pool(), move |conn| {
490         Comment::upsert(conn, &comment_form)
491       })
492       .await??;
493
494       Ok(comment)
495     }
496     Err(e) => Err(e.into()),
497   }
498 }