]> Untitled Git - lemmy.git/blob - server/src/apub/fetcher.rs
Some apub fixes
[lemmy.git] / server / src / apub / fetcher.rs
1 use crate::{
2   api::site::SearchResponse,
3   apub::{
4     check_is_apub_id_valid,
5     ActorType,
6     FromApub,
7     GroupExt,
8     PageExt,
9     PersonExt,
10     APUB_JSON_CONTENT_TYPE,
11   },
12   blocking,
13   request::{retry, RecvError},
14   routes::nodeinfo::{NodeInfo, NodeInfoWellKnown},
15   DbPool,
16   LemmyError,
17 };
18 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
19 use actix_web::client::Client;
20 use anyhow::anyhow;
21 use chrono::NaiveDateTime;
22 use diesel::{result::Error::NotFound, PgConnection};
23 use lemmy_db::{
24   comment::{Comment, CommentForm},
25   comment_view::CommentView,
26   community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
27   community_view::CommunityView,
28   naive_now,
29   post::{Post, PostForm},
30   post_view::PostView,
31   user::{UserForm, User_},
32   user_view::UserView,
33   Crud,
34   Joinable,
35   SearchType,
36 };
37 use lemmy_utils::get_apub_protocol_string;
38 use log::debug;
39 use serde::Deserialize;
40 use std::{fmt::Debug, time::Duration};
41 use url::Url;
42
43 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
44 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
45
46 // Fetch nodeinfo metadata from a remote instance.
47 async fn _fetch_node_info(client: &Client, domain: &str) -> Result<NodeInfo, LemmyError> {
48   let well_known_uri = Url::parse(&format!(
49     "{}://{}/.well-known/nodeinfo",
50     get_apub_protocol_string(),
51     domain
52   ))?;
53
54   let well_known = fetch_remote_object::<NodeInfoWellKnown>(client, &well_known_uri).await?;
55   let nodeinfo = fetch_remote_object::<NodeInfo>(client, &well_known.links.href).await?;
56
57   Ok(nodeinfo)
58 }
59
60 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
61 /// timeouts etc.
62 pub async fn fetch_remote_object<Response>(
63   client: &Client,
64   url: &Url,
65 ) -> Result<Response, LemmyError>
66 where
67   Response: for<'de> Deserialize<'de>,
68 {
69   check_is_apub_id_valid(&url)?;
70
71   let timeout = Duration::from_secs(60);
72
73   let json = retry(|| {
74     client
75       .get(url.as_str())
76       .header("Accept", APUB_JSON_CONTENT_TYPE)
77       .timeout(timeout)
78       .send()
79   })
80   .await?
81   .json()
82   .await
83   .map_err(|e| {
84     debug!("Receive error, {}", e);
85     RecvError(e.to_string())
86   })?;
87
88   Ok(json)
89 }
90
91 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
92 #[serde(untagged)]
93 #[derive(serde::Deserialize, Debug)]
94 pub enum SearchAcceptedObjects {
95   Person(Box<PersonExt>),
96   Group(Box<GroupExt>),
97   Page(Box<PageExt>),
98   Comment(Box<Note>),
99 }
100
101 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
102 ///
103 /// Some working examples for use with the docker/federation/ setup:
104 /// http://lemmy_alpha:8540/c/main, or !main@lemmy_alpha:8540
105 /// http://lemmy_alpha:8540/u/lemmy_alpha, or @lemmy_alpha@lemmy_alpha:8540
106 /// http://lemmy_alpha:8540/post/3
107 /// http://lemmy_alpha:8540/comment/2
108 pub async fn search_by_apub_id(
109   query: &str,
110   client: &Client,
111   pool: &DbPool,
112 ) -> Result<SearchResponse, LemmyError> {
113   // Parse the shorthand query url
114   let query_url = if query.contains('@') {
115     debug!("{}", query);
116     let split = query.split('@').collect::<Vec<&str>>();
117
118     // User type will look like ['', username, instance]
119     // Community will look like [!community, instance]
120     let (name, instance) = if split.len() == 3 {
121       (format!("/u/{}", split[1]), split[2])
122     } else if split.len() == 2 {
123       if split[0].contains('!') {
124         let split2 = split[0].split('!').collect::<Vec<&str>>();
125         (format!("/c/{}", split2[1]), split[1])
126       } else {
127         return Err(anyhow!("Invalid search query: {}", query).into());
128       }
129     } else {
130       return Err(anyhow!("Invalid search query: {}", query).into());
131     };
132
133     let url = format!("{}://{}{}", get_apub_protocol_string(), instance, name);
134     Url::parse(&url)?
135   } else {
136     Url::parse(&query)?
137   };
138
139   let mut response = SearchResponse {
140     type_: SearchType::All.to_string(),
141     comments: vec![],
142     posts: vec![],
143     communities: vec![],
144     users: vec![],
145   };
146
147   let domain = query_url.domain().unwrap();
148   let response = match fetch_remote_object::<SearchAcceptedObjects>(client, &query_url).await? {
149     SearchAcceptedObjects::Person(p) => {
150       let user_uri = p.inner.id(domain)?.unwrap();
151
152       let user = get_or_fetch_and_upsert_user(&user_uri, client, pool).await?;
153
154       response.users = vec![blocking(pool, move |conn| UserView::read(conn, user.id)).await??];
155
156       response
157     }
158     SearchAcceptedObjects::Group(g) => {
159       let community_uri = g.inner.id(domain)?.unwrap();
160
161       let community = get_or_fetch_and_upsert_community(community_uri, client, pool).await?;
162
163       // TODO Maybe at some point in the future, fetch all the history of a community
164       // fetch_community_outbox(&c, conn)?;
165       response.communities = vec![
166         blocking(pool, move |conn| {
167           CommunityView::read(conn, community.id, None)
168         })
169         .await??,
170       ];
171
172       response
173     }
174     SearchAcceptedObjects::Page(p) => {
175       let post_form = PostForm::from_apub(&p, client, pool, Some(query_url)).await?;
176
177       let p = blocking(pool, move |conn| upsert_post(&post_form, conn)).await??;
178       response.posts = vec![blocking(pool, move |conn| PostView::read(conn, p.id, None)).await??];
179
180       response
181     }
182     SearchAcceptedObjects::Comment(c) => {
183       let post_url = c.in_reply_to().as_ref().unwrap().as_many().unwrap();
184
185       // TODO: also fetch parent comments if any
186       let x = post_url.first().unwrap().as_xsd_any_uri().unwrap();
187       let post = fetch_remote_object(client, x).await?;
188       let post_form = PostForm::from_apub(&post, client, pool, Some(query_url.clone())).await?;
189       let comment_form = CommentForm::from_apub(&c, client, pool, Some(query_url)).await?;
190
191       blocking(pool, move |conn| upsert_post(&post_form, conn)).await??;
192       let c = blocking(pool, move |conn| upsert_comment(&comment_form, conn)).await??;
193       response.comments =
194         vec![blocking(pool, move |conn| CommentView::read(conn, c.id, None)).await??];
195
196       response
197     }
198   };
199
200   Ok(response)
201 }
202
203 pub async fn get_or_fetch_and_upsert_actor(
204   apub_id: &Url,
205   client: &Client,
206   pool: &DbPool,
207 ) -> Result<Box<dyn ActorType>, LemmyError> {
208   let user = get_or_fetch_and_upsert_user(apub_id, client, pool).await;
209   let actor: Box<dyn ActorType> = match user {
210     Ok(u) => Box::new(u),
211     Err(_) => Box::new(get_or_fetch_and_upsert_community(apub_id, client, pool).await?),
212   };
213   Ok(actor)
214 }
215
216 /// Check if a remote user exists, create if not found, if its too old update it.Fetch a user, insert/update it in the database and return the user.
217 pub async fn get_or_fetch_and_upsert_user(
218   apub_id: &Url,
219   client: &Client,
220   pool: &DbPool,
221 ) -> Result<User_, LemmyError> {
222   let apub_id_owned = apub_id.to_owned();
223   let user = blocking(pool, move |conn| {
224     User_::read_from_actor_id(conn, apub_id_owned.as_ref())
225   })
226   .await?;
227
228   match user {
229     // If its older than a day, re-fetch it
230     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
231       debug!("Fetching and updating from remote user: {}", apub_id);
232       let person = fetch_remote_object::<PersonExt>(client, apub_id).await?;
233
234       let mut uf = UserForm::from_apub(&person, client, pool, Some(apub_id.to_owned())).await?;
235       uf.last_refreshed_at = Some(naive_now());
236       let user = blocking(pool, move |conn| User_::update(conn, u.id, &uf)).await??;
237
238       Ok(user)
239     }
240     Ok(u) => Ok(u),
241     Err(NotFound {}) => {
242       debug!("Fetching and creating remote user: {}", apub_id);
243       let person = fetch_remote_object::<PersonExt>(client, apub_id).await?;
244
245       let uf = UserForm::from_apub(&person, client, pool, Some(apub_id.to_owned())).await?;
246       let user = blocking(pool, move |conn| User_::create(conn, &uf)).await??;
247
248       Ok(user)
249     }
250     Err(e) => Err(e.into()),
251   }
252 }
253
254 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
255 /// ACTOR_REFETCH_INTERVAL_SECONDS after the last refetch, in debug builds always.
256 ///
257 /// TODO it won't pick up new avatars, summaries etc until a day after.
258 /// Actors need an "update" activity pushed to other servers to fix this.
259 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
260   let update_interval = if cfg!(debug_assertions) {
261     // avoid infinite loop when fetching community outbox
262     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
263   } else {
264     chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
265   };
266   last_refreshed.lt(&(naive_now() - update_interval))
267 }
268
269 /// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
270 pub async fn get_or_fetch_and_upsert_community(
271   apub_id: &Url,
272   client: &Client,
273   pool: &DbPool,
274 ) -> Result<Community, LemmyError> {
275   let apub_id_owned = apub_id.to_owned();
276   let community = blocking(pool, move |conn| {
277     Community::read_from_actor_id(conn, apub_id_owned.as_str())
278   })
279   .await?;
280
281   match community {
282     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
283       debug!("Fetching and updating from remote community: {}", apub_id);
284       fetch_remote_community(apub_id, client, pool, Some(c.id)).await
285     }
286     Ok(c) => Ok(c),
287     Err(NotFound {}) => {
288       debug!("Fetching and creating remote community: {}", apub_id);
289       fetch_remote_community(apub_id, client, pool, None).await
290     }
291     Err(e) => Err(e.into()),
292   }
293 }
294
295 async fn fetch_remote_community(
296   apub_id: &Url,
297   client: &Client,
298   pool: &DbPool,
299   community_id: Option<i32>,
300 ) -> Result<Community, LemmyError> {
301   let group = fetch_remote_object::<GroupExt>(client, apub_id).await?;
302
303   let cf = CommunityForm::from_apub(&group, client, pool, Some(apub_id.to_owned())).await?;
304   let community = blocking(pool, move |conn| {
305     if let Some(cid) = community_id {
306       Community::update(conn, cid, &cf)
307     } else {
308       Community::create(conn, &cf)
309     }
310   })
311   .await??;
312
313   // Also add the community moderators too
314   let attributed_to = group.inner.attributed_to().unwrap();
315   let creator_and_moderator_uris: Vec<&Url> = attributed_to
316     .as_many()
317     .unwrap()
318     .iter()
319     .map(|a| a.as_xsd_any_uri().unwrap())
320     .collect();
321
322   let mut creator_and_moderators = Vec::new();
323
324   for uri in creator_and_moderator_uris {
325     let c_or_m = get_or_fetch_and_upsert_user(uri, client, pool).await?;
326
327     creator_and_moderators.push(c_or_m);
328   }
329
330   // TODO: need to make this work to update mods of existing communities
331   if community_id.is_none() {
332     let community_id = community.id;
333     blocking(pool, move |conn| {
334       for mod_ in creator_and_moderators {
335         let community_moderator_form = CommunityModeratorForm {
336           community_id,
337           user_id: mod_.id,
338         };
339
340         CommunityModerator::join(conn, &community_moderator_form)?;
341       }
342       Ok(()) as Result<(), LemmyError>
343     })
344     .await??;
345   }
346
347   // fetch outbox (maybe make this conditional)
348   let outbox =
349     fetch_remote_object::<OrderedCollection>(client, &community.get_outbox_url()?).await?;
350   let outbox_items = outbox.items().unwrap().clone();
351   for o in outbox_items.many().unwrap() {
352     let page = PageExt::from_any_base(o)?.unwrap();
353     let post = PostForm::from_apub(&page, client, pool, None).await?;
354     let post_ap_id = post.ap_id.clone();
355     // Check whether the post already exists in the local db
356     let existing = blocking(pool, move |conn| Post::read_from_apub_id(conn, &post_ap_id)).await?;
357     match existing {
358       Ok(e) => blocking(pool, move |conn| Post::update(conn, e.id, &post)).await??,
359       Err(_) => blocking(pool, move |conn| Post::create(conn, &post)).await??,
360     };
361     // TODO: we need to send a websocket update here
362   }
363
364   Ok(community)
365 }
366
367 fn upsert_post(post_form: &PostForm, conn: &PgConnection) -> Result<Post, LemmyError> {
368   let existing = Post::read_from_apub_id(conn, &post_form.ap_id);
369   match existing {
370     Err(NotFound {}) => Ok(Post::create(conn, &post_form)?),
371     Ok(p) => Ok(Post::update(conn, p.id, &post_form)?),
372     Err(e) => Err(e.into()),
373   }
374 }
375
376 pub async fn get_or_fetch_and_insert_post(
377   post_ap_id: &Url,
378   client: &Client,
379   pool: &DbPool,
380 ) -> Result<Post, LemmyError> {
381   let post_ap_id_owned = post_ap_id.to_owned();
382   let post = blocking(pool, move |conn| {
383     Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
384   })
385   .await?;
386
387   match post {
388     Ok(p) => Ok(p),
389     Err(NotFound {}) => {
390       debug!("Fetching and creating remote post: {}", post_ap_id);
391       let post = fetch_remote_object::<PageExt>(client, post_ap_id).await?;
392       let post_form = PostForm::from_apub(&post, client, pool, Some(post_ap_id.to_owned())).await?;
393
394       let post = blocking(pool, move |conn| Post::create(conn, &post_form)).await??;
395
396       Ok(post)
397     }
398     Err(e) => Err(e.into()),
399   }
400 }
401
402 fn upsert_comment(comment_form: &CommentForm, conn: &PgConnection) -> Result<Comment, LemmyError> {
403   let existing = Comment::read_from_apub_id(conn, &comment_form.ap_id);
404   match existing {
405     Err(NotFound {}) => Ok(Comment::create(conn, &comment_form)?),
406     Ok(p) => Ok(Comment::update(conn, p.id, &comment_form)?),
407     Err(e) => Err(e.into()),
408   }
409 }
410
411 pub async fn get_or_fetch_and_insert_comment(
412   comment_ap_id: &Url,
413   client: &Client,
414   pool: &DbPool,
415 ) -> Result<Comment, LemmyError> {
416   let comment_ap_id_owned = comment_ap_id.to_owned();
417   let comment = blocking(pool, move |conn| {
418     Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
419   })
420   .await?;
421
422   match comment {
423     Ok(p) => Ok(p),
424     Err(NotFound {}) => {
425       debug!(
426         "Fetching and creating remote comment and its parents: {}",
427         comment_ap_id
428       );
429       let comment = fetch_remote_object::<Note>(client, comment_ap_id).await?;
430       let comment_form =
431         CommentForm::from_apub(&comment, client, pool, Some(comment_ap_id.to_owned())).await?;
432
433       let comment = blocking(pool, move |conn| Comment::create(conn, &comment_form)).await??;
434
435       Ok(comment)
436     }
437     Err(e) => Err(e.into()),
438   }
439 }
440
441 // TODO It should not be fetching data from a community outbox.
442 // All posts, comments, comment likes, etc should be posts to our community_inbox
443 // The only data we should be periodically fetching (if it hasn't been fetched in the last day
444 // maybe), is community and user actors
445 // and user actors
446 // Fetch all posts in the outbox of the given user, and insert them into the database.
447 // fn fetch_community_outbox(community: &Community, conn: &PgConnection) -> Result<Vec<Post>, LemmyError> {
448 //   let outbox_url = Url::parse(&community.get_outbox_url())?;
449 //   let outbox = fetch_remote_object::<OrderedCollection>(&outbox_url)?;
450 //   let items = outbox.collection_props.get_many_items_base_boxes();
451
452 //   Ok(
453 //     items
454 //       .unwrap()
455 //       .map(|obox: &BaseBox| -> Result<PostForm, LemmyError> {
456 //         let page = obox.clone().to_concrete::<Page>()?;
457 //         PostForm::from_page(&page, conn)
458 //       })
459 //       .map(|pf| upsert_post(&pf?, conn))
460 //       .collect::<Result<Vec<Post>, LemmyError>>()?,
461 //   )
462 // }