2 check_is_apub_id_valid,
8 APUB_JSON_CONTENT_TYPE,
10 use activitystreams::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
11 use anyhow::{anyhow, Context};
12 use chrono::NaiveDateTime;
13 use diesel::result::Error::NotFound;
15 comment::{Comment, CommentForm},
16 comment_view::CommentView,
17 community::{Community, CommunityForm, CommunityModerator, CommunityModeratorForm},
18 community_view::CommunityView,
20 post::{Post, PostForm},
22 user::{UserForm, User_},
28 use lemmy_structs::{blocking, site::SearchResponse};
30 apub::get_apub_protocol_string,
32 request::{retry, RecvError},
35 use lemmy_websocket::LemmyContext;
38 use serde::Deserialize;
39 use std::{fmt::Debug, time::Duration};
42 static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
43 static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
45 /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation,
47 pub async fn fetch_remote_object<Response>(
50 ) -> Result<Response, LemmyError>
52 Response: for<'de> Deserialize<'de>,
54 check_is_apub_id_valid(&url)?;
56 let timeout = Duration::from_secs(60);
64 .header("Accept", APUB_JSON_CONTENT_TYPE)
72 debug!("Receive error, {}", e);
73 RecvError(e.to_string())
79 /// The types of ActivityPub objects that can be fetched directly by searching for their ID.
81 #[derive(serde::Deserialize, Debug)]
82 pub enum SearchAcceptedObjects {
83 Person(Box<PersonExt>),
89 /// Attempt to parse the query as URL, and fetch an ActivityPub object from it.
91 /// Some working examples for use with the docker/federation/ setup:
92 /// http://lemmy_alpha:8540/c/main, or !main@lemmy_alpha:8540
93 /// http://lemmy_alpha:8540/u/lemmy_alpha, or @lemmy_alpha@lemmy_alpha:8540
94 /// http://lemmy_alpha:8540/post/3
95 /// http://lemmy_alpha:8540/comment/2
96 pub async fn search_by_apub_id(
98 context: &LemmyContext,
99 ) -> Result<SearchResponse, LemmyError> {
100 // Parse the shorthand query url
101 let query_url = if query.contains('@') {
103 let split = query.split('@').collect::<Vec<&str>>();
105 // User type will look like ['', username, instance]
106 // Community will look like [!community, instance]
107 let (name, instance) = if split.len() == 3 {
108 (format!("/u/{}", split[1]), split[2])
109 } else if split.len() == 2 {
110 if split[0].contains('!') {
111 let split2 = split[0].split('!').collect::<Vec<&str>>();
112 (format!("/c/{}", split2[1]), split[1])
114 return Err(anyhow!("Invalid search query: {}", query).into());
117 return Err(anyhow!("Invalid search query: {}", query).into());
120 let url = format!("{}://{}{}", get_apub_protocol_string(), instance, name);
126 let mut response = SearchResponse {
127 type_: SearchType::All.to_string(),
134 let domain = query_url.domain().context("url has no domain")?;
136 match fetch_remote_object::<SearchAcceptedObjects>(context.client(), &query_url).await? {
137 SearchAcceptedObjects::Person(p) => {
138 let user_uri = p.inner.id(domain)?.context("person has no id")?;
140 let user = get_or_fetch_and_upsert_user(&user_uri, context).await?;
142 response.users = vec![
143 blocking(context.pool(), move |conn| {
144 UserView::get_user_secure(conn, user.id)
151 SearchAcceptedObjects::Group(g) => {
152 let community_uri = g.inner.id(domain)?.context("group has no id")?;
154 let community = get_or_fetch_and_upsert_community(community_uri, context).await?;
156 response.communities = vec![
157 blocking(context.pool(), move |conn| {
158 CommunityView::read(conn, community.id, None)
165 SearchAcceptedObjects::Page(p) => {
166 let post_form = PostForm::from_apub(&p, context, Some(query_url)).await?;
168 let p = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
170 vec![blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)).await??];
174 SearchAcceptedObjects::Comment(c) => {
175 let comment_form = CommentForm::from_apub(&c, context, Some(query_url)).await?;
177 let c = blocking(context.pool(), move |conn| {
178 Comment::upsert(conn, &comment_form)
181 response.comments = vec![
182 blocking(context.pool(), move |conn| {
183 CommentView::read(conn, c.id, None)
195 pub async fn get_or_fetch_and_upsert_actor(
197 context: &LemmyContext,
198 ) -> Result<Box<dyn ActorType>, LemmyError> {
199 let user = get_or_fetch_and_upsert_user(apub_id, context).await;
200 let actor: Box<dyn ActorType> = match user {
201 Ok(u) => Box::new(u),
202 Err(_) => Box::new(get_or_fetch_and_upsert_community(apub_id, context).await?),
207 /// Check if a remote user exists, create if not found, if its too old update it.Fetch a user, insert/update it in the database and return the user.
208 pub async fn get_or_fetch_and_upsert_user(
210 context: &LemmyContext,
211 ) -> Result<User_, LemmyError> {
212 let apub_id_owned = apub_id.to_owned();
213 let user = blocking(context.pool(), move |conn| {
214 User_::read_from_actor_id(conn, apub_id_owned.as_ref())
219 // If its older than a day, re-fetch it
220 Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => {
221 debug!("Fetching and updating from remote user: {}", apub_id);
222 let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
224 let mut uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
225 uf.last_refreshed_at = Some(naive_now());
226 let user = blocking(context.pool(), move |conn| User_::update(conn, u.id, &uf)).await??;
231 Err(NotFound {}) => {
232 debug!("Fetching and creating remote user: {}", apub_id);
233 let person = fetch_remote_object::<PersonExt>(context.client(), apub_id).await?;
235 let uf = UserForm::from_apub(&person, context, Some(apub_id.to_owned())).await?;
236 let user = blocking(context.pool(), move |conn| User_::upsert(conn, &uf)).await??;
240 Err(e) => Err(e.into()),
244 /// Determines when a remote actor should be refetched from its instance. In release builds, this is
245 /// ACTOR_REFETCH_INTERVAL_SECONDS after the last refetch, in debug builds always.
247 /// TODO it won't pick up new avatars, summaries etc until a day after.
248 /// Actors need an "update" activity pushed to other servers to fix this.
249 fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
250 let update_interval = if cfg!(debug_assertions) {
251 // avoid infinite loop when fetching community outbox
252 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
254 chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
256 last_refreshed.lt(&(naive_now() - update_interval))
259 /// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
260 pub async fn get_or_fetch_and_upsert_community(
262 context: &LemmyContext,
263 ) -> Result<Community, LemmyError> {
264 let apub_id_owned = apub_id.to_owned();
265 let community = blocking(context.pool(), move |conn| {
266 Community::read_from_actor_id(conn, apub_id_owned.as_str())
271 Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
272 debug!("Fetching and updating from remote community: {}", apub_id);
273 fetch_remote_community(apub_id, context, Some(c.id)).await
276 Err(NotFound {}) => {
277 debug!("Fetching and creating remote community: {}", apub_id);
278 fetch_remote_community(apub_id, context, None).await
280 Err(e) => Err(e.into()),
284 async fn fetch_remote_community(
286 context: &LemmyContext,
287 community_id: Option<i32>,
288 ) -> Result<Community, LemmyError> {
289 let group = fetch_remote_object::<GroupExt>(context.client(), apub_id).await?;
291 let cf = CommunityForm::from_apub(&group, context, Some(apub_id.to_owned())).await?;
292 let community = blocking(context.pool(), move |conn| Community::upsert(conn, &cf)).await??;
294 // Also add the community moderators too
295 let attributed_to = group.inner.attributed_to().context(location_info!())?;
296 let creator_and_moderator_uris: Vec<&Url> = attributed_to
298 .context(location_info!())?
300 .map(|a| a.as_xsd_any_uri().context(""))
301 .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
303 let mut creator_and_moderators = Vec::new();
305 for uri in creator_and_moderator_uris {
306 let c_or_m = get_or_fetch_and_upsert_user(uri, context).await?;
308 creator_and_moderators.push(c_or_m);
311 // TODO: need to make this work to update mods of existing communities
312 if community_id.is_none() {
313 let community_id = community.id;
314 blocking(context.pool(), move |conn| {
315 for mod_ in creator_and_moderators {
316 let community_moderator_form = CommunityModeratorForm {
321 CommunityModerator::join(conn, &community_moderator_form)?;
323 Ok(()) as Result<(), LemmyError>
328 // fetch outbox (maybe make this conditional)
330 fetch_remote_object::<OrderedCollection>(context.client(), &community.get_outbox_url()?)
332 let outbox_items = outbox.items().context(location_info!())?.clone();
333 let mut outbox_items = outbox_items.many().context(location_info!())?;
334 if outbox_items.len() > 20 {
335 outbox_items = outbox_items[0..20].to_vec();
337 for o in outbox_items {
338 let page = PageExt::from_any_base(o)?.context(location_info!())?;
339 let post = PostForm::from_apub(&page, context, None).await?;
340 let post_ap_id = post.ap_id.as_ref().context(location_info!())?.clone();
341 // Check whether the post already exists in the local db
342 let existing = blocking(context.pool(), move |conn| {
343 Post::read_from_apub_id(conn, &post_ap_id)
347 Ok(e) => blocking(context.pool(), move |conn| Post::update(conn, e.id, &post)).await??,
348 Err(_) => blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??,
350 // TODO: we need to send a websocket update here
356 pub async fn get_or_fetch_and_insert_post(
358 context: &LemmyContext,
359 ) -> Result<Post, LemmyError> {
360 let post_ap_id_owned = post_ap_id.to_owned();
361 let post = blocking(context.pool(), move |conn| {
362 Post::read_from_apub_id(conn, post_ap_id_owned.as_str())
368 Err(NotFound {}) => {
369 debug!("Fetching and creating remote post: {}", post_ap_id);
370 let post = fetch_remote_object::<PageExt>(context.client(), post_ap_id).await?;
371 let post_form = PostForm::from_apub(&post, context, Some(post_ap_id.to_owned())).await?;
373 let post = blocking(context.pool(), move |conn| Post::upsert(conn, &post_form)).await??;
377 Err(e) => Err(e.into()),
381 pub async fn get_or_fetch_and_insert_comment(
383 context: &LemmyContext,
384 ) -> Result<Comment, LemmyError> {
385 let comment_ap_id_owned = comment_ap_id.to_owned();
386 let comment = blocking(context.pool(), move |conn| {
387 Comment::read_from_apub_id(conn, comment_ap_id_owned.as_str())
393 Err(NotFound {}) => {
395 "Fetching and creating remote comment and its parents: {}",
398 let comment = fetch_remote_object::<Note>(context.client(), comment_ap_id).await?;
400 CommentForm::from_apub(&comment, context, Some(comment_ap_id.to_owned())).await?;
402 let comment = blocking(context.pool(), move |conn| {
403 Comment::upsert(conn, &comment_form)
409 Err(e) => Err(e.into()),