]> Untitled Git - lemmy.git/blob - crates/apub/src/http/mod.rs
Move @context out of object/activity definitions
[lemmy.git] / crates / apub / src / http / mod.rs
1 use crate::{
2   check_is_apub_id_valid,
3   context::WithContext,
4   fetcher::get_or_fetch_and_upsert_actor,
5   http::{
6     community::{receive_group_inbox, GroupInboxActivities},
7     person::{receive_person_inbox, PersonInboxActivities},
8   },
9   insert_activity,
10 };
11 use actix_web::{
12   body::Body,
13   web,
14   web::{Bytes, BytesMut, Payload},
15   HttpRequest,
16   HttpResponse,
17 };
18 use anyhow::{anyhow, Context};
19 use futures::StreamExt;
20 use http::StatusCode;
21 use lemmy_api_common::blocking;
22 use lemmy_apub_lib::{
23   data::Data,
24   signatures::verify_signature,
25   traits::{ActivityFields, ActivityHandler},
26   APUB_JSON_CONTENT_TYPE,
27 };
28 use lemmy_db_schema::{source::activity::Activity, DbPool};
29 use lemmy_utils::{location_info, LemmyError};
30 use lemmy_websocket::LemmyContext;
31 use log::info;
32 use serde::{Deserialize, Serialize};
33 use std::{fmt::Debug, io::Read};
34 use url::Url;
35
36 mod comment;
37 mod community;
38 mod person;
39 mod post;
40 pub mod routes;
41
42 #[derive(Clone, Debug, Deserialize, Serialize, ActivityHandler, ActivityFields)]
43 #[serde(untagged)]
44 #[activity_handler(LemmyContext)]
45 pub enum SharedInboxActivities {
46   GroupInboxActivities(GroupInboxActivities),
47   // Note, pm activities need to be at the end, otherwise comments will end up here. We can probably
48   // avoid this problem by replacing createpm.object with our own struct, instead of NoteExt.
49   PersonInboxActivities(PersonInboxActivities),
50 }
51
52 pub async fn shared_inbox(
53   request: HttpRequest,
54   payload: Payload,
55   context: web::Data<LemmyContext>,
56 ) -> Result<HttpResponse, LemmyError> {
57   let unparsed = payload_to_string(payload).await?;
58   info!("Received shared inbox activity {}", unparsed);
59   let activity = serde_json::from_str::<WithContext<SharedInboxActivities>>(&unparsed)?;
60   match activity.inner() {
61     SharedInboxActivities::GroupInboxActivities(g) => {
62       receive_group_inbox(g, request, &context).await
63     }
64     SharedInboxActivities::PersonInboxActivities(p) => {
65       receive_person_inbox(p, request, &context).await
66     }
67   }
68 }
69
70 async fn payload_to_string(mut payload: Payload) -> Result<String, LemmyError> {
71   let mut bytes = BytesMut::new();
72   while let Some(item) = payload.next().await {
73     bytes.extend_from_slice(&item?);
74   }
75   let mut unparsed = String::new();
76   Bytes::from(bytes).as_ref().read_to_string(&mut unparsed)?;
77   Ok(unparsed)
78 }
79
80 // TODO: move most of this code to library
81 async fn receive_activity<'a, T>(
82   request: HttpRequest,
83   activity: T,
84   context: &LemmyContext,
85 ) -> Result<HttpResponse, LemmyError>
86 where
87   T: ActivityHandler<DataType = LemmyContext>
88     + ActivityFields
89     + Clone
90     + Deserialize<'a>
91     + Serialize
92     + std::fmt::Debug
93     + Send
94     + 'static,
95 {
96   let request_counter = &mut 0;
97   let actor =
98     get_or_fetch_and_upsert_actor(activity.actor().clone(), context, request_counter).await?;
99   verify_signature(&request, &actor.public_key().context(location_info!())?)?;
100
101   // Do nothing if we received the same activity before
102   if is_activity_already_known(context.pool(), activity.id_unchecked()).await? {
103     return Ok(HttpResponse::Ok().finish());
104   }
105   check_is_apub_id_valid(activity.actor(), false, &context.settings())?;
106   info!("Verifying activity {}", activity.id_unchecked().to_string());
107   activity
108     .verify(&Data::new(context.clone()), request_counter)
109     .await?;
110   assert_activity_not_local(&activity, &context.settings().hostname)?;
111
112   // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
113   // if we receive the same activity twice in very quick succession.
114   insert_activity(
115     activity.id_unchecked(),
116     activity.clone(),
117     false,
118     true,
119     context.pool(),
120   )
121   .await?;
122
123   info!("Receiving activity {}", activity.id_unchecked().to_string());
124   activity
125     .receive(&Data::new(context.clone()), request_counter)
126     .await?;
127   Ok(HttpResponse::Ok().finish())
128 }
129
130 /// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub
131 /// headers.
132 fn create_apub_response<T>(data: &T) -> HttpResponse<Body>
133 where
134   T: Serialize,
135 {
136   HttpResponse::Ok()
137     .content_type(APUB_JSON_CONTENT_TYPE)
138     .json(WithContext::new(data))
139 }
140
141 fn create_apub_tombstone_response<T>(data: &T) -> HttpResponse<Body>
142 where
143   T: Serialize,
144 {
145   HttpResponse::Gone()
146     .content_type(APUB_JSON_CONTENT_TYPE)
147     .status(StatusCode::GONE)
148     .json(WithContext::new(data))
149 }
150
151 #[derive(Deserialize)]
152 pub struct ActivityQuery {
153   type_: String,
154   id: String,
155 }
156
157 /// Return the ActivityPub json representation of a local activity over HTTP.
158 pub(crate) async fn get_activity(
159   info: web::Path<ActivityQuery>,
160   context: web::Data<LemmyContext>,
161 ) -> Result<HttpResponse<Body>, LemmyError> {
162   let settings = context.settings();
163   let activity_id = Url::parse(&format!(
164     "{}/activities/{}/{}",
165     settings.get_protocol_and_hostname(),
166     info.type_,
167     info.id
168   ))?
169   .into();
170   let activity = blocking(context.pool(), move |conn| {
171     Activity::read_from_apub_id(conn, &activity_id)
172   })
173   .await??;
174
175   let sensitive = activity.sensitive.unwrap_or(true);
176   if !activity.local || sensitive {
177     Ok(HttpResponse::NotFound().finish())
178   } else {
179     Ok(create_apub_response(&activity.data))
180   }
181 }
182
183 pub(crate) async fn is_activity_already_known(
184   pool: &DbPool,
185   activity_id: &Url,
186 ) -> Result<bool, LemmyError> {
187   let activity_id = activity_id.to_owned().into();
188   let existing = blocking(pool, move |conn| {
189     Activity::read_from_apub_id(conn, &activity_id)
190   })
191   .await?;
192   match existing {
193     Ok(_) => Ok(true),
194     Err(_) => Ok(false),
195   }
196 }
197
198 fn assert_activity_not_local<T: Debug + ActivityFields>(
199   activity: &T,
200   hostname: &str,
201 ) -> Result<(), LemmyError> {
202   let activity_domain = activity.id_unchecked().domain().context(location_info!())?;
203
204   if activity_domain == hostname {
205     return Err(
206       anyhow!(
207         "Error: received activity which was sent by local instance: {:?}",
208         activity
209       )
210       .into(),
211     );
212   }
213   Ok(())
214 }