]> Untitled Git - lemmy.git/blob - lemmy_apub/src/fetcher.rs
Merge branch 'main' into move_views_to_diesel
[lemmy.git] / lemmy_apub / src / fetcher.rs
1 use crate::{
2   check_is_apub_id_valid,
3   objects::FromApub,
4   ActorType,
5   GroupExt,
6   NoteExt,
7   PageExt,
8   PersonExt,
9   APUB_JSON_CONTENT_TYPE,
10 };
11 use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
12 use anyhow::{anyhow, Context};
13 use chrono::NaiveDateTime;
14 use diesel::result::Error::NotFound;
15 use lemmy_db::{
16   comment::Comment,
17   comment_view::CommentView,
18   community::{Community, CommunityModerator, CommunityModeratorForm},
19   naive_now,
20   post::Post,
21   post_view::PostView,
22   user::User_,
23   views::{community_view::CommunityView, user_view::UserViewSafe},
24   ApubObject,
25   Joinable,
26   SearchType,
27 };
28 use lemmy_structs::{blocking, site::SearchResponse};
29 use lemmy_utils::{
30   location_info,
31   request::{retry, RecvError},
32   settings::Settings,
33   LemmyError,
34 };
35 use lemmy_websocket::LemmyContext;
36 use log::debug;
37 use reqwest::Client;
38 use serde::Deserialize;
39 use std::{fmt::Debug, time::Duration};
40 use url::Url;
41
42 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
43 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
44
45 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
46 /// fetch through the search).
47 ///
48 /// Tests are passing with a value of 5, so 10 should be safe for production.
49 static MAX_REQUEST_NUMBER: i32 = 10;
50
51 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
52 /// timeouts etc.
53 async fn fetch_remote_object<Response>(
54   client: &Client,
55   url: &Url,
56   recursion_counter: &mut i32,
57 ) -> Result<Response, LemmyError>
58 where
59   Response: for<'de> Deserialize<'de>,
60 {
61   *recursion_counter += 1;
62   if *recursion_counter > MAX_REQUEST_NUMBER {
63     return Err(anyhow!("Maximum recursion depth reached").into());
64   }
65   check_is_apub_id_valid(&url)?;
66
67   let timeout = Duration::from_secs(60);
68
69   let json = retry(|| {
70     client
71       .get(url.as_str())
72       .header("Accept", APUB_JSON_CONTENT_TYPE)
73       .timeout(timeout)
74       .send()
75   })
76   .await?
77   .json()
78   .await
79   .map_err(|e| {
80     debug!("Receive error, {}", e);
81     RecvError(e.to_string())
82   })?;
83
84   Ok(json)
85 }
86
87 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
88 #[serde(untagged)]
89 #[derive(serde::Deserialize, Debug)]
90 enum SearchAcceptedObjects {
91   Person(Box<PersonExt>),
92   Group(Box<GroupExt>),
93   Page(Box<PageExt>),
94   Comment(Box<NoteExt>),
95 }
96
97 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
98 ///
99 /// Some working examples for use with the `docker/federation/` setup:
100 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
101 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
102 /// http://lemmy_gamma:8561/post/3
103 /// http://lemmy_delta:8571/comment/2
104 pub async fn search_by_apub_id(
105   query: &str,
106   context: &LemmyContext,
107 ) -> Result<SearchResponse, LemmyError> {
108   // Parse the shorthand query url
109   let query_url = if query.contains('@') {
110     debug!("Search for {}", query);
111     let split = query.split('@').collect::<Vec<&str>>();
112
113     // User type will look like ['', username, instance]
114     // Community will look like [!community, instance]
115     let (name, instance) = if split.len() == 3 {
116       (format!("/u/{}", split[1]), split[2])
117     } else if split.len() == 2 {
118       if split[0].contains('!') {
119         let split2 = split[0].split('!').collect::<Vec<&str>>();
120         (format!("/c/{}", split2[1]), split[1])
121       } else {
122         return Err(anyhow!("Invalid search query: {}", query).into());
123       }
124     } else {
125       return Err(anyhow!("Invalid search query: {}", query).into());
126     };
127
128     let url = format!(
129       "{}://{}{}",
130       Settings::get().get_protocol_string(),
131       instance,
132       name
133     );
134     Url::parse(&url)?
135   } else {
136     Url::parse(&query)?
137   };
138
139   let mut response = SearchResponse {
140     type_: SearchType::All.to_string(),
141     comments: vec![],
142     posts: vec![],
143     communities: vec![],
144     users: vec![],
145   };
146
147   let domain = query_url.domain().context("url has no domain")?;
148   let recursion_counter = &mut 0;
149   let response = match fetch_remote_object::<SearchAcceptedObjects>(
150     context.client(),
151     &query_url,
152     recursion_counter,
153   )
154   .await?
155   {
156     SearchAcceptedObjects::Person(p) => {
157       let user_uri = p.inner.id(domain)?.context("person has no id")?;
158
159       let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
160
161       response.users = vec![
162         blocking(context.pool(), move |conn| {
163           UserViewSafe::read(conn, user.id)
164         })
165         .await??,
166       ];
167
168       response
169     }
170     SearchAcceptedObjects::Group(g) => {
171       let community_uri = g.inner.id(domain)?.context("group has no id")?;
172
173       let community =
174         get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
175
176       response.communities = vec![
177         blocking(context.pool(), move |conn| {
178           CommunityView::read(conn, community.id, None)
179         })
180         .await??,
181       ];
182
183       response
184     }
185     SearchAcceptedObjects::Page(p) => {
186       let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
187
188       response.posts =
189         vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
190
191       response
192     }
193     SearchAcceptedObjects::Comment(c) => {
194       let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
195
196       response.comments = vec![
197         blocking(context.pool(), move |conn| {
198           CommentView::read(conn, c.id, None)
199         })
200         .await??,
201       ];
202
203       response
204     }
205   };
206
207   Ok(response)
208 }
209
210 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
211 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
212 ///
213 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
214 /// Otherwise it is fetched from the remote instance, stored and returned.
215 pub(crate) async fn get_or_fetch_and_upsert_actor(
216   apub_id: &Url,
217   context: &LemmyContext,
218   recursion_counter: &mut i32,
219 ) -> Result<Box<dyn ActorType>, LemmyError> {
220   let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
221   let actor: Box<dyn ActorType> = match community {
222     Ok(c) => Box::new(c),
223     Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
224   };
225   Ok(actor)
226 }
227
228 /// Get a user from its apub ID.
229 ///
230 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
231 /// Otherwise it is fetched from the remote instance, stored and returned.
232 pub(crate) async fn get_or_fetch_and_upsert_user(
233   apub_id: &Url,
234   context: &LemmyContext,
235   recursion_counter: &mut i32,
236 ) -> Result<User_, LemmyError> {
237   let apub_id_owned = apub_id.to_owned();
238   let user = blocking(context.pool(), move |conn| {
239     User_::read_from_apub_id(conn, apub_id_owned.as_ref())
240   })
241   .await?;
242
243   match user {
244     // If its older than a day, re-fetch it
245     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
246       debug!("Fetching and updating from remote user: {}", apub_id);
247       let person =
248         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
249       // If fetching failed, return the existing data.
250       if person.is_err() {
251         return Ok(u);
252       }
253
254       let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
255
256       let user_id = user.id;
257       blocking(context.pool(), move |conn| {
258         User_::mark_as_updated(conn, user_id)
259       })
260       .await??;
261
262       Ok(user)
263     }
264     Ok(u) => Ok(u),
265     Err(NotFound {}) => {
266       debug!("Fetching and creating remote user: {}", apub_id);
267       let person =
268         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
269
270       let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
271
272       Ok(user)
273     }
274     Err(e) => Err(e.into()),
275   }
276 }
277
278 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
279 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
280 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
281 ///
282 /// TODO it won't pick up new avatars, summaries etc until a day after.
283 /// Actors need an "update" activity pushed to other servers to fix this.
284 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
285   let update_interval = if cfg!(debug_assertions) {
286     // avoid infinite loop when fetching community outbox
287     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
288   } else {
289     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
290   };
291   last_refreshed.lt(&(naive_now() - update_interval))
292 }
293
294 /// Get a community from its apub ID.
295 ///
296 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
297 /// Otherwise it is fetched from the remote instance, stored and returned.
298 pub(crate) async fn get_or_fetch_and_upsert_community(
299   apub_id: &Url,
300   context: &LemmyContext,
301   recursion_counter: &mut i32,
302 ) -> Result<Community, LemmyError> {
303   let apub_id_owned = apub_id.to_owned();
304   let community = blocking(context.pool(), move |conn| {
305     Community::read_from_apub_id(conn, apub_id_owned.as_str())
306   })
307   .await?;
308
309   match community {
310     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
311       debug!("Fetching and updating from remote community: {}", apub_id);
312       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
313     }
314     Ok(c) => Ok(c),
315     Err(NotFound {}) => {
316       debug!("Fetching and creating remote community: {}", apub_id);
317       fetch_remote_community(apub_id, context, None, recursion_counter).await
318     }
319     Err(e) => Err(e.into()),
320   }
321 }
322
323 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
324 /// is set, this is an update for a community which is already known locally. If not, we don't know
325 /// the community yet and also pull the outbox, to get some initial posts.
326 async fn fetch_remote_community(
327   apub_id: &Url,
328   context: &LemmyContext,
329   old_community: Option<Community>,
330   recursion_counter: &mut i32,
331 ) -> Result<Community, LemmyError> {
332   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
333   // If fetching failed, return the existing data.
334   if let Some(ref c) = old_community {
335     if group.is_err() {
336       return Ok(c.to_owned());
337     }
338   }
339
340   let group = group?;
341   let community =
342     Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
343
344   // Also add the community moderators too
345   let attributed_to = group.inner.attributed_to().context(location_info!())?;
346   let creator_and_moderator_uris: Vec<&Url> = attributed_to
347     .as_many()
348     .context(location_info!())?
349     .iter()
350     .map(|a| a.as_xsd_any_uri().context(""))
351     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
352
353   let mut creator_and_moderators = Vec::new();
354
355   for uri in creator_and_moderator_uris {
356     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
357
358     creator_and_moderators.push(c_or_m);
359   }
360
361   // TODO: need to make this work to update mods of existing communities
362   if old_community.is_none() {
363     let community_id = community.id;
364     blocking(context.pool(), move |conn| {
365       for mod_ in creator_and_moderators {
366         let community_moderator_form = CommunityModeratorForm {
367           community_id,
368           user_id: mod_.id,
369         };
370
371         CommunityModerator::join(conn, &community_moderator_form)?;
372       }
373       Ok(()) as Result<(), LemmyError>
374     })
375     .await??;
376   }
377
378   // fetch outbox (maybe make this conditional)
379   let outbox = fetch_remote_object::<OrderedCollection>(
380     context.client(),
381     &community.get_outbox_url()?,
382     recursion_counter,
383   )
384   .await?;
385   let outbox_items = outbox.items().context(location_info!())?.clone();
386   let mut outbox_items = outbox_items.many().context(location_info!())?;
387   if outbox_items.len() > 20 {
388     outbox_items = outbox_items[0..20].to_vec();
389   }
390   for o in outbox_items {
391     let page = PageExt::from_any_base(o)?.context(location_info!())?;
392     let page_id = page.id_unchecked().context(location_info!())?;
393
394     // The post creator may be from a blocked instance, if it errors, then skip it
395     if check_is_apub_id_valid(page_id).is_err() {
396       continue;
397     }
398     Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
399     // TODO: we need to send a websocket update here
400   }
401
402   Ok(community)
403 }
404
405 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
406 /// pulled from its apub ID, inserted and returned.
407 ///
408 /// The parent community is also pulled if necessary. Comments are not pulled.
409 pub(crate) async fn get_or_fetch_and_insert_post(
410   post_ap_id: &Url,
411   context: &LemmyContext,
412   recursion_counter: &mut i32,
413 ) -> Result<Post, LemmyError> {
414   let post_ap_id_owned = post_ap_id.to_owned();
415   let post = blocking(context.pool(), move |conn| {
416     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
417   })
418   .await?;
419
420   match post {
421     Ok(p) => Ok(p),
422     Err(NotFound {}) => {
423       debug!("Fetching and creating remote post: {}", post_ap_id);
424       let page =
425         fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
426       let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
427
428       Ok(post)
429     }
430     Err(e) => Err(e.into()),
431   }
432 }
433
434 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
435 /// pulled from its apub ID, inserted and returned.
436 ///
437 /// The parent community, post and comment are also pulled if necessary.
438 pub(crate) async fn get_or_fetch_and_insert_comment(
439   comment_ap_id: &Url,
440   context: &LemmyContext,
441   recursion_counter: &mut i32,
442 ) -> Result<Comment, LemmyError> {
443   let comment_ap_id_owned = comment_ap_id.to_owned();
444   let comment = blocking(context.pool(), move |conn| {
445     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
446   })
447   .await?;
448
449   match comment {
450     Ok(p) => Ok(p),
451     Err(NotFound {}) => {
452       debug!(
453         "Fetching and creating remote comment and its parents: {}",
454         comment_ap_id
455       );
456       let comment =
457         fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
458       let comment = Comment::from_apub(
459         &comment,
460         context,
461         comment_ap_id.to_owned(),
462         recursion_counter,
463       )
464       .await?;
465
466       let post_id = comment.post_id;
467       let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
468       if post.locked {
469         return Err(anyhow!("Post is locked").into());
470       }
471
472       Ok(comment)
473     }
474     Err(e) => Err(e.into()),
475   }
476 }