]> Untitled Git - lemmy.git/blob - src/scheduled_tasks.rs
5d98baf9b4479f3c17b2afd5671179d019c33580
[lemmy.git] / src / scheduled_tasks.rs
1 use clokwerk::{Scheduler, TimeUnits as CTimeUnits};
2 use diesel::{
3   dsl::{now, IntervalDsl},
4   Connection,
5   ExpressionMethods,
6   QueryDsl,
7 };
8 // Import week days and WeekDay
9 use diesel::{sql_query, PgConnection, RunQueryDsl};
10 use lemmy_db_schema::{
11   schema::{activity, community_person_ban, instance, person},
12   source::instance::{Instance, InstanceForm},
13   utils::naive_now,
14 };
15 use lemmy_routes::nodeinfo::NodeInfo;
16 use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
17 use reqwest::blocking::Client;
18 use std::{thread, time::Duration};
19 use tracing::info;
20
21 /// Schedules various cleanup tasks for lemmy in a background thread
22 pub fn setup(db_url: String, user_agent: String) -> Result<(), LemmyError> {
23   // Setup the connections
24   let mut scheduler = Scheduler::new();
25
26   let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
27
28   let mut conn_2 = PgConnection::establish(&db_url).expect("could not establish connection");
29
30   active_counts(&mut conn);
31   update_banned_when_expired(&mut conn);
32
33   // On startup, reindex the tables non-concurrently
34   // TODO remove this for now, since it slows down startup a lot on lemmy.ml
35   reindex_aggregates_tables(&mut conn, true);
36   scheduler.every(CTimeUnits::hour(1)).run(move || {
37     let conn = &mut PgConnection::establish(&db_url)
38       .unwrap_or_else(|_| panic!("Error connecting to {db_url}"));
39     active_counts(conn);
40     update_banned_when_expired(conn);
41     reindex_aggregates_tables(conn, true);
42     drop_ccnew_indexes(conn);
43   });
44
45   clear_old_activities(&mut conn);
46   scheduler.every(CTimeUnits::weeks(1)).run(move || {
47     clear_old_activities(&mut conn);
48   });
49
50   update_instance_software(&mut conn_2, &user_agent);
51   scheduler.every(CTimeUnits::days(1)).run(move || {
52     update_instance_software(&mut conn_2, &user_agent);
53   });
54
55   // Manually run the scheduler in an event loop
56   loop {
57     scheduler.run_pending();
58     thread::sleep(Duration::from_millis(1000));
59   }
60 }
61
62 /// Reindex the aggregates tables every one hour
63 /// This is necessary because hot_rank is actually a mutable function:
64 /// https://dba.stackexchange.com/questions/284052/how-to-create-an-index-based-on-a-time-based-function-in-postgres?noredirect=1#comment555727_284052
65 fn reindex_aggregates_tables(conn: &mut PgConnection, concurrently: bool) {
66   for table_name in &[
67     "post_aggregates",
68     "comment_aggregates",
69     "community_aggregates",
70   ] {
71     reindex_table(conn, table_name, concurrently);
72   }
73 }
74
75 fn reindex_table(conn: &mut PgConnection, table_name: &str, concurrently: bool) {
76   let concurrently_str = if concurrently { "concurrently" } else { "" };
77   info!("Reindexing table {} {} ...", concurrently_str, table_name);
78   let query = format!("reindex table {concurrently_str} {table_name}");
79   sql_query(query).execute(conn).expect("reindex table");
80   info!("Done.");
81 }
82
83 /// Clear old activities (this table gets very large)
84 fn clear_old_activities(conn: &mut PgConnection) {
85   info!("Clearing old activities...");
86   diesel::delete(activity::table.filter(activity::published.lt(now - 6.months())))
87     .execute(conn)
88     .expect("clear old activities");
89   info!("Done.");
90 }
91
92 /// Re-calculate the site and community active counts every 12 hours
93 fn active_counts(conn: &mut PgConnection) {
94   info!("Updating active site and community aggregates ...");
95
96   let intervals = vec![
97     ("1 day", "day"),
98     ("1 week", "week"),
99     ("1 month", "month"),
100     ("6 months", "half_year"),
101   ];
102
103   for i in &intervals {
104     let update_site_stmt = format!(
105       "update site_aggregates set users_active_{} = (select * from site_aggregates_activity('{}'))",
106       i.1, i.0
107     );
108     sql_query(update_site_stmt)
109       .execute(conn)
110       .expect("update site stats");
111
112     let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
113     sql_query(update_community_stmt)
114       .execute(conn)
115       .expect("update community stats");
116   }
117
118   info!("Done.");
119 }
120
121 /// Set banned to false after ban expires
122 fn update_banned_when_expired(conn: &mut PgConnection) {
123   info!("Updating banned column if it expires ...");
124
125   diesel::update(
126     person::table
127       .filter(person::banned.eq(true))
128       .filter(person::ban_expires.lt(now)),
129   )
130   .set(person::banned.eq(false))
131   .execute(conn)
132   .expect("update person.banned when expires");
133
134   diesel::delete(community_person_ban::table.filter(community_person_ban::expires.lt(now)))
135     .execute(conn)
136     .expect("remove community_ban expired rows");
137 }
138
139 /// Drops the phantom CCNEW indexes created by postgres
140 /// https://github.com/LemmyNet/lemmy/issues/2431
141 fn drop_ccnew_indexes(conn: &mut PgConnection) {
142   info!("Dropping phantom ccnew indexes...");
143   let drop_stmt = "select drop_ccnew_indexes()";
144   sql_query(drop_stmt)
145     .execute(conn)
146     .expect("drop ccnew indexes");
147 }
148
149 /// Updates the instance software and version
150 fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
151   info!("Updating instances software and versions...");
152
153   let client = Client::builder()
154     .user_agent(user_agent)
155     .timeout(REQWEST_TIMEOUT)
156     .build()
157     .expect("couldnt build reqwest client");
158
159   let instances = instance::table
160     .get_results::<Instance>(conn)
161     .expect("no instances found");
162
163   for instance in instances {
164     let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);
165
166     // Skip it if it can't connect
167     let res = client
168       .get(&node_info_url)
169       .send()
170       .ok()
171       .and_then(|t| t.json::<NodeInfo>().ok());
172
173     if let Some(node_info) = res {
174       let software = node_info.software.as_ref();
175       let form = InstanceForm::builder()
176         .domain(instance.domain)
177         .software(software.and_then(|s| s.name.clone()))
178         .version(software.and_then(|s| s.version.clone()))
179         .updated(Some(naive_now()))
180         .build();
181
182       diesel::update(instance::table.find(instance.id))
183         .set(form)
184         .execute(conn)
185         .expect("update site instance software");
186     }
187   }
188   info!("Done.");
189 }
190
191 #[cfg(test)]
192 mod tests {
193   use lemmy_routes::nodeinfo::NodeInfo;
194   use reqwest::Client;
195
196   #[tokio::test]
197   #[ignore]
198   async fn test_nodeinfo() {
199     let client = Client::builder().build().unwrap();
200     let lemmy_ml_nodeinfo = client
201       .get("https://lemmy.ml/nodeinfo/2.0.json")
202       .send()
203       .await
204       .unwrap()
205       .json::<NodeInfo>()
206       .await
207       .unwrap();
208
209     assert_eq!(lemmy_ml_nodeinfo.software.unwrap().name.unwrap(), "lemmy");
210   }
211 }