2 check_is_apub_id_valid,
4 fetch::fetch_remote_object,
5 get_or_fetch_and_upsert_user,
14 use activitystreams::{
15 base::{BaseExt, ExtendsExt},
16 collection::{CollectionExt, OrderedCollection},
20 use diesel::result::Error::NotFound;
21 use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable};
22 use lemmy_db_schema::source::{
23 community::{Community, CommunityModerator, CommunityModeratorForm},
26 use lemmy_structs::blocking;
27 use lemmy_utils::{location_info, LemmyError};
28 use lemmy_websocket::LemmyContext;
32 /// Get a community from its apub ID.
34 /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database.
35 /// Otherwise it is fetched from the remote instance, stored and returned.
36 pub(crate) async fn get_or_fetch_and_upsert_community(
38 context: &LemmyContext,
39 recursion_counter: &mut i32,
40 ) -> Result<Community, LemmyError> {
41 let apub_id_owned = apub_id.to_owned();
42 let community = blocking(context.pool(), move |conn| {
43 Community::read_from_apub_id(conn, &apub_id_owned.into())
48 Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
49 debug!("Fetching and updating from remote community: {}", apub_id);
50 fetch_remote_community(apub_id, context, Some(c), recursion_counter).await
54 debug!("Fetching and creating remote community: {}", apub_id);
55 fetch_remote_community(apub_id, context, None, recursion_counter).await
57 Err(e) => Err(e.into()),
61 /// Request a community by apub ID from a remote instance, including moderators. If `old_community`,
62 /// is set, this is an update for a community which is already known locally. If not, we don't know
63 /// the community yet and also pull the outbox, to get some initial posts.
64 async fn fetch_remote_community(
66 context: &LemmyContext,
67 old_community: Option<Community>,
68 recursion_counter: &mut i32,
69 ) -> Result<Community, LemmyError> {
70 let group = fetch_remote_object::<GroupExt>(context.client(), apub_id, recursion_counter).await;
72 if let Some(c) = old_community.to_owned() {
73 if is_deleted(&group) {
74 blocking(context.pool(), move |conn| {
75 Community::update_deleted(conn, c.id, true)
78 } else if group.is_err() {
79 // If fetching failed, return the existing data.
86 Community::from_apub(&group, context, apub_id.to_owned(), recursion_counter).await?;
88 // Also add the community moderators too
89 let attributed_to = group.inner.attributed_to().context(location_info!())?;
90 let creator_and_moderator_uris: Vec<&Url> = attributed_to
92 .context(location_info!())?
94 .map(|a| a.as_xsd_any_uri().context(""))
95 .collect::<Result<Vec<&Url>, anyhow::Error>>()?;
97 let mut creator_and_moderators = Vec::new();
99 for uri in creator_and_moderator_uris {
100 let c_or_m = get_or_fetch_and_upsert_user(uri, context, recursion_counter).await?;
102 creator_and_moderators.push(c_or_m);
105 // TODO: need to make this work to update mods of existing communities
106 if old_community.is_none() {
107 let community_id = community.id;
108 blocking(context.pool(), move |conn| {
109 for mod_ in creator_and_moderators {
110 let community_moderator_form = CommunityModeratorForm {
115 CommunityModerator::join(conn, &community_moderator_form)?;
117 Ok(()) as Result<(), LemmyError>
122 // fetch outbox (maybe make this conditional)
123 let outbox = fetch_remote_object::<OrderedCollection>(
125 &community.get_outbox_url()?,
129 let outbox_items = outbox.items().context(location_info!())?.clone();
130 let mut outbox_items = outbox_items.many().context(location_info!())?;
131 if outbox_items.len() > 20 {
132 outbox_items = outbox_items[0..20].to_vec();
134 for o in outbox_items {
135 let page = PageExt::from_any_base(o)?.context(location_info!())?;
136 let page_id = page.id_unchecked().context(location_info!())?;
138 // The post creator may be from a blocked instance, if it errors, then skip it
139 if check_is_apub_id_valid(page_id).is_err() {
142 Post::from_apub(&page, context, page_id.to_owned(), recursion_counter).await?;
143 // TODO: we need to send a websocket update here