3 check_is_apub_id_valid,
9 APUB_JSON_CONTENT_TYPE,
11 request::{retry, RecvError},
14 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
15 use anyhow::{anyhow, Context};
16 use chrono::NaiveDateTime;
17 use diesel::result::Error::NotFound;
18 use lemmy_api_structs::{blocking, site::SearchResponse};
20 comment::{Comment, CommentForm},
21 comment_view::CommentView,
22 community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
23 community_view::CommunityView,
25 post::{Post, PostForm},
27 user::{UserForm, User_},
33 use lemmy_utils::{apub::get_apub_protocol_string, location_info, LemmyError};
36 use serde::Deserialize;
37 use std::{fmt::Debug, time::Duration};
40 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
41 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
43 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
45 pub async fn fetch_remote_object<Response>(
48 ) -> Result<Response, LemmyError>
50 Response: for<'de> Deserialize<'de>,
52 check_is_apub_id_valid(&url)?;
54 let timeout = Duration::from_secs(60);
62 .header("Accept", APUB_JSON_CONTENT_TYPE)
70 debug!("Receive error, {}", e);
71 RecvError(e.to_string())
77 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
79 #[derive(serde::Deserialize, Debug)]
80 pub enum SearchAcceptedObjects {
81 Person(Box<PersonExt>),
87 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
89 /// Some working examples for use with the docker/federation/ setup:
90 /// http://lemmy_alpha:8540/c/main, or !main@lemmy_alpha:8540
91 /// http://lemmy_alpha:8540/u/lemmy_alpha, or @lemmy_alpha@lemmy_alpha:8540
92 /// http://lemmy_alpha:8540/post/3
93 /// http://lemmy_alpha:8540/comment/2
94 pub async fn search_by_apub_id(
96 context: &LemmyContext,
97 ) -> Result<SearchResponse, LemmyError> {
98 // Parse the shorthand query url
99 let query_url = if query.contains('@') {
101 let split = query.split('@').collect::<Vec<&str>>();
103 // User type will look like ['', username, instance]
104 // Community will look like [!community, instance]
105 let (name, instance) = if split.len() == 3 {
106 (format!("/u/{}", split[1]), split[2])
107 } else if split.len() == 2 {
108 if split[0].contains('!') {
109 let split2 = split[0].split('!').collect::<Vec<&str>>();
110 (format!("/c/{}", split2[1]), split[1])
112 return Err(anyhow!("Invalid search query: {}", query).into());
115 return Err(anyhow!("Invalid search query: {}", query).into());
118 let url = format!("{}://{}{}", get_apub_protocol_string(), instance, name);
124 let mut response = SearchResponse {
125 type_: SearchType::All.to_string(),
132 let domain = query_url.domain().context("url has no domain")?;
134 match fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url).await? {
135 SearchAcceptedObjects::Person(p) => {
136 let user_uri = p.inner.id(domain)?.context("person has no id")?;
138 let user = get_or_fetch_and_upsert_user(&user_uri, context).await?;
140 response.users = vec![
141 blocking(context.pool(), move |conn| {
142 UserView::get_user_secure(conn, user.id)
149 SearchAcceptedObjects::Group(g) => {
150 let community_uri = g.inner.id(domain)?.context("group has no id")?;
152 let community = get_or_fetch_and_upsert_community(community_uri, context).await?;
154 response.communities = vec![
155 blocking(context.pool(), move |conn| {
156 CommunityView::read(conn, community.id, None)
163 SearchAcceptedObjects::Page(p) => {
164 let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?;
166 let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
168 vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
172 SearchAcceptedObjects::Comment(c) => {
173 let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?;
175 let c = blocking(context.pool(), move |conn| {
176 Comment::upsert(conn, &comment_form)
179 response.comments = vec![
180 blocking(context.pool(), move |conn| {
181 CommentView::read(conn, c.id, None)
193 pub async fn get_or_fetch_and_upsert_actor(
195 context: &LemmyContext,
196 ) -> Result<Box<dyn ActorType>, LemmyError> {
197 let user = get_or_fetch_and_upsert_user(apub_id, context).await;
198 let actor: Box<dyn ActorType> = match user {
199 Ok(u) => Box::new(u),
200 Err(_) => Box::new(get_or_fetch_and_upsert_community(apub_id, context).await?),
205 /// Check if a remote user exists, create if not found, if its too old update it.Fetch a user, insert/update it in the database and return the user.
206 pub async fn get_or_fetch_and_upsert_user(
208 context: &LemmyContext,
209 ) -> Result<User_, LemmyError> {
210 let apub_id_owned = apub_id.to_owned();
211 let user = blocking(context.pool(), move |conn| {
212 User_::read_from_actor_id(conn, apub_id_owned.as_ref())
217 // If its older than a day, re-fetch it
218 Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
219 debug!("Fetching and updating from remote user: {}", apub_id);
220 let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
222 let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
223 uf.last_refreshed_at = Some(naive_now());
224 let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
229 Err(NotFound {}) => {
230 debug!("Fetching and creating remote user: {}", apub_id);
231 let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
233 let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
234 let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
238 Err(e) => Err(e.into()),
242 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
243 /// ACTOR_REFETCH_INTERVAL_SECONDS after the last refetch, in debug builds always.
245 /// TODO it won't pick up new avatars, summaries etc until a day after.
246 /// Actors need an "update" activity pushed to other servers to fix this.
247 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
248 let update_interval = if cfg!(debug_assertions) {
249 // avoid infinite loop when fetching community outbox
250 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
252 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
254 last_refreshed.lt(&(naive_now() - update_interval))
257 /// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
258 pub async fn get_or_fetch_and_upsert_community(
260 context: &LemmyContext,
261 ) -> Result<Community, LemmyError> {
262 let apub_id_owned = apub_id.to_owned();
263 let community = blocking(context.pool(), move |conn| {
264 Community::read_from_actor_id(conn, apub_id_owned.as_str())
269 Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
270 debug!("Fetching and updating from remote community: {}", apub_id);
271 fetch_remote_community(apub_id, context, Some(c.id)).await
274 Err(NotFound {}) => {
275 debug!("Fetching and creating remote community: {}", apub_id);
276 fetch_remote_community(apub_id, context, None).await
278 Err(e) => Err(e.into()),
282 async fn fetch_remote_community(
284 context: &LemmyContext,
285 community_id: Option<i32>,
286 ) -> Result<Community, LemmyError> {
287 let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
289 let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
290 let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
292 // Also add the community moderators too
293 let attributed_to = group.inner.attributed_to().context(location_info!())?;
294 let creator_and_moderator_uris: Vec<&Url> = attributed_to
296 .context(location_info!())?
298 .map(|a| a.as_xsd_any_uri().context(""))
299 .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
301 let mut creator_and_moderators = Vec::new();
303 for uri in creator_and_moderator_uris {
304 let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?;
306 creator_and_moderators.push(c_or_m);
309 // TODO: need to make this work to update mods of existing communities
310 if community_id.is_none() {
311 let community_id = community.id;
312 blocking(context.pool(), move |conn| {
313 for mod_ in creator_and_moderators {
314 let community_moderator_form = CommunityModeratorForm {
319 CommunityModerator::join(conn, &community_moderator_form)?;
321 Ok(()) as Result<(), LemmyError>
326 // fetch outbox (maybe make this conditional)
328 fetch_remote_object::<OrderedCollection>(context.client(), &community.get_outbox_url()?)
330 let outbox_items = outbox.items().context(location_info!())?.clone();
331 let mut outbox_items = outbox_items.many().context(location_info!())?;
332 if outbox_items.len() > 20 {
333 outbox_items = outbox_items[0..20].to_vec();
335 for o in outbox_items {
336 let page = PageExt::from_any_base(o)?.context(location_info!())?;
337 let post = PostForm::from_apub(&page, context, None).await?;
338 let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
339 // Check whether the post already exists in the local db
340 let existing = blocking(context.pool(), move |conn| {
341 Post::read_from_apub_id(conn, &post_ap_id)
345 Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
346 Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
348 // TODO: we need to send a websocket update here
354 pub async fn get_or_fetch_and_insert_post(
356 context: &LemmyContext,
357 ) -> Result<Post, LemmyError> {
358 let post_ap_id_owned = post_ap_id.to_owned();
359 let post = blocking(context.pool(), move |conn| {
360 Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
366 Err(NotFound {}) => {
367 debug!("Fetching and creating remote post: {}", post_ap_id);
368 let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
369 let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
371 let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
375 Err(e) => Err(e.into()),
379 pub async fn get_or_fetch_and_insert_comment(
381 context: &LemmyContext,
382 ) -> Result<Comment, LemmyError> {
383 let comment_ap_id_owned = comment_ap_id.to_owned();
384 let comment = blocking(context.pool(), move |conn| {
385 Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
391 Err(NotFound {}) => {
393 "Fetching and creating remote comment and its parents: {}",
396 let comment = fetch_remote_object::<Note>(context.client(), comment_ap_id).await?;
398 CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
400 let comment = blocking(context.pool(), move |conn| {
401 Comment::upsert(conn, &comment_form)
407 Err(e) => Err(e.into()),