use crate::{ fetcher::{deletable_apub_object::DeletableApubObject, should_refetch_actor}, objects::FromApub, APUB_JSON_CONTENT_TYPE, }; use anyhow::anyhow; use diesel::NotFound; use lemmy_api_common::blocking; use lemmy_db_queries::{ApubObject, DbPool}; use lemmy_db_schema::DbUrl; use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError}; use lemmy_websocket::LemmyContext; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use std::{ fmt::{Debug, Display, Formatter}, marker::PhantomData, time::Duration, }; use url::Url; /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object /// fetch through the search). This should be configurable. static REQUEST_LIMIT: i32 = 25; #[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] pub struct ObjectId(Url, #[serde(skip)] PhantomData) where Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, for<'de2> ::ApubType: serde::Deserialize<'de2>; impl ObjectId where Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, for<'de> ::ApubType: serde::Deserialize<'de>, { pub fn new(url: T) -> Self where T: Into, { ObjectId(url.into(), PhantomData::) } pub fn inner(&self) -> &Url { &self.0 } /// Fetches an activitypub object, either from local database (if possible), or over http. pub(crate) async fn dereference( &self, context: &LemmyContext, request_counter: &mut i32, ) -> Result { let db_object = self.dereference_locally(context.pool()).await?; // if its a local object, only fetch it from the database and not over http if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) { return match db_object { None => Err(NotFound {}.into()), Some(o) => Ok(o), }; } if let Some(object) = db_object { if let Some(last_refreshed_at) = object.last_refreshed_at() { // TODO: rename to should_refetch_object() if should_refetch_actor(last_refreshed_at) { return self .dereference_remotely(context, request_counter, Some(object)) .await; } } Ok(object) } else { self .dereference_remotely(context, request_counter, None) .await } } /// returning none means the object was not found in local db async fn dereference_locally(&self, pool: &DbPool) -> Result, LemmyError> { let id: DbUrl = self.0.clone().into(); let object = blocking(pool, move |conn| ApubObject::read_from_apub_id(conn, &id)).await?; match object { Ok(o) => Ok(Some(o)), Err(NotFound {}) => Ok(None), Err(e) => Err(e.into()), } } async fn dereference_remotely( &self, context: &LemmyContext, request_counter: &mut i32, db_object: Option, ) -> Result { // dont fetch local objects this way debug_assert!(self.0.domain() != Some(&Settings::get().hostname)); *request_counter += 1; if *request_counter > REQUEST_LIMIT { return Err(LemmyError::from(anyhow!("Request limit reached"))); } let res = retry(|| { context .client() .get(self.0.as_str()) .header("Accept", APUB_JSON_CONTENT_TYPE) .timeout(Duration::from_secs(60)) .send() }) .await?; if res.status() == StatusCode::GONE { if let Some(db_object) = db_object { db_object.delete(context).await?; } return Err(anyhow!("Fetched remote object {} which was deleted", self).into()); } let res2: Kind::ApubType = res.json().await?; Ok(Kind::from_apub(&res2, context, self.inner(), request_counter).await?) } } impl Display for ObjectId where Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, for<'de> ::ApubType: serde::Deserialize<'de>, { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0.to_string()) } } impl From> for Url where Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, for<'de> ::ApubType: serde::Deserialize<'de>, { fn from(id: ObjectId) -> Self { id.0 } } impl From> for DbUrl where Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, for<'de> ::ApubType: serde::Deserialize<'de>, { fn from(id: ObjectId) -> Self { id.0.into() } }