2 check_is_apub_id_valid,
8 APUB_JSON_CONTENT_TYPE,
10 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
11 use anyhow::{anyhow, Context};
12 use chrono::NaiveDateTime;
13 use diesel::result::Error::NotFound;
15 comment::{Comment, CommentForm},
16 comment_view::CommentView,
17 community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
18 community_view::CommunityView,
20 post::{Post, PostForm},
22 user::{UserForm, User_},
28 use lemmy_structs::{blocking, site::SearchResponse};
31 request::{retry, RecvError},
35 use lemmy_websocket::LemmyContext;
38 use serde::Deserialize;
39 use std::{fmt::Debug, time::Duration};
42 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
43 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
45 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
46 /// fetch through the search).
48 /// Tests are passing with a value of 5, so 10 should be safe for production.
49 static MAX_REQUEST_NUMBER: i32 = 10;
51 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
53 async fn fetch_remote_object<Response>(
56 recursion_counter: &mut i32,
57 ) -> Result<Response, LemmyError>
59 Response: for<'de> Deserialize<'de>,
61 *recursion_counter += 1;
62 if *recursion_counter > MAX_REQUEST_NUMBER {
63 return Err(anyhow!("Maximum recursion depth reached").into());
65 check_is_apub_id_valid(&url)?;
67 let timeout = Duration::from_secs(60);
72 .header("Accept", APUB_JSON_CONTENT_TYPE)
80 debug!("Receive error, {}", e);
81 RecvError(e.to_string())
87 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
89 #[derive(serde::Deserialize, Debug)]
90 pub enum SearchAcceptedObjects {
91 Person(Box<PersonExt>),
97 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
99 /// Some working examples for use with the `docker/federation/` setup:
100 /// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
101 /// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
102 /// http://lemmy_gamma:8561/post/3
103 /// http://lemmy_delta:8571/comment/2
104 pub async fn search_by_apub_id(
106 context: &LemmyContext,
107 ) -> Result<SearchResponse, LemmyError> {
108 // Parse the shorthand query url
109 let query_url = if query.contains('@') {
110 debug!("Search for {}", query);
111 let split = query.split('@').collect::<Vec<&str>>();
113 // User type will look like ['', username, instance]
114 // Community will look like [!community, instance]
115 let (name, instance) = if split.len() == 3 {
116 (format!("/u/{}", split[1]), split[2])
117 } else if split.len() == 2 {
118 if split[0].contains('!') {
119 let split2 = split[0].split('!').collect::<Vec<&str>>();
120 (format!("/c/{}", split2[1]), split[1])
122 return Err(anyhow!("Invalid search query: {}", query).into());
125 return Err(anyhow!("Invalid search query: {}", query).into());
130 Settings::get().get_protocol_string(),
139 let mut response = SearchResponse {
140 type_: SearchType::All.to_string(),
147 let domain = query_url.domain().context("url has no domain")?;
148 let recursion_counter = &mut 0;
149 let response = match fetch_remote_object::<SearchAcceptedObjects>(
156 SearchAcceptedObjects::Person(p) => {
157 let user_uri = p.inner.id(domain)?.context("person has no id")?;
159 let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
161 response.users = vec![
162 blocking(context.pool(), move |conn| {
163 UserView::get_user_secure(conn, user.id)
170 SearchAcceptedObjects::Group(g) => {
171 let community_uri = g.inner.id(domain)?.context("group has no id")?;
174 get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
176 response.communities = vec![
177 blocking(context.pool(), move |conn| {
178 CommunityView::read(conn, community.id, None)
185 SearchAcceptedObjects::Page(p) => {
186 let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?;
188 let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
190 vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
194 SearchAcceptedObjects::Comment(c) => {
196 CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?;
198 let c = blocking(context.pool(), move |conn| {
199 Comment::upsert(conn, &comment_form)
202 response.comments = vec![
203 blocking(context.pool(), move |conn| {
204 CommentView::read(conn, c.id, None)
216 /// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
217 /// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
219 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
220 /// Otherwise it is fetched from the remote instance, stored and returned.
221 pub(crate) async fn get_or_fetch_and_upsert_actor(
223 context: &LemmyContext,
224 recursion_counter: &mut i32,
225 ) -> Result<Box<dyn ActorType>, LemmyError> {
226 let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
227 let actor: Box<dyn ActorType> = match community {
228 Ok(c) => Box::new(c),
229 Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
234 /// Get a user from its apub ID.
236 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
237 /// Otherwise it is fetched from the remote instance, stored and returned.
238 pub(crate) async fn get_or_fetch_and_upsert_user(
240 context: &LemmyContext,
241 recursion_counter: &mut i32,
242 ) -> Result<User_, LemmyError> {
243 let apub_id_owned = apub_id.to_owned();
244 let user = blocking(context.pool(), move |conn| {
245 User_::read_from_actor_id(conn, apub_id_owned.as_ref())
250 // If its older than a day, re-fetch it
251 Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
252 debug!("Fetching and updating from remote user: {}", apub_id);
254 fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
255 // If fetching failed, return the existing data.
260 let mut uf = UserForm::from_apub(
263 Some(apub_id.to_owned()),
267 uf.last_refreshed_at = Some(naive_now());
268 let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
273 Err(NotFound {}) => {
274 debug!("Fetching and creating remote user: {}", apub_id);
276 fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
278 let uf = UserForm::from_apub(
281 Some(apub_id.to_owned()),
285 let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
289 Err(e) => Err(e.into()),
293 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
294 /// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
295 /// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
297 /// TODO it won't pick up new avatars, summaries etc until a day after.
298 /// Actors need an "update" activity pushed to other servers to fix this.
299 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
300 let update_interval = if cfg!(debug_assertions) {
301 // avoid infinite loop when fetching community outbox
302 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
304 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
306 last_refreshed.lt(&(naive_now() - update_interval))
309 /// Get a community from its apub ID.
311 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
312 /// Otherwise it is fetched from the remote instance, stored and returned.
313 pub(crate) async fn get_or_fetch_and_upsert_community(
315 context: &LemmyContext,
316 recursion_counter: &mut i32,
317 ) -> Result<Community, LemmyError> {
318 let apub_id_owned = apub_id.to_owned();
319 let community = blocking(context.pool(), move |conn| {
320 Community::read_from_actor_id(conn, apub_id_owned.as_str())
325 Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
326 debug!("Fetching and updating from remote community: {}", apub_id);
327 fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
330 Err(NotFound {}) => {
331 debug!("Fetching and creating remote community: {}", apub_id);
332 fetch_remote_community(apub_id, context, None, recursion_counter).await
334 Err(e) => Err(e.into()),
338 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
339 /// is set, this is an update for a community which is already known locally. If not, we don't know
340 /// the community yet and also pull the outbox, to get some initial posts.
341 async fn fetch_remote_community(
343 context: &LemmyContext,
344 old_community: Option<Community>,
345 recursion_counter: &mut i32,
346 ) -> Result<Community, LemmyError> {
347 let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
348 // If fetching failed, return the existing data.
349 if let Some(ref c) = old_community {
351 return Ok(c.to_owned());
357 CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?;
358 let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
360 // Also add the community moderators too
361 let attributed_to = group.inner.attributed_to().context(location_info!())?;
362 let creator_and_moderator_uris: Vec<&Url> = attributed_to
364 .context(location_info!())?
366 .map(|a| a.as_xsd_any_uri().context(""))
367 .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
369 let mut creator_and_moderators = Vec::new();
371 for uri in creator_and_moderator_uris {
372 let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
374 creator_and_moderators.push(c_or_m);
377 // TODO: need to make this work to update mods of existing communities
378 if old_community.is_none() {
379 let community_id = community.id;
380 blocking(context.pool(), move |conn| {
381 for mod_ in creator_and_moderators {
382 let community_moderator_form = CommunityModeratorForm {
387 CommunityModerator::join(conn, &community_moderator_form)?;
389 Ok(()) as Result<(), LemmyError>
394 // fetch outbox (maybe make this conditional)
395 let outbox = fetch_remote_object::<OrderedCollection>(
397 &community.get_outbox_url()?,
401 let outbox_items = outbox.items().context(location_info!())?.clone();
402 let mut outbox_items = outbox_items.many().context(location_info!())?;
403 if outbox_items.len() > 20 {
404 outbox_items = outbox_items[0..20].to_vec();
406 for o in outbox_items {
407 let page = PageExt::from_any_base(o)?.context(location_info!())?;
409 // The post creator may be from a blocked instance,
410 // if it errors, then continue
411 let post = match PostForm::from_apub(&page, context, None, recursion_counter).await {
415 let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
416 // Check whether the post already exists in the local db
417 let existing = blocking(context.pool(), move |conn| {
418 Post::read_from_apub_id(conn, &post_ap_id)
422 Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
423 Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
425 // TODO: we need to send a websocket update here
431 /// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
432 /// pulled from its apub ID, inserted and returned.
434 /// The parent community is also pulled if necessary. Comments are not pulled.
435 pub(crate) async fn get_or_fetch_and_insert_post(
437 context: &LemmyContext,
438 recursion_counter: &mut i32,
439 ) -> Result<Post, LemmyError> {
440 let post_ap_id_owned = post_ap_id.to_owned();
441 let post = blocking(context.pool(), move |conn| {
442 Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
448 Err(NotFound {}) => {
449 debug!("Fetching and creating remote post: {}", post_ap_id);
451 fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
452 let post_form = PostForm::from_apub(
455 Some(post_ap_id.to_owned()),
460 let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
464 Err(e) => Err(e.into()),
468 /// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
469 /// pulled from its apub ID, inserted and returned.
471 /// The parent community, post and comment are also pulled if necessary.
472 pub(crate) async fn get_or_fetch_and_insert_comment(
474 context: &LemmyContext,
475 recursion_counter: &mut i32,
476 ) -> Result<Comment, LemmyError> {
477 let comment_ap_id_owned = comment_ap_id.to_owned();
478 let comment = blocking(context.pool(), move |conn| {
479 Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
485 Err(NotFound {}) => {
487 "Fetching and creating remote comment and its parents: {}",
491 fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
492 let comment_form = CommentForm::from_apub(
495 Some(comment_ap_id.to_owned()),
500 let comment = blocking(context.pool(), move |conn| {
501 Comment::upsert(conn, &comment_form)
507 Err(e) => Err(e.into()),