2 check_is_apub_id_valid,
8 APUB_JSON_CONTENT_TYPE,
10 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
11 use anyhow::{anyhow, Context};
12 use chrono::NaiveDateTime;
13 use diesel::result::Error::NotFound;
15 comment::{Comment, CommentForm},
16 comment_view::CommentView,
17 community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
18 community_view::CommunityView,
20 post::{Post, PostForm},
22 user::{UserForm, User_},
28 use lemmy_structs::{blocking, site::SearchResponse};
31 request::{retry, RecvError},
35 use lemmy_websocket::LemmyContext;
38 use serde::Deserialize;
39 use std::{fmt::Debug, time::Duration};
42 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
43 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
45 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
46 /// fetch through the search).
48 /// Tests are passing with a value of 5, so 10 should be safe for production.
49 static MAX_REQUEST_NUMBER: i32 = 10;
51 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
53 async fn fetch_remote_object<Response>(
56 recursion_counter: &mut i32,
57 ) -> Result<Response, LemmyError>
59 Response: for<'de> Deserialize<'de>,
61 *recursion_counter += 1;
62 if *recursion_counter > MAX_REQUEST_NUMBER {
63 return Err(anyhow!("Maximum recursion depth reached").into());
65 check_is_apub_id_valid(&url)?;
67 let timeout = Duration::from_secs(60);
72 .header("Accept", APUB_JSON_CONTENT_TYPE)
80 debug!("Receive error, {}", e);
81 RecvError(e.to_string())
87 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
89 #[derive(serde::Deserialize, Debug)]
90 pub enum SearchAcceptedObjects {
91 Person(Box<PersonExt>),
97 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
99 /// Some working examples for use with the `docker/federation/` setup:
100 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
101 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
102 /// http://lemmy_gamma:8561/post/3
103 /// http://lemmy_delta:8571/comment/2
104 pub async fn search_by_apub_id(
106 context: &LemmyContext,
107 ) -> Result<SearchResponse, LemmyError> {
108 // Parse the shorthand query url
109 let query_url = if query.contains('@') {
110 debug!("Search for {}", query);
111 let split = query.split('@').collect::<Vec<&str>>();
113 // User type will look like ['', username, instance]
114 // Community will look like [!community, instance]
115 let (name, instance) = if split.len() == 3 {
116 (format!("/u/{}", split[1]), split[2])
117 } else if split.len() == 2 {
118 if split[0].contains('!') {
119 let split2 = split[0].split('!').collect::<Vec<&str>>();
120 (format!("/c/{}", split2[1]), split[1])
122 return Err(anyhow!("Invalid search query: {}", query).into());
125 return Err(anyhow!("Invalid search query: {}", query).into());
130 Settings::get().get_protocol_string(),
139 let mut response = SearchResponse {
140 type_: SearchType::All.to_string(),
147 let domain = query_url.domain().context("url has no domain")?;
148 let recursion_counter = &mut 0;
149 let response = match fetch_remote_object::<SearchAcceptedObjects>(
156 SearchAcceptedObjects::Person(p) => {
157 let user_uri = p.inner.id(domain)?.context("person has no id")?;
159 let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
161 response.users = vec![
162 blocking(context.pool(), move |conn| {
163 UserView::get_user_secure(conn, user.id)
170 SearchAcceptedObjects::Group(g) => {
171 let community_uri = g.inner.id(domain)?.context("group has no id")?;
174 get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
176 response.communities = vec![
177 blocking(context.pool(), move |conn| {
178 CommunityView::read(conn, community.id, None)
185 SearchAcceptedObjects::Page(p) => {
186 let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?;
188 let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
190 vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
194 SearchAcceptedObjects::Comment(c) => {
196 CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?;
198 let c = blocking(context.pool(), move |conn| {
199 Comment::upsert(conn, &comment_form)
202 response.comments = vec![
203 blocking(context.pool(), move |conn| {
204 CommentView::read(conn, c.id, None)
216 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
217 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
219 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
220 /// Otherwise it is fetched from the remote instance, stored and returned.
221 pub(crate) async fn get_or_fetch_and_upsert_actor(
223 context: &LemmyContext,
224 recursion_counter: &mut i32,
225 ) -> Result<Box<dyn ActorType>, LemmyError> {
226 let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
227 let actor: Box<dyn ActorType> = match community {
228 Ok(c) => Box::new(c),
229 Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
234 /// Get a user from its apub ID.
236 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
237 /// Otherwise it is fetched from the remote instance, stored and returned.
238 pub(crate) async fn get_or_fetch_and_upsert_user(
240 context: &LemmyContext,
241 recursion_counter: &mut i32,
242 ) -> Result<User_, LemmyError> {
243 let apub_id_owned = apub_id.to_owned();
244 let user = blocking(context.pool(), move |conn| {
245 User_::read_from_actor_id(conn, apub_id_owned.as_ref())
250 // If its older than a day, re-fetch it
251 Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
252 debug!("Fetching and updating from remote user: {}", apub_id);
254 fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
256 let mut uf = UserForm::from_apub(
259 Some(apub_id.to_owned()),
263 uf.last_refreshed_at = Some(naive_now());
264 let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
269 Err(NotFound {}) => {
270 debug!("Fetching and creating remote user: {}", apub_id);
272 fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
274 let uf = UserForm::from_apub(
277 Some(apub_id.to_owned()),
281 let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
285 Err(e) => Err(e.into()),
289 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
290 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
291 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
293 /// TODO it won't pick up new avatars, summaries etc until a day after.
294 /// Actors need an "update" activity pushed to other servers to fix this.
295 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
296 let update_interval = if cfg!(debug_assertions) {
297 // avoid infinite loop when fetching community outbox
298 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
300 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
302 last_refreshed.lt(&(naive_now() - update_interval))
305 /// Get a community from its apub ID.
307 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
308 /// Otherwise it is fetched from the remote instance, stored and returned.
309 pub(crate) async fn get_or_fetch_and_upsert_community(
311 context: &LemmyContext,
312 recursion_counter: &mut i32,
313 ) -> Result<Community, LemmyError> {
314 let apub_id_owned = apub_id.to_owned();
315 let community = blocking(context.pool(), move |conn| {
316 Community::read_from_actor_id(conn, apub_id_owned.as_str())
321 Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
322 debug!("Fetching and updating from remote community: {}", apub_id);
323 fetch_remote_community(apub_id, context, Some(c.id), recursion_counter).await
326 Err(NotFound {}) => {
327 debug!("Fetching and creating remote community: {}", apub_id);
328 fetch_remote_community(apub_id, context, None, recursion_counter).await
330 Err(e) => Err(e.into()),
334 /// Request a community by apub ID from a remote instance, including moderators. If `community_id`,
335 /// is set, this is an update for a community which is already known locally. If not, we don't know
336 /// the community yet and also pull the outbox, to get some initial posts.
337 async fn fetch_remote_community(
339 context: &LemmyContext,
340 community_id: Option<i32>,
341 recursion_counter: &mut i32,
342 ) -> Result<Community, LemmyError> {
343 let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await?;
346 CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?;
347 let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
349 // Also add the community moderators too
350 let attributed_to = group.inner.attributed_to().context(location_info!())?;
351 let creator_and_moderator_uris: Vec<&Url> = attributed_to
353 .context(location_info!())?
355 .map(|a| a.as_xsd_any_uri().context(""))
356 .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
358 let mut creator_and_moderators = Vec::new();
360 for uri in creator_and_moderator_uris {
361 let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
363 creator_and_moderators.push(c_or_m);
366 // TODO: need to make this work to update mods of existing communities
367 if community_id.is_none() {
368 let community_id = community.id;
369 blocking(context.pool(), move |conn| {
370 for mod_ in creator_and_moderators {
371 let community_moderator_form = CommunityModeratorForm {
376 CommunityModerator::join(conn, &community_moderator_form)?;
378 Ok(()) as Result<(), LemmyError>
383 // fetch outbox (maybe make this conditional)
384 let outbox = fetch_remote_object::<OrderedCollection>(
386 &community.get_outbox_url()?,
390 let outbox_items = outbox.items().context(location_info!())?.clone();
391 let mut outbox_items = outbox_items.many().context(location_info!())?;
392 if outbox_items.len() > 20 {
393 outbox_items = outbox_items[0..20].to_vec();
395 for o in outbox_items {
396 let page = PageExt::from_any_base(o)?.context(location_info!())?;
398 // The post creator may be from a blocked instance,
399 // if it errors, then continue
400 let post = match PostForm::from_apub(&page, context, None, recursion_counter).await {
404 let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
405 // Check whether the post already exists in the local db
406 let existing = blocking(context.pool(), move |conn| {
407 Post::read_from_apub_id(conn, &post_ap_id)
411 Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
412 Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
414 // TODO: we need to send a websocket update here
420 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
421 /// pulled from its apub ID, inserted and returned.
423 /// The parent community is also pulled if necessary. Comments are not pulled.
424 pub(crate) async fn get_or_fetch_and_insert_post(
426 context: &LemmyContext,
427 recursion_counter: &mut i32,
428 ) -> Result<Post, LemmyError> {
429 let post_ap_id_owned = post_ap_id.to_owned();
430 let post = blocking(context.pool(), move |conn| {
431 Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
437 Err(NotFound {}) => {
438 debug!("Fetching and creating remote post: {}", post_ap_id);
440 fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
441 let post_form = PostForm::from_apub(
444 Some(post_ap_id.to_owned()),
449 let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
453 Err(e) => Err(e.into()),
457 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
458 /// pulled from its apub ID, inserted and returned.
460 /// The parent community, post and comment are also pulled if necessary.
461 pub(crate) async fn get_or_fetch_and_insert_comment(
463 context: &LemmyContext,
464 recursion_counter: &mut i32,
465 ) -> Result<Comment, LemmyError> {
466 let comment_ap_id_owned = comment_ap_id.to_owned();
467 let comment = blocking(context.pool(), move |conn| {
468 Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
474 Err(NotFound {}) => {
476 "Fetching and creating remote comment and its parents: {}",
480 fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
481 let comment_form = CommentForm::from_apub(
484 Some(comment_ap_id.to_owned()),
489 let comment = blocking(context.pool(), move |conn| {
490 Comment::upsert(conn, &comment_form)
496 Err(e) => Err(e.into()),