]> Untitled Git - lemmy.git/blob - crates/apub/src/http/mod.rs
Move activity structs to protocol folder
[lemmy.git] / crates / apub / src / http / mod.rs
1 use crate::{
2   activity_lists::SharedInboxActivities,
3   check_is_apub_id_valid,
4   context::WithContext,
5   fetcher::get_or_fetch_and_upsert_actor,
6   http::{community::receive_group_inbox, person::receive_person_inbox},
7   insert_activity,
8 };
9 use actix_web::{
10   body::Body,
11   web,
12   web::{Bytes, BytesMut, Payload},
13   HttpRequest,
14   HttpResponse,
15 };
16 use anyhow::{anyhow, Context};
17 use futures::StreamExt;
18 use http::StatusCode;
19 use lemmy_api_common::blocking;
20 use lemmy_apub_lib::{
21   data::Data,
22   signatures::verify_signature,
23   traits::{ActivityFields, ActivityHandler},
24   APUB_JSON_CONTENT_TYPE,
25 };
26 use lemmy_db_schema::{source::activity::Activity, DbPool};
27 use lemmy_utils::{location_info, LemmyError};
28 use lemmy_websocket::LemmyContext;
29 use log::info;
30 use serde::{Deserialize, Serialize};
31 use std::{fmt::Debug, io::Read};
32 use url::Url;
33
34 mod comment;
35 mod community;
36 mod person;
37 mod post;
38 pub mod routes;
39
40 pub async fn shared_inbox(
41   request: HttpRequest,
42   payload: Payload,
43   context: web::Data<LemmyContext>,
44 ) -> Result<HttpResponse, LemmyError> {
45   let unparsed = payload_to_string(payload).await?;
46   info!("Received shared inbox activity {}", unparsed);
47   let activity = serde_json::from_str::<WithContext<SharedInboxActivities>>(&unparsed)?;
48   match activity.inner() {
49     SharedInboxActivities::GroupInboxActivities(g) => {
50       receive_group_inbox(g, request, &context).await
51     }
52     SharedInboxActivities::PersonInboxActivities(p) => {
53       receive_person_inbox(p, request, &context).await
54     }
55   }
56 }
57
58 async fn payload_to_string(mut payload: Payload) -> Result<String, LemmyError> {
59   let mut bytes = BytesMut::new();
60   while let Some(item) = payload.next().await {
61     bytes.extend_from_slice(&item?);
62   }
63   let mut unparsed = String::new();
64   Bytes::from(bytes).as_ref().read_to_string(&mut unparsed)?;
65   Ok(unparsed)
66 }
67
68 // TODO: move most of this code to library
69 async fn receive_activity<'a, T>(
70   request: HttpRequest,
71   activity: T,
72   context: &LemmyContext,
73 ) -> Result<HttpResponse, LemmyError>
74 where
75   T: ActivityHandler<DataType = LemmyContext>
76     + ActivityFields
77     + Clone
78     + Deserialize<'a>
79     + Serialize
80     + std::fmt::Debug
81     + Send
82     + 'static,
83 {
84   let request_counter = &mut 0;
85   let actor =
86     get_or_fetch_and_upsert_actor(activity.actor().clone(), context, request_counter).await?;
87   verify_signature(&request, &actor.public_key().context(location_info!())?)?;
88
89   // Do nothing if we received the same activity before
90   if is_activity_already_known(context.pool(), activity.id_unchecked()).await? {
91     return Ok(HttpResponse::Ok().finish());
92   }
93   check_is_apub_id_valid(activity.actor(), false, &context.settings())?;
94   info!("Verifying activity {}", activity.id_unchecked().to_string());
95   activity
96     .verify(&Data::new(context.clone()), request_counter)
97     .await?;
98   assert_activity_not_local(&activity, &context.settings().hostname)?;
99
100   // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
101   // if we receive the same activity twice in very quick succession.
102   insert_activity(
103     activity.id_unchecked(),
104     activity.clone(),
105     false,
106     true,
107     context.pool(),
108   )
109   .await?;
110
111   info!("Receiving activity {}", activity.id_unchecked().to_string());
112   activity
113     .receive(&Data::new(context.clone()), request_counter)
114     .await?;
115   Ok(HttpResponse::Ok().finish())
116 }
117
118 /// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub
119 /// headers.
120 fn create_apub_response<T>(data: &T) -> HttpResponse<Body>
121 where
122   T: Serialize,
123 {
124   HttpResponse::Ok()
125     .content_type(APUB_JSON_CONTENT_TYPE)
126     .json(WithContext::new(data))
127 }
128
129 fn create_apub_tombstone_response<T>(data: &T) -> HttpResponse<Body>
130 where
131   T: Serialize,
132 {
133   HttpResponse::Gone()
134     .content_type(APUB_JSON_CONTENT_TYPE)
135     .status(StatusCode::GONE)
136     .json(WithContext::new(data))
137 }
138
139 #[derive(Deserialize)]
140 pub struct ActivityQuery {
141   type_: String,
142   id: String,
143 }
144
145 /// Return the ActivityPub json representation of a local activity over HTTP.
146 pub(crate) async fn get_activity(
147   info: web::Path<ActivityQuery>,
148   context: web::Data<LemmyContext>,
149 ) -> Result<HttpResponse<Body>, LemmyError> {
150   let settings = context.settings();
151   let activity_id = Url::parse(&format!(
152     "{}/activities/{}/{}",
153     settings.get_protocol_and_hostname(),
154     info.type_,
155     info.id
156   ))?
157   .into();
158   let activity = blocking(context.pool(), move |conn| {
159     Activity::read_from_apub_id(conn, &activity_id)
160   })
161   .await??;
162
163   let sensitive = activity.sensitive.unwrap_or(true);
164   if !activity.local || sensitive {
165     Ok(HttpResponse::NotFound().finish())
166   } else {
167     Ok(create_apub_response(&activity.data))
168   }
169 }
170
171 pub(crate) async fn is_activity_already_known(
172   pool: &DbPool,
173   activity_id: &Url,
174 ) -> Result<bool, LemmyError> {
175   let activity_id = activity_id.to_owned().into();
176   let existing = blocking(pool, move |conn| {
177     Activity::read_from_apub_id(conn, &activity_id)
178   })
179   .await?;
180   match existing {
181     Ok(_) => Ok(true),
182     Err(_) => Ok(false),
183   }
184 }
185
186 fn assert_activity_not_local<T: Debug + ActivityFields>(
187   activity: &T,
188   hostname: &str,
189 ) -> Result<(), LemmyError> {
190   let activity_domain = activity.id_unchecked().domain().context(location_info!())?;
191
192   if activity_domain == hostname {
193     return Err(
194       anyhow!(
195         "Error: received activity which was sent by local instance: {:?}",
196         activity
197       )
198       .into(),
199     );
200   }
201   Ok(())
202 }