-use crate::{newtypes::DbUrl, source::activity::*, traits::Crud};
+use crate::{
+ diesel::OptionalExtension,
+ newtypes::DbUrl,
+ source::activity::{ReceivedActivity, SentActivity, SentActivityForm},
+ utils::{get_conn, DbPool},
+};
use diesel::{
- dsl::*,
- result::{DatabaseErrorKind, Error},
- *,
+ dsl::insert_into,
+ result::{DatabaseErrorKind, Error, Error::DatabaseError},
+ ExpressionMethods,
+ QueryDsl,
};
-use serde_json::Value;
-
-impl Crud for Activity {
- type Form = ActivityForm;
- type IdType = i32;
- fn read(conn: &PgConnection, activity_id: i32) -> Result<Self, Error> {
- use crate::schema::activity::dsl::*;
- activity.find(activity_id).first::<Self>(conn)
- }
+use diesel_async::RunQueryDsl;
- fn create(conn: &PgConnection, new_activity: &ActivityForm) -> Result<Self, Error> {
- use crate::schema::activity::dsl::*;
- insert_into(activity)
- .values(new_activity)
+impl SentActivity {
+ pub async fn create(pool: &mut DbPool<'_>, form: SentActivityForm) -> Result<Self, Error> {
+ use crate::schema::sent_activity::dsl::sent_activity;
+ let conn = &mut get_conn(pool).await?;
+ insert_into(sent_activity)
+ .values(form)
.get_result::<Self>(conn)
+ .await
}
- fn update(
- conn: &PgConnection,
- activity_id: i32,
- new_activity: &ActivityForm,
- ) -> Result<Self, Error> {
- use crate::schema::activity::dsl::*;
- diesel::update(activity.find(activity_id))
- .set(new_activity)
- .get_result::<Self>(conn)
- }
- fn delete(conn: &PgConnection, activity_id: i32) -> Result<usize, Error> {
- use crate::schema::activity::dsl::*;
- diesel::delete(activity.find(activity_id)).execute(conn)
+ pub async fn read_from_apub_id(pool: &mut DbPool<'_>, object_id: &DbUrl) -> Result<Self, Error> {
+ use crate::schema::sent_activity::dsl::{ap_id, sent_activity};
+ let conn = &mut get_conn(pool).await?;
+ sent_activity
+ .filter(ap_id.eq(object_id))
+ .first::<Self>(conn)
+ .await
}
}
-impl Activity {
- /// Returns true if the insert was successful
- pub fn insert(
- conn: &PgConnection,
- ap_id: DbUrl,
- data: Value,
- local: bool,
- sensitive: bool,
- ) -> Result<bool, Error> {
- let activity_form = ActivityForm {
- ap_id,
- data,
- local: Some(local),
- sensitive,
- updated: None,
- };
- match Activity::create(conn, &activity_form) {
- Ok(_) => Ok(true),
- Err(e) => {
- if let Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) = e {
- return Ok(false);
- }
- Err(e)
- }
+impl ReceivedActivity {
+ pub async fn create(pool: &mut DbPool<'_>, ap_id_: &DbUrl) -> Result<(), Error> {
+ use crate::schema::received_activity::dsl::{ap_id, id, received_activity};
+ let conn = &mut get_conn(pool).await?;
+ let res = insert_into(received_activity)
+ .values(ap_id.eq(ap_id_))
+ .on_conflict_do_nothing()
+ .returning(id)
+ .get_result::<i64>(conn)
+ .await
+ .optional()?;
+ if res.is_some() {
+ // new activity inserted successfully
+ Ok(())
+ } else {
+ // duplicate activity
+ Err(DatabaseError(
+ DatabaseErrorKind::UniqueViolation,
+ Box::<String>::default(),
+ ))
}
}
-
- pub fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result<Activity, Error> {
- use crate::schema::activity::dsl::*;
- activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
- }
-
- pub fn delete_olds(conn: &PgConnection) -> Result<usize, Error> {
- use crate::schema::activity::dsl::*;
- diesel::delete(activity.filter(published.lt(now - 6.months()))).execute(conn)
- }
}
#[cfg(test)]
mod tests {
+ #![allow(clippy::unwrap_used)]
+ #![allow(clippy::indexing_slicing)]
+
use super::*;
- use crate::{
- newtypes::DbUrl,
- source::{
- activity::{Activity, ActivityForm},
- person::{Person, PersonForm},
- },
- utils::establish_unpooled_connection,
- };
- use serde_json::Value;
+ use crate::utils::build_db_pool_for_tests;
+ use serde_json::json;
use serial_test::serial;
use url::Url;
- #[test]
+ #[tokio::test]
#[serial]
- fn test_crud() {
- let conn = establish_unpooled_connection();
+ async fn receive_activity_duplicate() {
+ let pool = &build_db_pool_for_tests().await;
+ let pool = &mut pool.into();
+ let ap_id: DbUrl = Url::parse("http://example.com/activity/531")
+ .unwrap()
+ .into();
- let creator_form = PersonForm {
- name: "activity_creator_pm".into(),
- public_key: Some("pubkey".to_string()),
- ..PersonForm::default()
- };
+ // inserting activity for first time
+ let res = ReceivedActivity::create(pool, &ap_id).await;
+ assert!(res.is_ok());
- let inserted_creator = Person::create(&conn, &creator_form).unwrap();
-
- let ap_id: DbUrl = Url::parse(
- "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
- )
- .unwrap()
- .into();
- let test_json: Value = serde_json::from_str(
- r#"{
- "@context": "https://www.w3.org/ns/activitystreams",
- "id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
- "type": "Delete",
- "actor": "https://enterprise.lemmy.ml/u/riker",
- "to": "https://www.w3.org/ns/activitystreams#Public",
- "cc": [
- "https://enterprise.lemmy.ml/c/main/"
- ],
- "object": "https://enterprise.lemmy.ml/post/32"
- }"#,
- )
- .unwrap();
- let activity_form = ActivityForm {
- ap_id: ap_id.clone(),
- data: test_json.to_owned(),
- local: Some(true),
- sensitive: false,
- updated: None,
- };
+ let res = ReceivedActivity::create(pool, &ap_id).await;
+ assert!(res.is_err());
+ }
- let inserted_activity = Activity::create(&conn, &activity_form).unwrap();
+ #[tokio::test]
+ #[serial]
+ async fn sent_activity_write_read() {
+ let pool = &build_db_pool_for_tests().await;
+ let pool = &mut pool.into();
+ let ap_id: DbUrl = Url::parse("http://example.com/activity/412")
+ .unwrap()
+ .into();
+ let data = json!({
+ "key1": "0xF9BA143B95FF6D82",
+ "key2": "42",
+ });
+ let sensitive = false;
- let expected_activity = Activity {
+ let form = SentActivityForm {
ap_id: ap_id.clone(),
- id: inserted_activity.id,
- data: test_json,
- local: true,
- sensitive: Some(false),
- published: inserted_activity.published,
- updated: None,
+ data: data.clone(),
+ sensitive,
};
- let read_activity = Activity::read(&conn, inserted_activity.id).unwrap();
- let read_activity_by_apub_id = Activity::read_from_apub_id(&conn, &ap_id).unwrap();
- Person::delete(&conn, inserted_creator.id).unwrap();
- Activity::delete(&conn, inserted_activity.id).unwrap();
+ SentActivity::create(pool, form).await.unwrap();
- assert_eq!(expected_activity, read_activity);
- assert_eq!(expected_activity, read_activity_by_apub_id);
- assert_eq!(expected_activity, inserted_activity);
+ let res = SentActivity::read_from_apub_id(pool, &ap_id).await.unwrap();
+ assert_eq!(res.ap_id, ap_id);
+ assert_eq!(res.data, data);
+ assert_eq!(res.sensitive, sensitive);
}
}