X-Git-Url: http://these/git/?a=blobdiff_plain;f=crates%2Fapub%2Fsrc%2Fhttp%2Fmod.rs;h=c261d9e4929c363385e10756d491967a25f29148;hb=e9e76549a88cfbdab36f00d302cceabcaaa24f4c;hp=0d484e8f8b462d7605b95ec02f511a2fc5efd767;hpb=3fea5645f82606df4c2706f4e156f7d38f4f5e38;p=lemmy.git diff --git a/crates/apub/src/http/mod.rs b/crates/apub/src/http/mod.rs index 0d484e8f..c261d9e4 100644 --- a/crates/apub/src/http/mod.rs +++ b/crates/apub/src/http/mod.rs @@ -1,35 +1,22 @@ use crate::{ activity_lists::SharedInboxActivities, - check_is_apub_id_valid, - context::WithContext, fetcher::user_or_community::UserOrCommunity, - http::{community::receive_group_inbox, person::receive_person_inbox}, - insert_activity, + protocol::objects::tombstone::Tombstone, + CONTEXT, }; -use actix_web::{ - body::Body, - web, - web::{Bytes, BytesMut, Payload}, - HttpRequest, - HttpResponse, +use activitypub_federation::{ + actix_web::inbox::receive_activity, + config::Data, + protocol::context::WithContext, + FEDERATION_CONTENT_TYPE, }; -use anyhow::{anyhow, Context}; -use futures::StreamExt; +use actix_web::{web, web::Bytes, HttpRequest, HttpResponse}; use http::StatusCode; -use lemmy_api_common::blocking; -use lemmy_apub_lib::{ - data::Data, - object_id::ObjectId, - signatures::verify_signature, - traits::{ActivityHandler, ActorType}, - APUB_JSON_CONTENT_TYPE, -}; -use lemmy_db_schema::source::activity::Activity; -use lemmy_utils::{location_info, LemmyError}; -use lemmy_websocket::LemmyContext; +use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::activity::SentActivity; +use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult}; use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, io::Read}; -use tracing::info; +use std::ops::Deref; use url::Url; mod comment; @@ -37,109 +24,48 @@ mod community; mod person; mod post; pub mod routes; +pub mod site; pub async fn shared_inbox( request: HttpRequest, - payload: Payload, - context: web::Data, -) -> Result { - let unparsed = payload_to_string(payload).await?; - info!("Received shared inbox activity {}", unparsed); - let activity_data: ActivityCommonFields = serde_json::from_str(&unparsed)?; - let activity = serde_json::from_str::>(&unparsed)?; - match activity.inner() { - SharedInboxActivities::GroupInboxActivities(g) => { - receive_group_inbox(*g, activity_data, request, &context).await - } - SharedInboxActivities::PersonInboxActivities(p) => { - receive_person_inbox(*p, activity_data, request, &context).await - } - } -} - -async fn payload_to_string(mut payload: Payload) -> Result { - let mut bytes = BytesMut::new(); - while let Some(item) = payload.next().await { - bytes.extend_from_slice(&item?); - } - let mut unparsed = String::new(); - Bytes::from(bytes).as_ref().read_to_string(&mut unparsed)?; - Ok(unparsed) -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub(crate) struct ActivityCommonFields { - pub(crate) id: Url, - pub(crate) actor: Url, -} - -// TODO: move most of this code to library -async fn receive_activity<'a, T>( - request: HttpRequest, - activity: T, - activity_data: ActivityCommonFields, - context: &LemmyContext, -) -> Result -where - T: ActivityHandler - + Clone - + Deserialize<'a> - + Serialize - + std::fmt::Debug - + Send - + 'static, -{ - check_is_apub_id_valid(&activity_data.actor, false, &context.settings())?; - let request_counter = &mut 0; - let actor = ObjectId::::new(activity_data.actor) - .dereference(context, request_counter) - .await?; - verify_signature(&request, &actor.public_key())?; - - info!("Verifying activity {}", activity_data.id.to_string()); - activity - .verify(&Data::new(context.clone()), request_counter) - .await?; - assert_activity_not_local(&activity_data.id, &context.settings().hostname)?; - - // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen - // if we receive the same activity twice in very quick succession. - let object_value = serde_json::to_value(&activity)?; - insert_activity(&activity_data.id, object_value, false, true, context.pool()).await?; - - info!("Receiving activity {}", activity_data.id.to_string()); - activity - .receive(&Data::new(context.clone()), request_counter) - .await?; - Ok(HttpResponse::Ok().finish()) + body: Bytes, + data: Data, +) -> LemmyResult { + receive_activity::(request, body, &data) + .await } /// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub /// headers. -fn create_apub_response(data: &T) -> HttpResponse +/// +/// actix-web doesn't allow pretty-print for json so we need to do this manually. +fn create_apub_response(data: &T) -> LemmyResult where T: Serialize, { - HttpResponse::Ok() - .content_type(APUB_JSON_CONTENT_TYPE) - .json(WithContext::new(data)) + let json = serde_json::to_string_pretty(&WithContext::new(data, CONTEXT.clone()))?; + + Ok( + HttpResponse::Ok() + .content_type(FEDERATION_CONTENT_TYPE) + .body(json), + ) } -fn create_json_apub_response(data: serde_json::Value) -> HttpResponse { - HttpResponse::Ok() - .content_type(APUB_JSON_CONTENT_TYPE) - .json(data) +fn create_apub_tombstone_response>(id: T) -> LemmyResult { + let tombstone = Tombstone::new(id.into()); + let json = serde_json::to_string_pretty(&WithContext::new(tombstone, CONTEXT.deref().clone()))?; + + Ok( + HttpResponse::Gone() + .content_type(FEDERATION_CONTENT_TYPE) + .status(StatusCode::GONE) + .body(json), + ) } -fn create_apub_tombstone_response(data: &T) -> HttpResponse -where - T: Serialize, -{ - HttpResponse::Gone() - .content_type(APUB_JSON_CONTENT_TYPE) - .status(StatusCode::GONE) - .json(WithContext::new(data)) +fn err_object_not_local() -> LemmyError { + LemmyErrorType::ObjectNotLocal.into() } #[derive(Deserialize)] @@ -149,10 +75,11 @@ pub struct ActivityQuery { } /// Return the ActivityPub json representation of a local activity over HTTP. +#[tracing::instrument(skip_all)] pub(crate) async fn get_activity( info: web::Path, context: web::Data, -) -> Result, LemmyError> { +) -> Result { let settings = context.settings(); let activity_id = Url::parse(&format!( "{}/activities/{}/{}", @@ -161,30 +88,12 @@ pub(crate) async fn get_activity( info.id ))? .into(); - let activity = blocking(context.pool(), move |conn| { - Activity::read_from_apub_id(conn, &activity_id) - }) - .await??; + let activity = SentActivity::read_from_apub_id(&mut context.pool(), &activity_id).await?; - let sensitive = activity.sensitive.unwrap_or(true); - if !activity.local || sensitive { - Ok(HttpResponse::NotFound().finish()) + let sensitive = activity.sensitive; + if sensitive { + Ok(HttpResponse::Forbidden().finish()) } else { - Ok(create_json_apub_response(activity.data)) - } -} - -fn assert_activity_not_local(id: &Url, hostname: &str) -> Result<(), LemmyError> { - let activity_domain = id.domain().context(location_info!())?; - - if activity_domain == hostname { - return Err( - anyhow!( - "Error: received activity which was sent by local instance: {:?}", - id - ) - .into(), - ); + create_apub_response(&activity.data) } - Ok(()) }