]> Untitled Git - lemmy.git/commitdiff
Adding some recurring lemmy tasks. (#1386)
authorDessalines <dessalines@users.noreply.github.com>
Fri, 29 Jan 2021 16:38:27 +0000 (11:38 -0500)
committerGitHub <noreply@github.com>
Fri, 29 Jan 2021 16:38:27 +0000 (11:38 -0500)
* Adding some recurring lemmy tasks.

- Add active users by day, week, month, and half year to site and
  community. Fixes #1195
- Periodically re-index the aggregates tables that use hot_rank.
  Fixes #1384
- Clear out old activities (> 6 months). Fixes #1133

* Some cleanup, recalculating actives every hour.

12 files changed:
Cargo.lock
Cargo.toml
crates/db_queries/src/aggregates/community_aggregates.rs
crates/db_queries/src/aggregates/site_aggregates.rs
crates/db_queries/src/source/activity.rs
crates/db_schema/src/schema.rs
migrations/2021-01-27-202728_active_users_monthly/down.sql [new file with mode: 0644]
migrations/2021-01-27-202728_active_users_monthly/up.sql [new file with mode: 0644]
src/lib.rs
src/main.rs
src/routes/nodeinfo.rs
src/scheduled_tasks.rs [new file with mode: 0644]

index f15cf8e88a806c39ada7eef3bffe0ab886f85456..618ba83a73f0439095fa52906fe35fb7400a70bc 100644 (file)
@@ -759,6 +759,15 @@ dependencies = [
  "generic-array 0.14.4",
 ]
 
+[[package]]
+name = "clokwerk"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9797a6d3acefa28d4cc62bf548b6ddc57b8ae51a43702d001cb46fba1dc48c1"
+dependencies = [
+ "chrono",
+]
+
 [[package]]
 name = "color_quant"
 version = "1.1.0"
@@ -1877,6 +1886,7 @@ dependencies = [
  "awc",
  "cargo-husky",
  "chrono",
+ "clokwerk",
  "diesel",
  "diesel_migrations",
  "env_logger",
index 1488ed95234d61c3c1114ba10ff226c76421830b..bd565c7a1364774093ef0642b1cb69b619efbbf6 100644 (file)
@@ -55,6 +55,7 @@ reqwest = { version = "0.10.10", features = ["json"] }
 activitystreams = "0.7.0-alpha.8"
 actix-rt = { version = "1.1.1", default-features = false }
 serde_json = { version = "1.0.60", features = ["preserve_order"] }
+clokwerk = "0.3.4"
 
 [dev-dependencies.cargo-husky]
 version = "1.5.0"
index 3fb891c11ee6e1b764e3130495faf17824db8ce0..0f15453ad5d63412f63b5632e44d4b79b28bd739 100644 (file)
@@ -11,6 +11,10 @@ pub struct CommunityAggregates {
   pub posts: i64,
   pub comments: i64,
   pub published: chrono::NaiveDateTime,
+  pub users_active_day: i64,
+  pub users_active_week: i64,
+  pub users_active_month: i64,
+  pub users_active_half_year: i64,
 }
 
 impl CommunityAggregates {
index b12e2b60af45ed7ef6ef74821e9c993f741b916c..ce9f2f7615e2b60183b4a0001f3b6886044d6300 100644 (file)
@@ -11,6 +11,10 @@ pub struct SiteAggregates {
   pub posts: i64,
   pub comments: i64,
   pub communities: i64,
+  pub users_active_day: i64,
+  pub users_active_week: i64,
+  pub users_active_month: i64,
+  pub users_active_half_year: i64,
 }
 
 impl SiteAggregates {
index 964e50424e221cd46b9f174d6c7eaf403fc426cd..d47bc256ff0f82c6a831c8d739553a6cdd4fe3f6 100644 (file)
@@ -50,6 +50,7 @@ pub trait Activity_ {
     T: Serialize + Debug;
 
   fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
+  fn delete_olds(conn: &PgConnection) -> Result<usize, Error>;
 
   /// Returns up to 20 activities of type `Announce/Create/Page` from the community
   fn read_community_outbox(
@@ -92,6 +93,11 @@ impl Activity_ for Activity {
     activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
   }
 
+  fn delete_olds(conn: &PgConnection) -> Result<usize, Error> {
+    use lemmy_db_schema::schema::activity::dsl::*;
+    diesel::delete(activity.filter(published.lt(now - 6.months()))).execute(conn)
+  }
+
   fn read_community_outbox(
     conn: &PgConnection,
     community_actor_id: &Url,
index bbc2e7b80ea9471fcef71b01b58f0b9127e2335c..9ff73eccbca23572de9e8e3f76dff239035dd1ef 100644 (file)
@@ -110,6 +110,10 @@ table! {
         posts -> Int8,
         comments -> Int8,
         published -> Timestamp,
+        users_active_day -> Int8,
+        users_active_week -> Int8,
+        users_active_month -> Int8,
+        users_active_half_year -> Int8,
     }
 }
 
@@ -371,6 +375,10 @@ table! {
         posts -> Int8,
         comments -> Int8,
         communities -> Int8,
+        users_active_day -> Int8,
+        users_active_week -> Int8,
+        users_active_month -> Int8,
+        users_active_half_year -> Int8,
     }
 }
 
diff --git a/migrations/2021-01-27-202728_active_users_monthly/down.sql b/migrations/2021-01-27-202728_active_users_monthly/down.sql
new file mode 100644 (file)
index 0000000..bc3d46a
--- /dev/null
@@ -0,0 +1,14 @@
+alter table site_aggregates 
+  drop column users_active_day,
+  drop column users_active_week,
+  drop column users_active_month,
+  drop column users_active_half_year;
+
+alter table community_aggregates 
+  drop column users_active_day,
+  drop column users_active_week,
+  drop column users_active_month,
+  drop column users_active_half_year;
+
+drop function site_aggregates_activity(i text);
+drop function community_aggregates_activity(i text);
diff --git a/migrations/2021-01-27-202728_active_users_monthly/up.sql b/migrations/2021-01-27-202728_active_users_monthly/up.sql
new file mode 100644 (file)
index 0000000..9248ae8
--- /dev/null
@@ -0,0 +1,89 @@
+-- Add monthly and half yearly active columns for site and community aggregates
+
+-- These columns don't need to be updated with a trigger, so they're saved daily via queries
+alter table site_aggregates add column users_active_day bigint not null default 0;
+alter table site_aggregates add column users_active_week bigint not null default 0;
+alter table site_aggregates add column users_active_month bigint not null default 0;
+alter table site_aggregates add column users_active_half_year bigint not null default 0;
+
+alter table community_aggregates add column users_active_day bigint not null default 0;
+alter table community_aggregates add column users_active_week bigint not null default 0;
+alter table community_aggregates add column users_active_month bigint not null default 0;
+alter table community_aggregates add column users_active_half_year bigint not null default 0;
+
+create or replace function site_aggregates_activity(i text)
+returns int
+language plpgsql
+as
+$$
+declare
+   count_ integer;
+begin
+  select count(*) 
+  into count_
+  from (
+    select c.creator_id from comment c
+    inner join user_ u on c.creator_id = u.id
+    where c.published > ('now'::timestamp - i::interval) 
+    and u.local = true
+    union
+    select p.creator_id from post p
+    inner join user_ u on p.creator_id = u.id
+    where p.published > ('now'::timestamp - i::interval)
+    and u.local = true
+  ) a;
+  return count_;
+end;
+$$;
+
+update site_aggregates 
+set users_active_day = (select * from site_aggregates_activity('1 day'));
+
+update site_aggregates 
+set users_active_week = (select * from site_aggregates_activity('1 week'));
+
+update site_aggregates 
+set users_active_month = (select * from site_aggregates_activity('1 month'));
+
+update site_aggregates 
+set users_active_half_year = (select * from site_aggregates_activity('6 months'));
+
+create or replace function community_aggregates_activity(i text)
+returns table(count_ bigint, community_id_ integer)
+language plpgsql
+as
+$$
+begin
+  return query 
+  select count(*), community_id
+  from (
+    select c.creator_id, p.community_id from comment c
+    inner join post p on c.post_id = p.id
+    where c.published > ('now'::timestamp - i::interval)
+    union
+    select p.creator_id, p.community_id from post p
+    where p.published > ('now'::timestamp - i::interval)  
+  ) a
+  group by community_id;
+end;
+$$;
+
+update community_aggregates ca
+set users_active_day = mv.count_
+from community_aggregates_activity('1 day') mv
+where ca.community_id = mv.community_id_;
+
+update community_aggregates ca
+set users_active_week = mv.count_
+from community_aggregates_activity('1 week') mv
+where ca.community_id = mv.community_id_;
+
+update community_aggregates ca
+set users_active_month = mv.count_
+from community_aggregates_activity('1 month') mv
+where ca.community_id = mv.community_id_;
+
+update community_aggregates ca
+set users_active_half_year = mv.count_
+from community_aggregates_activity('6 months') mv
+where ca.community_id = mv.community_id_;
index e6ea900ee7627335efc119dd635606463a06275d..e867a26b320d81180b946549c1203be6d2ad3936 100644 (file)
@@ -3,3 +3,4 @@
 extern crate lazy_static;
 pub mod code_migrations;
 pub mod routes;
+pub mod scheduled_tasks;
index fad3680bba12bacb1adc6a6ac30afcd0179da067..f00f724a69444283a4f19abfe3f98ba32ec960b7 100644 (file)
@@ -10,7 +10,7 @@ use diesel::{
 use lemmy_api::match_websocket_operation;
 use lemmy_apub::activity_queue::create_activity_queue;
 use lemmy_db_queries::get_database_url_from_env;
-use lemmy_server::{code_migrations::run_advanced_migrations, routes::*};
+use lemmy_server::{code_migrations::run_advanced_migrations, routes::*, scheduled_tasks};
 use lemmy_structs::blocking;
 use lemmy_utils::{
   rate_limit::{rate_limiter::RateLimiter, RateLimit},
@@ -19,7 +19,7 @@ use lemmy_utils::{
 };
 use lemmy_websocket::{chat_server::ChatServer, LemmyContext};
 use reqwest::Client;
-use std::sync::Arc;
+use std::{sync::Arc, thread};
 use tokio::sync::Mutex;
 
 embed_migrations!();
@@ -48,6 +48,11 @@ async fn main() -> Result<(), LemmyError> {
   })
   .await??;
 
+  let pool2 = pool.clone();
+  thread::spawn(move || {
+    scheduled_tasks::setup(pool2);
+  });
+
   // Set up the rate limiter
   let rate_limiter = RateLimit {
     rate_limiter: Arc::new(Mutex::new(RateLimiter::default())),
index df0064ecc6f50dc6cb0881b283b334c1f38c38ae..e2f038fd2ed82f80d87d232aa21f14f7e05e6e05 100644 (file)
@@ -48,6 +48,8 @@ async fn node_info(context: web::Data<LemmyContext>) -> Result<HttpResponse, Err
     usage: NodeInfoUsage {
       users: NodeInfoUsers {
         total: site_view.counts.users,
+        active_half_year: site_view.counts.users_active_half_year,
+        active_month: site_view.counts.users_active_month,
       },
       local_posts: site_view.counts.posts,
       local_comments: site_view.counts.comments,
@@ -96,4 +98,6 @@ struct NodeInfoUsage {
 #[derive(Serialize, Deserialize, Debug)]
 struct NodeInfoUsers {
   pub total: i64,
+  pub active_half_year: i64,
+  pub active_month: i64,
 }
diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs
new file mode 100644 (file)
index 0000000..53dfd6b
--- /dev/null
@@ -0,0 +1,85 @@
+// Scheduler, and trait for .seconds(), .minutes(), etc.
+use clokwerk::{Scheduler, TimeUnits};
+// Import week days and WeekDay
+use diesel::{sql_query, PgConnection, RunQueryDsl};
+use lemmy_db_queries::{source::activity::Activity_, DbPool};
+use lemmy_db_schema::source::activity::Activity;
+use log::info;
+use std::{thread, time::Duration};
+
+/// Schedules various cleanup tasks for lemmy in a background thread
+pub fn setup(pool: DbPool) {
+  let mut scheduler = Scheduler::new();
+
+  let conn = pool.get().unwrap();
+  active_counts(&conn);
+  reindex_aggregates_tables(&conn);
+  scheduler.every(1.hour()).run(move || {
+    active_counts(&conn);
+    reindex_aggregates_tables(&conn);
+  });
+
+  let conn = pool.get().unwrap();
+  clear_old_activities(&conn);
+  scheduler.every(1.weeks()).run(move || {
+    clear_old_activities(&conn);
+  });
+
+  // Manually run the scheduler in an event loop
+  loop {
+    scheduler.run_pending();
+    thread::sleep(Duration::from_millis(1000));
+  }
+}
+
+/// Reindex the aggregates tables every one hour
+/// This is necessary because hot_rank is actually a mutable function:
+/// https://dba.stackexchange.com/questions/284052/how-to-create-an-index-based-on-a-time-based-function-in-postgres?noredirect=1#comment555727_284052
+fn reindex_aggregates_tables(conn: &PgConnection) {
+  for table_name in &[
+    "post_aggregates",
+    "comment_aggregates",
+    "community_aggregates",
+  ] {
+    reindex_table(&conn, &table_name);
+  }
+}
+
+fn reindex_table(conn: &PgConnection, table_name: &str) {
+  info!("Reindexing table {} ...", table_name);
+  let query = format!("reindex table concurrently {}", table_name);
+  sql_query(query).execute(conn).unwrap();
+  info!("Done.");
+}
+
+/// Clear old activities (this table gets very large)
+fn clear_old_activities(conn: &PgConnection) {
+  info!("Clearing old activities...");
+  Activity::delete_olds(&conn).unwrap();
+  info!("Done.");
+}
+
+/// Re-calculate the site and community active counts every 12 hours
+fn active_counts(conn: &PgConnection) {
+  info!("Updating active site and community aggregates ...");
+
+  let intervals = vec![
+    ("1 day", "day"),
+    ("1 week", "week"),
+    ("1 month", "month"),
+    ("6 months", "half_year"),
+  ];
+
+  for i in &intervals {
+    let update_site_stmt = format!(
+      "update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}'))",
+      i.1, i.0
+    );
+    sql_query(update_site_stmt).execute(conn).unwrap();
+
+    let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
+    sql_query(update_community_stmt).execute(conn).unwrap();
+  }
+
+  info!("Done.");
+}