verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::community::ApubCommunity,
protocol::{
activities::community::announce::{AnnounceActivity, RawAnnouncableActivities},
IdOrNestedObject,
InCommunity,
},
- ActorType,
};
-use activitypub_federation::{core::object_id::ObjectId, data::Data, traits::ActivityHandler};
-use activitystreams_kinds::{activity::AnnounceType, public};
+use activitypub_federation::{
+ config::Data,
+ kinds::{activity::AnnounceType, public},
+ traits::{ActivityHandler, Actor},
+};
use lemmy_api_common::context::LemmyContext;
-use lemmy_utils::error::LemmyError;
+use lemmy_utils::error::{LemmyError, LemmyErrorType};
use serde_json::Value;
-use tracing::debug;
use url::Url;
-#[async_trait::async_trait(?Send)]
+#[async_trait::async_trait]
impl ActivityHandler for RawAnnouncableActivities {
type DataType = LemmyContext;
type Error = LemmyError;
}
#[tracing::instrument(skip_all)]
- async fn verify(
- &self,
- _data: &Data<Self::DataType>,
- _request_counter: &mut i32,
- ) -> Result<(), Self::Error> {
+ async fn verify(&self, _data: &Data<Self::DataType>) -> Result<(), Self::Error> {
Ok(())
}
#[tracing::instrument(skip_all)]
- async fn receive(
- self,
- data: &Data<Self::DataType>,
- request_counter: &mut i32,
- ) -> Result<(), Self::Error> {
+ async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
let activity: AnnouncableActivities = self.clone().try_into()?;
// This is only for sending, not receiving so we reject it.
if let AnnouncableActivities::Page(_) = activity {
- return Err(LemmyError::from_message("Cant receive page"));
+ return Err(LemmyErrorType::CannotReceivePage)?;
}
- let community = activity.community(data, &mut 0).await?;
- let actor_id = ObjectId::new(activity.actor().clone());
+ let community = activity.community(data).await?;
+ let actor_id = activity.actor().clone().into();
// verify and receive activity
- activity.verify(data, request_counter).await?;
- activity.receive(data, request_counter).await?;
+ activity.verify(data).await?;
+ activity.receive(data).await?;
// send to community followers
if community.local {
- verify_person_in_community(&actor_id, &community, data, &mut 0).await?;
+ verify_person_in_community(&actor_id, &community, data).await?;
AnnounceActivity::send(self, &community, data).await?;
}
Ok(())
pub(crate) fn new(
object: RawAnnouncableActivities,
community: &ApubCommunity,
- context: &LemmyContext,
+ context: &Data<LemmyContext>,
) -> Result<AnnounceActivity, LemmyError> {
Ok(AnnounceActivity {
- actor: ObjectId::new(community.actor_id()),
+ actor: community.id().into(),
to: vec![public()],
object: IdOrNestedObject::NestedObject(object),
cc: vec![community.followers_url.clone().into()],
pub async fn send(
object: RawAnnouncableActivities,
community: &ApubCommunity,
- context: &LemmyContext,
+ context: &Data<LemmyContext>,
) -> Result<(), LemmyError> {
let announce = AnnounceActivity::new(object.clone(), community, context)?;
let inboxes = community.get_follower_inboxes(context).await?;
// Hack: need to convert Page into a format which can be sent as activity, which requires
// adding actor field.
let announcable_page = RawAnnouncableActivities {
- id: c.object.id.clone().into_inner(),
+ id: generate_activity_id(
+ AnnounceType::Announce,
+ &context.settings().get_protocol_and_hostname(),
+ )?,
actor: c.actor.clone().into_inner(),
other: serde_json::to_value(c.object)?
.as_object()
}
}
-#[async_trait::async_trait(?Send)]
+#[async_trait::async_trait]
impl ActivityHandler for AnnounceActivity {
type DataType = LemmyContext;
type Error = LemmyError;
}
#[tracing::instrument(skip_all)]
- async fn verify(
- &self,
- _context: &Data<LemmyContext>,
- _request_counter: &mut i32,
- ) -> Result<(), LemmyError> {
+ async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
Ok(())
}
#[tracing::instrument(skip_all)]
- async fn receive(
- self,
- context: &Data<LemmyContext>,
- request_counter: &mut i32,
- ) -> Result<(), LemmyError> {
- let object: AnnouncableActivities = self
- .object
- .object(context, request_counter)
- .await?
- .try_into()?;
+ async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ let object: AnnouncableActivities = self.object.object(context).await?.try_into()?;
// This is only for sending, not receiving so we reject it.
if let AnnouncableActivities::Page(_) = object {
- return Err(LemmyError::from_message("Cant receive page"));
+ return Err(LemmyErrorType::CannotReceivePage)?;
}
- // we have to verify this here in order to avoid fetching the object twice over http
- object.verify(context, request_counter).await?;
-
- let object_value = serde_json::to_value(&object)?;
- let insert = insert_activity(object.id(), object_value, false, true, context.pool()).await?;
- if !insert {
- debug!(
- "Received duplicate activity in announce {}",
- object.id().to_string()
- );
- return Ok(());
- }
- object.receive(context, request_counter).await
+ // verify here in order to avoid fetching the object twice over http
+ object.verify(context).await?;
+ object.receive(context).await
}
}