]> Untitled Git - lemmy.git/blob - src/scheduled_tasks.rs
Move connection creation into scheduler. (#3120)
[lemmy.git] / src / scheduled_tasks.rs
1 use clokwerk::{Scheduler, TimeUnits as CTimeUnits};
2 use diesel::{
3   dsl::{now, IntervalDsl},
4   Connection,
5   ExpressionMethods,
6   QueryDsl,
7 };
8 // Import week days and WeekDay
9 use diesel::{sql_query, PgConnection, RunQueryDsl};
10 use lemmy_db_schema::{
11   schema::{
12     activity,
13     comment_aggregates,
14     community_aggregates,
15     community_person_ban,
16     instance,
17     person,
18     post_aggregates,
19   },
20   source::instance::{Instance, InstanceForm},
21   utils::{functions::hot_rank, naive_now},
22 };
23 use lemmy_routes::nodeinfo::NodeInfo;
24 use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
25 use reqwest::blocking::Client;
26 use std::{thread, time::Duration};
27 use tracing::{error, info};
28
29 /// Schedules various cleanup tasks for lemmy in a background thread
30 pub fn setup(db_url: String, user_agent: String) -> Result<(), LemmyError> {
31   // Setup the connections
32   let mut scheduler = Scheduler::new();
33
34   startup_jobs(&db_url);
35
36   // Update active counts every hour
37   let url = db_url.clone();
38   scheduler.every(CTimeUnits::hour(1)).run(move || {
39     let mut conn = PgConnection::establish(&url).expect("could not establish connection");
40     active_counts(&mut conn);
41     update_banned_when_expired(&mut conn);
42   });
43
44   // Update hot ranks every 5 minutes
45   let url = db_url.clone();
46   scheduler.every(CTimeUnits::minutes(5)).run(move || {
47     let mut conn = PgConnection::establish(&url).expect("could not establish connection");
48     update_hot_ranks(&mut conn, true);
49   });
50
51   // Clear old activities every week
52   let url = db_url.clone();
53   scheduler.every(CTimeUnits::weeks(1)).run(move || {
54     let mut conn = PgConnection::establish(&url).expect("could not establish connection");
55     clear_old_activities(&mut conn);
56   });
57
58   // Update the Instance Software
59   scheduler.every(CTimeUnits::days(1)).run(move || {
60     let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
61     update_instance_software(&mut conn, &user_agent);
62   });
63
64   // Manually run the scheduler in an event loop
65   loop {
66     scheduler.run_pending();
67     thread::sleep(Duration::from_millis(1000));
68   }
69 }
70
71 /// Run these on server startup
72 fn startup_jobs(db_url: &str) {
73   let mut conn = PgConnection::establish(db_url).expect("could not establish connection");
74   active_counts(&mut conn);
75   update_hot_ranks(&mut conn, false);
76   update_banned_when_expired(&mut conn);
77   clear_old_activities(&mut conn);
78 }
79
80 /// Update the hot_rank columns for the aggregates tables
81 fn update_hot_ranks(conn: &mut PgConnection, last_week_only: bool) {
82   let mut post_update = diesel::update(post_aggregates::table).into_boxed();
83   let mut comment_update = diesel::update(comment_aggregates::table).into_boxed();
84   let mut community_update = diesel::update(community_aggregates::table).into_boxed();
85
86   // Only update for the last week of content
87   if last_week_only {
88     info!("Updating hot ranks for last week...");
89     let last_week = now - diesel::dsl::IntervalDsl::weeks(1);
90
91     post_update = post_update.filter(post_aggregates::published.gt(last_week));
92     comment_update = comment_update.filter(comment_aggregates::published.gt(last_week));
93     community_update = community_update.filter(community_aggregates::published.gt(last_week));
94   } else {
95     info!("Updating hot ranks for all history...");
96   }
97
98   match post_update
99     .set((
100       post_aggregates::hot_rank.eq(hot_rank(post_aggregates::score, post_aggregates::published)),
101       post_aggregates::hot_rank_active.eq(hot_rank(
102         post_aggregates::score,
103         post_aggregates::newest_comment_time_necro,
104       )),
105     ))
106     .execute(conn)
107   {
108     Ok(_) => {}
109     Err(e) => {
110       error!("Failed to update post_aggregates hot_ranks: {}", e)
111     }
112   }
113
114   match comment_update
115     .set(comment_aggregates::hot_rank.eq(hot_rank(
116       comment_aggregates::score,
117       comment_aggregates::published,
118     )))
119     .execute(conn)
120   {
121     Ok(_) => {}
122     Err(e) => {
123       error!("Failed to update comment_aggregates hot_ranks: {}", e)
124     }
125   }
126
127   match community_update
128     .set(community_aggregates::hot_rank.eq(hot_rank(
129       community_aggregates::subscribers,
130       community_aggregates::published,
131     )))
132     .execute(conn)
133   {
134     Ok(_) => {
135       info!("Done.");
136     }
137     Err(e) => {
138       error!("Failed to update community_aggregates hot_ranks: {}", e)
139     }
140   }
141 }
142
143 /// Clear old activities (this table gets very large)
144 fn clear_old_activities(conn: &mut PgConnection) {
145   info!("Clearing old activities...");
146   match diesel::delete(activity::table.filter(activity::published.lt(now - 6.months())))
147     .execute(conn)
148   {
149     Ok(_) => {
150       info!("Done.");
151     }
152     Err(e) => {
153       error!("Failed to clear old activities: {}", e)
154     }
155   }
156 }
157
158 /// Re-calculate the site and community active counts every 12 hours
159 fn active_counts(conn: &mut PgConnection) {
160   info!("Updating active site and community aggregates ...");
161
162   let intervals = vec![
163     ("1 day", "day"),
164     ("1 week", "week"),
165     ("1 month", "month"),
166     ("6 months", "half_year"),
167   ];
168
169   for i in &intervals {
170     let update_site_stmt = format!(
171       "update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}'))",
172       i.1, i.0
173     );
174     match sql_query(update_site_stmt).execute(conn) {
175       Ok(_) => {}
176       Err(e) => {
177         error!("Failed to update site stats: {}", e)
178       }
179     }
180
181     let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
182     match sql_query(update_community_stmt).execute(conn) {
183       Ok(_) => {}
184       Err(e) => {
185         error!("Failed to update community stats: {}", e)
186       }
187     }
188   }
189
190   info!("Done.");
191 }
192
193 /// Set banned to false after ban expires
194 fn update_banned_when_expired(conn: &mut PgConnection) {
195   info!("Updating banned column if it expires ...");
196
197   match diesel::update(
198     person::table
199       .filter(person::banned.eq(true))
200       .filter(person::ban_expires.lt(now)),
201   )
202   .set(person::banned.eq(false))
203   .execute(conn)
204   {
205     Ok(_) => {}
206     Err(e) => {
207       error!("Failed to update person.banned when expires: {}", e)
208     }
209   }
210   match diesel::delete(community_person_ban::table.filter(community_person_ban::expires.lt(now)))
211     .execute(conn)
212   {
213     Ok(_) => {}
214     Err(e) => {
215       error!("Failed to remove community_ban expired rows: {}", e)
216     }
217   }
218 }
219
220 /// Updates the instance software and version
221 fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
222   info!("Updating instances software and versions...");
223
224   let client = match Client::builder()
225     .user_agent(user_agent)
226     .timeout(REQWEST_TIMEOUT)
227     .build()
228   {
229     Ok(client) => client,
230     Err(e) => {
231       error!("Failed to build reqwest client: {}", e);
232       return;
233     }
234   };
235
236   let instances = match instance::table.get_results::<Instance>(conn) {
237     Ok(instances) => instances,
238     Err(e) => {
239       error!("Failed to get instances: {}", e);
240       return;
241     }
242   };
243
244   for instance in instances {
245     let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);
246
247     // Skip it if it can't connect
248     let res = client
249       .get(&node_info_url)
250       .send()
251       .ok()
252       .and_then(|t| t.json::<NodeInfo>().ok());
253
254     if let Some(node_info) = res {
255       let software = node_info.software.as_ref();
256       let form = InstanceForm::builder()
257         .domain(instance.domain)
258         .software(software.and_then(|s| s.name.clone()))
259         .version(software.and_then(|s| s.version.clone()))
260         .updated(Some(naive_now()))
261         .build();
262
263       match diesel::update(instance::table.find(instance.id))
264         .set(form)
265         .execute(conn)
266       {
267         Ok(_) => {
268           info!("Done.");
269         }
270         Err(e) => {
271           error!("Failed to update site instance software: {}", e);
272           return;
273         }
274       }
275     }
276   }
277 }
278
279 #[cfg(test)]
280 mod tests {
281   use lemmy_routes::nodeinfo::NodeInfo;
282   use reqwest::Client;
283
284   #[tokio::test]
285   #[ignore]
286   async fn test_nodeinfo() {
287     let client = Client::builder().build().unwrap();
288     let lemmy_ml_nodeinfo = client
289       .get("https://lemmy.ml/nodeinfo/2.0.json")
290       .send()
291       .await
292       .unwrap()
293       .json::<NodeInfo>()
294       .await
295       .unwrap();
296
297     assert_eq!(lemmy_ml_nodeinfo.software.unwrap().name.unwrap(), "lemmy");
298   }
299 }