]> Untitled Git - lemmy.git/blob - lemmy_apub/src/fetcher.rs
Remove logging to find bug (yerbamate.ml/LemmyNet/lemmy/pulls/146)
[lemmy.git] / lemmy_apub / src / fetcher.rs
1 use crate::{
2   check_is_apub_id_valid,
3   objects::FromApub,
4   ActorType,
5   GroupExt,
6   NoteExt,
7   PageExt,
8   PersonExt,
9   APUB_JSON_CONTENT_TYPE,
10 };
11 use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
12 use anyhow::{anyhow, Context};
13 use chrono::NaiveDateTime;
14 use diesel::result::Error::NotFound;
15 use lemmy_db::{
16   comment::Comment,
17   comment_view::CommentView,
18   community::{Community, CommunityModerator, CommunityModeratorForm},
19   community_view::CommunityView,
20   naive_now,
21   post::Post,
22   post_view::PostView,
23   user::User_,
24   user_view::UserView,
25   ApubObject,
26   Joinable,
27   SearchType,
28 };
29 use lemmy_structs::{blocking, site::SearchResponse};
30 use lemmy_utils::{
31   location_info,
32   request::{retry, RecvError},
33   settings::Settings,
34   LemmyError,
35 };
36 use lemmy_websocket::LemmyContext;
37 use log::debug;
38 use reqwest::Client;
39 use serde::Deserialize;
40 use std::{fmt::Debug, time::Duration};
41 use url::Url;
42
43 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
44 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
45
46 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
47 /// fetch through the search).
48 ///
49 /// Tests are passing with a value of 5, so 10 should be safe for production.
50 static MAX_REQUEST_NUMBER: i32 = 10;
51
52 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
53 /// timeouts etc.
54 async fn fetch_remote_object<Response>(
55   client: &Client,
56   url: &Url,
57   recursion_counter: &mut i32,
58 ) -> Result<Response, LemmyError>
59 where
60   Response: for<'de> Deserialize<'de>,
61 {
62   *recursion_counter += 1;
63   if *recursion_counter > MAX_REQUEST_NUMBER {
64     return Err(anyhow!("Maximum recursion depth reached").into());
65   }
66   check_is_apub_id_valid(&url)?;
67
68   let timeout = Duration::from_secs(60);
69
70   let json = retry(|| {
71     client
72       .get(url.as_str())
73       .header("Accept", APUB_JSON_CONTENT_TYPE)
74       .timeout(timeout)
75       .send()
76   })
77   .await?
78   .json()
79   .await
80   .map_err(|e| {
81     debug!("Receive error, {}", e);
82     RecvError(e.to_string())
83   })?;
84
85   Ok(json)
86 }
87
88 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
89 #[serde(untagged)]
90 #[derive(serde::Deserialize, Debug)]
91 enum SearchAcceptedObjects {
92   Person(Box<PersonExt>),
93   Group(Box<GroupExt>),
94   Page(Box<PageExt>),
95   Comment(Box<NoteExt>),
96 }
97
98 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
99 ///
100 /// Some working examples for use with the `docker/federation/` setup:
101 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
102 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
103 /// http://lemmy_gamma:8561/post/3
104 /// http://lemmy_delta:8571/comment/2
105 pub async fn search_by_apub_id(
106   query: &str,
107   context: &LemmyContext,
108 ) -> Result<SearchResponse, LemmyError> {
109   // Parse the shorthand query url
110   let query_url = if query.contains('@') {
111     debug!("Search for {}", query);
112     let split = query.split('@').collect::<Vec<&str>>();
113
114     // User type will look like ['', username, instance]
115     // Community will look like [!community, instance]
116     let (name, instance) = if split.len() == 3 {
117       (format!("/u/{}", split[1]), split[2])
118     } else if split.len() == 2 {
119       if split[0].contains('!') {
120         let split2 = split[0].split('!').collect::<Vec<&str>>();
121         (format!("/c/{}", split2[1]), split[1])
122       } else {
123         return Err(anyhow!("Invalid search query: {}", query).into());
124       }
125     } else {
126       return Err(anyhow!("Invalid search query: {}", query).into());
127     };
128
129     let url = format!(
130       "{}://{}{}",
131       Settings::get().get_protocol_string(),
132       instance,
133       name
134     );
135     Url::parse(&url)?
136   } else {
137     Url::parse(&query)?
138   };
139
140   let mut response = SearchResponse {
141     type_: SearchType::All.to_string(),
142     comments: vec![],
143     posts: vec![],
144     communities: vec![],
145     users: vec![],
146   };
147
148   let domain = query_url.domain().context("url has no domain")?;
149   let recursion_counter = &mut 0;
150   let response = match fetch_remote_object::<SearchAcceptedObjects>(
151     context.client(),
152     &query_url,
153     recursion_counter,
154   )
155   .await?
156   {
157     SearchAcceptedObjects::Person(p) => {
158       let user_uri = p.inner.id(domain)?.context("person has no id")?;
159
160       let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
161
162       response.users = vec![
163         blocking(context.pool(), move |conn| {
164           UserView::get_user_secure(conn, user.id)
165         })
166         .await??,
167       ];
168
169       response
170     }
171     SearchAcceptedObjects::Group(g) => {
172       let community_uri = g.inner.id(domain)?.context("group has no id")?;
173
174       let community =
175         get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
176
177       response.communities = vec![
178         blocking(context.pool(), move |conn| {
179           CommunityView::read(conn, community.id, None)
180         })
181         .await??,
182       ];
183
184       response
185     }
186     SearchAcceptedObjects::Page(p) => {
187       let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
188
189       response.posts =
190         vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
191
192       response
193     }
194     SearchAcceptedObjects::Comment(c) => {
195       let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
196
197       response.comments = vec![
198         blocking(context.pool(), move |conn| {
199           CommentView::read(conn, c.id, None)
200         })
201         .await??,
202       ];
203
204       response
205     }
206   };
207
208   Ok(response)
209 }
210
211 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
212 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
213 ///
214 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
215 /// Otherwise it is fetched from the remote instance, stored and returned.
216 pub(crate) async fn get_or_fetch_and_upsert_actor(
217   apub_id: &Url,
218   context: &LemmyContext,
219   recursion_counter: &mut i32,
220 ) -> Result<Box<dyn ActorType>, LemmyError> {
221   let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
222   let actor: Box<dyn ActorType> = match community {
223     Ok(c) => Box::new(c),
224     Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
225   };
226   Ok(actor)
227 }
228
229 /// Get a user from its apub ID.
230 ///
231 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
232 /// Otherwise it is fetched from the remote instance, stored and returned.
233 pub(crate) async fn get_or_fetch_and_upsert_user(
234   apub_id: &Url,
235   context: &LemmyContext,
236   recursion_counter: &mut i32,
237 ) -> Result<User_, LemmyError> {
238   let apub_id_owned = apub_id.to_owned();
239   let user = blocking(context.pool(), move |conn| {
240     User_::read_from_apub_id(conn, apub_id_owned.as_ref())
241   })
242   .await?;
243
244   match user {
245     // If its older than a day, re-fetch it
246     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
247       debug!("Fetching and updating from remote user: {}", apub_id);
248       let person =
249         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
250       // If fetching failed, return the existing data.
251       if person.is_err() {
252         return Ok(u);
253       }
254
255       let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
256
257       let user_id = user.id;
258       blocking(context.pool(), move |conn| {
259         User_::mark_as_updated(conn, user_id)
260       })
261       .await??;
262
263       Ok(user)
264     }
265     Ok(u) => Ok(u),
266     Err(NotFound {}) => {
267       debug!("Fetching and creating remote user: {}", apub_id);
268       let person =
269         fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
270
271       let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
272
273       Ok(user)
274     }
275     Err(e) => Err(e.into()),
276   }
277 }
278
279 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
280 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
281 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
282 ///
283 /// TODO it won't pick up new avatars, summaries etc until a day after.
284 /// Actors need an "update" activity pushed to other servers to fix this.
285 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
286   let update_interval = if cfg!(debug_assertions) {
287     // avoid infinite loop when fetching community outbox
288     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
289   } else {
290     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
291   };
292   last_refreshed.lt(&(naive_now() - update_interval))
293 }
294
295 /// Get a community from its apub ID.
296 ///
297 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
298 /// Otherwise it is fetched from the remote instance, stored and returned.
299 pub(crate) async fn get_or_fetch_and_upsert_community(
300   apub_id: &Url,
301   context: &LemmyContext,
302   recursion_counter: &mut i32,
303 ) -> Result<Community, LemmyError> {
304   let apub_id_owned = apub_id.to_owned();
305   let community = blocking(context.pool(), move |conn| {
306     Community::read_from_apub_id(conn, apub_id_owned.as_str())
307   })
308   .await?;
309
310   match community {
311     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
312       debug!("Fetching and updating from remote community: {}", apub_id);
313       fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
314     }
315     Ok(c) => Ok(c),
316     Err(NotFound {}) => {
317       debug!("Fetching and creating remote community: {}", apub_id);
318       fetch_remote_community(apub_id, context, None, recursion_counter).await
319     }
320     Err(e) => Err(e.into()),
321   }
322 }
323
324 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
325 /// is set, this is an update for a community which is already known locally. If not, we don't know
326 /// the community yet and also pull the outbox, to get some initial posts.
327 async fn fetch_remote_community(
328   apub_id: &Url,
329   context: &LemmyContext,
330   old_community: Option<Community>,
331   recursion_counter: &mut i32,
332 ) -> Result<Community, LemmyError> {
333   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
334   // If fetching failed, return the existing data.
335   if let Some(ref c) = old_community {
336     if group.is_err() {
337       return Ok(c.to_owned());
338     }
339   }
340
341   let group = group?;
342   let community =
343     Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
344
345   // Also add the community moderators too
346   let attributed_to = group.inner.attributed_to().context(location_info!())?;
347   let creator_and_moderator_uris: Vec<&Url> = attributed_to
348     .as_many()
349     .context(location_info!())?
350     .iter()
351     .map(|a| a.as_xsd_any_uri().context(""))
352     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
353
354   let mut creator_and_moderators = Vec::new();
355
356   for uri in creator_and_moderator_uris {
357     let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
358
359     creator_and_moderators.push(c_or_m);
360   }
361
362   // TODO: need to make this work to update mods of existing communities
363   if old_community.is_none() {
364     let community_id = community.id;
365     blocking(context.pool(), move |conn| {
366       for mod_ in creator_and_moderators {
367         let community_moderator_form = CommunityModeratorForm {
368           community_id,
369           user_id: mod_.id,
370         };
371
372         CommunityModerator::join(conn, &community_moderator_form)?;
373       }
374       Ok(()) as Result<(), LemmyError>
375     })
376     .await??;
377   }
378
379   // fetch outbox (maybe make this conditional)
380   let outbox = fetch_remote_object::<OrderedCollection>(
381     context.client(),
382     &community.get_outbox_url()?,
383     recursion_counter,
384   )
385   .await?;
386   let outbox_items = outbox.items().context(location_info!())?.clone();
387   let mut outbox_items = outbox_items.many().context(location_info!())?;
388   if outbox_items.len() > 20 {
389     outbox_items = outbox_items[0..20].to_vec();
390   }
391   for o in outbox_items {
392     let page = PageExt::from_any_base(o)?.context(location_info!())?;
393     let page_id = page.id_unchecked().context(location_info!())?;
394
395     // The post creator may be from a blocked instance, if it errors, then skip it
396     if check_is_apub_id_valid(page_id).is_err() {
397       continue;
398     }
399     Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
400     // TODO: we need to send a websocket update here
401   }
402
403   Ok(community)
404 }
405
406 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
407 /// pulled from its apub ID, inserted and returned.
408 ///
409 /// The parent community is also pulled if necessary. Comments are not pulled.
410 pub(crate) async fn get_or_fetch_and_insert_post(
411   post_ap_id: &Url,
412   context: &LemmyContext,
413   recursion_counter: &mut i32,
414 ) -> Result<Post, LemmyError> {
415   let post_ap_id_owned = post_ap_id.to_owned();
416   let post = blocking(context.pool(), move |conn| {
417     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
418   })
419   .await?;
420
421   match post {
422     Ok(p) => Ok(p),
423     Err(NotFound {}) => {
424       debug!("Fetching and creating remote post: {}", post_ap_id);
425       let page =
426         fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
427       let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
428
429       Ok(post)
430     }
431     Err(e) => Err(e.into()),
432   }
433 }
434
435 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
436 /// pulled from its apub ID, inserted and returned.
437 ///
438 /// The parent community, post and comment are also pulled if necessary.
439 pub(crate) async fn get_or_fetch_and_insert_comment(
440   comment_ap_id: &Url,
441   context: &LemmyContext,
442   recursion_counter: &mut i32,
443 ) -> Result<Comment, LemmyError> {
444   let comment_ap_id_owned = comment_ap_id.to_owned();
445   let comment = blocking(context.pool(), move |conn| {
446     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
447   })
448   .await?;
449
450   match comment {
451     Ok(p) => Ok(p),
452     Err(NotFound {}) => {
453       debug!(
454         "Fetching and creating remote comment and its parents: {}",
455         comment_ap_id
456       );
457       let comment =
458         fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
459       let comment = Comment::from_apub(
460         &comment,
461         context,
462         comment_ap_id.to_owned(),
463         recursion_counter,
464       )
465       .await?;
466
467       let post_id = comment.post_id;
468       let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
469       if post.locked {
470         return Err(anyhow!("Post is locked").into());
471       }
472
473       Ok(comment)
474     }
475     Err(e) => Err(e.into()),
476   }
477 }