]> Untitled Git - lemmy.git/commitdiff
Merge branch 'main' into no-send-blocked-dess
authorDessalines <tyhou13@gmx.com>
Thu, 1 Oct 2020 20:57:47 +0000 (15:57 -0500)
committerDessalines <tyhou13@gmx.com>
Thu, 1 Oct 2020 20:57:47 +0000 (15:57 -0500)
1  2 
lemmy_apub/src/activity_queue.rs

index 960e126b871122b91a862996c89869c7cbce42a3,ca026332032ce7aaba776bc0bdb36730ca072940..7602fe2f7a858cae2eaf54508d579c80a6503f96
@@@ -1,10 -1,4 +1,10 @@@
 -use crate::{check_is_apub_id_valid, extensions::signatures::sign_and_send, ActorType};
 +use crate::{
 +  check_is_apub_id_valid,
 +  community::do_announce,
-   extensions::signatures::sign,
++  extensions::signatures::sign_and_send,
 +  insert_activity,
 +  ActorType,
 +};
  use activitystreams::{
    base::{Extends, ExtendsExt},
    object::AsObject,
@@@ -19,148 -13,22 +19,148 @@@ use background_jobs::
    QueueHandle,
    WorkerConfig,
  };
 +use itertools::Itertools;
 +use lemmy_db::{community::Community, user::User_, DbPool};
  use lemmy_utils::{location_info, settings::Settings, LemmyError};
 +use lemmy_websocket::LemmyContext;
  use log::warn;
  use reqwest::Client;
  use serde::{Deserialize, Serialize};
  use std::{collections::BTreeMap, future::Future, pin::Pin};
  use url::Url;
  
 -pub fn send_activity<T, Kind>(
 +pub async fn send_activity_single_dest<T, Kind>(
 +  activity: T,
 +  creator: &dyn ActorType,
 +  to: Url,
 +  context: &LemmyContext,
 +) -> Result<(), LemmyError>
 +where
 +  T: AsObject<Kind> + Extends<Kind>,
 +  Kind: Serialize,
 +  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
 +{
 +  if check_is_apub_id_valid(&to).is_ok() {
 +    send_activity_internal(
 +      context.activity_queue(),
 +      activity,
 +      creator,
 +      vec![to],
 +      context.pool(),
 +    )
 +    .await?;
 +  }
 +
 +  Ok(())
 +}
 +
 +pub async fn send_to_community_followers<T, Kind>(
 +  activity: T,
 +  community: &Community,
 +  context: &LemmyContext,
 +  sender_shared_inbox: Option<Url>,
 +) -> Result<(), LemmyError>
 +where
 +  T: AsObject<Kind> + Extends<Kind>,
 +  Kind: Serialize,
 +  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
 +{
 +  // dont send to the local instance, nor to the instance where the activity originally came from,
 +  // because that would result in a database error (same data inserted twice)
 +  let community_shared_inbox = community.get_shared_inbox_url()?;
 +  let to: Vec<Url> = community
 +    .get_follower_inboxes(context.pool())
 +    .await?
 +    .iter()
 +    .filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref())
 +    .filter(|inbox| inbox != &&community_shared_inbox)
 +    .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
 +    .unique()
 +    .map(|inbox| inbox.to_owned())
 +    .collect();
 +
 +  send_activity_internal(
 +    context.activity_queue(),
 +    activity,
 +    community,
 +    to,
 +    context.pool(),
 +  )
 +  .await?;
 +
 +  Ok(())
 +}
 +
 +pub async fn send_to_community<T, Kind>(
 +  creator: &User_,
 +  community: &Community,
 +  activity: T,
 +  context: &LemmyContext,
 +) -> Result<(), LemmyError>
 +where
 +  T: AsObject<Kind> + Extends<Kind>,
 +  Kind: Serialize,
 +  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
 +{
 +  // if this is a local community, we need to do an announce from the community instead
 +  if community.local {
 +    do_announce(activity.into_any_base()?, &community, creator, context).await?;
 +  } else {
 +    let inbox = community.get_shared_inbox_url()?;
 +    check_is_apub_id_valid(&inbox)?;
 +    send_activity_internal(
 +      context.activity_queue(),
 +      activity,
 +      creator,
 +      vec![inbox],
 +      context.pool(),
 +    )
 +    .await?;
 +  }
 +
 +  Ok(())
 +}
 +
 +pub async fn send_comment_mentions<T, Kind>(
 +  creator: &User_,
 +  mentions: Vec<Url>,
 +  activity: T,
 +  context: &LemmyContext,
 +) -> Result<(), LemmyError>
 +where
 +  T: AsObject<Kind> + Extends<Kind>,
 +  Kind: Serialize,
 +  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
 +{
 +  let mentions = mentions
 +    .iter()
 +    .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
 +    .map(|i| i.to_owned())
 +    .collect();
 +  send_activity_internal(
 +    context.activity_queue(),
 +    activity,
 +    creator,
 +    mentions,
 +    context.pool(),
 +  )
 +  .await?;
 +  Ok(())
 +}
 +
 +/// Asynchronously sends the given `activity` from `actor` to every inbox URL in `to`.
 +///
 +/// The caller of this function needs to remove any blocked domains from `to`,
 +/// using `check_is_apub_id_valid()`.
 +async fn send_activity_internal<T, Kind>(
    activity_sender: &QueueHandle,
    activity: T,
    actor: &dyn ActorType,
    to: Vec<Url>,
 +  pool: &DbPool,
  ) -> Result<(), LemmyError>
  where
 -  T: AsObject<Kind>,
 -  T: Extends<Kind>,
 +  T: AsObject<Kind> + Extends<Kind>,
    Kind: Serialize,
    <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
  {
      return Ok(());
    }
  
 -  let activity = activity.into_any_base()?;
 -  let serialised_activity = serde_json::to_string(&activity)?;
 -
    for to_url in &to {
 -    check_is_apub_id_valid(&to_url)?;
 +    assert!(check_is_apub_id_valid(&to_url).is_ok());
    }
  
 +  let activity = activity.into_any_base()?;
 +  let serialised_activity = serde_json::to_string(&activity)?;
 +  insert_activity(actor.user_id(), serialised_activity.clone(), true, pool).await?;
 +
    // TODO: it would make sense to create a separate task for each destination server
    let message = SendActivityTask {
      activity: serialised_activity,
@@@ -210,7 -77,7 +210,7 @@@ impl ActixJob for SendActivityTask 
        for to_url in &self.to {
          let mut headers = BTreeMap::<String, String>::new();
          headers.insert("Content-Type".into(), "application/json".into());
-         let result = sign(
+         let result = sign_and_send(
            &state.client,
            headers,
            to_url,