]> Untitled Git - lemmy.git/blob - crates/apub/src/http/mod.rs
Merge crates db_schema and db_queries
[lemmy.git] / crates / apub / src / http / mod.rs
1 use crate::{
2   check_is_apub_id_valid,
3   fetcher::get_or_fetch_and_upsert_actor,
4   http::{
5     community::{receive_group_inbox, GroupInboxActivities},
6     person::{receive_person_inbox, PersonInboxActivities},
7   },
8   insert_activity,
9 };
10 use actix_web::{
11   body::Body,
12   web,
13   web::{Bytes, BytesMut, Payload},
14   HttpRequest,
15   HttpResponse,
16 };
17 use anyhow::{anyhow, Context};
18 use futures::StreamExt;
19 use http::StatusCode;
20 use lemmy_api_common::blocking;
21 use lemmy_apub_lib::{
22   data::Data,
23   signatures::verify_signature,
24   traits::{ActivityFields, ActivityHandler},
25   APUB_JSON_CONTENT_TYPE,
26 };
27 use lemmy_db_schema::{source::activity::Activity, DbPool};
28 use lemmy_utils::{location_info, LemmyError};
29 use lemmy_websocket::LemmyContext;
30 use log::{info, trace};
31 use serde::{Deserialize, Serialize};
32 use std::{fmt::Debug, io::Read};
33 use url::Url;
34
35 mod comment;
36 mod community;
37 mod person;
38 mod post;
39 pub mod routes;
40
41 #[derive(Clone, Debug, Deserialize, Serialize, ActivityHandler, ActivityFields)]
42 #[serde(untagged)]
43 #[activity_handler(LemmyContext)]
44 pub enum SharedInboxActivities {
45   GroupInboxActivities(GroupInboxActivities),
46   // Note, pm activities need to be at the end, otherwise comments will end up here. We can probably
47   // avoid this problem by replacing createpm.object with our own struct, instead of NoteExt.
48   PersonInboxActivities(PersonInboxActivities),
49 }
50
51 pub async fn shared_inbox(
52   request: HttpRequest,
53   payload: Payload,
54   context: web::Data<LemmyContext>,
55 ) -> Result<HttpResponse, LemmyError> {
56   let unparsed = payload_to_string(payload).await?;
57   trace!("Received shared inbox activity {}", unparsed);
58   let activity = serde_json::from_str::<SharedInboxActivities>(&unparsed)?;
59   match activity {
60     SharedInboxActivities::GroupInboxActivities(g) => {
61       receive_group_inbox(g, request, &context).await
62     }
63     SharedInboxActivities::PersonInboxActivities(p) => {
64       receive_person_inbox(p, request, &context).await
65     }
66   }
67 }
68
69 async fn payload_to_string(mut payload: Payload) -> Result<String, LemmyError> {
70   let mut bytes = BytesMut::new();
71   while let Some(item) = payload.next().await {
72     bytes.extend_from_slice(&item?);
73   }
74   let mut unparsed = String::new();
75   Bytes::from(bytes).as_ref().read_to_string(&mut unparsed)?;
76   Ok(unparsed)
77 }
78
79 // TODO: move most of this code to library
80 async fn receive_activity<'a, T>(
81   request: HttpRequest,
82   activity: T,
83   context: &LemmyContext,
84 ) -> Result<HttpResponse, LemmyError>
85 where
86   T: ActivityHandler<DataType = LemmyContext>
87     + ActivityFields
88     + Clone
89     + Deserialize<'a>
90     + Serialize
91     + std::fmt::Debug
92     + Send
93     + 'static,
94 {
95   let request_counter = &mut 0;
96   let actor =
97     get_or_fetch_and_upsert_actor(activity.actor().clone(), context, request_counter).await?;
98   verify_signature(&request, &actor.public_key().context(location_info!())?)?;
99
100   // Do nothing if we received the same activity before
101   if is_activity_already_known(context.pool(), activity.id_unchecked()).await? {
102     return Ok(HttpResponse::Ok().finish());
103   }
104   check_is_apub_id_valid(activity.actor(), false, &context.settings())?;
105   info!("Verifying activity {}", activity.id_unchecked().to_string());
106   activity
107     .verify(&Data::new(context.clone()), request_counter)
108     .await?;
109   assert_activity_not_local(&activity, &context.settings().hostname)?;
110
111   // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
112   // if we receive the same activity twice in very quick succession.
113   insert_activity(
114     activity.id_unchecked(),
115     activity.clone(),
116     false,
117     true,
118     context.pool(),
119   )
120   .await?;
121
122   info!("Receiving activity {}", activity.id_unchecked().to_string());
123   activity
124     .receive(&Data::new(context.clone()), request_counter)
125     .await?;
126   Ok(HttpResponse::Ok().finish())
127 }
128
129 /// Convert the data to json and turn it into an HTTP Response with the correct ActivityPub
130 /// headers.
131 fn create_apub_response<T>(data: &T) -> HttpResponse<Body>
132 where
133   T: Serialize,
134 {
135   HttpResponse::Ok()
136     .content_type(APUB_JSON_CONTENT_TYPE)
137     .json(data)
138 }
139
140 fn create_apub_tombstone_response<T>(data: &T) -> HttpResponse<Body>
141 where
142   T: Serialize,
143 {
144   HttpResponse::Gone()
145     .content_type(APUB_JSON_CONTENT_TYPE)
146     .status(StatusCode::GONE)
147     .json(data)
148 }
149
150 #[derive(Deserialize)]
151 pub struct ActivityQuery {
152   type_: String,
153   id: String,
154 }
155
156 /// Return the ActivityPub json representation of a local activity over HTTP.
157 pub(crate) async fn get_activity(
158   info: web::Path<ActivityQuery>,
159   context: web::Data<LemmyContext>,
160 ) -> Result<HttpResponse<Body>, LemmyError> {
161   let settings = context.settings();
162   let activity_id = Url::parse(&format!(
163     "{}/activities/{}/{}",
164     settings.get_protocol_and_hostname(),
165     info.type_,
166     info.id
167   ))?
168   .into();
169   let activity = blocking(context.pool(), move |conn| {
170     Activity::read_from_apub_id(conn, &activity_id)
171   })
172   .await??;
173
174   let sensitive = activity.sensitive.unwrap_or(true);
175   if !activity.local || sensitive {
176     Ok(HttpResponse::NotFound().finish())
177   } else {
178     Ok(create_apub_response(&activity.data))
179   }
180 }
181
182 pub(crate) async fn is_activity_already_known(
183   pool: &DbPool,
184   activity_id: &Url,
185 ) -> Result<bool, LemmyError> {
186   let activity_id = activity_id.to_owned().into();
187   let existing = blocking(pool, move |conn| {
188     Activity::read_from_apub_id(conn, &activity_id)
189   })
190   .await?;
191   match existing {
192     Ok(_) => Ok(true),
193     Err(_) => Ok(false),
194   }
195 }
196
197 fn assert_activity_not_local<T: Debug + ActivityFields>(
198   activity: &T,
199   hostname: &str,
200 ) -> Result<(), LemmyError> {
201   let activity_domain = activity.id_unchecked().domain().context(location_info!())?;
202
203   if activity_domain == hostname {
204     return Err(
205       anyhow!(
206         "Error: received activity which was sent by local instance: {:?}",
207         activity
208       )
209       .into(),
210     );
211   }
212   Ok(())
213 }