]> Untitled Git - lemmy.git/blob - src/scheduled_tasks.rs
891dca365da2daf8011998f081eee720ae5b7e49
[lemmy.git] / src / scheduled_tasks.rs
1 use clokwerk::{Scheduler, TimeUnits as CTimeUnits};
2 use diesel::{
3   dsl::{now, IntervalDsl},
4   Connection,
5   ExpressionMethods,
6   NullableExpressionMethods,
7   QueryDsl,
8 };
9 // Import week days and WeekDay
10 use diesel::{sql_query, PgConnection, RunQueryDsl};
11 use lemmy_api_common::context::LemmyContext;
12 use lemmy_db_schema::{
13   schema::{
14     activity,
15     comment,
16     comment_aggregates,
17     community_aggregates,
18     community_person_ban,
19     instance,
20     person,
21     post,
22     post_aggregates,
23   },
24   source::instance::{Instance, InstanceForm},
25   utils::{functions::hot_rank, naive_now, DELETED_REPLACEMENT_TEXT},
26 };
27 use lemmy_routes::nodeinfo::NodeInfo;
28 use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
29 use reqwest::blocking::Client;
30 use std::{thread, time::Duration};
31 use tracing::{error, info};
32
33 /// Schedules various cleanup tasks for lemmy in a background thread
34 pub fn setup(
35   db_url: String,
36   user_agent: String,
37   context_1: LemmyContext,
38 ) -> Result<(), LemmyError> {
39   // Setup the connections
40   let mut scheduler = Scheduler::new();
41
42   startup_jobs(&db_url);
43
44   // Update active counts every hour
45   let url = db_url.clone();
46   scheduler.every(CTimeUnits::hour(1)).run(move || {
47     let mut conn = PgConnection::establish(&url).expect("could not establish connection");
48     active_counts(&mut conn);
49     update_banned_when_expired(&mut conn);
50   });
51
52   // Update hot ranks every 5 minutes
53   let url = db_url.clone();
54   scheduler.every(CTimeUnits::minutes(5)).run(move || {
55     let mut conn = PgConnection::establish(&url).expect("could not establish connection");
56     update_hot_ranks(&mut conn, true);
57   });
58
59   // Clear old activities every week
60   let url = db_url.clone();
61   scheduler.every(CTimeUnits::weeks(1)).run(move || {
62     let mut conn = PgConnection::establish(&url).expect("could not establish connection");
63     clear_old_activities(&mut conn);
64   });
65
66   // Remove old rate limit buckets after 1 to 2 hours of inactivity
67   scheduler.every(CTimeUnits::hour(1)).run(move || {
68     let hour = Duration::from_secs(3600);
69     context_1.settings_updated_channel().remove_older_than(hour);
70   });
71
72   // Overwrite deleted & removed posts and comments every day
73   let url = db_url.clone();
74   scheduler.every(CTimeUnits::days(1)).run(move || {
75     let mut conn = PgConnection::establish(&url).expect("could not establish connection");
76     overwrite_deleted_posts_and_comments(&mut conn);
77   });
78
79   // Update the Instance Software
80   scheduler.every(CTimeUnits::days(1)).run(move || {
81     let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
82     update_instance_software(&mut conn, &user_agent);
83   });
84
85   // Manually run the scheduler in an event loop
86   loop {
87     scheduler.run_pending();
88     thread::sleep(Duration::from_millis(1000));
89   }
90 }
91
92 /// Run these on server startup
93 fn startup_jobs(db_url: &str) {
94   let mut conn = PgConnection::establish(db_url).expect("could not establish connection");
95   active_counts(&mut conn);
96   update_hot_ranks(&mut conn, false);
97   update_banned_when_expired(&mut conn);
98   clear_old_activities(&mut conn);
99   overwrite_deleted_posts_and_comments(&mut conn);
100 }
101
102 /// Update the hot_rank columns for the aggregates tables
103 fn update_hot_ranks(conn: &mut PgConnection, last_week_only: bool) {
104   let mut post_update = diesel::update(post_aggregates::table).into_boxed();
105   let mut comment_update = diesel::update(comment_aggregates::table).into_boxed();
106   let mut community_update = diesel::update(community_aggregates::table).into_boxed();
107
108   // Only update for the last week of content
109   if last_week_only {
110     info!("Updating hot ranks for last week...");
111     let last_week = now - diesel::dsl::IntervalDsl::weeks(1);
112
113     post_update = post_update.filter(post_aggregates::published.gt(last_week));
114     comment_update = comment_update.filter(comment_aggregates::published.gt(last_week));
115     community_update = community_update.filter(community_aggregates::published.gt(last_week));
116   } else {
117     info!("Updating hot ranks for all history...");
118   }
119
120   match post_update
121     .set((
122       post_aggregates::hot_rank.eq(hot_rank(post_aggregates::score, post_aggregates::published)),
123       post_aggregates::hot_rank_active.eq(hot_rank(
124         post_aggregates::score,
125         post_aggregates::newest_comment_time_necro,
126       )),
127     ))
128     .execute(conn)
129   {
130     Ok(_) => {}
131     Err(e) => {
132       error!("Failed to update post_aggregates hot_ranks: {}", e)
133     }
134   }
135
136   match comment_update
137     .set(comment_aggregates::hot_rank.eq(hot_rank(
138       comment_aggregates::score,
139       comment_aggregates::published,
140     )))
141     .execute(conn)
142   {
143     Ok(_) => {}
144     Err(e) => {
145       error!("Failed to update comment_aggregates hot_ranks: {}", e)
146     }
147   }
148
149   match community_update
150     .set(community_aggregates::hot_rank.eq(hot_rank(
151       community_aggregates::subscribers,
152       community_aggregates::published,
153     )))
154     .execute(conn)
155   {
156     Ok(_) => {
157       info!("Done.");
158     }
159     Err(e) => {
160       error!("Failed to update community_aggregates hot_ranks: {}", e)
161     }
162   }
163 }
164
165 /// Clear old activities (this table gets very large)
166 fn clear_old_activities(conn: &mut PgConnection) {
167   info!("Clearing old activities...");
168   match diesel::delete(activity::table.filter(activity::published.lt(now - 6.months())))
169     .execute(conn)
170   {
171     Ok(_) => {
172       info!("Done.");
173     }
174     Err(e) => {
175       error!("Failed to clear old activities: {}", e)
176     }
177   }
178 }
179
180 /// overwrite posts and comments 30d after deletion
181 fn overwrite_deleted_posts_and_comments(conn: &mut PgConnection) {
182   info!("Overwriting deleted posts...");
183   match diesel::update(
184     post::table
185       .filter(post::deleted.eq(true))
186       .filter(post::updated.lt(now.nullable() - 1.months()))
187       .filter(post::body.ne(DELETED_REPLACEMENT_TEXT)),
188   )
189   .set((
190     post::body.eq(DELETED_REPLACEMENT_TEXT),
191     post::name.eq(DELETED_REPLACEMENT_TEXT),
192   ))
193   .execute(conn)
194   {
195     Ok(_) => {
196       info!("Done.");
197     }
198     Err(e) => {
199       error!("Failed to overwrite deleted posts: {}", e)
200     }
201   }
202
203   info!("Overwriting deleted comments...");
204   match diesel::update(
205     comment::table
206       .filter(comment::deleted.eq(true))
207       .filter(comment::updated.lt(now.nullable() - 1.months()))
208       .filter(comment::content.ne(DELETED_REPLACEMENT_TEXT)),
209   )
210   .set(comment::content.eq(DELETED_REPLACEMENT_TEXT))
211   .execute(conn)
212   {
213     Ok(_) => {
214       info!("Done.");
215     }
216     Err(e) => {
217       error!("Failed to overwrite deleted comments: {}", e)
218     }
219   }
220 }
221
222 /// Re-calculate the site and community active counts every 12 hours
223 fn active_counts(conn: &mut PgConnection) {
224   info!("Updating active site and community aggregates ...");
225
226   let intervals = vec![
227     ("1 day", "day"),
228     ("1 week", "week"),
229     ("1 month", "month"),
230     ("6 months", "half_year"),
231   ];
232
233   for i in &intervals {
234     let update_site_stmt = format!(
235       "update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}'))",
236       i.1, i.0
237     );
238     match sql_query(update_site_stmt).execute(conn) {
239       Ok(_) => {}
240       Err(e) => {
241         error!("Failed to update site stats: {}", e)
242       }
243     }
244
245     let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
246     match sql_query(update_community_stmt).execute(conn) {
247       Ok(_) => {}
248       Err(e) => {
249         error!("Failed to update community stats: {}", e)
250       }
251     }
252   }
253
254   info!("Done.");
255 }
256
257 /// Set banned to false after ban expires
258 fn update_banned_when_expired(conn: &mut PgConnection) {
259   info!("Updating banned column if it expires ...");
260
261   match diesel::update(
262     person::table
263       .filter(person::banned.eq(true))
264       .filter(person::ban_expires.lt(now)),
265   )
266   .set(person::banned.eq(false))
267   .execute(conn)
268   {
269     Ok(_) => {}
270     Err(e) => {
271       error!("Failed to update person.banned when expires: {}", e)
272     }
273   }
274   match diesel::delete(community_person_ban::table.filter(community_person_ban::expires.lt(now)))
275     .execute(conn)
276   {
277     Ok(_) => {}
278     Err(e) => {
279       error!("Failed to remove community_ban expired rows: {}", e)
280     }
281   }
282 }
283
284 /// Updates the instance software and version
285 fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
286   info!("Updating instances software and versions...");
287
288   let client = match Client::builder()
289     .user_agent(user_agent)
290     .timeout(REQWEST_TIMEOUT)
291     .build()
292   {
293     Ok(client) => client,
294     Err(e) => {
295       error!("Failed to build reqwest client: {}", e);
296       return;
297     }
298   };
299
300   let instances = match instance::table.get_results::<Instance>(conn) {
301     Ok(instances) => instances,
302     Err(e) => {
303       error!("Failed to get instances: {}", e);
304       return;
305     }
306   };
307
308   for instance in instances {
309     let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);
310
311     // Skip it if it can't connect
312     let res = client
313       .get(&node_info_url)
314       .send()
315       .ok()
316       .and_then(|t| t.json::<NodeInfo>().ok());
317
318     if let Some(node_info) = res {
319       let software = node_info.software.as_ref();
320       let form = InstanceForm::builder()
321         .domain(instance.domain)
322         .software(software.and_then(|s| s.name.clone()))
323         .version(software.and_then(|s| s.version.clone()))
324         .updated(Some(naive_now()))
325         .build();
326
327       match diesel::update(instance::table.find(instance.id))
328         .set(form)
329         .execute(conn)
330       {
331         Ok(_) => {
332           info!("Done.");
333         }
334         Err(e) => {
335           error!("Failed to update site instance software: {}", e);
336           return;
337         }
338       }
339     }
340   }
341 }
342
343 #[cfg(test)]
344 mod tests {
345   use lemmy_routes::nodeinfo::NodeInfo;
346   use reqwest::Client;
347
348   #[tokio::test]
349   #[ignore]
350   async fn test_nodeinfo() {
351     let client = Client::builder().build().unwrap();
352     let lemmy_ml_nodeinfo = client
353       .get("https://lemmy.ml/nodeinfo/2.0.json")
354       .send()
355       .await
356       .unwrap()
357       .json::<NodeInfo>()
358       .await
359       .unwrap();
360
361     assert_eq!(lemmy_ml_nodeinfo.software.unwrap().name.unwrap(), "lemmy");
362   }
363 }