* Delete local object on fetch when receiving HTTP 410, split fetcher (fixes #1256)
* Removing submodules
* Trying to re-init submodule
* Trying to re-init submodule 2
* Trying to re-init submodule 3
* Logging line.
* Removing submodules
* Adding again.
* Adding again 2.
* Adding again 3.
* Adding again 4.
* Adding again 5.
* Adding again 6.
* Adding again 7.
* Adding again 8.
* Adding again 9.
* Add more clippy lints, remove dbg!() statement
* Adding again 10.
* Adding again 11.
* Adding again 12.
Co-authored-by: Felix Ableitner <me@nutomic.com>
image: node:15-alpine3.12
commands:
- apk add git
- - git submodule init
- - git submodule update --recursive --remote
+ - git submodule update --init --recursive --remote
+ - find docs/
- name: chown repo
image: ekidd/rust-musl-builder:1.47.0
commands:
- chown 1000:1000 . -R
+ - name: check documentation build
+ image: ekidd/rust-musl-builder:1.47.0
+ commands:
+ - cargo install mdbook --git https://github.com/Ruin0x11/mdBook.git --branch localization --rev d06249b
+ - mdbook build docs/
+
- name: check formatting
image: rustdocker/rust:nightly
commands:
- name: cargo clippy
image: ekidd/rust-musl-builder:1.47.0
commands:
- - cargo clippy --workspace --tests --all-targets --all-features -- -D warnings
-
- - name: check documentation build
- image: ekidd/rust-musl-builder:1.47.0
- commands:
- - cargo install mdbook --git https://github.com/Ruin0x11/mdBook.git --branch localization --rev d06249b
- - mdbook build docs/
+ - cargo clippy --workspace --tests --all-targets --all-features -- -D warnings -D deprecated -D clippy::perf -D clippy::complexity -D clippy::dbg_macro
- name: cargo test
image: ekidd/rust-musl-builder:1.47.0
[submodule "docs"]
path = docs
- url = http://github.com/LemmyNet/lemmy-docs
- branch = master
+ url = https://github.com/LemmyNet/lemmy-docs
+ branch = main
sudo chown -R 991:991 volumes/pictrs_$Item
done
-sudo docker-compose up -d
-
-echo "Waiting for Lemmy to start..."
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8541/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8551/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8561/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8571/api/v1/site')" != "200" ]]; do sleep 1; done
-while [[ "$(curl -s -o /dev/null -w '%{http_code}' 'localhost:8581/api/v1/site')" != "200" ]]; do sleep 1; done
-echo "All instances started."
-
-sudo docker-compose logs -f
+sudo docker-compose up
};
use actix_web::web::Data;
use anyhow::Context;
-use lemmy_apub::fetcher::search_by_apub_id;
+use lemmy_apub::fetcher::search::search_by_apub_id;
use lemmy_db_queries::{
diesel_option_overwrite,
source::{category::Category_, site::Site_},
-use crate::fetcher::get_or_fetch_and_upsert_user;
+use crate::fetcher::user::get_or_fetch_and_upsert_user;
use activitystreams::{
activity::{ActorAndObjectRef, ActorAndObjectRefExt},
base::{AsBase, BaseExt},
use crate::{
activities::receive::verify_activity_domains_valid,
check_is_apub_id_valid,
- fetcher::get_or_fetch_and_upsert_user,
+ fetcher::user::get_or_fetch_and_upsert_user,
inbox::get_activity_to_and_cc,
objects::FromApub,
NoteExt,
activities::send::generate_activity_id,
activity_queue::{send_comment_mentions, send_to_community},
extensions::context::lemmy_context,
- fetcher::get_or_fetch_and_upsert_user,
+ fetcher::user::get_or_fetch_and_upsert_user,
objects::ToApub,
ActorType,
ApubLikeableType,
activity_queue::{send_activity_single_dest, send_to_community_followers},
check_is_apub_id_valid,
extensions::context::lemmy_context,
- fetcher::get_or_fetch_and_upsert_user,
+ fetcher::user::get_or_fetch_and_upsert_user,
ActorType,
};
use activitystreams::{
+++ /dev/null
-use crate::{
- check_is_apub_id_valid,
- objects::FromApub,
- ActorType,
- GroupExt,
- NoteExt,
- PageExt,
- PersonExt,
- APUB_JSON_CONTENT_TYPE,
-};
-use activitystreams::{base::BaseExt, collection::OrderedCollection, prelude::*};
-use anyhow::{anyhow, Context};
-use chrono::NaiveDateTime;
-use diesel::result::Error::NotFound;
-use lemmy_db_queries::{source::user::User, ApubObject, Crud, Joinable, SearchType};
-use lemmy_db_schema::{
- naive_now,
- source::{
- comment::Comment,
- community::{Community, CommunityModerator, CommunityModeratorForm},
- post::Post,
- user::User_,
- },
-};
-use lemmy_db_views::{comment_view::CommentView, post_view::PostView};
-use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe};
-use lemmy_structs::{blocking, site::SearchResponse};
-use lemmy_utils::{
- location_info,
- request::{retry, RecvError},
- settings::Settings,
- LemmyError,
-};
-use lemmy_websocket::LemmyContext;
-use log::debug;
-use reqwest::Client;
-use serde::Deserialize;
-use std::{fmt::Debug, time::Duration};
-use url::Url;
-
-static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
-static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
-
-/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
-/// fetch through the search).
-///
-/// Tests are passing with a value of 5, so 10 should be safe for production.
-static MAX_REQUEST_NUMBER: i32 = 10;
-
-/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
-/// timeouts etc.
-async fn fetch_remote_object<Response>(
- client: &Client,
- url: &Url,
- recursion_counter: &mut i32,
-) -> Result<Response, LemmyError>
-where
- Response: for<'de> Deserialize<'de>,
-{
- *recursion_counter += 1;
- if *recursion_counter > MAX_REQUEST_NUMBER {
- return Err(anyhow!("Maximum recursion depth reached").into());
- }
- check_is_apub_id_valid(&url)?;
-
- let timeout = Duration::from_secs(60);
-
- let json = retry(|| {
- client
- .get(url.as_str())
- .header("Accept", APUB_JSON_CONTENT_TYPE)
- .timeout(timeout)
- .send()
- })
- .await?
- .json()
- .await
- .map_err(|e| {
- debug!("Receive error, {}", e);
- RecvError(e.to_string())
- })?;
-
- Ok(json)
-}
-
-/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
-#[serde(untagged)]
-#[derive(serde::Deserialize, Debug)]
-enum SearchAcceptedObjects {
- Person(Box<PersonExt>),
- Group(Box<GroupExt>),
- Page(Box<PageExt>),
- Comment(Box<NoteExt>),
-}
-
-/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
-///
-/// Some working examples for use with the `docker/federation/` setup:
-/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
-/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
-/// http://lemmy_gamma:8561/post/3
-/// http://lemmy_delta:8571/comment/2
-pub async fn search_by_apub_id(
- query: &str,
- context: &LemmyContext,
-) -> Result<SearchResponse, LemmyError> {
- // Parse the shorthand query url
- let query_url = if query.contains('@') {
- debug!("Search for {}", query);
- let split = query.split('@').collect::<Vec<&str>>();
-
- // User type will look like ['', username, instance]
- // Community will look like [!community, instance]
- let (name, instance) = if split.len() == 3 {
- (format!("/u/{}", split[1]), split[2])
- } else if split.len() == 2 {
- if split[0].contains('!') {
- let split2 = split[0].split('!').collect::<Vec<&str>>();
- (format!("/c/{}", split2[1]), split[1])
- } else {
- return Err(anyhow!("Invalid search query: {}", query).into());
- }
- } else {
- return Err(anyhow!("Invalid search query: {}", query).into());
- };
-
- let url = format!(
- "{}://{}{}",
- Settings::get().get_protocol_string(),
- instance,
- name
- );
- Url::parse(&url)?
- } else {
- Url::parse(&query)?
- };
-
- let mut response = SearchResponse {
- type_: SearchType::All.to_string(),
- comments: vec![],
- posts: vec![],
- communities: vec![],
- users: vec![],
- };
-
- let domain = query_url.domain().context("url has no domain")?;
- let recursion_counter = &mut 0;
- let response = match fetch_remote_object::<SearchAcceptedObjects>(
- context.client(),
- &query_url,
- recursion_counter,
- )
- .await?
- {
- SearchAcceptedObjects::Person(p) => {
- let user_uri = p.inner.id(domain)?.context("person has no id")?;
-
- let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
-
- response.users = vec![
- blocking(context.pool(), move |conn| {
- UserViewSafe::read(conn, user.id)
- })
- .await??,
- ];
-
- response
- }
- SearchAcceptedObjects::Group(g) => {
- let community_uri = g.inner.id(domain)?.context("group has no id")?;
-
- let community =
- get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
-
- response.communities = vec![
- blocking(context.pool(), move |conn| {
- CommunityView::read(conn, community.id, None)
- })
- .await??,
- ];
-
- response
- }
- SearchAcceptedObjects::Page(p) => {
- let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
-
- response.posts =
- vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
-
- response
- }
- SearchAcceptedObjects::Comment(c) => {
- let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
-
- response.comments = vec![
- blocking(context.pool(), move |conn| {
- CommentView::read(conn, c.id, None)
- })
- .await??,
- ];
-
- response
- }
- };
-
- Ok(response)
-}
-
-/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
-/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
-///
-/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
-/// Otherwise it is fetched from the remote instance, stored and returned.
-pub(crate) async fn get_or_fetch_and_upsert_actor(
- apub_id: &Url,
- context: &LemmyContext,
- recursion_counter: &mut i32,
-) -> Result<Box<dyn ActorType>, LemmyError> {
- let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
- let actor: Box<dyn ActorType> = match community {
- Ok(c) => Box::new(c),
- Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
- };
- Ok(actor)
-}
-
-/// Get a user from its apub ID.
-///
-/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
-/// Otherwise it is fetched from the remote instance, stored and returned.
-pub(crate) async fn get_or_fetch_and_upsert_user(
- apub_id: &Url,
- context: &LemmyContext,
- recursion_counter: &mut i32,
-) -> Result<User_, LemmyError> {
- let apub_id_owned = apub_id.to_owned();
- let user = blocking(context.pool(), move |conn| {
- User_::read_from_apub_id(conn, apub_id_owned.as_ref())
- })
- .await?;
-
- match user {
- // If its older than a day, re-fetch it
- Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
- debug!("Fetching and updating from remote user: {}", apub_id);
- let person =
- fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
- // If fetching failed, return the existing data.
- if person.is_err() {
- return Ok(u);
- }
-
- let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
-
- let user_id = user.id;
- blocking(context.pool(), move |conn| {
- User_::mark_as_updated(conn, user_id)
- })
- .await??;
-
- Ok(user)
- }
- Ok(u) => Ok(u),
- Err(NotFound {}) => {
- debug!("Fetching and creating remote user: {}", apub_id);
- let person =
- fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
-
- let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
-
- Ok(user)
- }
- Err(e) => Err(e.into()),
- }
-}
-
-/// Determines when a remote actor should be refetched from its instance. In release builds, this is
-/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
-/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
-///
-/// TODO it won't pick up new avatars, summaries etc until a day after.
-/// Actors need an "update" activity pushed to other servers to fix this.
-fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
- let update_interval = if cfg!(debug_assertions) {
- // avoid infinite loop when fetching community outbox
- chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
- } else {
- chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
- };
- last_refreshed.lt(&(naive_now() - update_interval))
-}
-
-/// Get a community from its apub ID.
-///
-/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
-/// Otherwise it is fetched from the remote instance, stored and returned.
-pub(crate) async fn get_or_fetch_and_upsert_community(
- apub_id: &Url,
- context: &LemmyContext,
- recursion_counter: &mut i32,
-) -> Result<Community, LemmyError> {
- let apub_id_owned = apub_id.to_owned();
- let community = blocking(context.pool(), move |conn| {
- Community::read_from_apub_id(conn, apub_id_owned.as_str())
- })
- .await?;
-
- match community {
- Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
- debug!("Fetching and updating from remote community: {}", apub_id);
- fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
- }
- Ok(c) => Ok(c),
- Err(NotFound {}) => {
- debug!("Fetching and creating remote community: {}", apub_id);
- fetch_remote_community(apub_id, context, None, recursion_counter).await
- }
- Err(e) => Err(e.into()),
- }
-}
-
-/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
-/// is set, this is an update for a community which is already known locally. If not, we don't know
-/// the community yet and also pull the outbox, to get some initial posts.
-async fn fetch_remote_community(
- apub_id: &Url,
- context: &LemmyContext,
- old_community: Option<Community>,
- recursion_counter: &mut i32,
-) -> Result<Community, LemmyError> {
- let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
- // If fetching failed, return the existing data.
- if let Some(ref c) = old_community {
- if group.is_err() {
- return Ok(c.to_owned());
- }
- }
-
- let group = group?;
- let community =
- Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
-
- // Also add the community moderators too
- let attributed_to = group.inner.attributed_to().context(location_info!())?;
- let creator_and_moderator_uris: Vec<&Url> = attributed_to
- .as_many()
- .context(location_info!())?
- .iter()
- .map(|a| a.as_xsd_any_uri().context(""))
- .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
-
- let mut creator_and_moderators = Vec::new();
-
- for uri in creator_and_moderator_uris {
- let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
-
- creator_and_moderators.push(c_or_m);
- }
-
- // TODO: need to make this work to update mods of existing communities
- if old_community.is_none() {
- let community_id = community.id;
- blocking(context.pool(), move |conn| {
- for mod_ in creator_and_moderators {
- let community_moderator_form = CommunityModeratorForm {
- community_id,
- user_id: mod_.id,
- };
-
- CommunityModerator::join(conn, &community_moderator_form)?;
- }
- Ok(()) as Result<(), LemmyError>
- })
- .await??;
- }
-
- // fetch outbox (maybe make this conditional)
- let outbox = fetch_remote_object::<OrderedCollection>(
- context.client(),
- &community.get_outbox_url()?,
- recursion_counter,
- )
- .await?;
- let outbox_items = outbox.items().context(location_info!())?.clone();
- let mut outbox_items = outbox_items.many().context(location_info!())?;
- if outbox_items.len() > 20 {
- outbox_items = outbox_items[0..20].to_vec();
- }
- for o in outbox_items {
- let page = PageExt::from_any_base(o)?.context(location_info!())?;
- let page_id = page.id_unchecked().context(location_info!())?;
-
- // The post creator may be from a blocked instance, if it errors, then skip it
- if check_is_apub_id_valid(page_id).is_err() {
- continue;
- }
- Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
- // TODO: we need to send a websocket update here
- }
-
- Ok(community)
-}
-
-/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
-/// pulled from its apub ID, inserted and returned.
-///
-/// The parent community is also pulled if necessary. Comments are not pulled.
-pub(crate) async fn get_or_fetch_and_insert_post(
- post_ap_id: &Url,
- context: &LemmyContext,
- recursion_counter: &mut i32,
-) -> Result<Post, LemmyError> {
- let post_ap_id_owned = post_ap_id.to_owned();
- let post = blocking(context.pool(), move |conn| {
- Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
- })
- .await?;
-
- match post {
- Ok(p) => Ok(p),
- Err(NotFound {}) => {
- debug!("Fetching and creating remote post: {}", post_ap_id);
- let page =
- fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
- let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
-
- Ok(post)
- }
- Err(e) => Err(e.into()),
- }
-}
-
-/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
-/// pulled from its apub ID, inserted and returned.
-///
-/// The parent community, post and comment are also pulled if necessary.
-pub(crate) async fn get_or_fetch_and_insert_comment(
- comment_ap_id: &Url,
- context: &LemmyContext,
- recursion_counter: &mut i32,
-) -> Result<Comment, LemmyError> {
- let comment_ap_id_owned = comment_ap_id.to_owned();
- let comment = blocking(context.pool(), move |conn| {
- Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
- })
- .await?;
-
- match comment {
- Ok(p) => Ok(p),
- Err(NotFound {}) => {
- debug!(
- "Fetching and creating remote comment and its parents: {}",
- comment_ap_id
- );
- let comment =
- fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
- let comment = Comment::from_apub(
- &comment,
- context,
- comment_ap_id.to_owned(),
- recursion_counter,
- )
- .await?;
-
- let post_id = comment.post_id;
- let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
- if post.locked {
- return Err(anyhow!("Post is locked").into());
- }
-
- Ok(comment)
- }
- Err(e) => Err(e.into()),
- }
-}
--- /dev/null
+use crate::{
+ check_is_apub_id_valid,
+ fetcher::{
+ fetch::fetch_remote_object,
+ get_or_fetch_and_upsert_user,
+ is_deleted,
+ should_refetch_actor,
+ },
+ objects::FromApub,
+ ActorType,
+ GroupExt,
+ PageExt,
+};
+use activitystreams::{
+ base::{BaseExt, ExtendsExt},
+ collection::{CollectionExt, OrderedCollection},
+ object::ObjectExt,
+};
+use anyhow::Context;
+use diesel::result::Error::NotFound;
+use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
+use lemmy_db_schema::source::{
+ community::{Community, CommunityModerator, CommunityModeratorForm},
+ post::Post,
+};
+use lemmy_structs::blocking;
+use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// Get a community from its apub ID.
+///
+/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
+/// Otherwise it is fetched from the remote instance, stored and returned.
+pub(crate) async fn get_or_fetch_and_upsert_community(
+ apub_id: &Url,
+ context: &LemmyContext,
+ recursion_counter: &mut i32,
+) -> Result<Community, LemmyError> {
+ let apub_id_owned = apub_id.to_owned();
+ let community = blocking(context.pool(), move |conn| {
+ Community::read_from_apub_id(conn, apub_id_owned.as_str())
+ })
+ .await?;
+
+ match community {
+ Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
+ debug!("Fetching and updating from remote community: {}", apub_id);
+ fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
+ }
+ Ok(c) => Ok(c),
+ Err(NotFound {}) => {
+ debug!("Fetching and creating remote community: {}", apub_id);
+ fetch_remote_community(apub_id, context, None, recursion_counter).await
+ }
+ Err(e) => Err(e.into()),
+ }
+}
+
+/// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
+/// is set, this is an update for a community which is already known locally. If not, we don't know
+/// the community yet and also pull the outbox, to get some initial posts.
+async fn fetch_remote_community(
+ apub_id: &Url,
+ context: &LemmyContext,
+ old_community: Option<Community>,
+ recursion_counter: &mut i32,
+) -> Result<Community, LemmyError> {
+ let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
+
+ if let Some(c) = old_community.to_owned() {
+ if is_deleted(&group) {
+ blocking(context.pool(), move |conn| {
+ Community::update_deleted(conn, c.id, true)
+ })
+ .await??;
+ } else if group.is_err() {
+ // If fetching failed, return the existing data.
+ return Ok(c);
+ }
+ }
+
+ let group = group?;
+ let community =
+ Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
+
+ // Also add the community moderators too
+ let attributed_to = group.inner.attributed_to().context(location_info!())?;
+ let creator_and_moderator_uris: Vec<&Url> = attributed_to
+ .as_many()
+ .context(location_info!())?
+ .iter()
+ .map(|a| a.as_xsd_any_uri().context(""))
+ .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
+
+ let mut creator_and_moderators = Vec::new();
+
+ for uri in creator_and_moderator_uris {
+ let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
+
+ creator_and_moderators.push(c_or_m);
+ }
+
+ // TODO: need to make this work to update mods of existing communities
+ if old_community.is_none() {
+ let community_id = community.id;
+ blocking(context.pool(), move |conn| {
+ for mod_ in creator_and_moderators {
+ let community_moderator_form = CommunityModeratorForm {
+ community_id,
+ user_id: mod_.id,
+ };
+
+ CommunityModerator::join(conn, &community_moderator_form)?;
+ }
+ Ok(()) as Result<(), LemmyError>
+ })
+ .await??;
+ }
+
+ // fetch outbox (maybe make this conditional)
+ let outbox = fetch_remote_object::<OrderedCollection>(
+ context.client(),
+ &community.get_outbox_url()?,
+ recursion_counter,
+ )
+ .await?;
+ let outbox_items = outbox.items().context(location_info!())?.clone();
+ let mut outbox_items = outbox_items.many().context(location_info!())?;
+ if outbox_items.len() > 20 {
+ outbox_items = outbox_items[0..20].to_vec();
+ }
+ for o in outbox_items {
+ let page = PageExt::from_any_base(o)?.context(location_info!())?;
+ let page_id = page.id_unchecked().context(location_info!())?;
+
+ // The post creator may be from a blocked instance, if it errors, then skip it
+ if check_is_apub_id_valid(page_id).is_err() {
+ continue;
+ }
+ Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
+ // TODO: we need to send a websocket update here
+ }
+
+ Ok(community)
+}
--- /dev/null
+use crate::{check_is_apub_id_valid, APUB_JSON_CONTENT_TYPE};
+use anyhow::anyhow;
+use lemmy_utils::{request::retry, LemmyError};
+use reqwest::{Client, StatusCode};
+use serde::Deserialize;
+use std::time::Duration;
+use thiserror::Error;
+use url::Url;
+
+/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
+/// fetch through the search).
+///
+/// Tests are passing with a value of 5, so 10 should be safe for production.
+static MAX_REQUEST_NUMBER: i32 = 10;
+
+#[derive(Debug, Error)]
+pub(in crate::fetcher) struct FetchError {
+ pub inner: anyhow::Error,
+ pub status_code: Option<StatusCode>,
+}
+
+impl From<LemmyError> for FetchError {
+ fn from(t: LemmyError) -> Self {
+ FetchError {
+ inner: t.inner,
+ status_code: None,
+ }
+ }
+}
+
+impl From<reqwest::Error> for FetchError {
+ fn from(t: reqwest::Error) -> Self {
+ let status = t.status();
+ FetchError {
+ inner: t.into(),
+ status_code: status,
+ }
+ }
+}
+
+impl std::fmt::Display for FetchError {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ std::fmt::Display::fmt(&self, f)
+ }
+}
+
+/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
+/// timeouts etc.
+pub(in crate::fetcher) async fn fetch_remote_object<Response>(
+ client: &Client,
+ url: &Url,
+ recursion_counter: &mut i32,
+) -> Result<Response, FetchError>
+where
+ Response: for<'de> Deserialize<'de> + std::fmt::Debug,
+{
+ *recursion_counter += 1;
+ if *recursion_counter > MAX_REQUEST_NUMBER {
+ return Err(LemmyError::from(anyhow!("Maximum recursion depth reached")).into());
+ }
+ check_is_apub_id_valid(&url)?;
+
+ let timeout = Duration::from_secs(60);
+
+ let res = retry(|| {
+ client
+ .get(url.as_str())
+ .header("Accept", APUB_JSON_CONTENT_TYPE)
+ .timeout(timeout)
+ .send()
+ })
+ .await?;
+
+ if res.status() == StatusCode::GONE {
+ return Err(FetchError {
+ inner: anyhow!("Remote object {} was deleted", url),
+ status_code: Some(res.status()),
+ });
+ }
+
+ Ok(res.json().await?)
+}
--- /dev/null
+pub(crate) mod community;
+mod fetch;
+pub(crate) mod objects;
+pub mod search;
+pub(crate) mod user;
+
+use crate::{
+ fetcher::{
+ community::get_or_fetch_and_upsert_community,
+ fetch::FetchError,
+ user::get_or_fetch_and_upsert_user,
+ },
+ ActorType,
+};
+use chrono::NaiveDateTime;
+use http::StatusCode;
+use lemmy_db_schema::naive_now;
+use lemmy_utils::LemmyError;
+use lemmy_websocket::LemmyContext;
+use serde::Deserialize;
+use url::Url;
+
+static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
+static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
+
+fn is_deleted<Response>(fetch_response: &Result<Response, FetchError>) -> bool
+where
+ Response: for<'de> Deserialize<'de>,
+{
+ if let Err(e) = fetch_response {
+ if let Some(status) = e.status_code {
+ if status == StatusCode::GONE {
+ return true;
+ }
+ }
+ }
+ false
+}
+
+/// Get a remote actor from its apub ID (either a user or a community). Thin wrapper around
+/// `get_or_fetch_and_upsert_user()` and `get_or_fetch_and_upsert_community()`.
+///
+/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
+/// Otherwise it is fetched from the remote instance, stored and returned.
+pub(crate) async fn get_or_fetch_and_upsert_actor(
+ apub_id: &Url,
+ context: &LemmyContext,
+ recursion_counter: &mut i32,
+) -> Result<Box<dyn ActorType>, LemmyError> {
+ let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
+ let actor: Box<dyn ActorType> = match community {
+ Ok(c) => Box::new(c),
+ Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
+ };
+ Ok(actor)
+}
+
+/// Determines when a remote actor should be refetched from its instance. In release builds, this is
+/// `ACTOR_REFETCH_INTERVAL_SECONDS` after the last refetch, in debug builds
+/// `ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG`.
+///
+/// TODO it won't pick up new avatars, summaries etc until a day after.
+/// Actors need an "update" activity pushed to other servers to fix this.
+fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
+ let update_interval = if cfg!(debug_assertions) {
+ // avoid infinite loop when fetching community outbox
+ chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
+ } else {
+ chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
+ };
+ last_refreshed.lt(&(naive_now() - update_interval))
+}
--- /dev/null
+use crate::{fetcher::fetch::fetch_remote_object, objects::FromApub, NoteExt, PageExt};
+use anyhow::anyhow;
+use diesel::result::Error::NotFound;
+use lemmy_db_queries::{ApubObject, Crud};
+use lemmy_db_schema::source::{comment::Comment, post::Post};
+use lemmy_structs::blocking;
+use lemmy_utils::LemmyError;
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is
+/// pulled from its apub ID, inserted and returned.
+///
+/// The parent community is also pulled if necessary. Comments are not pulled.
+pub(crate) async fn get_or_fetch_and_insert_post(
+ post_ap_id: &Url,
+ context: &LemmyContext,
+ recursion_counter: &mut i32,
+) -> Result<Post, LemmyError> {
+ let post_ap_id_owned = post_ap_id.to_owned();
+ let post = blocking(context.pool(), move |conn| {
+ Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
+ })
+ .await?;
+
+ match post {
+ Ok(p) => Ok(p),
+ Err(NotFound {}) => {
+ debug!("Fetching and creating remote post: {}", post_ap_id);
+ let page =
+ fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
+ let post = Post::from_apub(&page, context, post_ap_id.to_owned(), recursion_counter).await?;
+
+ Ok(post)
+ }
+ Err(e) => Err(e.into()),
+ }
+}
+
+/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is
+/// pulled from its apub ID, inserted and returned.
+///
+/// The parent community, post and comment are also pulled if necessary.
+pub(crate) async fn get_or_fetch_and_insert_comment(
+ comment_ap_id: &Url,
+ context: &LemmyContext,
+ recursion_counter: &mut i32,
+) -> Result<Comment, LemmyError> {
+ let comment_ap_id_owned = comment_ap_id.to_owned();
+ let comment = blocking(context.pool(), move |conn| {
+ Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
+ })
+ .await?;
+
+ match comment {
+ Ok(p) => Ok(p),
+ Err(NotFound {}) => {
+ debug!(
+ "Fetching and creating remote comment and its parents: {}",
+ comment_ap_id
+ );
+ let comment =
+ fetch_remote_object::<NoteExt>(context.client(), comment_ap_id, recursion_counter).await?;
+ let comment = Comment::from_apub(
+ &comment,
+ context,
+ comment_ap_id.to_owned(),
+ recursion_counter,
+ )
+ .await?;
+
+ let post_id = comment.post_id;
+ let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
+ if post.locked {
+ return Err(anyhow!("Post is locked").into());
+ }
+
+ Ok(comment)
+ }
+ Err(e) => Err(e.into()),
+ }
+}
--- /dev/null
+use crate::{
+ fetcher::{
+ fetch::fetch_remote_object,
+ get_or_fetch_and_upsert_community,
+ get_or_fetch_and_upsert_user,
+ is_deleted,
+ },
+ find_object_by_id,
+ objects::FromApub,
+ GroupExt,
+ NoteExt,
+ Object,
+ PageExt,
+ PersonExt,
+};
+use activitystreams::base::BaseExt;
+use anyhow::{anyhow, Context};
+use lemmy_db_queries::{
+ source::{
+ comment::Comment_,
+ community::Community_,
+ post::Post_,
+ private_message::PrivateMessage_,
+ user::User,
+ },
+ SearchType,
+};
+use lemmy_db_schema::source::{
+ comment::Comment,
+ community::Community,
+ post::Post,
+ private_message::PrivateMessage,
+ user::User_,
+};
+use lemmy_db_views::{comment_view::CommentView, post_view::PostView};
+use lemmy_db_views_actor::{community_view::CommunityView, user_view::UserViewSafe};
+use lemmy_structs::{blocking, site::SearchResponse};
+use lemmy_utils::{settings::Settings, LemmyError};
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// The types of ActivityPub objects that can be fetched directly by searching for their ID.
+#[serde(untagged)]
+#[derive(serde::Deserialize, Debug)]
+enum SearchAcceptedObjects {
+ Person(Box<PersonExt>),
+ Group(Box<GroupExt>),
+ Page(Box<PageExt>),
+ Comment(Box<NoteExt>),
+}
+
+/// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
+///
+/// Some working examples for use with the `docker/federation/` setup:
+/// http://lemmy_alpha:8541/c/main, or !main@lemmy_alpha:8541
+/// http://lemmy_beta:8551/u/lemmy_alpha, or @lemmy_beta@lemmy_beta:8551
+/// http://lemmy_gamma:8561/post/3
+/// http://lemmy_delta:8571/comment/2
+pub async fn search_by_apub_id(
+ query: &str,
+ context: &LemmyContext,
+) -> Result<SearchResponse, LemmyError> {
+ // Parse the shorthand query url
+ let query_url = if query.contains('@') {
+ debug!("Search for {}", query);
+ let split = query.split('@').collect::<Vec<&str>>();
+
+ // User type will look like ['', username, instance]
+ // Community will look like [!community, instance]
+ let (name, instance) = if split.len() == 3 {
+ (format!("/u/{}", split[1]), split[2])
+ } else if split.len() == 2 {
+ if split[0].contains('!') {
+ let split2 = split[0].split('!').collect::<Vec<&str>>();
+ (format!("/c/{}", split2[1]), split[1])
+ } else {
+ return Err(anyhow!("Invalid search query: {}", query).into());
+ }
+ } else {
+ return Err(anyhow!("Invalid search query: {}", query).into());
+ };
+
+ let url = format!(
+ "{}://{}{}",
+ Settings::get().get_protocol_string(),
+ instance,
+ name
+ );
+ Url::parse(&url)?
+ } else {
+ Url::parse(&query)?
+ };
+
+ let recursion_counter = &mut 0;
+ let fetch_response =
+ fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url, recursion_counter)
+ .await;
+ if is_deleted(&fetch_response) {
+ delete_object_locally(&query_url, context).await?;
+ }
+
+ build_response(fetch_response?, query_url, recursion_counter, context).await
+}
+
+async fn build_response(
+ fetch_response: SearchAcceptedObjects,
+ query_url: Url,
+ recursion_counter: &mut i32,
+ context: &LemmyContext,
+) -> Result<SearchResponse, LemmyError> {
+ let domain = query_url.domain().context("url has no domain")?;
+ let mut response = SearchResponse {
+ type_: SearchType::All.to_string(),
+ comments: vec![],
+ posts: vec![],
+ communities: vec![],
+ users: vec![],
+ };
+
+ match fetch_response {
+ SearchAcceptedObjects::Person(p) => {
+ let user_uri = p.inner.id(domain)?.context("person has no id")?;
+
+ let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
+
+ response.users = vec![
+ blocking(context.pool(), move |conn| {
+ UserViewSafe::read(conn, user.id)
+ })
+ .await??,
+ ];
+ }
+ SearchAcceptedObjects::Group(g) => {
+ let community_uri = g.inner.id(domain)?.context("group has no id")?;
+
+ let community =
+ get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
+
+ response.communities = vec![
+ blocking(context.pool(), move |conn| {
+ CommunityView::read(conn, community.id, None)
+ })
+ .await??,
+ ];
+ }
+ SearchAcceptedObjects::Page(p) => {
+ let p = Post::from_apub(&p, context, query_url, recursion_counter).await?;
+
+ response.posts =
+ vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
+ }
+ SearchAcceptedObjects::Comment(c) => {
+ let c = Comment::from_apub(&c, context, query_url, recursion_counter).await?;
+
+ response.comments = vec![
+ blocking(context.pool(), move |conn| {
+ CommentView::read(conn, c.id, None)
+ })
+ .await??,
+ ];
+ }
+ };
+
+ Ok(response)
+}
+
+async fn delete_object_locally(query_url: &Url, context: &LemmyContext) -> Result<(), LemmyError> {
+ let res = find_object_by_id(context, query_url.to_owned()).await?;
+ match res {
+ Object::Comment(c) => {
+ blocking(context.pool(), move |conn| {
+ Comment::update_deleted(conn, c.id, true)
+ })
+ .await??;
+ }
+ Object::Post(p) => {
+ blocking(context.pool(), move |conn| {
+ Post::update_deleted(conn, p.id, true)
+ })
+ .await??;
+ }
+ Object::User(u) => {
+ // TODO: implement update_deleted() for user, move it to ApubObject trait
+ blocking(context.pool(), move |conn| {
+ User_::delete_account(conn, u.id)
+ })
+ .await??;
+ }
+ Object::Community(c) => {
+ blocking(context.pool(), move |conn| {
+ Community::update_deleted(conn, c.id, true)
+ })
+ .await??;
+ }
+ Object::PrivateMessage(pm) => {
+ blocking(context.pool(), move |conn| {
+ PrivateMessage::update_deleted(conn, pm.id, true)
+ })
+ .await??;
+ }
+ }
+ Err(anyhow!("Object was deleted").into())
+}
--- /dev/null
+use crate::{
+ fetcher::{fetch::fetch_remote_object, is_deleted, should_refetch_actor},
+ objects::FromApub,
+ PersonExt,
+};
+use anyhow::anyhow;
+use diesel::result::Error::NotFound;
+use lemmy_db_queries::{source::user::User, ApubObject};
+use lemmy_db_schema::source::user::User_;
+use lemmy_structs::blocking;
+use lemmy_utils::LemmyError;
+use lemmy_websocket::LemmyContext;
+use log::debug;
+use url::Url;
+
+/// Get a user from its apub ID.
+///
+/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
+/// Otherwise it is fetched from the remote instance, stored and returned.
+pub(crate) async fn get_or_fetch_and_upsert_user(
+ apub_id: &Url,
+ context: &LemmyContext,
+ recursion_counter: &mut i32,
+) -> Result<User_, LemmyError> {
+ let apub_id_owned = apub_id.to_owned();
+ let user = blocking(context.pool(), move |conn| {
+ User_::read_from_apub_id(conn, apub_id_owned.as_ref())
+ })
+ .await?;
+
+ match user {
+ // If its older than a day, re-fetch it
+ Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
+ debug!("Fetching and updating from remote user: {}", apub_id);
+ let person =
+ fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await;
+
+ if is_deleted(&person) {
+ // TODO: use User_::update_deleted() once implemented
+ blocking(context.pool(), move |conn| {
+ User_::delete_account(conn, u.id)
+ })
+ .await??;
+ return Err(anyhow!("User was deleted by remote instance").into());
+ } else if person.is_err() {
+ return Ok(u);
+ }
+
+ let user = User_::from_apub(&person?, context, apub_id.to_owned(), recursion_counter).await?;
+
+ let user_id = user.id;
+ blocking(context.pool(), move |conn| {
+ User_::mark_as_updated(conn, user_id)
+ })
+ .await??;
+
+ Ok(user)
+ }
+ Ok(u) => Ok(u),
+ Err(NotFound {}) => {
+ debug!("Fetching and creating remote user: {}", apub_id);
+ let person =
+ fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
+
+ let user = User_::from_apub(&person, context, apub_id.to_owned(), recursion_counter).await?;
+
+ Ok(user)
+ }
+ Err(e) => Err(e.into()),
+ }
+}
use crate::{
extensions::context::lemmy_context,
- http::create_apub_response,
+ http::{create_apub_response, create_apub_tombstone_response},
objects::ToApub,
ActorType,
};
context: web::Data<LemmyContext>,
) -> Result<HttpResponse<Body>, LemmyError> {
let user_name = info.into_inner().user_name;
+ // TODO: this needs to be able to read deleted users, so that it can send tombstones
let user = blocking(context.pool(), move |conn| {
User_::find_by_email_or_username(conn, &user_name)
})
.await??;
- let u = user.to_apub(context.pool()).await?;
- Ok(create_apub_response(&u))
+
+ if !user.deleted {
+ let apub = user.to_apub(context.pool()).await?;
+
+ Ok(create_apub_response(&apub))
+ } else {
+ Ok(create_apub_tombstone_response(&user.to_tombstone()?))
+ }
}
pub async fn get_apub_user_outbox(
receive_unhandled_activity,
verify_activity_domains_valid,
},
- fetcher::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
+ fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
+ find_post_or_comment_by_id,
inbox::is_addressed_to_public,
+ PostOrComment,
};
use activitystreams::{
activity::{Create, Delete, Dislike, Like, Remove, Undo, Update},
};
use anyhow::Context;
use diesel::result::Error::NotFound;
-use lemmy_db_queries::{ApubObject, Crud};
-use lemmy_db_schema::source::{comment::Comment, post::Post, site::Site};
+use lemmy_db_queries::Crud;
+use lemmy_db_schema::source::site::Site;
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
}
}
-enum PostOrComment {
- Comment(Comment),
- Post(Post),
-}
-
-/// Tries to find a post or comment in the local database, without any network requests.
-/// This is used to handle deletions and removals, because in case we dont have the object, we can
-/// simply ignore the activity.
-async fn find_post_or_comment_by_id(
- context: &LemmyContext,
- apub_id: Url,
-) -> Result<PostOrComment, LemmyError> {
- let ap_id = apub_id.to_string();
- let post = blocking(context.pool(), move |conn| {
- Post::read_from_apub_id(conn, &ap_id)
- })
- .await?;
- if let Ok(p) = post {
- return Ok(PostOrComment::Post(p));
- }
-
- let ap_id = apub_id.to_string();
- let comment = blocking(context.pool(), move |conn| {
- Comment::read_from_apub_id(conn, &ap_id)
- })
- .await?;
- if let Ok(c) = comment {
- return Ok(PostOrComment::Comment(c));
- }
-
- Err(NotFound.into())
-}
-
async fn fetch_post_or_comment_by_id(
apub_id: &Url,
context: &LemmyContext,
verify_activity_domains_valid,
},
check_is_apub_id_valid,
- fetcher::get_or_fetch_and_upsert_community,
+ fetcher::community::get_or_fetch_and_upsert_community,
inbox::{
assert_activity_not_local,
get_activity_id,
};
use activitystreams_ext::{Ext1, Ext2};
use anyhow::{anyhow, Context};
-use lemmy_db_queries::{source::activity::Activity_, DbPool};
-use lemmy_db_schema::source::{activity::Activity, user::User_};
+use diesel::NotFound;
+use lemmy_db_queries::{source::activity::Activity_, ApubObject, DbPool};
+use lemmy_db_schema::source::{
+ activity::Activity,
+ comment::Comment,
+ community::Community,
+ post::Post,
+ private_message::PrivateMessage,
+ user::User_,
+};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
.await??;
Ok(())
}
+
+pub(crate) enum PostOrComment {
+ Comment(Comment),
+ Post(Post),
+}
+
+/// Tries to find a post or comment in the local database, without any network requests.
+/// This is used to handle deletions and removals, because in case we dont have the object, we can
+/// simply ignore the activity.
+pub(crate) async fn find_post_or_comment_by_id(
+ context: &LemmyContext,
+ apub_id: Url,
+) -> Result<PostOrComment, LemmyError> {
+ let ap_id = apub_id.to_string();
+ let post = blocking(context.pool(), move |conn| {
+ Post::read_from_apub_id(conn, &ap_id)
+ })
+ .await?;
+ if let Ok(p) = post {
+ return Ok(PostOrComment::Post(p));
+ }
+
+ let ap_id = apub_id.to_string();
+ let comment = blocking(context.pool(), move |conn| {
+ Comment::read_from_apub_id(conn, &ap_id)
+ })
+ .await?;
+ if let Ok(c) = comment {
+ return Ok(PostOrComment::Comment(c));
+ }
+
+ Err(NotFound.into())
+}
+
+pub(crate) enum Object {
+ Comment(Comment),
+ Post(Post),
+ Community(Community),
+ User(User_),
+ PrivateMessage(PrivateMessage),
+}
+
+pub(crate) async fn find_object_by_id(
+ context: &LemmyContext,
+ apub_id: Url,
+) -> Result<Object, LemmyError> {
+ if let Ok(pc) = find_post_or_comment_by_id(context, apub_id.to_owned()).await {
+ return Ok(match pc {
+ PostOrComment::Post(p) => Object::Post(p),
+ PostOrComment::Comment(c) => Object::Comment(c),
+ });
+ }
+
+ let ap_id = apub_id.to_string();
+ let user = blocking(context.pool(), move |conn| {
+ User_::read_from_apub_id(conn, &ap_id)
+ })
+ .await?;
+ if let Ok(u) = user {
+ return Ok(Object::User(u));
+ }
+
+ let ap_id = apub_id.to_string();
+ let community = blocking(context.pool(), move |conn| {
+ Community::read_from_apub_id(conn, &ap_id)
+ })
+ .await?;
+ if let Ok(c) = community {
+ return Ok(Object::Community(c));
+ }
+
+ let ap_id = apub_id.to_string();
+ let private_message = blocking(context.pool(), move |conn| {
+ PrivateMessage::read_from_apub_id(conn, &ap_id)
+ })
+ .await?;
+ if let Ok(pm) = private_message {
+ return Ok(Object::PrivateMessage(pm));
+ }
+
+ Err(NotFound.into())
+}
use crate::{
extensions::context::lemmy_context,
- fetcher::{
- get_or_fetch_and_insert_comment,
- get_or_fetch_and_insert_post,
- get_or_fetch_and_upsert_user,
- },
+ fetcher::objects::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
objects::{
check_object_domain,
check_object_for_community_or_site_ban,
create_tombstone,
get_object_from_apub,
+ get_or_fetch_and_upsert_user,
get_source_markdown_value,
set_content_and_source,
FromApub,
use crate::{
extensions::{context::lemmy_context, group_extensions::GroupExtension},
- fetcher::get_or_fetch_and_upsert_user,
+ fetcher::user::get_or_fetch_and_upsert_user,
objects::{
check_object_domain,
create_tombstone,
use crate::{
check_is_apub_id_valid,
- fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
+ fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user},
inbox::community_inbox::check_community_or_site_ban,
};
use activitystreams::{
use crate::{
extensions::{context::lemmy_context, page_extension::PageExtension},
- fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
+ fetcher::{community::get_or_fetch_and_upsert_community, user::get_or_fetch_and_upsert_user},
objects::{
check_object_domain,
check_object_for_community_or_site_ban,
use crate::{
check_is_apub_id_valid,
extensions::context::lemmy_context,
- fetcher::get_or_fetch_and_upsert_user,
+ fetcher::user::get_or_fetch_and_upsert_user,
objects::{
check_object_domain,
create_tombstone,
#[error("Error receiving response, {0}")]
pub struct RecvError(pub String);
-pub async fn retry<F, Fut, T>(f: F) -> Result<T, LemmyError>
+pub async fn retry<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, reqwest::Error>>,
retry_custom(|| async { Ok((f)().await) }).await
}
-async fn retry_custom<F, Fut, T>(f: F) -> Result<T, LemmyError>
+async fn retry_custom<F, Fut, T>(f: F) -> Result<T, reqwest::Error>
where
F: Fn() -> Fut,
- Fut: Future<Output = Result<Result<T, reqwest::Error>, LemmyError>>,
+ Fut: Future<Output = Result<Result<T, reqwest::Error>, reqwest::Error>>,
{
- let mut response = Err(anyhow!("connect timeout").into());
+ let mut response: Option<Result<T, reqwest::Error>> = None;
for _ in 0u8..3 {
match (f)().await? {
Ok(t) => return Ok(t),
Err(e) => {
if e.is_timeout() {
- response = Err(SendError(e.to_string()).into());
+ response = Some(Err(e));
continue;
}
- return Err(SendError(e.to_string()).into());
+ return Err(e);
}
}
}
- response
+ response.unwrap()
}
#[derive(Deserialize, Debug)]