]> Untitled Git - lemmy.git/blob - lemmy_apub/src/fetcher.rs
61cdbd47ab4b63e5c8d696e1452a20b1fe38013f
[lemmy.git] / lemmy_apub / src / fetcher.rs
1 use crate::{
2   check_is_apub_id_valid,
3   objects::FromApub,
4   ActorType,
5   GroupExt,
6   NoteExt,
7   PageExt,
8   PersonExt,
9   APUB_JSON_CONTENT_TYPE,
10 };
11 use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
12 use anyhow::{anyhow, Context};
13 use chrono::NaiveDateTime;
14 use diesel::result::Error::NotFound;
15 use lemmy_db::{
16   naive_now,
17   source::{
18     comment::Comment,
19     community::{Community, CommunityModerator, CommunityModeratorForm},
20     post::Post,
21     user::User_,
22   },
23   views::{
24     comment_view::CommentView,
25     community::community_view::CommunityView,
26     post_view::PostView,
27     user_view::UserViewSafe,
28   },
29   ApubObject,
30   Joinable,
31   SearchType,
32 };
33 use lemmy_structs::{blocking, site::SearchResponse};
34 use lemmy_utils::{
35   location_info,
36   request::{retry, RecvError},
37   settings::Settings,
38   LemmyError,
39 };
40 use lemmy_websocket::LemmyContext;
41 use log::debug;
42 use reqwest::Client;
43 use serde::Deserialize;
44 use std::{fmt::Debug, time::Duration};
45 use url::Url;
46
47 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
48 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
49
50 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
51 /// fetch through the search).
52 ///
53 /// Tests are passing with a value of 5, so 10 should be safe for production.
54 static MAX_REQUEST_NUMBER: i32 = 10;
55
56 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
57 /// timeouts etc.
58 async fn fetch_remote_object<Response>(
59   client: &Client,
60   url: &Url,
61   recursion_counter: &mut i32,
62 ) -> Result<Response, LemmyError>
63 where
64   Response: for<'de> Deserialize<'de>,
65 {
66   *recursion_counter += 1;
67   if *recursion_counter > MAX_REQUEST_NUMBER {
68     return Err(anyhow!("Maximum recursion depth reached").into());
69   }
70   check_is_apub_id_valid(&url)?;
71
72   let timeout = Duration::from_secs(60);
73
74   let json = retry(|| {
75     client
76       .get(url.as_str())
77       .header("Accept", APUB_JSON_CONTENT_TYPE)
78       .timeout(timeout)
79       .send()
80   })
81   .await?
82   .json()
83   .await
84   .map_err(|e| {
85     debug!("Receive error, {}", e);
86     RecvError(e.to_string())
87   })?;
88
89   Ok(json)
90 }
91
92 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
93 #[serde(untagged)]
94 #[derive(serde::Deserialize, Debug)]
95 enum SearchAcceptedObjects {
96   Person(Box<PersonExt>),
97   Group(Box<GroupExt>),
98   Page(Box<PageExt>),
99   Comment(Box<NoteExt>),
100 }
101
102 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
103 ///
104 /// Some working examples for use with the `docker/federation/` setup:
105 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
106 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
107 /// http://lemmy_gamma:8561/post/3
108 /// http://lemmy_delta:8571/comment/2
109 pub async fn search_by_apub_id(
110   query: &str,
111   context: &LemmyContext,
112 ) -> Result<SearchResponse, LemmyError> {
113   // Parse the shorthand query url
114   let query_url = if query.contains('@') {
115     debug!("Search for {}", query);
116     let split = query.split('@').collect::<Vec<&str>>();
117
118     // User type will look like ['', username, instance]
119     // Community will look like [!community, instance]
120     let (name, instance) = if split.len() == 3 {
121       (format!("/u/{}", split[1]), split[2])
122     } else if split.len() == 2 {
123       if split[0].contains('!') {
124         let split2 = split[0].split('!').collect::<Vec<&str>>();
125         (format!("/c/{}", split2[1]), split[1])
126       } else {
127         return Err(anyhow!("Invalid search query: {}", query).into());
128       }
129     } else {
130       return Err(anyhow!("Invalid search query: {}", query).into());
131     };
132
133     let url = format!(
134       "{}://{}{}",
135       Settings::get().get_protocol_string(),
136       instance,
137       name
138     );
139     Url::parse(&url)?
140   } else {
141     Url::parse(&query)?
142   };
143
144   let mut response = SearchResponse {
145     type_: SearchType::All.to_string(),
146     comments: vec![],
147     posts: vec![],
148     communities: vec![],
149     users: vec![],
150   };
151
152   let domain = query_url.domain().context("url has no domain")?;
153   let recursion_counter = &mut 0;
154   let response = match fetch_remote_object::<SearchAcceptedObjects>(
155     context.client(),
156     &query_url,
157     recursion_counter,
158   )
159   .await?
160   {
161     SearchAcceptedObjects::Person(p) => {
162       let user_uri = p.inner.id(domain)?.context("person has no id")?;
163
164       let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
165
166       response.users = vec![
167         blocking(context.pool(), move |conn| {
168           UserViewSafe::read(conn, user.id)
169         })
170         .await??,
171       ];
172
173       response
174     }
175     SearchAcceptedObjects::Group(g) => {
176       let community_uri = g.inner.id(domain)?.context("group has no id")?;
177
178       let community =
179         get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
180
181       response.communities = vec![
182         blocking(context.pool(), move |conn| {
183           CommunityView::read(conn, community.id, None)
184         })
185         .await??,
186       ];
187
188       response
189     }
190     SearchAcceptedObjects::Page(p) => {
191       let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
192
193       response.posts =
194         vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
195
196       response
197     }
198     SearchAcceptedObjects::Comment(c) => {
199       let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
200
201       response.comments = vec![
202         blocking(context.pool(), move |conn| {
203           CommentView::read(conn, c.id, None)
204         })
205         .await??,
206       ];
207
208       response
209     }
210   };
211
212   Ok(response)
213 }
214
215 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
216 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
217 ///
218 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
219 /// Otherwise it is fetched from the remote instance, stored and returned.
220 pub(crate) async fn get_or_fetch_and_upsert_actor(
221   apub_id: &Url,
222   context: &LemmyContext,
223   recursion_counter: &mut i32,
224 ) -> Result<Box<dyn ActorType>, LemmyError> {
225   let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
226   let actor: Box<dyn ActorType> = match community {
227     Ok(c) => Box::new(c),
228     Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
229   };
230   Ok(actor)
231 }
232
233 /// Get a user from its apub ID.
234 ///
235 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
236 /// Otherwise it is fetched from the remote instance, stored and returned.
237 pub(crate) async fn get_or_fetch_and_upsert_user(
238   apub_id: &Url,
239   context: &LemmyContext,
240   recursion_counter: &mut i32,
241 ) -> Result<User_, LemmyError> {
242   let apub_id_owned = apub_id.to_owned();
243   let user = blocking(context.pool(), move |conn| {
244     User_::read_from_apub_id(conn, apub_id_owned.as_ref())
245   })
246   .await?;
247
248   match user {
249     // If its older than a day, re-fetch it
250     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
251       debug!("Fetching and updating from remote user: {}", apub_id);
252       let person =
253         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
254       // If fetching failed, return the existing data.
255       if person.is_err() {
256         return Ok(u);
257       }
258
259       let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
260
261       let user_id = user.id;
262       blocking(context.pool(), move |conn| {
263         User_::mark_as_updated(conn, user_id)
264       })
265       .await??;
266
267       Ok(user)
268     }
269     Ok(u) => Ok(u),
270     Err(NotFound {}) => {
271       debug!("Fetching and creating remote user: {}", apub_id);
272       let person =
273         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
274
275       let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
276
277       Ok(user)
278     }
279     Err(e) => Err(e.into()),
280   }
281 }
282
283 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
284 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
285 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
286 ///
287 /// TODO it won't pick up new avatars, summaries etc until a day after.
288 /// Actors need an "update" activity pushed to other servers to fix this.
289 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
290   let update_interval = if cfg!(debug_assertions) {
291     // avoid infinite loop when fetching community outbox
292     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
293   } else {
294     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
295   };
296   last_refreshed.lt(&(naive_now() - update_interval))
297 }
298
299 /// Get a community from its apub ID.
300 ///
301 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
302 /// Otherwise it is fetched from the remote instance, stored and returned.
303 pub(crate) async fn get_or_fetch_and_upsert_community(
304   apub_id: &Url,
305   context: &LemmyContext,
306   recursion_counter: &mut i32,
307 ) -> Result<Community, LemmyError> {
308   let apub_id_owned = apub_id.to_owned();
309   let community = blocking(context.pool(), move |conn| {
310     Community::read_from_apub_id(conn, apub_id_owned.as_str())
311   })
312   .await?;
313
314   match community {
315     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
316       debug!("Fetching and updating from remote community: {}", apub_id);
317       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
318     }
319     Ok(c) => Ok(c),
320     Err(NotFound {}) => {
321       debug!("Fetching and creating remote community: {}", apub_id);
322       fetch_remote_community(apub_id, context, None, recursion_counter).await
323     }
324     Err(e) => Err(e.into()),
325   }
326 }
327
328 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
329 /// is set, this is an update for a community which is already known locally. If not, we don't know
330 /// the community yet and also pull the outbox, to get some initial posts.
331 async fn fetch_remote_community(
332   apub_id: &Url,
333   context: &LemmyContext,
334   old_community: Option<Community>,
335   recursion_counter: &mut i32,
336 ) -> Result<Community, LemmyError> {
337   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
338   // If fetching failed, return the existing data.
339   if let Some(ref c) = old_community {
340     if group.is_err() {
341       return Ok(c.to_owned());
342     }
343   }
344
345   let group = group?;
346   let community =
347     Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
348
349   // Also add the community moderators too
350   let attributed_to = group.inner.attributed_to().context(location_info!())?;
351   let creator_and_moderator_uris: Vec<&Url> = attributed_to
352     .as_many()
353     .context(location_info!())?
354     .iter()
355     .map(|a| a.as_xsd_any_uri().context(""))
356     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
357
358   let mut creator_and_moderators = Vec::new();
359
360   for uri in creator_and_moderator_uris {
361     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
362
363     creator_and_moderators.push(c_or_m);
364   }
365
366   // TODO: need to make this work to update mods of existing communities
367   if old_community.is_none() {
368     let community_id = community.id;
369     blocking(context.pool(), move |conn| {
370       for mod_ in creator_and_moderators {
371         let community_moderator_form = CommunityModeratorForm {
372           community_id,
373           user_id: mod_.id,
374         };
375
376         CommunityModerator::join(conn, &community_moderator_form)?;
377       }
378       Ok(()) as Result<(), LemmyError>
379     })
380     .await??;
381   }
382
383   // fetch outbox (maybe make this conditional)
384   let outbox = fetch_remote_object::<OrderedCollection>(
385     context.client(),
386     &community.get_outbox_url()?,
387     recursion_counter,
388   )
389   .await?;
390   let outbox_items = outbox.items().context(location_info!())?.clone();
391   let mut outbox_items = outbox_items.many().context(location_info!())?;
392   if outbox_items.len() > 20 {
393     outbox_items = outbox_items[0..20].to_vec();
394   }
395   for o in outbox_items {
396     let page = PageExt::from_any_base(o)?.context(location_info!())?;
397     let page_id = page.id_unchecked().context(location_info!())?;
398
399     // The post creator may be from a blocked instance, if it errors, then skip it
400     if check_is_apub_id_valid(page_id).is_err() {
401       continue;
402     }
403     Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
404     // TODO: we need to send a websocket update here
405   }
406
407   Ok(community)
408 }
409
410 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
411 /// pulled from its apub ID, inserted and returned.
412 ///
413 /// The parent community is also pulled if necessary. Comments are not pulled.
414 pub(crate) async fn get_or_fetch_and_insert_post(
415   post_ap_id: &Url,
416   context: &LemmyContext,
417   recursion_counter: &mut i32,
418 ) -> Result<Post, LemmyError> {
419   let post_ap_id_owned = post_ap_id.to_owned();
420   let post = blocking(context.pool(), move |conn| {
421     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
422   })
423   .await?;
424
425   match post {
426     Ok(p) => Ok(p),
427     Err(NotFound {}) => {
428       debug!("Fetching and creating remote post: {}", post_ap_id);
429       let page =
430         fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
431       let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
432
433       Ok(post)
434     }
435     Err(e) => Err(e.into()),
436   }
437 }
438
439 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
440 /// pulled from its apub ID, inserted and returned.
441 ///
442 /// The parent community, post and comment are also pulled if necessary.
443 pub(crate) async fn get_or_fetch_and_insert_comment(
444   comment_ap_id: &Url,
445   context: &LemmyContext,
446   recursion_counter: &mut i32,
447 ) -> Result<Comment, LemmyError> {
448   let comment_ap_id_owned = comment_ap_id.to_owned();
449   let comment = blocking(context.pool(), move |conn| {
450     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
451   })
452   .await?;
453
454   match comment {
455     Ok(p) => Ok(p),
456     Err(NotFound {}) => {
457       debug!(
458         "Fetching and creating remote comment and its parents: {}",
459         comment_ap_id
460       );
461       let comment =
462         fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
463       let comment = Comment::from_apub(
464         &comment,
465         context,
466         comment_ap_id.to_owned(),
467         recursion_counter,
468       )
469       .await?;
470
471       let post_id = comment.post_id;
472       let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
473       if post.locked {
474         return Err(anyhow!("Post is locked").into());
475       }
476
477       Ok(comment)
478     }
479     Err(e) => Err(e.into()),
480   }
481 }