use crate::{
- activities::{generate_activity_id, send_lemmy_activity, verify_activity, verify_is_public},
+ activities::{
+ generate_activity_id,
+ send_lemmy_activity,
+ verify_is_public,
+ verify_person_in_community,
+ },
activity_lists::AnnouncableActivities,
- fetcher::object_id::ObjectId,
- http::is_activity_already_known,
- insert_activity,
+ insert_received_activity,
objects::community::ApubCommunity,
- protocol::activities::community::announce::AnnounceActivity,
+ protocol::{
+ activities::community::announce::{AnnounceActivity, RawAnnouncableActivities},
+ Id,
+ IdOrNestedObject,
+ InCommunity,
+ },
};
-use activitystreams::{activity::kind::AnnounceType, public};
-use lemmy_apub_lib::{
- data::Data,
- traits::{ActivityFields, ActivityHandler, ActorType},
+use activitypub_federation::{
+ config::Data,
+ kinds::{activity::AnnounceType, public},
+ traits::{ActivityHandler, Actor},
};
-use lemmy_utils::LemmyError;
-use lemmy_websocket::LemmyContext;
+use lemmy_api_common::context::LemmyContext;
+use lemmy_utils::error::{LemmyError, LemmyErrorType};
+use serde_json::Value;
use url::Url;
-#[async_trait::async_trait(?Send)]
-pub(crate) trait GetCommunity {
- async fn get_community(
- &self,
- context: &LemmyContext,
- request_counter: &mut i32,
- ) -> Result<ApubCommunity, LemmyError>;
+#[async_trait::async_trait]
+impl ActivityHandler for RawAnnouncableActivities {
+ type DataType = LemmyContext;
+ type Error = LemmyError;
+
+ fn id(&self) -> &Url {
+ &self.id
+ }
+
+ fn actor(&self) -> &Url {
+ &self.actor
+ }
+
+ #[tracing::instrument(skip_all)]
+ async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
+ Ok(())
+ }
+
+ #[tracing::instrument(skip_all)]
+ async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
+ let activity: AnnouncableActivities = self.clone().try_into()?;
+ // This is only for sending, not receiving so we reject it.
+ if let AnnouncableActivities::Page(_) = activity {
+ return Err(LemmyErrorType::CannotReceivePage)?;
+ }
+
+ // verify and receive activity
+ activity.verify(data).await?;
+ activity.clone().receive(data).await?;
+
+ // if activity is in a community, send to followers
+ let community = activity.community(data).await;
+ if let Ok(community) = community {
+ if community.local {
+ let actor_id = activity.actor().clone().into();
+ verify_person_in_community(&actor_id, &community, data).await?;
+ AnnounceActivity::send(self, &community, data).await?;
+ }
+ }
+ Ok(())
+ }
}
impl AnnounceActivity {
- pub async fn send(
- object: AnnouncableActivities,
+ pub(crate) fn new(
+ object: RawAnnouncableActivities,
community: &ApubCommunity,
- additional_inboxes: Vec<Url>,
- context: &LemmyContext,
- ) -> Result<(), LemmyError> {
- let announce = AnnounceActivity {
- actor: ObjectId::new(community.actor_id()),
+ context: &Data<LemmyContext>,
+ ) -> Result<AnnounceActivity, LemmyError> {
+ Ok(AnnounceActivity {
+ actor: community.id().into(),
to: vec![public()],
- object,
- cc: vec![community.followers_url.clone().into_inner()],
+ object: IdOrNestedObject::NestedObject(object),
+ cc: vec![community.followers_url.clone().into()],
kind: AnnounceType::Announce,
id: generate_activity_id(
&AnnounceType::Announce,
&context.settings().get_protocol_and_hostname(),
)?,
- unparsed: Default::default(),
- };
- let inboxes = community
- .get_follower_inboxes(additional_inboxes, context)
- .await?;
- send_lemmy_activity(context, &announce, &announce.id, community, inboxes, false).await
+ })
+ }
+
+ #[tracing::instrument(skip_all)]
+ pub async fn send(
+ object: RawAnnouncableActivities,
+ community: &ApubCommunity,
+ context: &Data<LemmyContext>,
+ ) -> Result<(), LemmyError> {
+ let announce = AnnounceActivity::new(object.clone(), community, context)?;
+ let inboxes = community.get_follower_inboxes(context).await?;
+ send_lemmy_activity(context, announce, community, inboxes.clone(), false).await?;
+
+ // Pleroma and Mastodon can't handle activities like Announce/Create/Page. So for
+ // compatibility, we also send Announce/Page so that they can follow Lemmy communities.
+ let object_parsed = object.try_into()?;
+ if let AnnouncableActivities::CreateOrUpdatePost(c) = object_parsed {
+ // Hack: need to convert Page into a format which can be sent as activity, which requires
+ // adding actor field.
+ let announcable_page = RawAnnouncableActivities {
+ id: generate_activity_id(
+ AnnounceType::Announce,
+ &context.settings().get_protocol_and_hostname(),
+ )?,
+ actor: c.actor.clone().into_inner(),
+ other: serde_json::to_value(c.object)?
+ .as_object()
+ .expect("is object")
+ .clone(),
+ };
+ let announce_compat = AnnounceActivity::new(announcable_page, community, context)?;
+ send_lemmy_activity(context, announce_compat, community, inboxes, false).await?;
+ }
+ Ok(())
}
}
-#[async_trait::async_trait(?Send)]
+#[async_trait::async_trait]
impl ActivityHandler for AnnounceActivity {
type DataType = LemmyContext;
- async fn verify(
- &self,
- context: &Data<LemmyContext>,
- request_counter: &mut i32,
- ) -> Result<(), LemmyError> {
- verify_is_public(&self.to)?;
- verify_activity(self, &context.settings())?;
- self.object.verify(context, request_counter).await?;
+ type Error = LemmyError;
+
+ fn id(&self) -> &Url {
+ &self.id
+ }
+
+ fn actor(&self) -> &Url {
+ self.actor.inner()
+ }
+
+ #[tracing::instrument(skip_all)]
+ async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
+ verify_is_public(&self.to, &self.cc)?;
Ok(())
}
- async fn receive(
- self,
- context: &Data<LemmyContext>,
- request_counter: &mut i32,
- ) -> Result<(), LemmyError> {
- if is_activity_already_known(context.pool(), self.object.id_unchecked()).await? {
- return Ok(());
+ #[tracing::instrument(skip_all)]
+ async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ let object: AnnouncableActivities = self.object.object(context).await?.try_into()?;
+ // This is only for sending, not receiving so we reject it.
+ if let AnnouncableActivities::Page(_) = object {
+ return Err(LemmyErrorType::CannotReceivePage)?;
}
- insert_activity(
- self.object.id_unchecked(),
- self.object.clone(),
- false,
- true,
- context.pool(),
- )
- .await?;
- self.object.receive(context, request_counter).await
+
+ // verify here in order to avoid fetching the object twice over http
+ object.verify(context).await?;
+ object.receive(context).await
+ }
+}
+
+impl Id for RawAnnouncableActivities {
+ fn object_id(&self) -> &Url {
+ ActivityHandler::id(self)
+ }
+}
+
+impl TryFrom<RawAnnouncableActivities> for AnnouncableActivities {
+ type Error = serde_json::error::Error;
+
+ fn try_from(value: RawAnnouncableActivities) -> Result<Self, Self::Error> {
+ let mut map = value.other.clone();
+ map.insert("id".to_string(), Value::String(value.id.to_string()));
+ map.insert("actor".to_string(), Value::String(value.actor.to_string()));
+ serde_json::from_value(Value::Object(map))
+ }
+}
+
+impl TryFrom<AnnouncableActivities> for RawAnnouncableActivities {
+ type Error = serde_json::error::Error;
+
+ fn try_from(value: AnnouncableActivities) -> Result<Self, Self::Error> {
+ serde_json::from_value(serde_json::to_value(value)?)
}
}