]> Untitled Git - lemmy.git/blob - src/apub/fetcher.rs
Isomorphic docker (#1124)
[lemmy.git] / src / apub / fetcher.rs
1 use crate::{
2   apub::{
3     check_is_apub_id_valid,
4     ActorType,
5     FromApub,
6     GroupExt,
7     PageExt,
8     PersonExt,
9     APUB_JSON_CONTENT_TYPE,
10   },
11   request::{retry, RecvError},
12   LemmyContext,
13 };
14 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
15 use anyhow::{anyhow, Context};
16 use chrono::NaiveDateTime;
17 use diesel::result::Error::NotFound;
18 use lemmy_api_structs::{blocking, site::SearchResponse};
19 use lemmy_db::{
20   comment::{Comment, CommentForm},
21   comment_view::CommentView,
22   community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
23   community_view::CommunityView,
24   naive_now,
25   post::{Post, PostForm},
26   post_view::PostView,
27   user::{UserForm, User_},
28   user_view::UserView,
29   Crud,
30   Joinable,
31   SearchType,
32 };
33 use lemmy_utils::{apub::get_apub_protocol_string, location_info, LemmyError};
34 use log::debug;
35 use reqwest::Client;
36 use serde::Deserialize;
37 use std::{fmt::Debug, time::Duration};
38 use url::Url;
39
40 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
41 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
42
43 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
44 /// timeouts etc.
45 pub async fn fetch_remote_object<Response>(
46   client: &Client,
47   url: &Url,
48 ) -> Result<Response, LemmyError>
49 where
50   Response: for<'de> Deserialize<'de>,
51 {
52   check_is_apub_id_valid(&url)?;
53
54   let timeout = Duration::from_secs(60);
55
56   // speed up tests
57   // before: 305s
58   // after: 240s
59   let json = retry(|| {
60     client
61       .get(url.as_str())
62       .header("Accept", APUB_JSON_CONTENT_TYPE)
63       .timeout(timeout)
64       .send()
65   })
66   .await?
67   .json()
68   .await
69   .map_err(|e| {
70     debug!("Receive error, {}", e);
71     RecvError(e.to_string())
72   })?;
73
74   Ok(json)
75 }
76
77 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
78 #[serde(untagged)]
79 #[derive(serde::Deserialize, Debug)]
80 pub enum SearchAcceptedObjects {
81   Person(Box<PersonExt>),
82   Group(Box<GroupExt>),
83   Page(Box<PageExt>),
84   Comment(Box<Note>),
85 }
86
87 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
88 ///
89 /// Some working examples for use with the docker/federation/ setup:
90 /// http://lemmy_alpha:8540/c/main, or !main@lemmy_alpha:8540
91 /// http://lemmy_alpha:8540/u/lemmy_alpha, or @lemmy_alpha@lemmy_alpha:8540
92 /// http://lemmy_alpha:8540/post/3
93 /// http://lemmy_alpha:8540/comment/2
94 pub async fn search_by_apub_id(
95   query: &str,
96   context: &LemmyContext,
97 ) -> Result<SearchResponse, LemmyError> {
98   // Parse the shorthand query url
99   let query_url = if query.contains('@') {
100     debug!("{}", query);
101     let split = query.split('@').collect::<Vec<&str>>();
102
103     // User type will look like ['', username, instance]
104     // Community will look like [!community, instance]
105     let (name, instance) = if split.len() == 3 {
106       (format!("/u/{}", split[1]), split[2])
107     } else if split.len() == 2 {
108       if split[0].contains('!') {
109         let split2 = split[0].split('!').collect::<Vec<&str>>();
110         (format!("/c/{}", split2[1]), split[1])
111       } else {
112         return Err(anyhow!("Invalid search query: {}", query).into());
113       }
114     } else {
115       return Err(anyhow!("Invalid search query: {}", query).into());
116     };
117
118     let url = format!("{}://{}{}", get_apub_protocol_string(), instance, name);
119     Url::parse(&url)?
120   } else {
121     Url::parse(&query)?
122   };
123
124   let mut response = SearchResponse {
125     type_: SearchType::All.to_string(),
126     comments: vec![],
127     posts: vec![],
128     communities: vec![],
129     users: vec![],
130   };
131
132   let domain = query_url.domain().context("url has no domain")?;
133   let response =
134     match fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url).await? {
135       SearchAcceptedObjects::Person(p) => {
136         let user_uri = p.inner.id(domain)?.context("person has no id")?;
137
138         let user = get_or_fetch_and_upsert_user(&user_uri, context).await?;
139
140         response.users = vec![
141           blocking(context.pool(), move |conn| {
142             UserView::get_user_secure(conn, user.id)
143           })
144           .await??,
145         ];
146
147         response
148       }
149       SearchAcceptedObjects::Group(g) => {
150         let community_uri = g.inner.id(domain)?.context("group has no id")?;
151
152         let community = get_or_fetch_and_upsert_community(community_uri, context).await?;
153
154         response.communities = vec![
155           blocking(context.pool(), move |conn| {
156             CommunityView::read(conn, community.id, None)
157           })
158           .await??,
159         ];
160
161         response
162       }
163       SearchAcceptedObjects::Page(p) => {
164         let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?;
165
166         let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
167         response.posts =
168           vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
169
170         response
171       }
172       SearchAcceptedObjects::Comment(c) => {
173         let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?;
174
175         let c = blocking(context.pool(), move |conn| {
176           Comment::upsert(conn, &comment_form)
177         })
178         .await??;
179         response.comments = vec![
180           blocking(context.pool(), move |conn| {
181             CommentView::read(conn, c.id, None)
182           })
183           .await??,
184         ];
185
186         response
187       }
188     };
189
190   Ok(response)
191 }
192
193 pub async fn get_or_fetch_and_upsert_actor(
194   apub_id: &Url,
195   context: &LemmyContext,
196 ) -> Result<Box<dyn ActorType>, LemmyError> {
197   let user = get_or_fetch_and_upsert_user(apub_id, context).await;
198   let actor: Box<dyn ActorType> = match user {
199     Ok(u) => Box::new(u),
200     Err(_) => Box::new(get_or_fetch_and_upsert_community(apub_id, context).await?),
201   };
202   Ok(actor)
203 }
204
205 /// Check if a remote user exists, create if not found, if its too old update it.Fetch a user, insert/update it in the database and return the user.
206 pub async fn get_or_fetch_and_upsert_user(
207   apub_id: &Url,
208   context: &LemmyContext,
209 ) -> Result<User_, LemmyError> {
210   let apub_id_owned = apub_id.to_owned();
211   let user = blocking(context.pool(), move |conn| {
212     User_::read_from_actor_id(conn, apub_id_owned.as_ref())
213   })
214   .await?;
215
216   match user {
217     // If its older than a day, re-fetch it
218     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
219       debug!("Fetching and updating from remote user: {}", apub_id);
220       let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
221
222       let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
223       uf.last_refreshed_at = Some(naive_now());
224       let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
225
226       Ok(user)
227     }
228     Ok(u) => Ok(u),
229     Err(NotFound {}) => {
230       debug!("Fetching and creating remote user: {}", apub_id);
231       let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
232
233       let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
234       let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
235
236       Ok(user)
237     }
238     Err(e) => Err(e.into()),
239   }
240 }
241
242 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
243 /// ACTOR_REFETCH_INTERVAL_SECONDS after the last refetch, in debug builds always.
244 ///
245 /// TODO it won't pick up new avatars, summaries etc until a day after.
246 /// Actors need an "update" activity pushed to other servers to fix this.
247 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
248   let update_interval = if cfg!(debug_assertions) {
249     // avoid infinite loop when fetching community outbox
250     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
251   } else {
252     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
253   };
254   last_refreshed.lt(&(naive_now() - update_interval))
255 }
256
257 /// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
258 pub async fn get_or_fetch_and_upsert_community(
259   apub_id: &Url,
260   context: &LemmyContext,
261 ) -> Result<Community, LemmyError> {
262   let apub_id_owned = apub_id.to_owned();
263   let community = blocking(context.pool(), move |conn| {
264     Community::read_from_actor_id(conn, apub_id_owned.as_str())
265   })
266   .await?;
267
268   match community {
269     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
270       debug!("Fetching and updating from remote community: {}", apub_id);
271       fetch_remote_community(apub_id, context, Some(c.id)).await
272     }
273     Ok(c) => Ok(c),
274     Err(NotFound {}) => {
275       debug!("Fetching and creating remote community: {}", apub_id);
276       fetch_remote_community(apub_id, context, None).await
277     }
278     Err(e) => Err(e.into()),
279   }
280 }
281
282 async fn fetch_remote_community(
283   apub_id: &Url,
284   context: &LemmyContext,
285   community_id: Option<i32>,
286 ) -> Result<Community, LemmyError> {
287   let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
288
289   let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
290   let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
291
292   // Also add the community moderators too
293   let attributed_to = group.inner.attributed_to().context(location_info!())?;
294   let creator_and_moderator_uris: Vec<&Url> = attributed_to
295     .as_many()
296     .context(location_info!())?
297     .iter()
298     .map(|a| a.as_xsd_any_uri().context(""))
299     .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
300
301   let mut creator_and_moderators = Vec::new();
302
303   for uri in creator_and_moderator_uris {
304     let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?;
305
306     creator_and_moderators.push(c_or_m);
307   }
308
309   // TODO: need to make this work to update mods of existing communities
310   if community_id.is_none() {
311     let community_id = community.id;
312     blocking(context.pool(), move |conn| {
313       for mod_ in creator_and_moderators {
314         let community_moderator_form = CommunityModeratorForm {
315           community_id,
316           user_id: mod_.id,
317         };
318
319         CommunityModerator::join(conn, &community_moderator_form)?;
320       }
321       Ok(()) as Result<(), LemmyError>
322     })
323     .await??;
324   }
325
326   // fetch outbox (maybe make this conditional)
327   let outbox =
328     fetch_remote_object::<OrderedCollection>(context.client(), &community.get_outbox_url()?)
329       .await?;
330   let outbox_items = outbox.items().context(location_info!())?.clone();
331   let mut outbox_items = outbox_items.many().context(location_info!())?;
332   if outbox_items.len() > 20 {
333     outbox_items = outbox_items[0..20].to_vec();
334   }
335   for o in outbox_items {
336     let page = PageExt::from_any_base(o)?.context(location_info!())?;
337     let post = PostForm::from_apub(&page, context, None).await?;
338     let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
339     // Check whether the post already exists in the local db
340     let existing = blocking(context.pool(), move |conn| {
341       Post::read_from_apub_id(conn, &post_ap_id)
342     })
343     .await?;
344     match existing {
345       Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
346       Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
347     };
348     // TODO: we need to send a websocket update here
349   }
350
351   Ok(community)
352 }
353
354 pub async fn get_or_fetch_and_insert_post(
355   post_ap_id: &Url,
356   context: &LemmyContext,
357 ) -> Result<Post, LemmyError> {
358   let post_ap_id_owned = post_ap_id.to_owned();
359   let post = blocking(context.pool(), move |conn| {
360     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
361   })
362   .await?;
363
364   match post {
365     Ok(p) => Ok(p),
366     Err(NotFound {}) => {
367       debug!("Fetching and creating remote post: {}", post_ap_id);
368       let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
369       let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
370
371       let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
372
373       Ok(post)
374     }
375     Err(e) => Err(e.into()),
376   }
377 }
378
379 pub async fn get_or_fetch_and_insert_comment(
380   comment_ap_id: &Url,
381   context: &LemmyContext,
382 ) -> Result<Comment, LemmyError> {
383   let comment_ap_id_owned = comment_ap_id.to_owned();
384   let comment = blocking(context.pool(), move |conn| {
385     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
386   })
387   .await?;
388
389   match comment {
390     Ok(p) => Ok(p),
391     Err(NotFound {}) => {
392       debug!(
393         "Fetching and creating remote comment and its parents: {}",
394         comment_ap_id
395       );
396       let comment = fetch_remote_object::<Note>(context.client(), comment_ap_id).await?;
397       let comment_form =
398         CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
399
400       let comment = blocking(context.pool(), move |conn| {
401         Comment::upsert(conn, &comment_form)
402       })
403       .await??;
404
405       Ok(comment)
406     }
407     Err(e) => Err(e.into()),
408   }
409 }