From c34cc46c2df4c44060d8badb74a8fb578933a1fd Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Sat, 30 May 2020 19:44:50 +0200 Subject: [PATCH] get it working (mostly) --- server/src/apub/comment.rs | 3 +- server/src/apub/community.rs | 50 +++++++++- server/src/apub/community_inbox.rs | 70 +------------ server/src/apub/post.rs | 3 +- server/src/apub/shared_inbox.rs | 154 +++++++++++++++-------------- server/src/db/activity.rs | 11 ++- 6 files changed, 139 insertions(+), 152 deletions(-) diff --git a/server/src/apub/comment.rs b/server/src/apub/comment.rs index 7b0cfed9..992ad26b 100644 --- a/server/src/apub/comment.rs +++ b/server/src/apub/comment.rs @@ -6,7 +6,6 @@ use crate::{ create_tombstone, fetch_webfinger_url, fetcher::get_or_fetch_and_upsert_remote_user, - shared_inbox::do_announce, ActorType, ApubLikeableType, ApubObjectType, @@ -481,7 +480,7 @@ impl Comment { // if this is a local community, we need to do an announce from the community instead if community.local { - do_announce(activity, &community.actor_id, &creator.actor_id, conn)?; + Community::do_announce(activity, &community.actor_id, &creator.actor_id, conn, true)?; } else { send_activity(&activity, creator, vec![community.get_shared_inbox_url()])?; } diff --git a/server/src/apub/community.rs b/server/src/apub/community.rs index 1ba41da4..6ed10f3b 100644 --- a/server/src/apub/community.rs +++ b/server/src/apub/community.rs @@ -23,20 +23,22 @@ use crate::{ routes::DbPoolParam, }; use activitystreams::{ - activity::{Accept, Delete, Follow, Remove, Undo}, + activity::{Accept, Announce, Delete, Follow, Remove, Undo}, actor::{kind::GroupType, properties::ApActorProperties, Group}, collection::UnorderedCollection, context, endpoint::EndpointProperties, object::{properties::ObjectProperties, Tombstone}, + Activity, + Base, BaseBox, }; use activitystreams_ext::Ext3; use actix_web::{body::Body, web::Path, HttpResponse, Result}; use diesel::PgConnection; -use failure::Error; +use failure::{Error, _core::fmt::Debug}; use itertools::Itertools; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; #[derive(Deserialize)] pub struct CommunityQuery { @@ -378,3 +380,45 @@ pub async fn get_apub_community_followers( .set_total_items(community_followers.len() as u64)?; Ok(create_apub_response(&collection)) } + +impl Community { + pub fn do_announce( + activity: A, + // TODO: maybe pass in the community object + community_uri: &str, + sender: &str, + conn: &PgConnection, + is_local_activity: bool, + ) -> Result + where + A: Activity + Base + Serialize + Debug, + { + let community = Community::read_from_actor_id(conn, &community_uri)?; + + insert_activity(&conn, -1, &activity, is_local_activity)?; + + let mut announce = Announce::default(); + populate_object_props( + &mut announce.object_props, + vec![community.get_followers_url()], + &format!("{}/announce/{}", community.actor_id, uuid::Uuid::new_v4()), + )?; + announce + .announce_props + .set_actor_xsd_any_uri(community.actor_id.to_owned())? + .set_object_base_box(BaseBox::from_concrete(activity)?)?; + + insert_activity(&conn, -1, &announce, true)?; + + // dont send to the instance where the activity originally came from, because that would result + // in a database error (same data inserted twice) + let mut to = community.get_follower_inboxes(&conn)?; + let sending_user = get_or_fetch_and_upsert_remote_user(&sender, conn)?; + // this seems to be the "easiest" stable alternative for remove_item() + to.retain(|x| *x != sending_user.get_shared_inbox_url()); + + send_activity(&announce, &community, to)?; + + Ok(HttpResponse::Ok().finish()) + } +} diff --git a/server/src/apub/community_inbox.rs b/server/src/apub/community_inbox.rs index 9b6f5ed0..5220dddd 100644 --- a/server/src/apub/community_inbox.rs +++ b/server/src/apub/community_inbox.rs @@ -1,6 +1,5 @@ use crate::{ apub::{ - activities::{populate_object_props, send_activity}, extensions::signatures::verify, fetcher::{get_or_fetch_and_upsert_remote_community, get_or_fetch_and_upsert_remote_user}, ActorType, @@ -13,26 +12,18 @@ use crate::{ }, routes::{ChatServerParam, DbPoolParam}, }; -use activitystreams::{ - activity::{Activity, Announce, Create, Delete, Follow, Remove, Undo, Update}, - Base, - BaseBox, -}; +use activitystreams::activity::{Follow, Undo}; use actix_web::{web, HttpRequest, HttpResponse, Result}; use diesel::PgConnection; use failure::{Error, _core::fmt::Debug}; use log::debug; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; #[serde(untagged)] #[derive(Deserialize, Debug)] pub enum CommunityAcceptedObjects { Follow(Follow), Undo(Undo), - Create(Create), - Update(Update), - Delete(Delete), - Remove(Remove), } impl CommunityAcceptedObjects { @@ -47,7 +38,6 @@ impl CommunityAcceptedObjects { .to_owned() .into_concrete::()?, ), - _ => todo!(), } } } @@ -58,7 +48,7 @@ pub async fn community_inbox( input: web::Json, path: web::Path, db: DbPoolParam, - chat_server: ChatServerParam, + _chat_server: ChatServerParam, ) -> Result { let input = input.into_inner(); let conn = db.get()?; @@ -94,16 +84,7 @@ pub async fn community_inbox( match input { CommunityAcceptedObjects::Follow(f) => handle_follow(&f, &user, &community, &conn), - CommunityAcceptedObjects::Undo(u) => { - // TODO: if this is an undo or undo, we need to announce it instead - handle_undo_follow(&u, &user, &community, &conn) - } - // TODO: we should be able to handle all this with a single wildcard match, but i dont see how - // to get the value from that - CommunityAcceptedObjects::Create(c) => do_announce(c, &request, &community, &conn, chat_server), - CommunityAcceptedObjects::Update(u) => do_announce(u, &request, &community, &conn, chat_server), - CommunityAcceptedObjects::Delete(d) => do_announce(d, &request, &community, &conn, chat_server), - CommunityAcceptedObjects::Remove(r) => do_announce(r, &request, &community, &conn, chat_server), + CommunityAcceptedObjects::Undo(u) => handle_undo_follow(&u, &user, &community, &conn), } } @@ -147,46 +128,3 @@ fn handle_undo_follow( Ok(HttpResponse::Ok().finish()) } - -fn do_announce( - activity: A, - _request: &HttpRequest, - community: &Community, - conn: &PgConnection, - _chat_server: ChatServerParam, -) -> Result -where - A: Activity + Base + Serialize, -{ - // TODO: checking the signature needs a lot of boilerplate, unless this gets implemented - // https://git.asonix.dog/Aardwolf/activitystreams/issues/4 - /* - let user_uri = activity - .follow_props - .get_actor_xsd_any_uri() - .unwrap() - .to_string(); - let user = get_or_fetch_and_upsert_remote_user(&user_uri, &conn)?; - verify(&request, &user.public_key.unwrap())?; - */ - - insert_activity(&conn, -1, &activity, false)?; - - // TODO: handle the sending in community.rs - let mut announce = Announce::default(); - populate_object_props( - &mut announce.object_props, - vec![community.get_followers_url()], - &format!("{}/announce/{}", community.actor_id, uuid::Uuid::new_v4()), - )?; - announce - .announce_props - .set_actor_xsd_any_uri(community.actor_id.to_owned())? - .set_object_base_box(BaseBox::from_concrete(activity)?)?; - - insert_activity(&conn, -1, &announce, true)?; - - send_activity(&announce, community, community.get_follower_inboxes(&conn)?)?; - - Ok(HttpResponse::Ok().finish()) -} diff --git a/server/src/apub/post.rs b/server/src/apub/post.rs index 3aac524b..a6dcc5ba 100644 --- a/server/src/apub/post.rs +++ b/server/src/apub/post.rs @@ -7,7 +7,6 @@ use crate::{ extensions::page_extension::PageExtension, fetcher::{get_or_fetch_and_upsert_remote_community, get_or_fetch_and_upsert_remote_user}, get_apub_protocol_string, - shared_inbox::do_announce, ActorType, ApubLikeableType, ApubObjectType, @@ -480,7 +479,7 @@ impl Post { // if this is a local community, we need to do an announce from the community instead if community.local { - do_announce(activity, &community.actor_id, &creator.actor_id, conn)?; + Community::do_announce(activity, &community.actor_id, &creator.actor_id, conn, true)?; } else { send_activity(&activity, creator, vec![community.get_shared_inbox_url()])?; } diff --git a/server/src/apub/shared_inbox.rs b/server/src/apub/shared_inbox.rs index 0510cbba..55395f03 100644 --- a/server/src/apub/shared_inbox.rs +++ b/server/src/apub/shared_inbox.rs @@ -5,10 +5,8 @@ use crate::{ post::PostResponse, }, apub::{ - activities::{populate_object_props, send_activity}, extensions::signatures::verify, fetcher::{get_or_fetch_and_upsert_remote_community, get_or_fetch_and_upsert_remote_user}, - ActorType, FromApub, GroupExt, PageExt, @@ -134,58 +132,95 @@ pub async fn shared_inbox( match (activity, object.kind()) { (SharedAcceptedObjects::Create(c), Some("Page")) => { - // TODO: first check that it is addressed to a local community receive_create_post(&c, &conn, chat_server)?; - do_announce(*c, &to, sender, conn) - } + announce_activity_if_valid::(*c, &to, sender, conn) + }, (SharedAcceptedObjects::Update(u), Some("Page")) => { receive_update_post(&u, &conn, chat_server)?; - do_announce(*u, &to, &sender, conn) - } + announce_activity_if_valid::(*u, &to, sender, conn) + }, (SharedAcceptedObjects::Like(l), Some("Page")) => { receive_like_post(&l, &conn, chat_server)?; - do_announce(*l, &to, &sender, conn) - } + announce_activity_if_valid::(*l, &to, sender, conn) + }, (SharedAcceptedObjects::Dislike(d), Some("Page")) => { receive_dislike_post(&d, &conn, chat_server)?; - do_announce(*d, &to, &sender, conn) + announce_activity_if_valid::(*d, &to, sender, conn) } (SharedAcceptedObjects::Delete(d), Some("Page")) => { receive_delete_post(&d, &conn, chat_server)?; - do_announce(*d, &to, &sender, conn) - } + announce_activity_if_valid::(*d, &to, sender, conn) + }, (SharedAcceptedObjects::Remove(r), Some("Page")) => { receive_remove_post(&r, &conn, chat_server)?; - do_announce(*r, &to, &sender, conn) - } + announce_activity_if_valid::(*r, &to, sender, conn) + }, (SharedAcceptedObjects::Create(c), Some("Note")) => { - receive_create_comment(&c, &conn, chat_server) + receive_create_comment(&c, &conn, chat_server)?; + announce_activity_if_valid::(*c, &to, sender, conn) } (SharedAcceptedObjects::Update(u), Some("Note")) => { - receive_update_comment(&u, &conn, chat_server) + receive_update_comment(&u, &conn, chat_server)?; + announce_activity_if_valid::(*u, &to, sender, conn) } - (SharedAcceptedObjects::Like(l), Some("Note")) => receive_like_comment(&l, &conn, chat_server), + (SharedAcceptedObjects::Like(l), Some("Note")) => { + receive_like_comment(&l, &conn, chat_server)?; + announce_activity_if_valid::(*l, &to, sender, conn) + }, (SharedAcceptedObjects::Dislike(d), Some("Note")) => { - receive_dislike_comment(&d, &conn, chat_server) + receive_dislike_comment(&d, &conn, chat_server)?; + announce_activity_if_valid::(*d, &to, sender, conn) } (SharedAcceptedObjects::Delete(d), Some("Note")) => { - receive_delete_comment(&d, &conn, chat_server) + receive_delete_comment(&d, &conn, chat_server)?; + announce_activity_if_valid::(*d, &to, sender, conn) } (SharedAcceptedObjects::Remove(r), Some("Note")) => { - receive_remove_comment(&r, &conn, chat_server) + receive_remove_comment(&r, &conn, chat_server)?; + announce_activity_if_valid::(*r, &to, sender, conn) } (SharedAcceptedObjects::Delete(d), Some("Group")) => { - receive_delete_community(&d, &conn, chat_server) + receive_delete_community(&d, &conn, chat_server)?; + announce_activity_if_valid::(*d, &to, sender, conn) } (SharedAcceptedObjects::Remove(r), Some("Group")) => { - receive_remove_community(&r, &conn, chat_server) + receive_remove_community(&r, &conn, chat_server)?; + announce_activity_if_valid::(*r, &to, sender, conn) } - (SharedAcceptedObjects::Undo(u), Some("Delete")) => receive_undo_delete(&u, &conn, chat_server), - (SharedAcceptedObjects::Undo(u), Some("Remove")) => receive_undo_remove(&u, &conn, chat_server), - (SharedAcceptedObjects::Undo(u), Some("Like")) => receive_undo_like(&u, &conn, chat_server), - (SharedAcceptedObjects::Announce(a), _) => receive_announce(a, &conn, chat_server), - _ => Err(format_err!("Unknown incoming activity type.")), + (SharedAcceptedObjects::Undo(u), Some("Delete")) => { + receive_undo_delete(&u, &conn, chat_server)?; + announce_activity_if_valid::(*u, &to, sender, conn) + }, + (SharedAcceptedObjects::Undo(u), Some("Remove")) => { + receive_undo_remove(&u, &conn, chat_server)?; + announce_activity_if_valid::(*u, &to, sender, conn) + }, + (SharedAcceptedObjects::Undo(u), Some("Like")) => { + receive_undo_like(&u, &conn, chat_server)?; + announce_activity_if_valid::(*u, &to, sender, conn) + }, + (SharedAcceptedObjects::Announce(a), _) => { + receive_announce(a, &conn, chat_server) + }, + (a, _) => receive_unhandled_activity(a), + } +} + +fn announce_activity_if_valid( + activity: A, + community_uri: &str, + sender: &str, + conn: &PgConnection, +) -> Result +where + A: Activity + Base + Serialize + Debug, +{ + // TODO: first check that it is addressed to a local community + let community = Community::read_from_actor_id(conn, &community_uri)?; + if !community.local { + // ignore this object } + Community::do_announce(activity, &community_uri, sender, conn, false) } fn receive_announce( @@ -199,7 +234,6 @@ fn receive_announce( .unwrap() .to_owned(); // TODO: too much copy paste - // TODO: we should log all unhandled events match object.kind() { Some("Create") => { let create = object.into_concrete::()?; @@ -207,7 +241,7 @@ fn receive_announce( match inner_object.kind() { Some("Page") => receive_create_post(&create, &conn, chat_server), Some("Note") => receive_create_comment(&create, &conn, chat_server), - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } Some("Update") => { @@ -216,7 +250,7 @@ fn receive_announce( match inner_object.kind() { Some("Page") => receive_update_post(&update, &conn, chat_server), Some("Note") => receive_update_comment(&update, &conn, chat_server), - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } Some("Like") => { @@ -225,7 +259,7 @@ fn receive_announce( match inner_object.kind() { Some("Page") => receive_like_post(&like, &conn, chat_server), Some("Note") => receive_like_comment(&like, &conn, chat_server), - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } Some("Dislike") => { @@ -234,7 +268,7 @@ fn receive_announce( match inner_object.kind() { Some("Page") => receive_dislike_post(&dislike, &conn, chat_server), Some("Note") => receive_dislike_comment(&dislike, &conn, chat_server), - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } Some("Delete") => { @@ -243,7 +277,7 @@ fn receive_announce( match inner_object.kind() { Some("Page") => receive_delete_post(&delete, &conn, chat_server), Some("Note") => receive_delete_comment(&delete, &conn, chat_server), - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } Some("Remove") => { @@ -252,7 +286,7 @@ fn receive_announce( match inner_object.kind() { Some("Page") => receive_remove_post(&remove, &conn, chat_server), Some("Note") => receive_remove_comment(&remove, &conn, chat_server), - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } Some("Undo") => { @@ -262,13 +296,21 @@ fn receive_announce( Some("Delete") => receive_undo_delete(&undo, &conn, chat_server), Some("Remove") => receive_undo_remove(&undo, &conn, chat_server), Some("Like") => receive_undo_like(&undo, &conn, chat_server), - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } - _ => Ok(HttpResponse::NotImplemented().finish()), + _ => receive_unhandled_activity(announce), } } +fn receive_unhandled_activity(activity: A) -> Result +where + A: Debug, +{ + debug!("received unhandled activity type: {:?}", activity); + Ok(HttpResponse::NotImplemented().finish()) +} + fn receive_create_post( create: &Create, conn: &PgConnection, @@ -1565,43 +1607,3 @@ fn receive_undo_like_post( Ok(HttpResponse::Ok().finish()) } - -// TODO: move to community.rs -pub fn do_announce( - activity: A, - community_uri: &str, - sender: &str, - conn: &PgConnection, -) -> Result -where - A: Activity + Base + Serialize + Debug, -{ - let community = Community::read_from_actor_id(conn, &community_uri)?; - - // TODO: need to add boolean param is_local_activity - //insert_activity(&conn, -1, &activity, false)?; - - let mut announce = Announce::default(); - populate_object_props( - &mut announce.object_props, - vec![community.get_followers_url()], - &format!("{}/announce/{}", community.actor_id, uuid::Uuid::new_v4()), - )?; - announce - .announce_props - .set_actor_xsd_any_uri(community.actor_id.to_owned())? - .set_object_base_box(BaseBox::from_concrete(activity)?)?; - - insert_activity(&conn, community.id, &announce, true)?; - - // dont send to the instance where the activity originally came from, because that would result - // in a database error (same data inserted twice) - let mut to = community.get_follower_inboxes(&conn)?; - let sending_user = get_or_fetch_and_upsert_remote_user(&sender, conn)?; - // this seems to be the "easiest" stable alternative for remove_item() - to.retain(|x| *x != sending_user.get_shared_inbox_url()); - - send_activity(&announce, &community, to)?; - - Ok(HttpResponse::Ok().finish()) -} diff --git a/server/src/db/activity.rs b/server/src/db/activity.rs index bb8a74d4..12137887 100644 --- a/server/src/db/activity.rs +++ b/server/src/db/activity.rs @@ -1,5 +1,7 @@ use crate::{db::Crud, schema::activity}; use diesel::{dsl::*, result::Error, *}; +use failure::_core::fmt::Debug; +use log::debug; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -58,9 +60,9 @@ pub fn insert_activity( user_id: i32, data: &T, local: bool, -) -> Result +) -> Result<(), failure::Error> where - T: Serialize, + T: Serialize + Debug, { let activity_form = ActivityForm { user_id, @@ -68,7 +70,10 @@ where local, updated: None, }; - Ok(Activity::create(&conn, &activity_form)?) + debug!("inserting activity for user {}, data {:?}", user_id, data); + // TODO: this is broken + //Activity::create(&conn, &activity_form)?; + Ok(()) } #[cfg(test)] -- 2.44.1