X-Git-Url: http://these/git/?a=blobdiff_plain;f=crates%2Fdb_schema%2Fsrc%2Fimpls%2Factivity.rs;h=8c4a0a15d81c74e8052acf179d132fa01b29bf4d;hb=92568956353f21649ed9aff68b42699c9d036f30;hp=5ec5d49db9c1ec8269a9342efc80ea94d7c02624;hpb=1b9414f292f3c48d95bb0cd3bbaf8220607f1f85;p=lemmy.git diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 5ec5d49d..8c4a0a15 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -1,148 +1,114 @@ -use crate::{newtypes::DbUrl, source::activity::*, traits::Crud}; -use diesel::{dsl::*, result::Error, *}; -use serde_json::Value; -use std::io::{Error as IoError, ErrorKind}; +use crate::{ + diesel::OptionalExtension, + newtypes::DbUrl, + source::activity::{ReceivedActivity, SentActivity, SentActivityForm}, + utils::{get_conn, DbPool}, +}; +use diesel::{ + dsl::insert_into, + result::{DatabaseErrorKind, Error, Error::DatabaseError}, + ExpressionMethods, + QueryDsl, +}; +use diesel_async::RunQueryDsl; -impl Crud for Activity { - type Form = ActivityForm; - type IdType = i32; - fn read(conn: &PgConnection, activity_id: i32) -> Result { - use crate::schema::activity::dsl::*; - activity.find(activity_id).first::(conn) - } - - fn create(conn: &PgConnection, new_activity: &ActivityForm) -> Result { - use crate::schema::activity::dsl::*; - insert_into(activity) - .values(new_activity) +impl SentActivity { + pub async fn create(pool: &mut DbPool<'_>, form: SentActivityForm) -> Result { + use crate::schema::sent_activity::dsl::sent_activity; + let conn = &mut get_conn(pool).await?; + insert_into(sent_activity) + .values(form) .get_result::(conn) + .await } - fn update( - conn: &PgConnection, - activity_id: i32, - new_activity: &ActivityForm, - ) -> Result { - use crate::schema::activity::dsl::*; - diesel::update(activity.find(activity_id)) - .set(new_activity) - .get_result::(conn) - } - fn delete(conn: &PgConnection, activity_id: i32) -> Result { - use crate::schema::activity::dsl::*; - diesel::delete(activity.find(activity_id)).execute(conn) + pub async fn read_from_apub_id(pool: &mut DbPool<'_>, object_id: &DbUrl) -> Result { + use crate::schema::sent_activity::dsl::{ap_id, sent_activity}; + let conn = &mut get_conn(pool).await?; + sent_activity + .filter(ap_id.eq(object_id)) + .first::(conn) + .await } } -impl Activity { - pub fn insert( - conn: &PgConnection, - ap_id: DbUrl, - data: Value, - local: bool, - sensitive: bool, - ) -> Result { - let activity_form = ActivityForm { - ap_id, - data, - local: Some(local), - sensitive, - updated: None, - }; - let result = Activity::create(conn, &activity_form); - match result { - Ok(s) => Ok(s), - Err(e) => Err(IoError::new( - ErrorKind::Other, - format!("Failed to insert activity into database: {}", e), - )), +impl ReceivedActivity { + pub async fn create(pool: &mut DbPool<'_>, ap_id_: &DbUrl) -> Result<(), Error> { + use crate::schema::received_activity::dsl::{ap_id, id, received_activity}; + let conn = &mut get_conn(pool).await?; + let res = insert_into(received_activity) + .values(ap_id.eq(ap_id_)) + .on_conflict_do_nothing() + .returning(id) + .get_result::(conn) + .await + .optional()?; + if res.is_some() { + // new activity inserted successfully + Ok(()) + } else { + // duplicate activity + Err(DatabaseError( + DatabaseErrorKind::UniqueViolation, + Box::::default(), + )) } } - - pub fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { - use crate::schema::activity::dsl::*; - activity.filter(ap_id.eq(object_id)).first::(conn) - } - - pub fn delete_olds(conn: &PgConnection) -> Result { - use crate::schema::activity::dsl::*; - diesel::delete(activity.filter(published.lt(now - 6.months()))).execute(conn) - } } #[cfg(test)] mod tests { + #![allow(clippy::unwrap_used)] + #![allow(clippy::indexing_slicing)] + use super::*; - use crate::{ - establish_unpooled_connection, - newtypes::DbUrl, - source::{ - activity::{Activity, ActivityForm}, - person::{Person, PersonForm}, - }, - }; - use serde_json::Value; + use crate::utils::build_db_pool_for_tests; + use serde_json::json; use serial_test::serial; use url::Url; - #[test] + #[tokio::test] #[serial] - fn test_crud() { - let conn = establish_unpooled_connection(); - - let creator_form = PersonForm { - name: "activity_creator_pm".into(), - ..PersonForm::default() - }; + async fn receive_activity_duplicate() { + let pool = &build_db_pool_for_tests().await; + let pool = &mut pool.into(); + let ap_id: DbUrl = Url::parse("http://example.com/activity/531") + .unwrap() + .into(); - let inserted_creator = Person::create(&conn, &creator_form).unwrap(); + // inserting activity for first time + let res = ReceivedActivity::create(pool, &ap_id).await; + assert!(res.is_ok()); - let ap_id: DbUrl = Url::parse( - "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c", - ) - .unwrap() - .into(); - let test_json: Value = serde_json::from_str( - r#"{ - "@context": "https://www.w3.org/ns/activitystreams", - "id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c", - "type": "Delete", - "actor": "https://enterprise.lemmy.ml/u/riker", - "to": "https://www.w3.org/ns/activitystreams#Public", - "cc": [ - "https://enterprise.lemmy.ml/c/main/" - ], - "object": "https://enterprise.lemmy.ml/post/32" - }"#, - ) - .unwrap(); - let activity_form = ActivityForm { - ap_id: ap_id.clone(), - data: test_json.to_owned(), - local: Some(true), - sensitive: false, - updated: None, - }; + let res = ReceivedActivity::create(pool, &ap_id).await; + assert!(res.is_err()); + } - let inserted_activity = Activity::create(&conn, &activity_form).unwrap(); + #[tokio::test] + #[serial] + async fn sent_activity_write_read() { + let pool = &build_db_pool_for_tests().await; + let pool = &mut pool.into(); + let ap_id: DbUrl = Url::parse("http://example.com/activity/412") + .unwrap() + .into(); + let data = json!({ + "key1": "0xF9BA143B95FF6D82", + "key2": "42", + }); + let sensitive = false; - let expected_activity = Activity { - ap_id: Some(ap_id.clone()), - id: inserted_activity.id, - data: test_json, - local: true, - sensitive: Some(false), - published: inserted_activity.published, - updated: None, + let form = SentActivityForm { + ap_id: ap_id.clone(), + data: data.clone(), + sensitive, }; - let read_activity = Activity::read(&conn, inserted_activity.id).unwrap(); - let read_activity_by_apub_id = Activity::read_from_apub_id(&conn, &ap_id).unwrap(); - Person::delete(&conn, inserted_creator.id).unwrap(); - Activity::delete(&conn, inserted_activity.id).unwrap(); + SentActivity::create(pool, form).await.unwrap(); - assert_eq!(expected_activity, read_activity); - assert_eq!(expected_activity, read_activity_by_apub_id); - assert_eq!(expected_activity, inserted_activity); + let res = SentActivity::read_from_apub_id(pool, &ap_id).await.unwrap(); + assert_eq!(res.ap_id, ap_id); + assert_eq!(res.data, data); + assert_eq!(res.sensitive, sensitive); } }