2 check_is_apub_id_valid,
3 extensions::signatures::verify_signature,
4 fetcher::get_or_fetch_and_upsert_actor,
5 http::inbox_enums::SharedInboxActivities,
7 APUB_JSON_CONTENT_TYPE,
12 web::{Bytes, BytesMut, Payload},
16 use anyhow::{anyhow, Context};
17 use futures::StreamExt;
19 use lemmy_api_common::blocking;
20 use lemmy_apub_lib::ActivityHandler;
21 use lemmy_db_queries::{source::activity::Activity_, DbPool};
22 use lemmy_db_schema::source::activity::Activity;
23 use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
24 use lemmy_websocket::LemmyContext;
25 use serde::{Deserialize, Serialize};
26 use std::{fmt::Debug, io::Read};
36 pub async fn shared_inbox(
39 context: web::Data<LemmyContext>,
40 ) -> Result<HttpResponse, LemmyError> {
41 let unparsed = payload_to_string(payload).await?;
42 receive_activity::<SharedInboxActivities>(request, &unparsed, context).await
45 async fn payload_to_string(mut payload: Payload) -> Result<String, LemmyError> {
46 let mut bytes = BytesMut::new();
47 while let Some(item) = payload.next().await {
48 bytes.extend_from_slice(&item?);
50 let mut unparsed = String::new();
51 Bytes::from(bytes).as_ref().read_to_string(&mut unparsed)?;
55 // TODO: move most of this code to library
56 async fn receive_activity<'a, T>(
59 context: web::Data<LemmyContext>,
60 ) -> Result<HttpResponse, LemmyError>
62 T: ActivityHandler + Clone + Deserialize<'a> + Serialize + std::fmt::Debug + Send + 'static,
64 let activity = serde_json::from_str::<T>(activity)?;
65 let activity_data = activity.common();
67 let request_counter = &mut 0;
69 get_or_fetch_and_upsert_actor(&activity_data.actor, &context, request_counter).await?;
70 verify_signature(&request, &actor.public_key().context(location_info!())?)?;
72 // Do nothing if we received the same activity before
73 if is_activity_already_known(context.pool(), activity_data.id_unchecked()).await? {
74 return Ok(HttpResponse::Ok().finish());
76 check_is_apub_id_valid(&activity_data.actor, false)?;
78 "Verifying activity {}",
79 activity_data.id_unchecked().to_string()
81 activity.verify(&context, request_counter).await?;
82 assert_activity_not_local(&activity)?;
84 // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
85 // if we receive the same activity twice in very quick succession.
87 activity_data.id_unchecked(),
96 "Receiving activity {}",
97 activity_data.id_unchecked().to_string()
99 activity.receive(&context, request_counter).await?;
100 Ok(HttpResponse::Ok().finish())
103 /// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub
105 fn create_apub_response<T>(data: &T) -> HttpResponse<Body>
110 .content_type(APUB_JSON_CONTENT_TYPE)
114 fn create_apub_tombstone_response<T>(data: &T) -> HttpResponse<Body>
119 .content_type(APUB_JSON_CONTENT_TYPE)
120 .status(StatusCode::GONE)
124 #[derive(Deserialize)]
125 pub struct ActivityQuery {
130 /// Return the ActivityPub json representation of a local activity over HTTP.
131 pub(crate) async fn get_activity(
132 info: web::Path<ActivityQuery>,
133 context: web::Data<LemmyContext>,
134 ) -> Result<HttpResponse<Body>, LemmyError> {
135 let settings = Settings::get();
136 let activity_id = Url::parse(&format!(
137 "{}/activities/{}/{}",
138 settings.get_protocol_and_hostname(),
143 let activity = blocking(context.pool(), move |conn| {
144 Activity::read_from_apub_id(conn, &activity_id)
148 let sensitive = activity.sensitive.unwrap_or(true);
149 if !activity.local || sensitive {
150 Ok(HttpResponse::NotFound().finish())
152 Ok(create_apub_response(&activity.data))
156 pub(crate) async fn is_activity_already_known(
159 ) -> Result<bool, LemmyError> {
160 let activity_id = activity_id.to_owned().into();
161 let existing = blocking(pool, move |conn| {
162 Activity::read_from_apub_id(conn, &activity_id)
171 fn assert_activity_not_local<T: Debug + ActivityHandler>(activity: &T) -> Result<(), LemmyError> {
172 let activity_domain = activity
176 .context(location_info!())?;
178 if activity_domain == Settings::get().hostname() {
181 "Error: received activity which was sent by local instance: {:?}",