* Adding some recurring lemmy tasks.
- Add active users by day, week, month, and half year to site and
community. Fixes #1195
- Periodically re-index the aggregates tables that use hot_rank.
Fixes #1384
- Clear out old activities (> 6 months). Fixes #1133
* Some cleanup, recalculating actives every hour.
"generic-array 0.14.4",
]
+[[package]]
+name = "clokwerk"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9797a6d3acefa28d4cc62bf548b6ddc57b8ae51a43702d001cb46fba1dc48c1"
+dependencies = [
+ "chrono",
+]
+
[[package]]
name = "color_quant"
version = "1.1.0"
"awc",
"cargo-husky",
"chrono",
+ "clokwerk",
"diesel",
"diesel_migrations",
"env_logger",
activitystreams = "0.7.0-alpha.8"
actix-rt = { version = "1.1.1", default-features = false }
serde_json = { version = "1.0.60", features = ["preserve_order"] }
+clokwerk = "0.3.4"
[dev-dependencies.cargo-husky]
version = "1.5.0"
pub posts: i64,
pub comments: i64,
pub published: chrono::NaiveDateTime,
+ pub users_active_day: i64,
+ pub users_active_week: i64,
+ pub users_active_month: i64,
+ pub users_active_half_year: i64,
}
impl CommunityAggregates {
pub posts: i64,
pub comments: i64,
pub communities: i64,
+ pub users_active_day: i64,
+ pub users_active_week: i64,
+ pub users_active_month: i64,
+ pub users_active_half_year: i64,
}
impl SiteAggregates {
T: Serialize + Debug;
fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Activity, Error>;
+ fn delete_olds(conn: &PgConnection) -> Result<usize, Error>;
/// Returns up to 20 activities of type `Announce/Create/Page` from the community
fn read_community_outbox(
activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
}
+ fn delete_olds(conn: &PgConnection) -> Result<usize, Error> {
+ use lemmy_db_schema::schema::activity::dsl::*;
+ diesel::delete(activity.filter(published.lt(now - 6.months()))).execute(conn)
+ }
+
fn read_community_outbox(
conn: &PgConnection,
community_actor_id: &Url,
posts -> Int8,
comments -> Int8,
published -> Timestamp,
+ users_active_day -> Int8,
+ users_active_week -> Int8,
+ users_active_month -> Int8,
+ users_active_half_year -> Int8,
}
}
posts -> Int8,
comments -> Int8,
communities -> Int8,
+ users_active_day -> Int8,
+ users_active_week -> Int8,
+ users_active_month -> Int8,
+ users_active_half_year -> Int8,
}
}
--- /dev/null
+alter table site_aggregates
+ drop column users_active_day,
+ drop column users_active_week,
+ drop column users_active_month,
+ drop column users_active_half_year;
+
+alter table community_aggregates
+ drop column users_active_day,
+ drop column users_active_week,
+ drop column users_active_month,
+ drop column users_active_half_year;
+
+drop function site_aggregates_activity(i text);
+drop function community_aggregates_activity(i text);
--- /dev/null
+-- Add monthly and half yearly active columns for site and community aggregates
+
+-- These columns don't need to be updated with a trigger, so they're saved daily via queries
+alter table site_aggregates add column users_active_day bigint not null default 0;
+alter table site_aggregates add column users_active_week bigint not null default 0;
+alter table site_aggregates add column users_active_month bigint not null default 0;
+alter table site_aggregates add column users_active_half_year bigint not null default 0;
+
+alter table community_aggregates add column users_active_day bigint not null default 0;
+alter table community_aggregates add column users_active_week bigint not null default 0;
+alter table community_aggregates add column users_active_month bigint not null default 0;
+alter table community_aggregates add column users_active_half_year bigint not null default 0;
+
+create or replace function site_aggregates_activity(i text)
+returns int
+language plpgsql
+as
+$$
+declare
+ count_ integer;
+begin
+ select count(*)
+ into count_
+ from (
+ select c.creator_id from comment c
+ inner join user_ u on c.creator_id = u.id
+ where c.published > ('now'::timestamp - i::interval)
+ and u.local = true
+ union
+ select p.creator_id from post p
+ inner join user_ u on p.creator_id = u.id
+ where p.published > ('now'::timestamp - i::interval)
+ and u.local = true
+ ) a;
+ return count_;
+end;
+$$;
+
+update site_aggregates
+set users_active_day = (select * from site_aggregates_activity('1 day'));
+
+update site_aggregates
+set users_active_week = (select * from site_aggregates_activity('1 week'));
+
+update site_aggregates
+set users_active_month = (select * from site_aggregates_activity('1 month'));
+
+update site_aggregates
+set users_active_half_year = (select * from site_aggregates_activity('6 months'));
+
+create or replace function community_aggregates_activity(i text)
+returns table(count_ bigint, community_id_ integer)
+language plpgsql
+as
+$$
+begin
+ return query
+ select count(*), community_id
+ from (
+ select c.creator_id, p.community_id from comment c
+ inner join post p on c.post_id = p.id
+ where c.published > ('now'::timestamp - i::interval)
+ union
+ select p.creator_id, p.community_id from post p
+ where p.published > ('now'::timestamp - i::interval)
+ ) a
+ group by community_id;
+end;
+$$;
+
+update community_aggregates ca
+set users_active_day = mv.count_
+from community_aggregates_activity('1 day') mv
+where ca.community_id = mv.community_id_;
+
+update community_aggregates ca
+set users_active_week = mv.count_
+from community_aggregates_activity('1 week') mv
+where ca.community_id = mv.community_id_;
+
+update community_aggregates ca
+set users_active_month = mv.count_
+from community_aggregates_activity('1 month') mv
+where ca.community_id = mv.community_id_;
+
+update community_aggregates ca
+set users_active_half_year = mv.count_
+from community_aggregates_activity('6 months') mv
+where ca.community_id = mv.community_id_;
extern crate lazy_static;
pub mod code_migrations;
pub mod routes;
+pub mod scheduled_tasks;
use lemmy_api::match_websocket_operation;
use lemmy_apub::activity_queue::create_activity_queue;
use lemmy_db_queries::get_database_url_from_env;
-use lemmy_server::{code_migrations::run_advanced_migrations, routes::*};
+use lemmy_server::{code_migrations::run_advanced_migrations, routes::*, scheduled_tasks};
use lemmy_structs::blocking;
use lemmy_utils::{
rate_limit::{rate_limiter::RateLimiter, RateLimit},
};
use lemmy_websocket::{chat_server::ChatServer, LemmyContext};
use reqwest::Client;
-use std::sync::Arc;
+use std::{sync::Arc, thread};
use tokio::sync::Mutex;
embed_migrations!();
})
.await??;
+ let pool2 = pool.clone();
+ thread::spawn(move || {
+ scheduled_tasks::setup(pool2);
+ });
+
// Set up the rate limiter
let rate_limiter = RateLimit {
rate_limiter: Arc::new(Mutex::new(RateLimiter::default())),
usage: NodeInfoUsage {
users: NodeInfoUsers {
total: site_view.counts.users,
+ active_half_year: site_view.counts.users_active_half_year,
+ active_month: site_view.counts.users_active_month,
},
local_posts: site_view.counts.posts,
local_comments: site_view.counts.comments,
#[derive(Serialize, Deserialize, Debug)]
struct NodeInfoUsers {
pub total: i64,
+ pub active_half_year: i64,
+ pub active_month: i64,
}
--- /dev/null
+// Scheduler, and trait for .seconds(), .minutes(), etc.
+use clokwerk::{Scheduler, TimeUnits};
+// Import week days and WeekDay
+use diesel::{sql_query, PgConnection, RunQueryDsl};
+use lemmy_db_queries::{source::activity::Activity_, DbPool};
+use lemmy_db_schema::source::activity::Activity;
+use log::info;
+use std::{thread, time::Duration};
+
+/// Schedules various cleanup tasks for lemmy in a background thread
+pub fn setup(pool: DbPool) {
+ let mut scheduler = Scheduler::new();
+
+ let conn = pool.get().unwrap();
+ active_counts(&conn);
+ reindex_aggregates_tables(&conn);
+ scheduler.every(1.hour()).run(move || {
+ active_counts(&conn);
+ reindex_aggregates_tables(&conn);
+ });
+
+ let conn = pool.get().unwrap();
+ clear_old_activities(&conn);
+ scheduler.every(1.weeks()).run(move || {
+ clear_old_activities(&conn);
+ });
+
+ // Manually run the scheduler in an event loop
+ loop {
+ scheduler.run_pending();
+ thread::sleep(Duration::from_millis(1000));
+ }
+}
+
+/// Reindex the aggregates tables every one hour
+/// This is necessary because hot_rank is actually a mutable function:
+/// https://dba.stackexchange.com/questions/284052/how-to-create-an-index-based-on-a-time-based-function-in-postgres?noredirect=1#comment555727_284052
+fn reindex_aggregates_tables(conn: &PgConnection) {
+ for table_name in &[
+ "post_aggregates",
+ "comment_aggregates",
+ "community_aggregates",
+ ] {
+ reindex_table(&conn, &table_name);
+ }
+}
+
+fn reindex_table(conn: &PgConnection, table_name: &str) {
+ info!("Reindexing table {} ...", table_name);
+ let query = format!("reindex table concurrently {}", table_name);
+ sql_query(query).execute(conn).unwrap();
+ info!("Done.");
+}
+
+/// Clear old activities (this table gets very large)
+fn clear_old_activities(conn: &PgConnection) {
+ info!("Clearing old activities...");
+ Activity::delete_olds(&conn).unwrap();
+ info!("Done.");
+}
+
+/// Re-calculate the site and community active counts every 12 hours
+fn active_counts(conn: &PgConnection) {
+ info!("Updating active site and community aggregates ...");
+
+ let intervals = vec![
+ ("1 day", "day"),
+ ("1 week", "week"),
+ ("1 month", "month"),
+ ("6 months", "half_year"),
+ ];
+
+ for i in &intervals {
+ let update_site_stmt = format!(
+ "update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}'))",
+ i.1, i.0
+ );
+ sql_query(update_site_stmt).execute(conn).unwrap();
+
+ let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
+ sql_query(update_community_stmt).execute(conn).unwrap();
+ }
+
+ info!("Done.");
+}