},
actor::{kind::GroupType, ApActor, Endpoints, Group},
base::{AnyBase, BaseExt},
- collection::UnorderedCollection,
+ collection::{OrderedCollection, UnorderedCollection},
context,
object::Tombstone,
prelude::*,
community::{Community, CommunityForm},
community_view::{CommunityFollowerView, CommunityModeratorView},
naive_now,
+ post::Post,
user::User_,
};
use lemmy_utils::convert_datetime;
group.set_content(d);
}
- let mut ap_actor = ApActor::new(self.get_inbox_url().parse()?, group);
+ let mut ap_actor = ApActor::new(self.get_inbox_url()?, group);
ap_actor
.set_preferred_username(self.title.to_owned())
- .set_outbox(self.get_outbox_url().parse()?)
+ .set_outbox(self.get_outbox_url()?)
.set_followers(self.get_followers_url().parse()?)
.set_following(self.get_following_url().parse()?)
.set_liked(self.get_liked_url().parse()?)
Ok(create_apub_response(&collection))
}
+pub async fn get_apub_community_outbox(
+ info: web::Path<CommunityQuery>,
+ db: DbPoolParam,
+) -> Result<HttpResponse<Body>, LemmyError> {
+ let community = blocking(&db, move |conn| {
+ Community::read_from_name(&conn, &info.community_name)
+ })
+ .await??;
+
+ let community_id = community.id;
+ let posts = blocking(&db, move |conn| {
+ Post::list_for_community(conn, community_id)
+ })
+ .await??;
+
+ let mut pages: Vec<AnyBase> = vec![];
+ for p in posts {
+ pages.push(p.to_apub(&db).await?.into_any_base()?);
+ }
+
+ let len = pages.len();
+ let mut collection = OrderedCollection::new(pages);
+ collection
+ .set_context(context())
+ .set_id(community.get_outbox_url()?)
+ .set_total_items(len as u64);
+ Ok(create_apub_response(&collection))
+}
+
pub async fn do_announce(
activity: AnyBase,
community: &Community,
DbPool,
LemmyError,
};
-use activitystreams_new::{base::BaseExt, object::Note, prelude::*};
+use activitystreams_new::{base::BaseExt, collection::OrderedCollection, object::Note, prelude::*};
use actix_web::client::Client;
use chrono::NaiveDateTime;
use diesel::{result::Error::NotFound, PgConnection};
use url::Url;
static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60;
+static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10;
// Fetch nodeinfo metadata from a remote instance.
async fn _fetch_node_info(client: &Client, domain: &str) -> Result<NodeInfo, LemmyError> {
/// TODO it won't pick up new avatars, summaries etc until a day after.
/// Actors need an "update" activity pushed to other servers to fix this.
fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool {
- if cfg!(debug_assertions) {
- true
+ let update_interval = if cfg!(debug_assertions) {
+ // avoid infinite loop when fetching community outbox
+ chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG)
} else {
- let update_interval = chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS);
- last_refreshed.lt(&(naive_now() - update_interval))
- }
+ chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS)
+ };
+ last_refreshed.lt(&(naive_now() - update_interval))
}
/// Check if a remote community exists, create if not found, if its too old update it.Fetch a community, insert/update it in the database and return the community.
match community {
Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => {
debug!("Fetching and updating from remote community: {}", apub_id);
- let group = fetch_remote_object::<GroupExt>(client, apub_id).await?;
-
- let mut cf = CommunityForm::from_apub(&group, client, pool).await?;
- cf.last_refreshed_at = Some(naive_now());
- let community = blocking(pool, move |conn| Community::update(conn, c.id, &cf)).await??;
-
- Ok(community)
+ fetch_remote_community(apub_id, client, pool, Some(c.id)).await
}
Ok(c) => Ok(c),
Err(NotFound {}) => {
debug!("Fetching and creating remote community: {}", apub_id);
- let group = fetch_remote_object::<GroupExt>(client, apub_id).await?;
+ fetch_remote_community(apub_id, client, pool, None).await
+ }
+ Err(e) => Err(e.into()),
+ }
+}
+
+async fn fetch_remote_community(
+ apub_id: &Url,
+ client: &Client,
+ pool: &DbPool,
+ community_id: Option<i32>,
+) -> Result<Community, LemmyError> {
+ let group = fetch_remote_object::<GroupExt>(client, apub_id).await?;
+
+ let cf = CommunityForm::from_apub(&group, client, pool).await?;
+ let community = blocking(pool, move |conn| {
+ if let Some(cid) = community_id {
+ Community::update(conn, cid, &cf)
+ } else {
+ Community::create(conn, &cf)
+ }
+ })
+ .await??;
- let cf = CommunityForm::from_apub(&group, client, pool).await?;
- let community = blocking(pool, move |conn| Community::create(conn, &cf)).await??;
+ // Also add the community moderators too
+ let attributed_to = group.inner.attributed_to().unwrap();
+ let creator_and_moderator_uris: Vec<&Url> = attributed_to
+ .as_many()
+ .unwrap()
+ .iter()
+ .map(|a| a.as_xsd_any_uri().unwrap())
+ .collect();
- // Also add the community moderators too
- let attributed_to = group.inner.attributed_to().unwrap();
- let creator_and_moderator_uris: Vec<&Url> = attributed_to
- .as_many()
- .unwrap()
- .iter()
- .map(|a| a.as_xsd_any_uri().unwrap())
- .collect();
+ let mut creator_and_moderators = Vec::new();
- let mut creator_and_moderators = Vec::new();
+ for uri in creator_and_moderator_uris {
+ let c_or_m = get_or_fetch_and_upsert_user(uri, client, pool).await?;
- for uri in creator_and_moderator_uris {
- let c_or_m = get_or_fetch_and_upsert_user(uri, client, pool).await?;
+ creator_and_moderators.push(c_or_m);
+ }
- creator_and_moderators.push(c_or_m);
+ // TODO: need to make this work to update mods of existing communities
+ if community_id.is_none() {
+ let community_id = community.id;
+ blocking(pool, move |conn| {
+ for mod_ in creator_and_moderators {
+ let community_moderator_form = CommunityModeratorForm {
+ community_id,
+ user_id: mod_.id,
+ };
+
+ CommunityModerator::join(conn, &community_moderator_form)?;
}
+ Ok(()) as Result<(), LemmyError>
+ })
+ .await??;
+ }
- let community_id = community.id;
- blocking(pool, move |conn| {
- for mod_ in creator_and_moderators {
- let community_moderator_form = CommunityModeratorForm {
- community_id,
- user_id: mod_.id,
- };
-
- CommunityModerator::join(conn, &community_moderator_form)?;
- }
- Ok(()) as Result<(), LemmyError>
- })
- .await??;
-
- Ok(community)
- }
- Err(e) => Err(e.into()),
+ // fetch outbox (maybe make this conditional)
+ let outbox =
+ fetch_remote_object::<OrderedCollection>(client, &community.get_outbox_url()?).await?;
+ let outbox_items = outbox.items().clone();
+ for o in outbox_items.many().unwrap() {
+ let page = PageExt::from_any_base(o)?.unwrap();
+ let post = PostForm::from_apub(&page, client, pool).await?;
+ let post_ap_id = post.ap_id.clone();
+ // Check whether the post already exists in the local db
+ let existing = blocking(pool, move |conn| Post::read_from_apub_id(conn, &post_ap_id)).await?;
+ match existing {
+ Ok(e) => blocking(pool, move |conn| Post::update(conn, e.id, &post)).await??,
+ Err(_) => blocking(pool, move |conn| Post::create(conn, &post)).await??,
+ };
}
+
+ Ok(community)
}
fn upsert_post(post_form: &PostForm, conn: &PgConnection) -> Result<Post, LemmyError> {