1 use clokwerk::{Scheduler, TimeUnits as CTimeUnits};
3 dsl::{now, IntervalDsl},
8 // Import week days and WeekDay
9 use diesel::{sql_query, PgConnection, RunQueryDsl};
10 use lemmy_api_common::context::LemmyContext;
11 use lemmy_db_schema::{
21 source::instance::{Instance, InstanceForm},
22 utils::{functions::hot_rank, naive_now},
24 use lemmy_routes::nodeinfo::NodeInfo;
25 use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
26 use reqwest::blocking::Client;
27 use std::{thread, time::Duration};
28 use tracing::{error, info};
30 /// Schedules various cleanup tasks for lemmy in a background thread
34 context_1: LemmyContext,
35 ) -> Result<(), LemmyError> {
36 // Setup the connections
37 let mut scheduler = Scheduler::new();
39 startup_jobs(&db_url);
41 // Update active counts every hour
42 let url = db_url.clone();
43 scheduler.every(CTimeUnits::hour(1)).run(move || {
44 let mut conn = PgConnection::establish(&url).expect("could not establish connection");
45 active_counts(&mut conn);
46 update_banned_when_expired(&mut conn);
49 // Update hot ranks every 5 minutes
50 let url = db_url.clone();
51 scheduler.every(CTimeUnits::minutes(5)).run(move || {
52 let mut conn = PgConnection::establish(&url).expect("could not establish connection");
53 update_hot_ranks(&mut conn, true);
56 // Clear old activities every week
57 let url = db_url.clone();
58 scheduler.every(CTimeUnits::weeks(1)).run(move || {
59 let mut conn = PgConnection::establish(&url).expect("could not establish connection");
60 clear_old_activities(&mut conn);
63 // Remove old rate limit buckets after 1 to 2 hours of inactivity
64 scheduler.every(CTimeUnits::hour(1)).run(move || {
65 let hour = Duration::from_secs(3600);
66 context_1.settings_updated_channel().remove_older_than(hour);
69 // Update the Instance Software
70 scheduler.every(CTimeUnits::days(1)).run(move || {
71 let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
72 update_instance_software(&mut conn, &user_agent);
75 // Manually run the scheduler in an event loop
77 scheduler.run_pending();
78 thread::sleep(Duration::from_millis(1000));
82 /// Run these on server startup
83 fn startup_jobs(db_url: &str) {
84 let mut conn = PgConnection::establish(db_url).expect("could not establish connection");
85 active_counts(&mut conn);
86 update_hot_ranks(&mut conn, false);
87 update_banned_when_expired(&mut conn);
88 clear_old_activities(&mut conn);
91 /// Update the hot_rank columns for the aggregates tables
92 fn update_hot_ranks(conn: &mut PgConnection, last_week_only: bool) {
93 let mut post_update = diesel::update(post_aggregates::table).into_boxed();
94 let mut comment_update = diesel::update(comment_aggregates::table).into_boxed();
95 let mut community_update = diesel::update(community_aggregates::table).into_boxed();
97 // Only update for the last week of content
99 info!("Updating hot ranks for last week...");
100 let last_week = now - diesel::dsl::IntervalDsl::weeks(1);
102 post_update = post_update.filter(post_aggregates::published.gt(last_week));
103 comment_update = comment_update.filter(comment_aggregates::published.gt(last_week));
104 community_update = community_update.filter(community_aggregates::published.gt(last_week));
106 info!("Updating hot ranks for all history...");
111 post_aggregates::hot_rank.eq(hot_rank(post_aggregates::score, post_aggregates::published)),
112 post_aggregates::hot_rank_active.eq(hot_rank(
113 post_aggregates::score,
114 post_aggregates::newest_comment_time_necro,
121 error!("Failed to update post_aggregates hot_ranks: {}", e)
126 .set(comment_aggregates::hot_rank.eq(hot_rank(
127 comment_aggregates::score,
128 comment_aggregates::published,
134 error!("Failed to update comment_aggregates hot_ranks: {}", e)
138 match community_update
139 .set(community_aggregates::hot_rank.eq(hot_rank(
140 community_aggregates::subscribers,
141 community_aggregates::published,
149 error!("Failed to update community_aggregates hot_ranks: {}", e)
154 /// Clear old activities (this table gets very large)
155 fn clear_old_activities(conn: &mut PgConnection) {
156 info!("Clearing old activities...");
157 match diesel::delete(activity::table.filter(activity::published.lt(now - 6.months())))
164 error!("Failed to clear old activities: {}", e)
169 /// Re-calculate the site and community active counts every 12 hours
170 fn active_counts(conn: &mut PgConnection) {
171 info!("Updating active site and community aggregates ...");
173 let intervals = vec![
176 ("1 month", "month"),
177 ("6 months", "half_year"),
180 for i in &intervals {
181 let update_site_stmt = format!(
182 "update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}'))",
185 match sql_query(update_site_stmt).execute(conn) {
188 error!("Failed to update site stats: {}", e)
192 let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
193 match sql_query(update_community_stmt).execute(conn) {
196 error!("Failed to update community stats: {}", e)
204 /// Set banned to false after ban expires
205 fn update_banned_when_expired(conn: &mut PgConnection) {
206 info!("Updating banned column if it expires ...");
208 match diesel::update(
210 .filter(person::banned.eq(true))
211 .filter(person::ban_expires.lt(now)),
213 .set(person::banned.eq(false))
218 error!("Failed to update person.banned when expires: {}", e)
221 match diesel::delete(community_person_ban::table.filter(community_person_ban::expires.lt(now)))
226 error!("Failed to remove community_ban expired rows: {}", e)
231 /// Updates the instance software and version
232 fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
233 info!("Updating instances software and versions...");
235 let client = match Client::builder()
236 .user_agent(user_agent)
237 .timeout(REQWEST_TIMEOUT)
240 Ok(client) => client,
242 error!("Failed to build reqwest client: {}", e);
247 let instances = match instance::table.get_results::<Instance>(conn) {
248 Ok(instances) => instances,
250 error!("Failed to get instances: {}", e);
255 for instance in instances {
256 let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);
258 // Skip it if it can't connect
263 .and_then(|t| t.json::<NodeInfo>().ok());
265 if let Some(node_info) = res {
266 let software = node_info.software.as_ref();
267 let form = InstanceForm::builder()
268 .domain(instance.domain)
269 .software(software.and_then(|s| s.name.clone()))
270 .version(software.and_then(|s| s.version.clone()))
271 .updated(Some(naive_now()))
274 match diesel::update(instance::table.find(instance.id))
282 error!("Failed to update site instance software: {}", e);
292 use lemmy_routes::nodeinfo::NodeInfo;
297 async fn test_nodeinfo() {
298 let client = Client::builder().build().unwrap();
299 let lemmy_ml_nodeinfo = client
300 .get("https://lemmy.ml/nodeinfo/2.0.json")
308 assert_eq!(lemmy_ml_nodeinfo.software.unwrap().name.unwrap(), "lemmy");