]> Untitled Git - lemmy.git/commitdiff
Deletion on fetch (#1345)
authorDessalines <dessalines@users.noreply.github.com>
Tue, 12 Jan 2021 16:12:41 +0000 (11:12 -0500)
committerGitHub <noreply@github.com>
Tue, 12 Jan 2021 16:12:41 +0000 (11:12 -0500)
* Delete local object on fetch when receiving HTTP 410, split fetcher (fixes #1256)

* Removing submodules

* Trying to re-init submodule

* Trying to re-init submodule 2

* Trying to re-init submodule 3

* Logging line.

* Removing submodules

* Adding again.

* Adding again 2.

* Adding again 3.

* Adding again 4.

* Adding again 5.

* Adding again 6.

* Adding again 7.

* Adding again 8.

* Adding again 9.

* Add more clippy lints, remove dbg!() statement

* Adding again 10.

* Adding again 11.

* Adding again 12.

Co-authored-by: Felix Ableitner <me@nutomic.com>
25 files changed:
.drone.yml
.gitmodules
docker/federation/start-local-instances.bash
lemmy_api/src/site.rs
lemmy_apub/src/activities/receive/mod.rs
lemmy_apub/src/activities/receive/private_message.rs
lemmy_apub/src/activities/send/comment.rs
lemmy_apub/src/activities/send/community.rs
lemmy_apub/src/fetcher.rs [deleted file]
lemmy_apub/src/fetcher/community.rs [new file with mode: 0644]
lemmy_apub/src/fetcher/fetch.rs [new file with mode: 0644]
lemmy_apub/src/fetcher/mod.rs [new file with mode: 0644]
lemmy_apub/src/fetcher/objects.rs [new file with mode: 0644]
lemmy_apub/src/fetcher/search.rs [new file with mode: 0644]
lemmy_apub/src/fetcher/user.rs [new file with mode: 0644]
lemmy_apub/src/http/user.rs
lemmy_apub/src/inbox/receive_for_community.rs
lemmy_apub/src/inbox/user_inbox.rs
lemmy_apub/src/lib.rs
lemmy_apub/src/objects/comment.rs
lemmy_apub/src/objects/community.rs
lemmy_apub/src/objects/mod.rs
lemmy_apub/src/objects/post.rs
lemmy_apub/src/objects/private_message.rs
lemmy_utils/src/request.rs

index 11abb8dae4f85c60d7944eb0552ec9d62f9b4b8c..6e0c4214137e516d6f40fc9b6351148ffbfdcbe1 100644 (file)
@@ -6,8 +6,8 @@ steps:
     image: node:15-alpine3.12
     commands:
       - apk add git
-      - git submodule init
-      - git submodule update --recursive --remote
+      - git submodule update --init --recursive --remote
+      - find docs/
 
   - name: chown repo
     image: ekidd/rust-musl-builder:1.47.0
@@ -15,6 +15,12 @@ steps:
     commands:
       - chown 1000:1000 . -R
 
+  - name: check documentation build
+    image: ekidd/rust-musl-builder:1.47.0
+    commands:
+      - cargo install mdbook --git https://github.com/Ruin0x11/mdBook.git --branch localization --rev d06249b
+      - mdbook build docs/
+
   - name: check formatting
     image: rustdocker/rust:nightly
     commands:
@@ -23,13 +29,7 @@ steps:
   - name: cargo clippy
     image: ekidd/rust-musl-builder:1.47.0
     commands:
-      - cargo clippy --workspace --tests --all-targets --all-features -- -D warnings
-
-  - name: check documentation build
-    image: ekidd/rust-musl-builder:1.47.0
-    commands:
-      - cargo install mdbook --git https://github.com/Ruin0x11/mdBook.git --branch localization --rev d06249b
-      - mdbook build docs/
+      - cargo clippy --workspace --tests --all-targets --all-features -- -D warnings -D deprecated -D clippy::perf -D clippy::complexity -D clippy::dbg_macro
 
   - name: cargo test
     image: ekidd/rust-musl-builder:1.47.0
index 335de3282cbfae2f141fd0205f97363d4b7002b9..90f9e09f83eb315765681328d3815b73bc75b867 100644 (file)
@@ -1,4 +1,4 @@
 [submodule "docs"]
        path = docs
-       url = http://github.com/LemmyNet/lemmy-docs
-       branch = master
+       url = https://github.com/LemmyNet/lemmy-docs
+       branch = main
index ec2712f14be2f8c38f4bd4c791da87e69f198b13..e963792adf60cb33f54990f7f1dfdb1c3d722730 100755 (executable)
@@ -8,14 +8,4 @@ for Item in alpha beta gamma delta epsilon ; do
   sudo chown -R 991:991 volumes/pictrs_$Item
 done
 
-sudo docker-compose up -d
-
-echo "Waiting for Lemmy to start..."
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8541/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8551/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8561/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8571/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8581/api/v1/site')" != "200" ]]; do sleep 1; done
-echo "All instances started."
-
-sudo docker-compose logs -f
+sudo docker-compose up
index 2f5f4a3a9fef92ec7fd94e86d1fba468fe967c07..ff032562fce84469e4373b90c3f1983cbf8bb9dd 100644 (file)
@@ -8,7 +8,7 @@ use crate::{
 };
 use actix_web::web::Data;
 use anyhow::Context;
-use lemmy_apub::fetcher::search_by_apub_id;
+use lemmy_apub::fetcher::search::search_by_apub_id;
 use lemmy_db_queries::{
   diesel_option_overwrite,
   source::{category::Category_, site::Site_},
index f52bbea1a310704b8a196746f99fdd10947407ab..a03e1ef529fcbe26d31bf8d6cf5ac11ad203c0a1 100644 (file)
@@ -1,4 +1,4 @@
-use crate::fetcher::get_or_fetch_and_upsert_user;
+use crate::fetcher::user::get_or_fetch_and_upsert_user;
 use activitystreams::{
   activity::{ActorAndObjectRef, ActorAndObjectRefExt},
   base::{AsBase, BaseExt},
index bd21f4c7fbf4d3842da63cfe50b2f49cda7297d1..160b20ece50b6deb9007b3272a74763e98fe3525 100644 (file)
@@ -1,7 +1,7 @@
 use crate::{
   activities::receive::verify_activity_domains_valid,
   check_is_apub_id_valid,
-  fetcher::get_or_fetch_and_upsert_user,
+  fetcher::user::get_or_fetch_and_upsert_user,
   inbox::get_activity_to_and_cc,
   objects::FromApub,
   NoteExt,
index 3f7a59de3731001ea16480cea2ec2442955b906d..36917ecb2731bc3c1700af53e9c184172aaa5b50 100644 (file)
@@ -2,7 +2,7 @@ use crate::{
   activities::send::generate_activity_id,
   activity_queue::{send_comment_mentions, send_to_community},
   extensions::context::lemmy_context,
-  fetcher::get_or_fetch_and_upsert_user,
+  fetcher::user::get_or_fetch_and_upsert_user,
   objects::ToApub,
   ActorType,
   ApubLikeableType,
index e148b4e94d47e3d7dc56bf7e878db09e5622422b..c2f22fa2dc35193c668b0c03c7b853774352776c 100644 (file)
@@ -3,7 +3,7 @@ use crate::{
   activity_queue::{send_activity_single_dest, send_to_community_followers},
   check_is_apub_id_valid,
   extensions::context::lemmy_context,
-  fetcher::get_or_fetch_and_upsert_user,
+  fetcher::user::get_or_fetch_and_upsert_user,
   ActorType,
 };
 use activitystreams::{
diff --git a/lemmy_apub/src/fetcher.rs b/lemmy_apub/src/fetcher.rs
deleted file mode 100644 (file)
index 4e1fa98..0000000
+++ /dev/null
@@ -1,475 +0,0 @@
-use crate::{
-  check_is_apub_id_valid,
-  objects::FromApub,
-  ActorType,
-  GroupExt,
-  NoteExt,
-  PageExt,
-  PersonExt,
-  APUB_JSON_CONTENT_TYPE,
-};
-use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
-use anyhow::{anyhow, Context};
-use chrono::NaiveDateTime;
-use diesel::result::Error::NotFound;
-use lemmy_db_queries::{source::user::User, ApubObject, Crud, Joinable, SearchType};
-use lemmy_db_schema::{
-  naive_now,
-  source::{
-    comment::Comment,
-    community::{Community, CommunityModerator, CommunityModeratorForm},
-    post::Post,
-    user::User_,
-  },
-};
-use lemmy_db_views::{comment_view::CommentView, post_view::PostView};
-use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe};
-use lemmy_structs::{blocking, site::SearchResponse};
-use lemmy_utils::{
-  location_info,
-  request::{retry, RecvError},
-  settings::Settings,
-  LemmyError,
-};
-use lemmy_websocket::LemmyContext;
-use log::debug;
-use reqwest::Client;
-use serde::Deserialize;
-use std::{fmt::Debug, time::Duration};
-use url::Url;
-
-static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
-static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
-
-/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
-/// fetch through the search).
-///
-/// Tests are passing with a value of 5, so 10 should be safe for production.
-static MAX_REQUEST_NUMBER: i32 = 10;
-
-/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
-/// timeouts etc.
-async fn fetch_remote_object<Response>(
-  client: &Client,
-  url: &Url,
-  recursion_counter: &mut i32,
-) -> Result<Response, LemmyError>
-where
-  Response: for<'de> Deserialize<'de>,
-{
-  *recursion_counter += 1;
-  if *recursion_counter > MAX_REQUEST_NUMBER {
-    return Err(anyhow!("Maximum recursion depth reached").into());
-  }
-  check_is_apub_id_valid(&url)?;
-
-  let timeout = Duration::from_secs(60);
-
-  let json = retry(|| {
-    client
-      .get(url.as_str())
-      .header("Accept", APUB_JSON_CONTENT_TYPE)
-      .timeout(timeout)
-      .send()
-  })
-  .await?
-  .json()
-  .await
-  .map_err(|e| {
-    debug!("Receive error, {}", e);
-    RecvError(e.to_string())
-  })?;
-
-  Ok(json)
-}
-
-/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
-#[serde(untagged)]
-#[derive(serde::Deserialize, Debug)]
-enum SearchAcceptedObjects {
-  Person(Box<PersonExt>),
-  Group(Box<GroupExt>),
-  Page(Box<PageExt>),
-  Comment(Box<NoteExt>),
-}
-
-/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
-///
-/// Some working examples for use with the `docker/federation/` setup:
-/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
-/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
-/// http://lemmy_gamma:8561/post/3
-/// http://lemmy_delta:8571/comment/2
-pub async fn search_by_apub_id(
-  query: &str,
-  context: &LemmyContext,
-) -> Result<SearchResponse, LemmyError> {
-  // Parse the shorthand query url
-  let query_url = if query.contains('@') {
-    debug!("Search for {}", query);
-    let split = query.split('@').collect::<Vec<&str>>();
-
-    // User type will look like ['', username, instance]
-    // Community will look like [!community, instance]
-    let (name, instance) = if split.len() == 3 {
-      (format!("/u/{}", split[1]), split[2])
-    } else if split.len() == 2 {
-      if split[0].contains('!') {
-        let split2 = split[0].split('!').collect::<Vec<&str>>();
-        (format!("/c/{}", split2[1]), split[1])
-      } else {
-        return Err(anyhow!("Invalid search query: {}", query).into());
-      }
-    } else {
-      return Err(anyhow!("Invalid search query: {}", query).into());
-    };
-
-    let url = format!(
-      "{}://{}{}",
-      Settings::get().get_protocol_string(),
-      instance,
-      name
-    );
-    Url::parse(&url)?
-  } else {
-    Url::parse(&query)?
-  };
-
-  let mut response = SearchResponse {
-    type_: SearchType::All.to_string(),
-    comments: vec![],
-    posts: vec![],
-    communities: vec![],
-    users: vec![],
-  };
-
-  let domain = query_url.domain().context("url has no domain")?;
-  let recursion_counter = &mut 0;
-  let response = match fetch_remote_object::<SearchAcceptedObjects>(
-    context.client(),
-    &query_url,
-    recursion_counter,
-  )
-  .await?
-  {
-    SearchAcceptedObjects::Person(p) => {
-      let user_uri = p.inner.id(domain)?.context("person has no id")?;
-
-      let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
-
-      response.users = vec![
-        blocking(context.pool(), move |conn| {
-          UserViewSafe::read(conn, user.id)
-        })
-        .await??,
-      ];
-
-      response
-    }
-    SearchAcceptedObjects::Group(g) => {
-      let community_uri = g.inner.id(domain)?.context("group has no id")?;
-
-      let community =
-        get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
-
-      response.communities = vec![
-        blocking(context.pool(), move |conn| {
-          CommunityView::read(conn, community.id, None)
-        })
-        .await??,
-      ];
-
-      response
-    }
-    SearchAcceptedObjects::Page(p) => {
-      let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
-
-      response.posts =
-        vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
-
-      response
-    }
-    SearchAcceptedObjects::Comment(c) => {
-      let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
-
-      response.comments = vec![
-        blocking(context.pool(), move |conn| {
-          CommentView::read(conn, c.id, None)
-        })
-        .await??,
-      ];
-
-      response
-    }
-  };
-
-  Ok(response)
-}
-
-/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
-/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
-///
-/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
-/// Otherwise it is fetched from the remote instance, stored and returned.
-pub(crate) async fn get_or_fetch_and_upsert_actor(
-  apub_id: &Url,
-  context: &LemmyContext,
-  recursion_counter: &mut i32,
-) -> Result<Box<dyn ActorType>, LemmyError> {
-  let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
-  let actor: Box<dyn ActorType> = match community {
-    Ok(c) => Box::new(c),
-    Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
-  };
-  Ok(actor)
-}
-
-/// Get a user from its apub ID.
-///
-/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
-/// Otherwise it is fetched from the remote instance, stored and returned.
-pub(crate) async fn get_or_fetch_and_upsert_user(
-  apub_id: &Url,
-  context: &LemmyContext,
-  recursion_counter: &mut i32,
-) -> Result<User_, LemmyError> {
-  let apub_id_owned = apub_id.to_owned();
-  let user = blocking(context.pool(), move |conn| {
-    User_::read_from_apub_id(conn, apub_id_owned.as_ref())
-  })
-  .await?;
-
-  match user {
-    // If its older than a day, re-fetch it
-    Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
-      debug!("Fetching and updating from remote user: {}", apub_id);
-      let person =
-        fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
-      // If fetching failed, return the existing data.
-      if person.is_err() {
-        return Ok(u);
-      }
-
-      let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
-
-      let user_id = user.id;
-      blocking(context.pool(), move |conn| {
-        User_::mark_as_updated(conn, user_id)
-      })
-      .await??;
-
-      Ok(user)
-    }
-    Ok(u) => Ok(u),
-    Err(NotFound {}) => {
-      debug!("Fetching and creating remote user: {}", apub_id);
-      let person =
-        fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
-
-      let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
-
-      Ok(user)
-    }
-    Err(e) => Err(e.into()),
-  }
-}
-
-/// Determines when a remote actor should be refetched from its instance. In release builds, this is
-/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
-/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
-///
-/// TODO it won't pick up new avatars, summaries etc until a day after.
-/// Actors need an "update" activity pushed to other servers to fix this.
-fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
-  let update_interval = if cfg!(debug_assertions) {
-    // avoid infinite loop when fetching community outbox
-    chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
-  } else {
-    chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
-  };
-  last_refreshed.lt(&(naive_now() - update_interval))
-}
-
-/// Get a community from its apub ID.
-///
-/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
-/// Otherwise it is fetched from the remote instance, stored and returned.
-pub(crate) async fn get_or_fetch_and_upsert_community(
-  apub_id: &Url,
-  context: &LemmyContext,
-  recursion_counter: &mut i32,
-) -> Result<Community, LemmyError> {
-  let apub_id_owned = apub_id.to_owned();
-  let community = blocking(context.pool(), move |conn| {
-    Community::read_from_apub_id(conn, apub_id_owned.as_str())
-  })
-  .await?;
-
-  match community {
-    Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
-      debug!("Fetching and updating from remote community: {}", apub_id);
-      fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
-    }
-    Ok(c) => Ok(c),
-    Err(NotFound {}) => {
-      debug!("Fetching and creating remote community: {}", apub_id);
-      fetch_remote_community(apub_id, context, None, recursion_counter).await
-    }
-    Err(e) => Err(e.into()),
-  }
-}
-
-/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
-/// is set, this is an update for a community which is already known locally. If not, we don't know
-/// the community yet and also pull the outbox, to get some initial posts.
-async fn fetch_remote_community(
-  apub_id: &Url,
-  context: &LemmyContext,
-  old_community: Option<Community>,
-  recursion_counter: &mut i32,
-) -> Result<Community, LemmyError> {
-  let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
-  // If fetching failed, return the existing data.
-  if let Some(ref c) = old_community {
-    if group.is_err() {
-      return Ok(c.to_owned());
-    }
-  }
-
-  let group = group?;
-  let community =
-    Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
-
-  // Also add the community moderators too
-  let attributed_to = group.inner.attributed_to().context(location_info!())?;
-  let creator_and_moderator_uris: Vec<&Url> = attributed_to
-    .as_many()
-    .context(location_info!())?
-    .iter()
-    .map(|a| a.as_xsd_any_uri().context(""))
-    .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
-
-  let mut creator_and_moderators = Vec::new();
-
-  for uri in creator_and_moderator_uris {
-    let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
-
-    creator_and_moderators.push(c_or_m);
-  }
-
-  // TODO: need to make this work to update mods of existing communities
-  if old_community.is_none() {
-    let community_id = community.id;
-    blocking(context.pool(), move |conn| {
-      for mod_ in creator_and_moderators {
-        let community_moderator_form = CommunityModeratorForm {
-          community_id,
-          user_id: mod_.id,
-        };
-
-        CommunityModerator::join(conn, &community_moderator_form)?;
-      }
-      Ok(()) as Result<(), LemmyError>
-    })
-    .await??;
-  }
-
-  // fetch outbox (maybe make this conditional)
-  let outbox = fetch_remote_object::<OrderedCollection>(
-    context.client(),
-    &community.get_outbox_url()?,
-    recursion_counter,
-  )
-  .await?;
-  let outbox_items = outbox.items().context(location_info!())?.clone();
-  let mut outbox_items = outbox_items.many().context(location_info!())?;
-  if outbox_items.len() > 20 {
-    outbox_items = outbox_items[0..20].to_vec();
-  }
-  for o in outbox_items {
-    let page = PageExt::from_any_base(o)?.context(location_info!())?;
-    let page_id = page.id_unchecked().context(location_info!())?;
-
-    // The post creator may be from a blocked instance, if it errors, then skip it
-    if check_is_apub_id_valid(page_id).is_err() {
-      continue;
-    }
-    Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
-    // TODO: we need to send a websocket update here
-  }
-
-  Ok(community)
-}
-
-/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
-/// pulled from its apub ID, inserted and returned.
-///
-/// The parent community is also pulled if necessary. Comments are not pulled.
-pub(crate) async fn get_or_fetch_and_insert_post(
-  post_ap_id: &Url,
-  context: &LemmyContext,
-  recursion_counter: &mut i32,
-) -> Result<Post, LemmyError> {
-  let post_ap_id_owned = post_ap_id.to_owned();
-  let post = blocking(context.pool(), move |conn| {
-    Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
-  })
-  .await?;
-
-  match post {
-    Ok(p) => Ok(p),
-    Err(NotFound {}) => {
-      debug!("Fetching and creating remote post: {}", post_ap_id);
-      let page =
-        fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
-      let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
-
-      Ok(post)
-    }
-    Err(e) => Err(e.into()),
-  }
-}
-
-/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
-/// pulled from its apub ID, inserted and returned.
-///
-/// The parent community, post and comment are also pulled if necessary.
-pub(crate) async fn get_or_fetch_and_insert_comment(
-  comment_ap_id: &Url,
-  context: &LemmyContext,
-  recursion_counter: &mut i32,
-) -> Result<Comment, LemmyError> {
-  let comment_ap_id_owned = comment_ap_id.to_owned();
-  let comment = blocking(context.pool(), move |conn| {
-    Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
-  })
-  .await?;
-
-  match comment {
-    Ok(p) => Ok(p),
-    Err(NotFound {}) => {
-      debug!(
-        "Fetching and creating remote comment and its parents: {}",
-        comment_ap_id
-      );
-      let comment =
-        fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
-      let comment = Comment::from_apub(
-        &comment,
-        context,
-        comment_ap_id.to_owned(),
-        recursion_counter,
-      )
-      .await?;
-
-      let post_id = comment.post_id;
-      let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
-      if post.locked {
-        return Err(anyhow!("Post is locked").into());
-      }
-
-      Ok(comment)
-    }
-    Err(e) => Err(e.into()),
-  }
-}
diff --git a/lemmy_apub/src/fetcher/community.rs b/lemmy_apub/src/fetcher/community.rs
new file mode 100644 (file)
index 0000000..2a25641
--- /dev/null
@@ -0,0 +1,147 @@
+use crate::{
+  check_is_apub_id_valid,
+  fetcher::{
+    fetch::fetch_remote_object,
+    get_or_fetch_and_upsert_user,
+    is_deleted,
+    should_refetch_actor,
+  },
+  objects::FromApub,
+  ActorType,
+  GroupExt,
+  PageExt,
+};
+use activitystreams::{
+  base::{BaseExt, ExtendsExt},
+  collection::{CollectionExt, OrderedCollection},
+  object::ObjectExt,
+};
+use anyhow::Context;
+use diesel::result::Error::NotFound;
+use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
+use lemmy_db_schema::source::{
+  community::{Community, CommunityModerator, CommunityModeratorForm},
+  post::Post,
+};
+use lemmy_structs::blocking;
+use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// Get a community from its apub ID.
+///
+/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
+/// Otherwise it is fetched from the remote instance, stored and returned.
+pub(crate) async fn get_or_fetch_and_upsert_community(
+  apub_id: &Url,
+  context: &LemmyContext,
+  recursion_counter: &mut i32,
+) -> Result<Community, LemmyError> {
+  let apub_id_owned = apub_id.to_owned();
+  let community = blocking(context.pool(), move |conn| {
+    Community::read_from_apub_id(conn, apub_id_owned.as_str())
+  })
+  .await?;
+
+  match community {
+    Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
+      debug!("Fetching and updating from remote community: {}", apub_id);
+      fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
+    }
+    Ok(c) => Ok(c),
+    Err(NotFound {}) => {
+      debug!("Fetching and creating remote community: {}", apub_id);
+      fetch_remote_community(apub_id, context, None, recursion_counter).await
+    }
+    Err(e) => Err(e.into()),
+  }
+}
+
+/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
+/// is set, this is an update for a community which is already known locally. If not, we don't know
+/// the community yet and also pull the outbox, to get some initial posts.
+async fn fetch_remote_community(
+  apub_id: &Url,
+  context: &LemmyContext,
+  old_community: Option<Community>,
+  recursion_counter: &mut i32,
+) -> Result<Community, LemmyError> {
+  let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
+
+  if let Some(c) = old_community.to_owned() {
+    if is_deleted(&group) {
+      blocking(context.pool(), move |conn| {
+        Community::update_deleted(conn, c.id, true)
+      })
+      .await??;
+    } else if group.is_err() {
+      // If fetching failed, return the existing data.
+      return Ok(c);
+    }
+  }
+
+  let group = group?;
+  let community =
+    Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
+
+  // Also add the community moderators too
+  let attributed_to = group.inner.attributed_to().context(location_info!())?;
+  let creator_and_moderator_uris: Vec<&Url> = attributed_to
+    .as_many()
+    .context(location_info!())?
+    .iter()
+    .map(|a| a.as_xsd_any_uri().context(""))
+    .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
+
+  let mut creator_and_moderators = Vec::new();
+
+  for uri in creator_and_moderator_uris {
+    let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
+
+    creator_and_moderators.push(c_or_m);
+  }
+
+  // TODO: need to make this work to update mods of existing communities
+  if old_community.is_none() {
+    let community_id = community.id;
+    blocking(context.pool(), move |conn| {
+      for mod_ in creator_and_moderators {
+        let community_moderator_form = CommunityModeratorForm {
+          community_id,
+          user_id: mod_.id,
+        };
+
+        CommunityModerator::join(conn, &community_moderator_form)?;
+      }
+      Ok(()) as Result<(), LemmyError>
+    })
+    .await??;
+  }
+
+  // fetch outbox (maybe make this conditional)
+  let outbox = fetch_remote_object::<OrderedCollection>(
+    context.client(),
+    &community.get_outbox_url()?,
+    recursion_counter,
+  )
+  .await?;
+  let outbox_items = outbox.items().context(location_info!())?.clone();
+  let mut outbox_items = outbox_items.many().context(location_info!())?;
+  if outbox_items.len() > 20 {
+    outbox_items = outbox_items[0..20].to_vec();
+  }
+  for o in outbox_items {
+    let page = PageExt::from_any_base(o)?.context(location_info!())?;
+    let page_id = page.id_unchecked().context(location_info!())?;
+
+    // The post creator may be from a blocked instance, if it errors, then skip it
+    if check_is_apub_id_valid(page_id).is_err() {
+      continue;
+    }
+    Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
+    // TODO: we need to send a websocket update here
+  }
+
+  Ok(community)
+}
diff --git a/lemmy_apub/src/fetcher/fetch.rs b/lemmy_apub/src/fetcher/fetch.rs
new file mode 100644 (file)
index 0000000..e7e2907
--- /dev/null
@@ -0,0 +1,82 @@
+use crate::{check_is_apub_id_valid, APUB_JSON_CONTENT_TYPE};
+use anyhow::anyhow;
+use lemmy_utils::{request::retry, LemmyError};
+use reqwest::{Client, StatusCode};
+use serde::Deserialize;
+use std::time::Duration;
+use thiserror::Error;
+use url::Url;
+
+/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
+/// fetch through the search).
+///
+/// Tests are passing with a value of 5, so 10 should be safe for production.
+static MAX_REQUEST_NUMBER: i32 = 10;
+
+#[derive(Debug, Error)]
+pub(in crate::fetcher) struct FetchError {
+  pub inner: anyhow::Error,
+  pub status_code: Option<StatusCode>,
+}
+
+impl From<LemmyError> for FetchError {
+  fn from(t: LemmyError) -> Self {
+    FetchError {
+      inner: t.inner,
+      status_code: None,
+    }
+  }
+}
+
+impl From<reqwest::Error> for FetchError {
+  fn from(t: reqwest::Error) -> Self {
+    let status = t.status();
+    FetchError {
+      inner: t.into(),
+      status_code: status,
+    }
+  }
+}
+
+impl std::fmt::Display for FetchError {
+  fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+    std::fmt::Display::fmt(&self, f)
+  }
+}
+
+/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
+/// timeouts etc.
+pub(in crate::fetcher) async fn fetch_remote_object<Response>(
+  client: &Client,
+  url: &Url,
+  recursion_counter: &mut i32,
+) -> Result<Response, FetchError>
+where
+  Response: for<'de> Deserialize<'de> + std::fmt::Debug,
+{
+  *recursion_counter += 1;
+  if *recursion_counter > MAX_REQUEST_NUMBER {
+    return Err(LemmyError::from(anyhow!("Maximum recursion depth reached")).into());
+  }
+  check_is_apub_id_valid(&url)?;
+
+  let timeout = Duration::from_secs(60);
+
+  let res = retry(|| {
+    client
+      .get(url.as_str())
+      .header("Accept", APUB_JSON_CONTENT_TYPE)
+      .timeout(timeout)
+      .send()
+  })
+  .await?;
+
+  if res.status() == StatusCode::GONE {
+    return Err(FetchError {
+      inner: anyhow!("Remote object {} was deleted", url),
+      status_code: Some(res.status()),
+    });
+  }
+
+  Ok(res.json().await?)
+}
diff --git a/lemmy_apub/src/fetcher/mod.rs b/lemmy_apub/src/fetcher/mod.rs
new file mode 100644 (file)
index 0000000..593b163
--- /dev/null
@@ -0,0 +1,72 @@
+pub(crate) mod community;
+mod fetch;
+pub(crate) mod objects;
+pub mod search;
+pub(crate) mod user;
+
+use crate::{
+  fetcher::{
+    community::get_or_fetch_and_upsert_community,
+    fetch::FetchError,
+    user::get_or_fetch_and_upsert_user,
+  },
+  ActorType,
+};
+use chrono::NaiveDateTime;
+use http::StatusCode;
+use lemmy_db_schema::naive_now;
+use lemmy_utils::LemmyError;
+use lemmy_websocket::LemmyContext;
+use serde::Deserialize;
+use url::Url;
+
+static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
+static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
+
+fn is_deleted<Response>(fetch_response: &Result<Response, FetchError>) -> bool
+where
+  Response: for<'de> Deserialize<'de>,
+{
+  if let Err(e) = fetch_response {
+    if let Some(status) = e.status_code {
+      if status == StatusCode::GONE {
+        return true;
+      }
+    }
+  }
+  false
+}
+
+/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
+/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
+///
+/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
+/// Otherwise it is fetched from the remote instance, stored and returned.
+pub(crate) async fn get_or_fetch_and_upsert_actor(
+  apub_id: &Url,
+  context: &LemmyContext,
+  recursion_counter: &mut i32,
+) -> Result<Box<dyn ActorType>, LemmyError> {
+  let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
+  let actor: Box<dyn ActorType> = match community {
+    Ok(c) => Box::new(c),
+    Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
+  };
+  Ok(actor)
+}
+
+/// Determines when a remote actor should be refetched from its instance. In release builds, this is
+/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
+/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
+///
+/// TODO it won't pick up new avatars, summaries etc until a day after.
+/// Actors need an "update" activity pushed to other servers to fix this.
+fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
+  let update_interval = if cfg!(debug_assertions) {
+    // avoid infinite loop when fetching community outbox
+    chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
+  } else {
+    chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
+  };
+  last_refreshed.lt(&(naive_now() - update_interval))
+}
diff --git a/lemmy_apub/src/fetcher/objects.rs b/lemmy_apub/src/fetcher/objects.rs
new file mode 100644 (file)
index 0000000..269b27e
--- /dev/null
@@ -0,0 +1,83 @@
+use crate::{fetcher::fetch::fetch_remote_object, objects::FromApub, NoteExt, PageExt};
+use anyhow::anyhow;
+use diesel::result::Error::NotFound;
+use lemmy_db_queries::{ApubObject, Crud};
+use lemmy_db_schema::source::{comment::Comment, post::Post};
+use lemmy_structs::blocking;
+use lemmy_utils::LemmyError;
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
+/// pulled from its apub ID, inserted and returned.
+///
+/// The parent community is also pulled if necessary. Comments are not pulled.
+pub(crate) async fn get_or_fetch_and_insert_post(
+  post_ap_id: &Url,
+  context: &LemmyContext,
+  recursion_counter: &mut i32,
+) -> Result<Post, LemmyError> {
+  let post_ap_id_owned = post_ap_id.to_owned();
+  let post = blocking(context.pool(), move |conn| {
+    Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
+  })
+  .await?;
+
+  match post {
+    Ok(p) => Ok(p),
+    Err(NotFound {}) => {
+      debug!("Fetching and creating remote post: {}", post_ap_id);
+      let page =
+        fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
+      let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
+
+      Ok(post)
+    }
+    Err(e) => Err(e.into()),
+  }
+}
+
+/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
+/// pulled from its apub ID, inserted and returned.
+///
+/// The parent community, post and comment are also pulled if necessary.
+pub(crate) async fn get_or_fetch_and_insert_comment(
+  comment_ap_id: &Url,
+  context: &LemmyContext,
+  recursion_counter: &mut i32,
+) -> Result<Comment, LemmyError> {
+  let comment_ap_id_owned = comment_ap_id.to_owned();
+  let comment = blocking(context.pool(), move |conn| {
+    Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
+  })
+  .await?;
+
+  match comment {
+    Ok(p) => Ok(p),
+    Err(NotFound {}) => {
+      debug!(
+        "Fetching and creating remote comment and its parents: {}",
+        comment_ap_id
+      );
+      let comment =
+        fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
+      let comment = Comment::from_apub(
+        &comment,
+        context,
+        comment_ap_id.to_owned(),
+        recursion_counter,
+      )
+      .await?;
+
+      let post_id = comment.post_id;
+      let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
+      if post.locked {
+        return Err(anyhow!("Post is locked").into());
+      }
+
+      Ok(comment)
+    }
+    Err(e) => Err(e.into()),
+  }
+}
diff --git a/lemmy_apub/src/fetcher/search.rs b/lemmy_apub/src/fetcher/search.rs
new file mode 100644 (file)
index 0000000..13187b0
--- /dev/null
@@ -0,0 +1,204 @@
+use crate::{
+  fetcher::{
+    fetch::fetch_remote_object,
+    get_or_fetch_and_upsert_community,
+    get_or_fetch_and_upsert_user,
+    is_deleted,
+  },
+  find_object_by_id,
+  objects::FromApub,
+  GroupExt,
+  NoteExt,
+  Object,
+  PageExt,
+  PersonExt,
+};
+use activitystreams::base::BaseExt;
+use anyhow::{anyhow, Context};
+use lemmy_db_queries::{
+  source::{
+    comment::Comment_,
+    community::Community_,
+    post::Post_,
+    private_message::PrivateMessage_,
+    user::User,
+  },
+  SearchType,
+};
+use lemmy_db_schema::source::{
+  comment::Comment,
+  community::Community,
+  post::Post,
+  private_message::PrivateMessage,
+  user::User_,
+};
+use lemmy_db_views::{comment_view::CommentView, post_view::PostView};
+use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe};
+use lemmy_structs::{blocking, site::SearchResponse};
+use lemmy_utils::{settings::Settings, LemmyError};
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
+#[serde(untagged)]
+#[derive(serde::Deserialize, Debug)]
+enum SearchAcceptedObjects {
+  Person(Box<PersonExt>),
+  Group(Box<GroupExt>),
+  Page(Box<PageExt>),
+  Comment(Box<NoteExt>),
+}
+
+/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
+///
+/// Some working examples for use with the `docker/federation/` setup:
+/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
+/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
+/// http://lemmy_gamma:8561/post/3
+/// http://lemmy_delta:8571/comment/2
+pub async fn search_by_apub_id(
+  query: &str,
+  context: &LemmyContext,
+) -> Result<SearchResponse, LemmyError> {
+  // Parse the shorthand query url
+  let query_url = if query.contains('@') {
+    debug!("Search for {}", query);
+    let split = query.split('@').collect::<Vec<&str>>();
+
+    // User type will look like ['', username, instance]
+    // Community will look like [!community, instance]
+    let (name, instance) = if split.len() == 3 {
+      (format!("/u/{}", split[1]), split[2])
+    } else if split.len() == 2 {
+      if split[0].contains('!') {
+        let split2 = split[0].split('!').collect::<Vec<&str>>();
+        (format!("/c/{}", split2[1]), split[1])
+      } else {
+        return Err(anyhow!("Invalid search query: {}", query).into());
+      }
+    } else {
+      return Err(anyhow!("Invalid search query: {}", query).into());
+    };
+
+    let url = format!(
+      "{}://{}{}",
+      Settings::get().get_protocol_string(),
+      instance,
+      name
+    );
+    Url::parse(&url)?
+  } else {
+    Url::parse(&query)?
+  };
+
+  let recursion_counter = &mut 0;
+  let fetch_response =
+    fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url, recursion_counter)
+      .await;
+  if is_deleted(&fetch_response) {
+    delete_object_locally(&query_url, context).await?;
+  }
+
+  build_response(fetch_response?, query_url, recursion_counter, context).await
+}
+
+async fn build_response(
+  fetch_response: SearchAcceptedObjects,
+  query_url: Url,
+  recursion_counter: &mut i32,
+  context: &LemmyContext,
+) -> Result<SearchResponse, LemmyError> {
+  let domain = query_url.domain().context("url has no domain")?;
+  let mut response = SearchResponse {
+    type_: SearchType::All.to_string(),
+    comments: vec![],
+    posts: vec![],
+    communities: vec![],
+    users: vec![],
+  };
+
+  match fetch_response {
+    SearchAcceptedObjects::Person(p) => {
+      let user_uri = p.inner.id(domain)?.context("person has no id")?;
+
+      let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
+
+      response.users = vec![
+        blocking(context.pool(), move |conn| {
+          UserViewSafe::read(conn, user.id)
+        })
+        .await??,
+      ];
+    }
+    SearchAcceptedObjects::Group(g) => {
+      let community_uri = g.inner.id(domain)?.context("group has no id")?;
+
+      let community =
+        get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
+
+      response.communities = vec![
+        blocking(context.pool(), move |conn| {
+          CommunityView::read(conn, community.id, None)
+        })
+        .await??,
+      ];
+    }
+    SearchAcceptedObjects::Page(p) => {
+      let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
+
+      response.posts =
+        vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
+    }
+    SearchAcceptedObjects::Comment(c) => {
+      let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
+
+      response.comments = vec![
+        blocking(context.pool(), move |conn| {
+          CommentView::read(conn, c.id, None)
+        })
+        .await??,
+      ];
+    }
+  };
+
+  Ok(response)
+}
+
+async fn delete_object_locally(query_url: &Url, context: &LemmyContext) -> Result<(), LemmyError> {
+  let res = find_object_by_id(context, query_url.to_owned()).await?;
+  match res {
+    Object::Comment(c) => {
+      blocking(context.pool(), move |conn| {
+        Comment::update_deleted(conn, c.id, true)
+      })
+      .await??;
+    }
+    Object::Post(p) => {
+      blocking(context.pool(), move |conn| {
+        Post::update_deleted(conn, p.id, true)
+      })
+      .await??;
+    }
+    Object::User(u) => {
+      // TODO: implement update_deleted() for user, move it to ApubObject trait
+      blocking(context.pool(), move |conn| {
+        User_::delete_account(conn, u.id)
+      })
+      .await??;
+    }
+    Object::Community(c) => {
+      blocking(context.pool(), move |conn| {
+        Community::update_deleted(conn, c.id, true)
+      })
+      .await??;
+    }
+    Object::PrivateMessage(pm) => {
+      blocking(context.pool(), move |conn| {
+        PrivateMessage::update_deleted(conn, pm.id, true)
+      })
+      .await??;
+    }
+  }
+  Err(anyhow!("Object was deleted").into())
+}
diff --git a/lemmy_apub/src/fetcher/user.rs b/lemmy_apub/src/fetcher/user.rs
new file mode 100644 (file)
index 0000000..8442519
--- /dev/null
@@ -0,0 +1,71 @@
+use crate::{
+  fetcher::{fetch::fetch_remote_object, is_deleted, should_refetch_actor},
+  objects::FromApub,
+  PersonExt,
+};
+use anyhow::anyhow;
+use diesel::result::Error::NotFound;
+use lemmy_db_queries::{source::user::User, ApubObject};
+use lemmy_db_schema::source::user::User_;
+use lemmy_structs::blocking;
+use lemmy_utils::LemmyError;
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// Get a user from its apub ID.
+///
+/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
+/// Otherwise it is fetched from the remote instance, stored and returned.
+pub(crate) async fn get_or_fetch_and_upsert_user(
+  apub_id: &Url,
+  context: &LemmyContext,
+  recursion_counter: &mut i32,
+) -> Result<User_, LemmyError> {
+  let apub_id_owned = apub_id.to_owned();
+  let user = blocking(context.pool(), move |conn| {
+    User_::read_from_apub_id(conn, apub_id_owned.as_ref())
+  })
+  .await?;
+
+  match user {
+    // If its older than a day, re-fetch it
+    Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
+      debug!("Fetching and updating from remote user: {}", apub_id);
+      let person =
+        fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
+
+      if is_deleted(&person) {
+        // TODO: use User_::update_deleted() once implemented
+        blocking(context.pool(), move |conn| {
+          User_::delete_account(conn, u.id)
+        })
+        .await??;
+        return Err(anyhow!("User was deleted by remote instance").into());
+      } else if person.is_err() {
+        return Ok(u);
+      }
+
+      let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
+
+      let user_id = user.id;
+      blocking(context.pool(), move |conn| {
+        User_::mark_as_updated(conn, user_id)
+      })
+      .await??;
+
+      Ok(user)
+    }
+    Ok(u) => Ok(u),
+    Err(NotFound {}) => {
+      debug!("Fetching and creating remote user: {}", apub_id);
+      let person =
+        fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
+
+      let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
+
+      Ok(user)
+    }
+    Err(e) => Err(e.into()),
+  }
+}
index d0ce9251ff2df0d3a7b9885688f3ac873f90af05..3005b8b59bccc3e6608b26372636281334e203a7 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   extensions::context::lemmy_context,
-  http::create_apub_response,
+  http::{create_apub_response, create_apub_tombstone_response},
   objects::ToApub,
   ActorType,
 };
@@ -28,12 +28,19 @@ pub async fn get_apub_user_http(
   context: web::Data<LemmyContext>,
 ) -> Result<HttpResponse<Body>, LemmyError> {
   let user_name = info.into_inner().user_name;
+  // TODO: this needs to be able to read deleted users, so that it can send tombstones
   let user = blocking(context.pool(), move |conn| {
     User_::find_by_email_or_username(conn, &user_name)
   })
   .await??;
-  let u = user.to_apub(context.pool()).await?;
-  Ok(create_apub_response(&u))
+
+  if !user.deleted {
+    let apub = user.to_apub(context.pool()).await?;
+
+    Ok(create_apub_response(&apub))
+  } else {
+    Ok(create_apub_tombstone_response(&user.to_tombstone()?))
+  }
 }
 
 pub async fn get_apub_user_outbox(
index 934404242ed0a65ff74c72ec7c8859fab4a11c42..e0248cb6d6e2cd9970cb2fb4b74485e578fd9e8a 100644 (file)
@@ -31,8 +31,10 @@ use crate::{
     receive_unhandled_activity,
     verify_activity_domains_valid,
   },
-  fetcher::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
+  fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
+  find_post_or_comment_by_id,
   inbox::is_addressed_to_public,
+  PostOrComment,
 };
 use activitystreams::{
   activity::{Create, Delete, Dislike, Like, Remove, Undo, Update},
@@ -41,8 +43,8 @@ use activitystreams::{
 };
 use anyhow::Context;
 use diesel::result::Error::NotFound;
-use lemmy_db_queries::{ApubObject, Crud};
-use lemmy_db_schema::source::{comment::Comment, post::Post, site::Site};
+use lemmy_db_queries::Crud;
+use lemmy_db_schema::source::site::Site;
 use lemmy_structs::blocking;
 use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::LemmyContext;
@@ -317,39 +319,6 @@ pub(in crate::inbox) async fn receive_undo_dislike_for_community(
   }
 }
 
-enum PostOrComment {
-  Comment(Comment),
-  Post(Post),
-}
-
-/// Tries to find a post or comment in the local database, without any network requests.
-/// This is used to handle deletions and removals, because in case we dont have the object, we can
-/// simply ignore the activity.
-async fn find_post_or_comment_by_id(
-  context: &LemmyContext,
-  apub_id: Url,
-) -> Result<PostOrComment, LemmyError> {
-  let ap_id = apub_id.to_string();
-  let post = blocking(context.pool(), move |conn| {
-    Post::read_from_apub_id(conn, &ap_id)
-  })
-  .await?;
-  if let Ok(p) = post {
-    return Ok(PostOrComment::Post(p));
-  }
-
-  let ap_id = apub_id.to_string();
-  let comment = blocking(context.pool(), move |conn| {
-    Comment::read_from_apub_id(conn, &ap_id)
-  })
-  .await?;
-  if let Ok(c) = comment {
-    return Ok(PostOrComment::Comment(c));
-  }
-
-  Err(NotFound.into())
-}
-
 async fn fetch_post_or_comment_by_id(
   apub_id: &Url,
   context: &LemmyContext,
index 49c66dc7aa7b81676787a5f677c41abca45bd852..353a296e31fcedb4f64ed146a1a265c8e48939be 100644 (file)
@@ -17,7 +17,7 @@ use crate::{
     verify_activity_domains_valid,
   },
   check_is_apub_id_valid,
-  fetcher::get_or_fetch_and_upsert_community,
+  fetcher::community::get_or_fetch_and_upsert_community,
   inbox::{
     assert_activity_not_local,
     get_activity_id,
index 6f0d41c8aed6b15e5d6a17155a10205f5fa14080..bbf1bbbc7967295a89fb1c9382f7cc0cf1386487 100644 (file)
@@ -22,8 +22,16 @@ use activitystreams::{
 };
 use activitystreams_ext::{Ext1, Ext2};
 use anyhow::{anyhow, Context};
-use lemmy_db_queries::{source::activity::Activity_, DbPool};
-use lemmy_db_schema::source::{activity::Activity, user::User_};
+use diesel::NotFound;
+use lemmy_db_queries::{source::activity::Activity_, ApubObject, DbPool};
+use lemmy_db_schema::source::{
+  activity::Activity,
+  comment::Comment,
+  community::Community,
+  post::Post,
+  private_message::PrivateMessage,
+  user::User_,
+};
 use lemmy_structs::blocking;
 use lemmy_utils::{location_info, settings::Settings, LemmyError};
 use lemmy_websocket::LemmyContext;
@@ -239,3 +247,85 @@ where
   .await??;
   Ok(())
 }
+
+pub(crate) enum PostOrComment {
+  Comment(Comment),
+  Post(Post),
+}
+
+/// Tries to find a post or comment in the local database, without any network requests.
+/// This is used to handle deletions and removals, because in case we dont have the object, we can
+/// simply ignore the activity.
+pub(crate) async fn find_post_or_comment_by_id(
+  context: &LemmyContext,
+  apub_id: Url,
+) -> Result<PostOrComment, LemmyError> {
+  let ap_id = apub_id.to_string();
+  let post = blocking(context.pool(), move |conn| {
+    Post::read_from_apub_id(conn, &ap_id)
+  })
+  .await?;
+  if let Ok(p) = post {
+    return Ok(PostOrComment::Post(p));
+  }
+
+  let ap_id = apub_id.to_string();
+  let comment = blocking(context.pool(), move |conn| {
+    Comment::read_from_apub_id(conn, &ap_id)
+  })
+  .await?;
+  if let Ok(c) = comment {
+    return Ok(PostOrComment::Comment(c));
+  }
+
+  Err(NotFound.into())
+}
+
+pub(crate) enum Object {
+  Comment(Comment),
+  Post(Post),
+  Community(Community),
+  User(User_),
+  PrivateMessage(PrivateMessage),
+}
+
+pub(crate) async fn find_object_by_id(
+  context: &LemmyContext,
+  apub_id: Url,
+) -> Result<Object, LemmyError> {
+  if let Ok(pc) = find_post_or_comment_by_id(context, apub_id.to_owned()).await {
+    return Ok(match pc {
+      PostOrComment::Post(p) => Object::Post(p),
+      PostOrComment::Comment(c) => Object::Comment(c),
+    });
+  }
+
+  let ap_id = apub_id.to_string();
+  let user = blocking(context.pool(), move |conn| {
+    User_::read_from_apub_id(conn, &ap_id)
+  })
+  .await?;
+  if let Ok(u) = user {
+    return Ok(Object::User(u));
+  }
+
+  let ap_id = apub_id.to_string();
+  let community = blocking(context.pool(), move |conn| {
+    Community::read_from_apub_id(conn, &ap_id)
+  })
+  .await?;
+  if let Ok(c) = community {
+    return Ok(Object::Community(c));
+  }
+
+  let ap_id = apub_id.to_string();
+  let private_message = blocking(context.pool(), move |conn| {
+    PrivateMessage::read_from_apub_id(conn, &ap_id)
+  })
+  .await?;
+  if let Ok(pm) = private_message {
+    return Ok(Object::PrivateMessage(pm));
+  }
+
+  Err(NotFound.into())
+}
index c02055c4534071c090a257c1cf3a0958574bf5b8..5dba4149bda4614890b98f46ff34af84c45fc9ee 100644 (file)
@@ -1,15 +1,12 @@
 use crate::{
   extensions::context::lemmy_context,
-  fetcher::{
-    get_or_fetch_and_insert_comment,
-    get_or_fetch_and_insert_post,
-    get_or_fetch_and_upsert_user,
-  },
+  fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
   objects::{
     check_object_domain,
     check_object_for_community_or_site_ban,
     create_tombstone,
     get_object_from_apub,
+    get_or_fetch_and_upsert_user,
     get_source_markdown_value,
     set_content_and_source,
     FromApub,
index 39abcd1f31354425f0691d44f31e7bcb0771ba6a..4d7a235cce7018d1807ec18a642332273add4909 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   extensions::{context::lemmy_context, group_extensions::GroupExtension},
-  fetcher::get_or_fetch_and_upsert_user,
+  fetcher::user::get_or_fetch_and_upsert_user,
   objects::{
     check_object_domain,
     create_tombstone,
index 9e13782c95c0caf85b3598fb079b5d86ee55a839..d9eea762e0191076cebd7d3e80c0e10852a46c0c 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   check_is_apub_id_valid,
-  fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
+  fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user},
   inbox::community_inbox::check_community_or_site_ban,
 };
 use activitystreams::{
index fa1adfc8437bd8984ba2cc5ce15f6886813ee099..6d5ed8e23eaf2dcf572d6f8dbaca439552180b7e 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   extensions::{context::lemmy_context, page_extension::PageExtension},
-  fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
+  fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user},
   objects::{
     check_object_domain,
     check_object_for_community_or_site_ban,
index db5a06109a83264e526e4c316c3118b171c8422b..1a7b5e32752c6bd98def477a638add54d06e055a 100644 (file)
@@ -1,7 +1,7 @@
 use crate::{
   check_is_apub_id_valid,
   extensions::context::lemmy_context,
-  fetcher::get_or_fetch_and_upsert_user,
+  fetcher::user::get_or_fetch_and_upsert_user,
   objects::{
     check_object_domain,
     create_tombstone,
index 36baa4d424afe4c7fec0cf34df1a5858abaff66c..411d4342763bde852e746b47fcfc115fbb9f5b54 100644 (file)
@@ -15,7 +15,7 @@ struct SendError(pub String);
 #[error("Error receiving response, {0}")]
 pub struct RecvError(pub String);
 
-pub async fn retry<F, Fut, T>(f: F) -> Result<T, LemmyError>
+pub async fn retry<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
 where
   F: Fn() -> Fut,
   Fut: Future<Output = Result<T, reqwest::Error>>,
@@ -23,27 +23,27 @@ where
   retry_custom(|| async { Ok((f)().await) }).await
 }
 
-async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
+async fn retry_custom<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
 where
   F: Fn() -> Fut,
-  Fut: Future<Output = Result<Result<T, reqwest::Error>, LemmyError>>,
+  Fut: Future<Output = Result<Result<T, reqwest::Error>, reqwest::Error>>,
 {
-  let mut response = Err(anyhow!("connect timeout").into());
+  let mut response: Option<Result<T, reqwest::Error>> = None;
 
   for _ in 0u8..3 {
     match (f)().await? {
       Ok(t) => return Ok(t),
       Err(e) => {
         if e.is_timeout() {
-          response = Err(SendError(e.to_string()).into());
+          response = Some(Err(e));
           continue;
         }
-        return Err(SendError(e.to_string()).into());
+        return Err(e);
       }
     }
   }
 
-  response
+  response.unwrap()
 }
 
 #[derive(Deserialize, Debug)]