]> Untitled Git - lemmy.git/commitdiff
Limit amount of HTTP requests to handle activities (fixes #1221)
authorFelix Ableitner <me@nutomic.com>
Thu, 22 Oct 2020 18:27:32 +0000 (20:27 +0200)
committerFelix Ableitner <me@nutomic.com>
Thu, 22 Oct 2020 18:27:32 +0000 (20:27 +0200)
18 files changed:
lemmy_apub/src/activities/receive/comment.rs
lemmy_apub/src/activities/receive/comment_undo.rs
lemmy_apub/src/activities/receive/community.rs
lemmy_apub/src/activities/receive/mod.rs
lemmy_apub/src/activities/receive/post.rs
lemmy_apub/src/activities/receive/post_undo.rs
lemmy_apub/src/activities/send/comment.rs
lemmy_apub/src/activities/send/community.rs
lemmy_apub/src/fetcher.rs
lemmy_apub/src/inbox/community_inbox.rs
lemmy_apub/src/inbox/shared_inbox.rs
lemmy_apub/src/inbox/user_inbox.rs
lemmy_apub/src/lib.rs
lemmy_apub/src/objects/comment.rs
lemmy_apub/src/objects/community.rs
lemmy_apub/src/objects/post.rs
lemmy_apub/src/objects/private_message.rs
lemmy_apub/src/objects/user.rs

index 971248c4093266642a71639854a42f7fc2cd1e81..e7efe81485f1e9242208597c71e84c10392fe7e0 100644 (file)
@@ -25,12 +25,14 @@ use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
 pub(crate) async fn receive_create_comment(
   create: Create,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(&create, context).await?;
+  let user = get_actor_as_user(&create, context, request_counter).await?;
   let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
 
-  let comment = CommentForm::from_apub(&note, context, Some(user.actor_id()?)).await?;
+  let comment =
+    CommentForm::from_apub(&note, context, Some(user.actor_id()?), request_counter).await?;
 
   let inserted_comment =
     blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??;
@@ -71,23 +73,26 @@ pub(crate) async fn receive_create_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(create, &user, context).await?;
+  announce_if_community_is_local(create, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
 pub(crate) async fn receive_update_comment(
   update: Update,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
-  let user = get_actor_as_user(&update, context).await?;
+  let user = get_actor_as_user(&update, context, request_counter).await?;
 
-  let comment = CommentForm::from_apub(&note, context, Some(user.actor_id()?)).await?;
+  let comment =
+    CommentForm::from_apub(&note, context, Some(user.actor_id()?), request_counter).await?;
 
-  let original_comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
-    .await?
-    .id;
+  let original_comment_id =
+    get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
+      .await?
+      .id;
 
   let updated_comment = blocking(context.pool(), move |conn| {
     Comment::update(conn, original_comment_id, &comment)
@@ -126,21 +131,22 @@ pub(crate) async fn receive_update_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(update, &user, context).await?;
+  announce_if_community_is_local(update, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
 pub(crate) async fn receive_like_comment(
   like: Like,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
-  let user = get_actor_as_user(&like, context).await?;
+  let user = get_actor_as_user(&like, context, request_counter).await?;
 
-  let comment = CommentForm::from_apub(&note, context, None).await?;
+  let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
 
-  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
+  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -177,13 +183,14 @@ pub(crate) async fn receive_like_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(like, &user, context).await?;
+  announce_if_community_is_local(like, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
 pub(crate) async fn receive_dislike_comment(
   dislike: Dislike,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let note = Note::from_any_base(
     dislike
@@ -193,11 +200,11 @@ pub(crate) async fn receive_dislike_comment(
       .context(location_info!())?,
   )?
   .context(location_info!())?;
-  let user = get_actor_as_user(&dislike, context).await?;
+  let user = get_actor_as_user(&dislike, context, request_counter).await?;
 
-  let comment = CommentForm::from_apub(&note, context, None).await?;
+  let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
 
-  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
+  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -234,7 +241,7 @@ pub(crate) async fn receive_dislike_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(dislike, &user, context).await?;
+  announce_if_community_is_local(dislike, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -242,6 +249,7 @@ pub(crate) async fn receive_delete_comment(
   context: &LemmyContext,
   delete: Delete,
   comment: Comment,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let deleted_comment = blocking(context.pool(), move |conn| {
     Comment::update_deleted(conn, comment.id, true)
@@ -268,8 +276,8 @@ pub(crate) async fn receive_delete_comment(
     websocket_id: None,
   });
 
-  let user = get_actor_as_user(&delete, context).await?;
-  announce_if_community_is_local(delete, &user, context).await?;
+  let user = get_actor_as_user(&delete, context, request_counter).await?;
+  announce_if_community_is_local(delete, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
index 4c046b6fe5172ddaf6f624f182324e3a33395aa7..4d6ca29b9a06fc1860ec2c7d44ad493e73dbac7b 100644 (file)
@@ -19,14 +19,15 @@ pub(crate) async fn receive_undo_like_comment(
   undo: Undo,
   like: &Like,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(like, context).await?;
+  let user = get_actor_as_user(like, context, request_counter).await?;
   let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
 
-  let comment = CommentForm::from_apub(&note, context, None).await?;
+  let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
 
-  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
+  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -56,7 +57,7 @@ pub(crate) async fn receive_undo_like_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, &user, context).await?;
+  announce_if_community_is_local(undo, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -64,8 +65,9 @@ pub(crate) async fn receive_undo_dislike_comment(
   undo: Undo,
   dislike: &Dislike,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(dislike, context).await?;
+  let user = get_actor_as_user(dislike, context, request_counter).await?;
   let note = Note::from_any_base(
     dislike
       .object()
@@ -75,9 +77,9 @@ pub(crate) async fn receive_undo_dislike_comment(
   )?
   .context(location_info!())?;
 
-  let comment = CommentForm::from_apub(&note, context, None).await?;
+  let comment = CommentForm::from_apub(&note, context, None, request_counter).await?;
 
-  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
+  let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -107,7 +109,7 @@ pub(crate) async fn receive_undo_dislike_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, &user, context).await?;
+  announce_if_community_is_local(undo, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -115,6 +117,7 @@ pub(crate) async fn receive_undo_delete_comment(
   context: &LemmyContext,
   undo: Undo,
   comment: Comment,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let deleted_comment = blocking(context.pool(), move |conn| {
     Comment::update_deleted(conn, comment.id, false)
@@ -142,8 +145,8 @@ pub(crate) async fn receive_undo_delete_comment(
     websocket_id: None,
   });
 
-  let user = get_actor_as_user(&undo, context).await?;
-  announce_if_community_is_local(undo, &user, context).await?;
+  let user = get_actor_as_user(&undo, context, request_counter).await?;
+  announce_if_community_is_local(undo, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -151,6 +154,7 @@ pub(crate) async fn receive_undo_remove_comment(
   context: &LemmyContext,
   undo: Undo,
   comment: Comment,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let removed_comment = blocking(context.pool(), move |conn| {
     Comment::update_removed(conn, comment.id, false)
@@ -178,7 +182,7 @@ pub(crate) async fn receive_undo_remove_comment(
     websocket_id: None,
   });
 
-  let mod_ = get_actor_as_user(&undo, context).await?;
-  announce_if_community_is_local(undo, &mod_, context).await?;
+  let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
+  announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
index 7ec9c2e719c013410430d588e363c79bb4aaff90..a79a85eeb56f96ed1da2e47f2e6c1dd568ddd3b7 100644 (file)
@@ -10,6 +10,7 @@ pub(crate) async fn receive_delete_community(
   context: &LemmyContext,
   delete: Delete,
   community: Community,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let deleted_community = blocking(context.pool(), move |conn| {
     Community::update_deleted(conn, community.id, true)
@@ -32,8 +33,8 @@ pub(crate) async fn receive_delete_community(
     websocket_id: None,
   });
 
-  let user = get_actor_as_user(&delete, context).await?;
-  announce_if_community_is_local(delete, &user, context).await?;
+  let user = get_actor_as_user(&delete, context, request_counter).await?;
+  announce_if_community_is_local(delete, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -63,6 +64,7 @@ pub(crate) async fn receive_remove_community(
     websocket_id: None,
   });
 
+  // TODO: this should probably also call announce_if_community_is_local()
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -70,6 +72,7 @@ pub(crate) async fn receive_undo_delete_community(
   context: &LemmyContext,
   undo: Undo,
   community: Community,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let deleted_community = blocking(context.pool(), move |conn| {
     Community::update_deleted(conn, community.id, false)
@@ -92,8 +95,8 @@ pub(crate) async fn receive_undo_delete_community(
     websocket_id: None,
   });
 
-  let user = get_actor_as_user(&undo, context).await?;
-  announce_if_community_is_local(undo, &user, context).await?;
+  let user = get_actor_as_user(&undo, context, request_counter).await?;
+  announce_if_community_is_local(undo, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -101,6 +104,7 @@ pub(crate) async fn receive_undo_remove_community(
   context: &LemmyContext,
   undo: Undo,
   community: Community,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let removed_community = blocking(context.pool(), move |conn| {
     Community::update_removed(conn, community.id, false)
@@ -124,7 +128,7 @@ pub(crate) async fn receive_undo_remove_community(
     websocket_id: None,
   });
 
-  let mod_ = get_actor_as_user(&undo, context).await?;
-  announce_if_community_is_local(undo, &mod_, context).await?;
+  let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
+  announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
index 0003cab939d32958059d12f074db88aa25a42ef4..59c555192b66572e1f516d43d41df6ab2303b69b 100644 (file)
@@ -41,6 +41,7 @@ async fn announce_if_community_is_local<T, Kind>(
   activity: T,
   user: &User_,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<(), LemmyError>
 where
   T: AsObject<Kind>,
@@ -55,7 +56,9 @@ where
     .context(location_info!())?
     .as_xsd_any_uri()
     .context(location_info!())?;
-  let community = get_or_fetch_and_upsert_community(&community_uri, context).await?;
+  // TODO: we could just read from the local db here (and ignore if the community is not found)
+  let community =
+    get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
 
   if community.local {
     community
@@ -69,13 +72,14 @@ where
 pub(crate) async fn get_actor_as_user<T, A>(
   activity: &T,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<User_, LemmyError>
 where
   T: AsBase<A> + ActorAndObjectRef,
 {
   let actor = activity.actor()?;
   let user_uri = actor.as_single_xsd_any_uri().context(location_info!())?;
-  get_or_fetch_and_upsert_user(&user_uri, context).await
+  get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await
 }
 
 pub(crate) enum FindResults {
index b82b7922fb3fb13da6278f574ae0dbda3b85543f..60fa8db14ef6e5033e9c35aeb01c943c54cc8a89 100644 (file)
@@ -24,12 +24,13 @@ use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
 pub(crate) async fn receive_create_post(
   create: Create,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(&create, context).await?;
+  let user = get_actor_as_user(&create, context, request_counter).await?;
   let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
 
-  let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?;
+  let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?;
 
   // Using an upsert, since likes (which fetch the post), sometimes come in before the create
   // resulting in double posts.
@@ -50,21 +51,22 @@ pub(crate) async fn receive_create_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(create, &user, context).await?;
+  announce_if_community_is_local(create, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
 pub(crate) async fn receive_update_post(
   update: Update,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(&update, context).await?;
+  let user = get_actor_as_user(&update, context, request_counter).await?;
   let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
 
-  let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?;
+  let post = PostForm::from_apub(&page, context, Some(user.actor_id()?), request_counter).await?;
 
-  let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
+  let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -87,21 +89,22 @@ pub(crate) async fn receive_update_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(update, &user, context).await?;
+  announce_if_community_is_local(update, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
 pub(crate) async fn receive_like_post(
   like: Like,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(&like, context).await?;
+  let user = get_actor_as_user(&like, context, request_counter).await?;
   let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
 
-  let post = PostForm::from_apub(&page, context, None).await?;
+  let post = PostForm::from_apub(&page, context, None, request_counter).await?;
 
-  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
+  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -131,15 +134,16 @@ pub(crate) async fn receive_like_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(like, &user, context).await?;
+  announce_if_community_is_local(like, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
 pub(crate) async fn receive_dislike_post(
   dislike: Dislike,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(&dislike, context).await?;
+  let user = get_actor_as_user(&dislike, context, request_counter).await?;
   let page = PageExt::from_any_base(
     dislike
       .object()
@@ -149,9 +153,9 @@ pub(crate) async fn receive_dislike_post(
   )?
   .context(location_info!())?;
 
-  let post = PostForm::from_apub(&page, context, None).await?;
+  let post = PostForm::from_apub(&page, context, None, request_counter).await?;
 
-  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
+  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -181,7 +185,7 @@ pub(crate) async fn receive_dislike_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(dislike, &user, context).await?;
+  announce_if_community_is_local(dislike, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -189,6 +193,7 @@ pub(crate) async fn receive_delete_post(
   context: &LemmyContext,
   delete: Delete,
   post: Post,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let deleted_post = blocking(context.pool(), move |conn| {
     Post::update_deleted(conn, post.id, true)
@@ -209,8 +214,8 @@ pub(crate) async fn receive_delete_post(
     websocket_id: None,
   });
 
-  let user = get_actor_as_user(&delete, context).await?;
-  announce_if_community_is_local(delete, &user, context).await?;
+  let user = get_actor_as_user(&delete, context, request_counter).await?;
+  announce_if_community_is_local(delete, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
index e1638bc4cbb3059696775d24b9031073dc684eec..dea1a62131547c0bf93e8436d63d410af34f6920 100644 (file)
@@ -20,14 +20,15 @@ pub(crate) async fn receive_undo_like_post(
   undo: Undo,
   like: &Like,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(like, context).await?;
+  let user = get_actor_as_user(like, context, request_counter).await?;
   let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
 
-  let post = PostForm::from_apub(&page, context, None).await?;
+  let post = PostForm::from_apub(&page, context, None, request_counter).await?;
 
-  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
+  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -51,7 +52,7 @@ pub(crate) async fn receive_undo_like_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, &user, context).await?;
+  announce_if_community_is_local(undo, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -59,8 +60,9 @@ pub(crate) async fn receive_undo_dislike_post(
   undo: Undo,
   dislike: &Dislike,
   context: &LemmyContext,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
-  let user = get_actor_as_user(dislike, context).await?;
+  let user = get_actor_as_user(dislike, context, request_counter).await?;
   let page = PageExt::from_any_base(
     dislike
       .object()
@@ -70,9 +72,9 @@ pub(crate) async fn receive_undo_dislike_post(
   )?
   .context(location_info!())?;
 
-  let post = PostForm::from_apub(&page, context, None).await?;
+  let post = PostForm::from_apub(&page, context, None, request_counter).await?;
 
-  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
+  let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context, request_counter)
     .await?
     .id;
 
@@ -96,7 +98,7 @@ pub(crate) async fn receive_undo_dislike_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, &user, context).await?;
+  announce_if_community_is_local(undo, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -104,6 +106,7 @@ pub(crate) async fn receive_undo_delete_post(
   context: &LemmyContext,
   undo: Undo,
   post: Post,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let deleted_post = blocking(context.pool(), move |conn| {
     Post::update_deleted(conn, post.id, false)
@@ -124,8 +127,8 @@ pub(crate) async fn receive_undo_delete_post(
     websocket_id: None,
   });
 
-  let user = get_actor_as_user(&undo, context).await?;
-  announce_if_community_is_local(undo, &user, context).await?;
+  let user = get_actor_as_user(&undo, context, request_counter).await?;
+  announce_if_community_is_local(undo, &user, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
 
@@ -133,6 +136,7 @@ pub(crate) async fn receive_undo_remove_post(
   context: &LemmyContext,
   undo: Undo,
   post: Post,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let removed_post = blocking(context.pool(), move |conn| {
     Post::update_removed(conn, post.id, false)
@@ -154,7 +158,7 @@ pub(crate) async fn receive_undo_remove_post(
     websocket_id: None,
   });
 
-  let mod_ = get_actor_as_user(&undo, context).await?;
-  announce_if_community_is_local(undo, &mod_, context).await?;
+  let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
+  announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
   Ok(HttpResponse::Ok().finish())
 }
index bcfd779c8d73c8e6513e9a22386535b81a12fd52..185ceff2bdea8594b581714903bd4106d63b6c3e 100644 (file)
@@ -341,7 +341,7 @@ async fn collect_non_local_mentions_and_addresses(
       debug!("mention actor_id: {}", actor_id);
       addressed_ccs.push(actor_id.to_owned().to_string().parse()?);
 
-      let mention_user = get_or_fetch_and_upsert_user(&actor_id, context).await?;
+      let mention_user = get_or_fetch_and_upsert_user(&actor_id, context, &mut 0).await?;
       let shared_inbox = mention_user.get_shared_inbox_url()?;
 
       mention_inboxes.push(shared_inbox);
index 2f43f9c55f618d41bc48d6a62568e843c1b67296..f8d03579eea3708410157e54989625be23ee70a7 100644 (file)
@@ -72,7 +72,7 @@ impl ActorType for Community {
       .actor()?
       .as_single_xsd_any_uri()
       .context(location_info!())?;
-    let user = get_or_fetch_and_upsert_user(actor_uri, context).await?;
+    let user = get_or_fetch_and_upsert_user(actor_uri, context, &mut 0).await?;
 
     let mut accept = Accept::new(self.actor_id.to_owned(), follow.into_any_base()?);
     accept
index 908d1a5eacbbcc3f41df8fa7983c4afe87b2289b..43466a0f59db8444886c65e36e8570923cfeec82 100644 (file)
@@ -42,12 +42,26 @@ use url::Url;
 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
 
+/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
+/// fetch through the search).
+///
+/// Tests are passing with a value of 5, so 10 should be safe for production.
+static MAX_REQUEST_NUMBER: i32 = 10;
+
 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
 /// timeouts etc.
-async fn fetch_remote_object<Response>(client: &Client, url: &Url) -> Result<Response, LemmyError>
+async fn fetch_remote_object<Response>(
+  client: &Client,
+  url: &Url,
+  recursion_counter: &mut i32,
+) -> Result<Response, LemmyError>
 where
   Response: for<'de> Deserialize<'de>,
 {
+  *recursion_counter += 1;
+  if *recursion_counter > MAX_REQUEST_NUMBER {
+    return Err(anyhow!("Maximum recursion depth reached").into());
+  }
   check_is_apub_id_valid(&url)?;
 
   let timeout = Duration::from_secs(60);
@@ -131,62 +145,70 @@ pub async fn search_by_apub_id(
   };
 
   let domain = query_url.domain().context("url has no domain")?;
-  let response =
-    match fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url).await? {
-      SearchAcceptedObjects::Person(p) => {
-        let user_uri = p.inner.id(domain)?.context("person has no id")?;
+  let recursion_counter = &mut 0;
+  let response = match fetch_remote_object::<SearchAcceptedObjects>(
+    context.client(),
+    &query_url,
+    recursion_counter,
+  )
+  .await?
+  {
+    SearchAcceptedObjects::Person(p) => {
+      let user_uri = p.inner.id(domain)?.context("person has no id")?;
 
-        let user = get_or_fetch_and_upsert_user(&user_uri, context).await?;
+      let user = get_or_fetch_and_upsert_user(&user_uri, context, recursion_counter).await?;
 
-        response.users = vec![
-          blocking(context.pool(), move |conn| {
-            UserView::get_user_secure(conn, user.id)
-          })
-          .await??,
-        ];
+      response.users = vec![
+        blocking(context.pool(), move |conn| {
+          UserView::get_user_secure(conn, user.id)
+        })
+        .await??,
+      ];
 
-        response
-      }
-      SearchAcceptedObjects::Group(g) => {
-        let community_uri = g.inner.id(domain)?.context("group has no id")?;
+      response
+    }
+    SearchAcceptedObjects::Group(g) => {
+      let community_uri = g.inner.id(domain)?.context("group has no id")?;
 
-        let community = get_or_fetch_and_upsert_community(community_uri, context).await?;
+      let community =
+        get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?;
 
-        response.communities = vec![
-          blocking(context.pool(), move |conn| {
-            CommunityView::read(conn, community.id, None)
-          })
-          .await??,
-        ];
+      response.communities = vec![
+        blocking(context.pool(), move |conn| {
+          CommunityView::read(conn, community.id, None)
+        })
+        .await??,
+      ];
 
-        response
-      }
-      SearchAcceptedObjects::Page(p) => {
-        let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?;
+      response
+    }
+    SearchAcceptedObjects::Page(p) => {
+      let post_form = PostForm::from_apub(&p, context, Some(query_url), recursion_counter).await?;
 
-        let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
-        response.posts =
-          vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
+      let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
+      response.posts =
+        vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
 
-        response
-      }
-      SearchAcceptedObjects::Comment(c) => {
-        let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?;
+      response
+    }
+    SearchAcceptedObjects::Comment(c) => {
+      let comment_form =
+        CommentForm::from_apub(&c, context, Some(query_url), recursion_counter).await?;
 
-        let c = blocking(context.pool(), move |conn| {
-          Comment::upsert(conn, &comment_form)
+      let c = blocking(context.pool(), move |conn| {
+        Comment::upsert(conn, &comment_form)
+      })
+      .await??;
+      response.comments = vec![
+        blocking(context.pool(), move |conn| {
+          CommentView::read(conn, c.id, None)
         })
-        .await??;
-        response.comments = vec![
-          blocking(context.pool(), move |conn| {
-            CommentView::read(conn, c.id, None)
-          })
-          .await??,
-        ];
-
-        response
-      }
-    };
+        .await??,
+      ];
+
+      response
+    }
+  };
 
   Ok(response)
 }
@@ -199,11 +221,12 @@ pub async fn search_by_apub_id(
 pub(crate) async fn get_or_fetch_and_upsert_actor(
   apub_id: &Url,
   context: &LemmyContext,
+  recursion_counter: &mut i32,
 ) -> Result<Box<dyn ActorType>, LemmyError> {
-  let community = get_or_fetch_and_upsert_community(apub_id, context).await;
+  let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await;
   let actor: Box<dyn ActorType> = match community {
     Ok(c) => Box::new(c),
-    Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context).await?),
+    Err(_) => Box::new(get_or_fetch_and_upsert_user(apub_id, context, recursion_counter).await?),
   };
   Ok(actor)
 }
@@ -215,6 +238,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor(
 pub(crate) async fn get_or_fetch_and_upsert_user(
   apub_id: &Url,
   context: &LemmyContext,
+  recursion_counter: &mut i32,
 ) -> Result<User_, LemmyError> {
   let apub_id_owned = apub_id.to_owned();
   let user = blocking(context.pool(), move |conn| {
@@ -226,9 +250,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user(
     // If its older than a day, re-fetch it
     Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
       debug!("Fetching and updating from remote user: {}", apub_id);
-      let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
-
-      let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
+      let person =
+        fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
+
+      let mut uf = UserForm::from_apub(
+        &person,
+        context,
+        Some(apub_id.to_owned()),
+        recursion_counter,
+      )
+      .await?;
       uf.last_refreshed_at = Some(naive_now());
       let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
 
@@ -237,9 +268,16 @@ pub(crate) async fn get_or_fetch_and_upsert_user(
     Ok(u) => Ok(u),
     Err(NotFound {}) => {
       debug!("Fetching and creating remote user: {}", apub_id);
-      let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
-
-      let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
+      let person =
+        fetch_remote_object::<PersonExt>(context.client(), apub_id, recursion_counter).await?;
+
+      let uf = UserForm::from_apub(
+        &person,
+        context,
+        Some(apub_id.to_owned()),
+        recursion_counter,
+      )
+      .await?;
       let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
 
       Ok(user)
@@ -271,6 +309,7 @@ fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
 pub(crate) async fn get_or_fetch_and_upsert_community(
   apub_id: &Url,
   context: &LemmyContext,
+  recursion_counter: &mut i32,
 ) -> Result<Community, LemmyError> {
   let apub_id_owned = apub_id.to_owned();
   let community = blocking(context.pool(), move |conn| {
@@ -281,12 +320,12 @@ pub(crate) async fn get_or_fetch_and_upsert_community(
   match community {
     Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
       debug!("Fetching and updating from remote community: {}", apub_id);
-      fetch_remote_community(apub_id, context, Some(c.id)).await
+      fetch_remote_community(apub_id, context, Some(c.id), recursion_counter).await
     }
     Ok(c) => Ok(c),
     Err(NotFound {}) => {
       debug!("Fetching and creating remote community: {}", apub_id);
-      fetch_remote_community(apub_id, context, None).await
+      fetch_remote_community(apub_id, context, None, recursion_counter).await
     }
     Err(e) => Err(e.into()),
   }
@@ -299,10 +338,12 @@ async fn fetch_remote_community(
   apub_id: &Url,
   context: &LemmyContext,
   community_id: Option<i32>,
+  recursion_counter: &mut i32,
 ) -> Result<Community, LemmyError> {
-  let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
+  let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await?;
 
-  let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
+  let cf =
+    CommunityForm::from_apub(&group, context, Some(apub_id.to_owned()), recursion_counter).await?;
   let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
 
   // Also add the community moderators too
@@ -317,7 +358,7 @@ async fn fetch_remote_community(
   let mut creator_and_moderators = Vec::new();
 
   for uri in creator_and_moderator_uris {
-    let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?;
+    let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
 
     creator_and_moderators.push(c_or_m);
   }
@@ -340,9 +381,12 @@ async fn fetch_remote_community(
   }
 
   // fetch outbox (maybe make this conditional)
-  let outbox =
-    fetch_remote_object::<OrderedCollection>(context.client(), &community.get_outbox_url()?)
-      .await?;
+  let outbox = fetch_remote_object::<OrderedCollection>(
+    context.client(),
+    &community.get_outbox_url()?,
+    recursion_counter,
+  )
+  .await?;
   let outbox_items = outbox.items().context(location_info!())?.clone();
   let mut outbox_items = outbox_items.many().context(location_info!())?;
   if outbox_items.len() > 20 {
@@ -353,7 +397,7 @@ async fn fetch_remote_community(
 
     // The post creator may be from a blocked instance,
     // if it errors, then continue
-    let post = match PostForm::from_apub(&page, context, None).await {
+    let post = match PostForm::from_apub(&page, context, None, recursion_counter).await {
       Ok(post) => post,
       Err(_) => continue,
     };
@@ -380,6 +424,7 @@ async fn fetch_remote_community(
 pub(crate) async fn get_or_fetch_and_insert_post(
   post_ap_id: &Url,
   context: &LemmyContext,
+  recursion_counter: &mut i32,
 ) -> Result<Post, LemmyError> {
   let post_ap_id_owned = post_ap_id.to_owned();
   let post = blocking(context.pool(), move |conn| {
@@ -391,8 +436,15 @@ pub(crate) async fn get_or_fetch_and_insert_post(
     Ok(p) => Ok(p),
     Err(NotFound {}) => {
       debug!("Fetching and creating remote post: {}", post_ap_id);
-      let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
-      let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
+      let post =
+        fetch_remote_object::<PageExt>(context.client(), post_ap_id, recursion_counter).await?;
+      let post_form = PostForm::from_apub(
+        &post,
+        context,
+        Some(post_ap_id.to_owned()),
+        recursion_counter,
+      )
+      .await?;
 
       let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
 
@@ -409,6 +461,7 @@ pub(crate) async fn get_or_fetch_and_insert_post(
 pub(crate) async fn get_or_fetch_and_insert_comment(
   comment_ap_id: &Url,
   context: &LemmyContext,
+  recursion_counter: &mut i32,
 ) -> Result<Comment, LemmyError> {
   let comment_ap_id_owned = comment_ap_id.to_owned();
   let comment = blocking(context.pool(), move |conn| {
@@ -423,9 +476,15 @@ pub(crate) async fn get_or_fetch_and_insert_comment(
         "Fetching and creating remote comment and its parents: {}",
         comment_ap_id
       );
-      let comment = fetch_remote_object::<Note>(context.client(), comment_ap_id).await?;
-      let comment_form =
-        CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
+      let comment =
+        fetch_remote_object::<Note>(context.client(), comment_ap_id, recursion_counter).await?;
+      let comment_form = CommentForm::from_apub(
+        &comment,
+        context,
+        Some(comment_ap_id.to_owned()),
+        recursion_counter,
+      )
+      .await?;
 
       let comment = blocking(context.pool(), move |conn| {
         Comment::upsert(conn, &comment_form)
index 11d09972b5392ae6f7c2ccece8746147f51f956a..6cef1d03494916e35f67c7b2678c0a35091f123f 100644 (file)
@@ -75,7 +75,8 @@ pub async fn community_inbox(
   );
   check_is_apub_id_valid(user_uri)?;
 
-  let user = get_or_fetch_and_upsert_user(&user_uri, &context).await?;
+  let request_counter = &mut 0;
+  let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?;
 
   verify_signature(&request, &user)?;
 
index 5184197ef830ae9dafb4a39fe181e8ab81438c7d..da26d74871698fa2b8171d6bab31f8896997e7aa 100644 (file)
@@ -100,20 +100,23 @@ pub async fn shared_inbox(
 
   check_is_apub_id_valid(&actor_id)?;
 
-  let actor = get_or_fetch_and_upsert_actor(&actor_id, &context).await?;
+  let request_counter = &mut 0;
+  let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
   verify_signature(&request, actor.as_ref())?;
 
   let any_base = activity.clone().into_any_base()?;
   let kind = activity.kind().context(location_info!())?;
   let res = match kind {
-    ValidTypes::Announce => receive_announce(&context, any_base, actor.as_ref()).await,
-    ValidTypes::Create => receive_create(&context, any_base, actor_id).await,
-    ValidTypes::Update => receive_update(&context, any_base, actor_id).await,
-    ValidTypes::Like => receive_like(&context, any_base, actor_id).await,
-    ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id).await,
+    ValidTypes::Announce => {
+      receive_announce(&context, any_base, actor.as_ref(), request_counter).await
+    }
+    ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await,
+    ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await,
+    ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await,
+    ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await,
     ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await,
-    ValidTypes::Delete => receive_delete(&context, any_base, actor_id).await,
-    ValidTypes::Undo => receive_undo(&context, any_base, actor_id).await,
+    ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await,
+    ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
   };
 
   insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
@@ -125,6 +128,7 @@ async fn receive_announce(
   context: &LemmyContext,
   activity: AnyBase,
   actor: &dyn ActorType,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
@@ -140,13 +144,13 @@ async fn receive_announce(
   check_is_apub_id_valid(&inner_id)?;
 
   match kind {
-    Some("Create") => receive_create(context, object, inner_id).await,
-    Some("Update") => receive_update(context, object, inner_id).await,
-    Some("Like") => receive_like(context, object, inner_id).await,
-    Some("Dislike") => receive_dislike(context, object, inner_id).await,
-    Some("Delete") => receive_delete(context, object, inner_id).await,
+    Some("Create") => receive_create(context, object, inner_id, request_counter).await,
+    Some("Update") => receive_update(context, object, inner_id, request_counter).await,
+    Some("Like") => receive_like(context, object, inner_id, request_counter).await,
+    Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await,
+    Some("Delete") => receive_delete(context, object, inner_id, request_counter).await,
     Some("Remove") => receive_remove(context, object, inner_id).await,
-    Some("Undo") => receive_undo(context, object, inner_id).await,
+    Some("Undo") => receive_undo(context, object, inner_id, request_counter).await,
     _ => receive_unhandled_activity(announce),
   }
 }
@@ -155,13 +159,14 @@ async fn receive_create(
   context: &LemmyContext,
   activity: AnyBase,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let create = Create::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&create, expected_domain, true)?;
 
   match create.object().as_single_kind_str() {
-    Some("Page") => receive_create_post(create, context).await,
-    Some("Note") => receive_create_comment(create, context).await,
+    Some("Page") => receive_create_post(create, context, request_counter).await,
+    Some("Note") => receive_create_comment(create, context, request_counter).await,
     _ => receive_unhandled_activity(create),
   }
 }
@@ -170,13 +175,14 @@ async fn receive_update(
   context: &LemmyContext,
   activity: AnyBase,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let update = Update::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&update, expected_domain, true)?;
 
   match update.object().as_single_kind_str() {
-    Some("Page") => receive_update_post(update, context).await,
-    Some("Note") => receive_update_comment(update, context).await,
+    Some("Page") => receive_update_post(update, context, request_counter).await,
+    Some("Note") => receive_update_comment(update, context, request_counter).await,
     _ => receive_unhandled_activity(update),
   }
 }
@@ -185,13 +191,14 @@ async fn receive_like(
   context: &LemmyContext,
   activity: AnyBase,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let like = Like::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&like, expected_domain, false)?;
 
   match like.object().as_single_kind_str() {
-    Some("Page") => receive_like_post(like, context).await,
-    Some("Note") => receive_like_comment(like, context).await,
+    Some("Page") => receive_like_post(like, context, request_counter).await,
+    Some("Note") => receive_like_comment(like, context, request_counter).await,
     _ => receive_unhandled_activity(like),
   }
 }
@@ -200,6 +207,7 @@ async fn receive_dislike(
   context: &LemmyContext,
   activity: AnyBase,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let enable_downvotes = blocking(context.pool(), move |conn| {
     Site::read(conn, 1).map(|s| s.enable_downvotes)
@@ -213,8 +221,8 @@ async fn receive_dislike(
   verify_activity_domains_valid(&dislike, expected_domain, false)?;
 
   match dislike.object().as_single_kind_str() {
-    Some("Page") => receive_dislike_post(dislike, context).await,
-    Some("Note") => receive_dislike_comment(dislike, context).await,
+    Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
+    Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
     _ => receive_unhandled_activity(dislike),
   }
 }
@@ -223,6 +231,7 @@ pub async fn receive_delete(
   context: &LemmyContext,
   activity: AnyBase,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let delete = Delete::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&delete, expected_domain, true)?;
@@ -234,9 +243,13 @@ pub async fn receive_delete(
     .context(location_info!())?;
 
   match find_by_id(context, object).await {
-    Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p).await,
-    Ok(FindResults::Comment(c)) => receive_delete_comment(context, delete, c).await,
-    Ok(FindResults::Community(c)) => receive_delete_community(context, delete, c).await,
+    Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await,
+    Ok(FindResults::Comment(c)) => {
+      receive_delete_comment(context, delete, c, request_counter).await
+    }
+    Ok(FindResults::Community(c)) => {
+      receive_delete_community(context, delete, c, request_counter).await
+    }
     // if we dont have the object, no need to do anything
     Err(_) => Ok(HttpResponse::Ok().finish()),
   }
@@ -283,15 +296,16 @@ async fn receive_undo(
   context: &LemmyContext,
   activity: AnyBase,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let undo = Undo::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?;
 
   match undo.object().as_single_kind_str() {
-    Some("Delete") => receive_undo_delete(context, undo, expected_domain).await,
-    Some("Remove") => receive_undo_remove(context, undo, expected_domain).await,
-    Some("Like") => receive_undo_like(context, undo, expected_domain).await,
-    Some("Dislike") => receive_undo_dislike(context, undo, expected_domain).await,
+    Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await,
+    Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await,
+    Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await,
+    Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await,
     _ => receive_unhandled_activity(undo),
   }
 }
@@ -300,6 +314,7 @@ async fn receive_undo_delete(
   context: &LemmyContext,
   undo: Undo,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -311,9 +326,13 @@ async fn receive_undo_delete(
     .single_xsd_any_uri()
     .context(location_info!())?;
   match find_by_id(context, object).await {
-    Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p).await,
-    Ok(FindResults::Comment(c)) => receive_undo_delete_comment(context, undo, c).await,
-    Ok(FindResults::Community(c)) => receive_undo_delete_community(context, undo, c).await,
+    Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await,
+    Ok(FindResults::Comment(c)) => {
+      receive_undo_delete_comment(context, undo, c, request_counter).await
+    }
+    Ok(FindResults::Community(c)) => {
+      receive_undo_delete_community(context, undo, c, request_counter).await
+    }
     // if we dont have the object, no need to do anything
     Err(_) => Ok(HttpResponse::Ok().finish()),
   }
@@ -323,6 +342,7 @@ async fn receive_undo_remove(
   context: &LemmyContext,
   undo: Undo,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -334,9 +354,13 @@ async fn receive_undo_remove(
     .single_xsd_any_uri()
     .context(location_info!())?;
   match find_by_id(context, object).await {
-    Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p).await,
-    Ok(FindResults::Comment(c)) => receive_undo_remove_comment(context, undo, c).await,
-    Ok(FindResults::Community(c)) => receive_undo_remove_community(context, undo, c).await,
+    Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await,
+    Ok(FindResults::Comment(c)) => {
+      receive_undo_remove_comment(context, undo, c, request_counter).await
+    }
+    Ok(FindResults::Community(c)) => {
+      receive_undo_remove_community(context, undo, c, request_counter).await
+    }
     // if we dont have the object, no need to do anything
     Err(_) => Ok(HttpResponse::Ok().finish()),
   }
@@ -346,6 +370,7 @@ async fn receive_undo_like(
   context: &LemmyContext,
   undo: Undo,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -356,8 +381,8 @@ async fn receive_undo_like(
     .as_single_kind_str()
     .context(location_info!())?;
   match type_ {
-    "Note" => receive_undo_like_comment(undo, &like, context).await,
-    "Page" => receive_undo_like_post(undo, &like, context).await,
+    "Note" => receive_undo_like_comment(undo, &like, context, request_counter).await,
+    "Page" => receive_undo_like_post(undo, &like, context, request_counter).await,
     d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
   }
 }
@@ -366,6 +391,7 @@ async fn receive_undo_dislike(
   context: &LemmyContext,
   undo: Undo,
   expected_domain: Url,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -376,8 +402,8 @@ async fn receive_undo_dislike(
     .as_single_kind_str()
     .context(location_info!())?;
   match type_ {
-    "Note" => receive_undo_dislike_comment(undo, &dislike, context).await,
-    "Page" => receive_undo_dislike_post(undo, &dislike, context).await,
+    "Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await,
+    "Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await,
     d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
   }
 }
index ebb11a56e3ca6f0056b06c681a24a8d6ee92ed94..e89aba148d24fe10b9dd8d5a64057792ecc84dad 100644 (file)
@@ -79,15 +79,22 @@ pub async fn user_inbox(
 
   check_is_apub_id_valid(actor_uri)?;
 
-  let actor = get_or_fetch_and_upsert_actor(actor_uri, &context).await?;
+  let request_counter = &mut 0;
+  let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
   verify_signature(&request, actor.as_ref())?;
 
   let any_base = activity.clone().into_any_base()?;
   let kind = activity.kind().context(location_info!())?;
   let res = match kind {
-    ValidTypes::Accept => receive_accept(&context, any_base, actor.as_ref(), user).await,
-    ValidTypes::Create => receive_create_private_message(&context, any_base, actor.as_ref()).await,
-    ValidTypes::Update => receive_update_private_message(&context, any_base, actor.as_ref()).await,
+    ValidTypes::Accept => {
+      receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
+    }
+    ValidTypes::Create => {
+      receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
+    }
+    ValidTypes::Update => {
+      receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
+    }
     ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
     ValidTypes::Undo => {
       receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
@@ -104,6 +111,7 @@ async fn receive_accept(
   activity: AnyBase,
   actor: &dyn ActorType,
   user: User_,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
@@ -120,7 +128,8 @@ async fn receive_accept(
     .single_xsd_any_uri()
     .context(location_info!())?;
 
-  let community = get_or_fetch_and_upsert_community(&community_uri, context).await?;
+  let community =
+    get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
 
   // Now you need to add this to the community follower
   let community_follower_form = CommunityFollowerForm {
@@ -141,6 +150,7 @@ async fn receive_create_private_message(
   context: &LemmyContext,
   activity: AnyBase,
   actor: &dyn ActorType,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let create = Create::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
@@ -155,7 +165,7 @@ async fn receive_create_private_message(
   .context(location_info!())?;
 
   let private_message =
-    PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?)).await?;
+    PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
 
   let inserted_private_message = blocking(&context.pool(), move |conn| {
     PrivateMessage::create(conn, &private_message)
@@ -185,6 +195,7 @@ async fn receive_update_private_message(
   context: &LemmyContext,
   activity: AnyBase,
   actor: &dyn ActorType,
+  request_counter: &mut i32,
 ) -> Result<HttpResponse, LemmyError> {
   let update = Update::from_any_base(activity)?.context(location_info!())?;
   verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
@@ -197,7 +208,7 @@ async fn receive_update_private_message(
   let note = Note::from_any_base(object)?.context(location_info!())?;
 
   let private_message_form =
-    PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?)).await?;
+    PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
 
   let private_message_ap_id = private_message_form
     .ap_id
index c93d6477c3cecd3e38c2bedff462546795066760..4a1413b33220046b4240a1332c6890b7b1e7060e 100644 (file)
@@ -122,6 +122,7 @@ pub trait FromApub {
     apub: &Self::ApubType,
     context: &LemmyContext,
     expected_domain: Option<Url>,
+    request_counter: &mut i32,
   ) -> Result<Self, LemmyError>
   where
     Self: Sized;
index efd2064d67cffd965617a6c7f47304c4dce831ce..ca0b0e8501aabcb0714b09dfaf7271833a1cfc16 100644 (file)
@@ -89,6 +89,7 @@ impl FromApub for CommentForm {
     note: &Note,
     context: &LemmyContext,
     expected_domain: Option<Url>,
+    request_counter: &mut i32,
   ) -> Result<CommentForm, LemmyError> {
     let creator_actor_id = &note
       .attributed_to()
@@ -96,7 +97,7 @@ impl FromApub for CommentForm {
       .as_single_xsd_any_uri()
       .context(location_info!())?;
 
-    let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?;
+    let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?;
 
     let mut in_reply_tos = note
       .in_reply_to()
@@ -109,7 +110,7 @@ impl FromApub for CommentForm {
     let post_ap_id = in_reply_tos.next().context(location_info!())??;
 
     // This post, or the parent comment might not yet exist on this server yet, fetch them.
-    let post = get_or_fetch_and_insert_post(&post_ap_id, context).await?;
+    let post = get_or_fetch_and_insert_post(&post_ap_id, context, request_counter).await?;
 
     // The 2nd item, if it exists, is the parent comment apub_id
     // For deeply nested comments, FromApub automatically gets called recursively
@@ -117,7 +118,7 @@ impl FromApub for CommentForm {
       Some(parent_comment_uri) => {
         let parent_comment_ap_id = &parent_comment_uri?;
         let parent_comment =
-          get_or_fetch_and_insert_comment(&parent_comment_ap_id, context).await?;
+          get_or_fetch_and_insert_comment(&parent_comment_ap_id, context, request_counter).await?;
 
         Some(parent_comment.id)
       }
index 0012e2bfab34203318ce44efdc75f83364308594..d697c70b0eb8ff37749e3228a8ba1ddc8be219a5 100644 (file)
@@ -111,6 +111,7 @@ impl FromApub for CommunityForm {
     group: &GroupExt,
     context: &LemmyContext,
     expected_domain: Option<Url>,
+    request_counter: &mut i32,
   ) -> Result<Self, LemmyError> {
     let creator_and_moderator_uris = group.inner.attributed_to().context(location_info!())?;
     let creator_uri = creator_and_moderator_uris
@@ -122,7 +123,7 @@ impl FromApub for CommunityForm {
       .as_xsd_any_uri()
       .context(location_info!())?;
 
-    let creator = get_or_fetch_and_upsert_user(creator_uri, context).await?;
+    let creator = get_or_fetch_and_upsert_user(creator_uri, context, request_counter).await?;
     let name = group
       .inner
       .preferred_username()
index 8796146d52e0cc65b60cc483e2ddeaaa17632363..6b42e690838935c3e7cf64947c47ffa9f1eb9daa 100644 (file)
@@ -101,6 +101,7 @@ impl FromApub for PostForm {
     page: &PageExt,
     context: &LemmyContext,
     expected_domain: Option<Url>,
+    request_counter: &mut i32,
   ) -> Result<PostForm, LemmyError> {
     let ext = &page.ext_one;
     let creator_actor_id = page
@@ -111,7 +112,7 @@ impl FromApub for PostForm {
       .as_single_xsd_any_uri()
       .context(location_info!())?;
 
-    let creator = get_or_fetch_and_upsert_user(creator_actor_id, context).await?;
+    let creator = get_or_fetch_and_upsert_user(creator_actor_id, context, request_counter).await?;
 
     let community_actor_id = page
       .inner
@@ -121,7 +122,8 @@ impl FromApub for PostForm {
       .as_single_xsd_any_uri()
       .context(location_info!())?;
 
-    let community = get_or_fetch_and_upsert_community(community_actor_id, context).await?;
+    let community =
+      get_or_fetch_and_upsert_community(community_actor_id, context, request_counter).await?;
 
     let thumbnail_url = match &page.inner.image() {
       Some(any_image) => Image::from_any_base(
index 119dfb56c2c970182b59e22088fca04d31d3ab68..64047963c088460f6f670a25ae0ec49fc0bfb2f2 100644 (file)
@@ -62,6 +62,7 @@ impl FromApub for PrivateMessageForm {
     note: &Note,
     context: &LemmyContext,
     expected_domain: Option<Url>,
+    request_counter: &mut i32,
   ) -> Result<PrivateMessageForm, LemmyError> {
     let creator_actor_id = note
       .attributed_to()
@@ -70,14 +71,15 @@ impl FromApub for PrivateMessageForm {
       .single_xsd_any_uri()
       .context(location_info!())?;
 
-    let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context).await?;
+    let creator = get_or_fetch_and_upsert_user(&creator_actor_id, context, request_counter).await?;
     let recipient_actor_id = note
       .to()
       .context(location_info!())?
       .clone()
       .single_xsd_any_uri()
       .context(location_info!())?;
-    let recipient = get_or_fetch_and_upsert_user(&recipient_actor_id, context).await?;
+    let recipient =
+      get_or_fetch_and_upsert_user(&recipient_actor_id, context, request_counter).await?;
     let ap_id = note.id_unchecked().context(location_info!())?.to_string();
     check_is_apub_id_valid(&Url::parse(&ap_id)?)?;
 
index fb8213e8f5ab6291afae37631e23693008fede00..5ef1ec8e9d7c20e9d3dee33578783967c91d6376 100644 (file)
@@ -76,6 +76,7 @@ impl FromApub for UserForm {
     person: &PersonExt,
     _context: &LemmyContext,
     expected_domain: Option<Url>,
+    _request_counter: &mut i32,
   ) -> Result<Self, LemmyError> {
     let avatar = match person.icon() {
       Some(any_image) => Some(