]> Untitled Git - lemmy.git/blob - crates/apub/src/fetcher/object_id.rs
Rewrite fetcher (#1792)
[lemmy.git] / crates / apub / src / fetcher / object_id.rs
1 use crate::{
2   fetcher::{deletable_apub_object::DeletableApubObject, should_refetch_actor},
3   objects::FromApub,
4   APUB_JSON_CONTENT_TYPE,
5 };
6 use anyhow::anyhow;
7 use diesel::NotFound;
8 use lemmy_api_common::blocking;
9 use lemmy_db_queries::{ApubObject, DbPool};
10 use lemmy_db_schema::DbUrl;
11 use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError};
12 use lemmy_websocket::LemmyContext;
13 use reqwest::StatusCode;
14 use serde::{Deserialize, Serialize};
15 use std::{
16   fmt::{Debug, Display, Formatter},
17   marker::PhantomData,
18   time::Duration,
19 };
20 use url::Url;
21
22 /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object
23 /// fetch through the search). This should be configurable.
24 static REQUEST_LIMIT: i32 = 25;
25
26 #[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
27 pub struct ObjectId<Kind>(Url, #[serde(skip)] PhantomData<Kind>)
28 where
29   Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
30   for<'de2> <Kind as FromApub>::ApubType: serde::Deserialize<'de2>;
31
32 impl<Kind> ObjectId<Kind>
33 where
34   Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
35   for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
36 {
37   pub fn new<T>(url: T) -> Self
38   where
39     T: Into<Url>,
40   {
41     ObjectId(url.into(), PhantomData::<Kind>)
42   }
43
44   pub fn inner(&self) -> &Url {
45     &self.0
46   }
47
48   /// Fetches an activitypub object, either from local database (if possible), or over http.
49   pub(crate) async fn dereference(
50     &self,
51     context: &LemmyContext,
52     request_counter: &mut i32,
53   ) -> Result<Kind, LemmyError> {
54     let db_object = self.dereference_locally(context.pool()).await?;
55
56     // if its a local object, only fetch it from the database and not over http
57     if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) {
58       return match db_object {
59         None => Err(NotFound {}.into()),
60         Some(o) => Ok(o),
61       };
62     }
63
64     if let Some(object) = db_object {
65       if let Some(last_refreshed_at) = object.last_refreshed_at() {
66         // TODO: rename to should_refetch_object()
67         if should_refetch_actor(last_refreshed_at) {
68           return self
69             .dereference_remotely(context, request_counter, Some(object))
70             .await;
71         }
72       }
73       Ok(object)
74     } else {
75       self
76         .dereference_remotely(context, request_counter, None)
77         .await
78     }
79   }
80
81   /// returning none means the object was not found in local db
82   async fn dereference_locally(&self, pool: &DbPool) -> Result<Option<Kind>, LemmyError> {
83     let id: DbUrl = self.0.clone().into();
84     let object = blocking(pool, move |conn| ApubObject::read_from_apub_id(conn, &id)).await?;
85     match object {
86       Ok(o) => Ok(Some(o)),
87       Err(NotFound {}) => Ok(None),
88       Err(e) => Err(e.into()),
89     }
90   }
91
92   async fn dereference_remotely(
93     &self,
94     context: &LemmyContext,
95     request_counter: &mut i32,
96     db_object: Option<Kind>,
97   ) -> Result<Kind, LemmyError> {
98     // dont fetch local objects this way
99     debug_assert!(self.0.domain() != Some(&Settings::get().hostname));
100
101     *request_counter += 1;
102     if *request_counter > REQUEST_LIMIT {
103       return Err(LemmyError::from(anyhow!("Request limit reached")));
104     }
105
106     let res = retry(|| {
107       context
108         .client()
109         .get(self.0.as_str())
110         .header("Accept", APUB_JSON_CONTENT_TYPE)
111         .timeout(Duration::from_secs(60))
112         .send()
113     })
114     .await?;
115
116     if res.status() == StatusCode::GONE {
117       if let Some(db_object) = db_object {
118         db_object.delete(context).await?;
119       }
120       return Err(anyhow!("Fetched remote object {} which was deleted", self).into());
121     }
122
123     let res2: Kind::ApubType = res.json().await?;
124
125     Ok(Kind::from_apub(&res2, context, self.inner(), request_counter).await?)
126   }
127 }
128
129 impl<Kind> Display for ObjectId<Kind>
130 where
131   Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
132   for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
133 {
134   fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
135     write!(f, "{}", self.0.to_string())
136   }
137 }
138
139 impl<Kind> From<ObjectId<Kind>> for Url
140 where
141   Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
142   for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
143 {
144   fn from(id: ObjectId<Kind>) -> Self {
145     id.0
146   }
147 }
148
149 impl<Kind> From<ObjectId<Kind>> for DbUrl
150 where
151   Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static,
152   for<'de> <Kind as FromApub>::ApubType: serde::Deserialize<'de>,
153 {
154   fn from(id: ObjectId<Kind>) -> Self {
155     id.0.into()
156   }
157 }