]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
Add support for Featured Posts (#2585)
[lemmy.git] / src / api_routes_websocket.rs
1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
7   comment::{
8     CreateComment,
9     CreateCommentLike,
10     CreateCommentReport,
11     DeleteComment,
12     EditComment,
13     GetComment,
14     GetComments,
15     ListCommentReports,
16     RemoveComment,
17     ResolveCommentReport,
18     SaveComment,
19   },
20   community::{
21     AddModToCommunity,
22     BanFromCommunity,
23     BlockCommunity,
24     CreateCommunity,
25     DeleteCommunity,
26     EditCommunity,
27     FollowCommunity,
28     GetCommunity,
29     ListCommunities,
30     RemoveCommunity,
31     TransferCommunity,
32   },
33   context::LemmyContext,
34   person::{
35     AddAdmin,
36     BanPerson,
37     BlockPerson,
38     ChangePassword,
39     DeleteAccount,
40     GetBannedPersons,
41     GetCaptcha,
42     GetPersonDetails,
43     GetPersonMentions,
44     GetReplies,
45     GetReportCount,
46     GetUnreadCount,
47     Login,
48     MarkAllAsRead,
49     MarkCommentReplyAsRead,
50     MarkPersonMentionAsRead,
51     PasswordChangeAfterReset,
52     PasswordReset,
53     Register,
54     SaveUserSettings,
55     VerifyEmail,
56   },
57   post::{
58     CreatePost,
59     CreatePostLike,
60     CreatePostReport,
61     DeletePost,
62     EditPost,
63     FeaturePost,
64     GetPost,
65     GetPosts,
66     GetSiteMetadata,
67     ListPostReports,
68     LockPost,
69     MarkPostAsRead,
70     RemovePost,
71     ResolvePostReport,
72     SavePost,
73   },
74   private_message::{
75     CreatePrivateMessage,
76     CreatePrivateMessageReport,
77     DeletePrivateMessage,
78     EditPrivateMessage,
79     GetPrivateMessages,
80     ListPrivateMessageReports,
81     MarkPrivateMessageAsRead,
82     ResolvePrivateMessageReport,
83   },
84   site::{
85     ApproveRegistrationApplication,
86     CreateSite,
87     EditSite,
88     GetModlog,
89     GetSite,
90     GetUnreadRegistrationApplicationCount,
91     LeaveAdmin,
92     ListRegistrationApplications,
93     PurgeComment,
94     PurgeCommunity,
95     PurgePerson,
96     PurgePost,
97     ResolveObject,
98     Search,
99   },
100   websocket::{
101     serialize_websocket_message,
102     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
103     UserOperation,
104     UserOperationApub,
105     UserOperationCrud,
106   },
107 };
108 use lemmy_api_crud::PerformCrud;
109 use lemmy_apub::{api::PerformApub, SendActivity};
110 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
111 use serde::Deserialize;
112 use serde_json::Value;
113 use std::{
114   result,
115   str::FromStr,
116   sync::{Arc, Mutex},
117   time::{Duration, Instant},
118 };
119 use tracing::{debug, error, info};
120
121 /// Entry point for our route
122 pub async fn websocket(
123   req: HttpRequest,
124   body: web::Payload,
125   context: web::Data<LemmyContext>,
126   rate_limiter: web::Data<RateLimitCell>,
127 ) -> Result<HttpResponse, Error> {
128   let (response, session, stream) = actix_ws::handle(&req, body)?;
129
130   let client_ip = IpAddr(
131     req
132       .connection_info()
133       .realip_remote_addr()
134       .unwrap_or("blank_ip")
135       .to_string(),
136   );
137
138   let check = rate_limiter.message().check(client_ip.clone());
139   if !check {
140     debug!(
141       "Websocket join with IP: {} has been rate limited.",
142       &client_ip
143     );
144     session.close(None).await.map_err(LemmyError::from)?;
145     return Ok(response);
146   }
147
148   let connection_id = context.chat_server().handle_connect(session.clone())?;
149   info!("{} joined", &client_ip);
150
151   let alive = Arc::new(Mutex::new(Instant::now()));
152   heartbeat(session.clone(), alive.clone());
153
154   actix_rt::spawn(handle_messages(
155     stream,
156     client_ip,
157     session,
158     connection_id,
159     alive,
160     rate_limiter,
161     context,
162   ));
163
164   Ok(response)
165 }
166
167 async fn handle_messages(
168   mut stream: MessageStream,
169   client_ip: IpAddr,
170   mut session: Session,
171   connection_id: ConnectionId,
172   alive: Arc<Mutex<Instant>>,
173   rate_limiter: web::Data<RateLimitCell>,
174   context: web::Data<LemmyContext>,
175 ) -> Result<(), LemmyError> {
176   while let Some(Ok(msg)) = stream.next().await {
177     match msg {
178       ws::Message::Ping(bytes) => {
179         if session.pong(&bytes).await.is_err() {
180           break;
181         }
182       }
183       ws::Message::Pong(_) => {
184         let mut lock = alive
185           .lock()
186           .expect("Failed to acquire websocket heartbeat alive lock");
187         *lock = Instant::now();
188       }
189       ws::Message::Text(text) => {
190         let msg = text.trim().to_string();
191         let executed = parse_json_message(
192           msg,
193           client_ip.clone(),
194           connection_id,
195           rate_limiter.get_ref(),
196           context.get_ref().clone(),
197         )
198         .await;
199
200         let res = executed.unwrap_or_else(|e| {
201           error!("Error during message handling {}", e);
202           e.to_json()
203             .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
204         });
205         session.text(res).await?;
206       }
207       ws::Message::Close(_) => {
208         session.close(None).await?;
209         context.chat_server().handle_disconnect(&connection_id)?;
210         break;
211       }
212       ws::Message::Binary(_) => info!("Unexpected binary"),
213       _ => {}
214     }
215   }
216   Ok(())
217 }
218
219 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
220   actix_rt::spawn(async move {
221     let mut interval = actix_rt::time::interval(Duration::from_secs(5));
222     loop {
223       if session.ping(b"").await.is_err() {
224         break;
225       }
226
227       let duration_since = {
228         let alive_lock = alive
229           .lock()
230           .expect("Failed to acquire websocket heartbeat alive lock");
231         Instant::now().duration_since(*alive_lock)
232       };
233       if duration_since > Duration::from_secs(10) {
234         let _ = session.close(None).await;
235         break;
236       }
237       interval.tick().await;
238     }
239   });
240 }
241
242 async fn parse_json_message(
243   msg: String,
244   ip: IpAddr,
245   connection_id: ConnectionId,
246   rate_limiter: &RateLimitCell,
247   context: LemmyContext,
248 ) -> Result<String, LemmyError> {
249   let json: Value = serde_json::from_str(&msg)?;
250   let data = &json["data"].to_string();
251   let op = &json["op"]
252     .as_str()
253     .ok_or_else(|| LemmyError::from_message("missing op"))?;
254
255   // check if api call passes the rate limit, and generate future for later execution
256   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
257     let passed = match user_operation_crud {
258       UserOperationCrud::Register => rate_limiter.register().check(ip),
259       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
260       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
261       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
262       _ => rate_limiter.message().check(ip),
263     };
264     check_rate_limit_passed(passed)?;
265     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
266   } else if let Ok(user_operation) = UserOperation::from_str(op) {
267     let passed = match user_operation {
268       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
269       _ => rate_limiter.message().check(ip),
270     };
271     check_rate_limit_passed(passed)?;
272     match_websocket_operation(context, connection_id, user_operation, data).await
273   } else {
274     let user_operation = UserOperationApub::from_str(op)?;
275     let passed = match user_operation {
276       UserOperationApub::Search => rate_limiter.search().check(ip),
277       _ => rate_limiter.message().check(ip),
278     };
279     check_rate_limit_passed(passed)?;
280     match_websocket_operation_apub(context, connection_id, user_operation, data).await
281   }
282 }
283
284 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
285   if passed {
286     Ok(())
287   } else {
288     // if rate limit was hit, respond with message
289     Err(LemmyError::from_message("rate_limit_error"))
290   }
291 }
292
293 pub async fn match_websocket_operation_crud(
294   context: LemmyContext,
295   id: ConnectionId,
296   op: UserOperationCrud,
297   data: &str,
298 ) -> result::Result<String, LemmyError> {
299   match op {
300     // User ops
301     UserOperationCrud::Register => {
302       do_websocket_operation_crud::<Register>(context, id, op, data).await
303     }
304     UserOperationCrud::DeleteAccount => {
305       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
306     }
307
308     // Private Message ops
309     UserOperationCrud::CreatePrivateMessage => {
310       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
311     }
312     UserOperationCrud::EditPrivateMessage => {
313       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
314     }
315     UserOperationCrud::DeletePrivateMessage => {
316       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
317     }
318     UserOperationCrud::GetPrivateMessages => {
319       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
320     }
321
322     // Site ops
323     UserOperationCrud::CreateSite => {
324       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
325     }
326     UserOperationCrud::EditSite => {
327       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
328     }
329     UserOperationCrud::GetSite => {
330       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
331     }
332
333     // Community ops
334     UserOperationCrud::ListCommunities => {
335       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
336     }
337     UserOperationCrud::CreateCommunity => {
338       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
339     }
340     UserOperationCrud::EditCommunity => {
341       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
342     }
343     UserOperationCrud::DeleteCommunity => {
344       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
345     }
346     UserOperationCrud::RemoveCommunity => {
347       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
348     }
349
350     // Post ops
351     UserOperationCrud::CreatePost => {
352       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
353     }
354     UserOperationCrud::GetPost => {
355       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
356     }
357     UserOperationCrud::EditPost => {
358       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
359     }
360     UserOperationCrud::DeletePost => {
361       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
362     }
363     UserOperationCrud::RemovePost => {
364       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
365     }
366
367     // Comment ops
368     UserOperationCrud::CreateComment => {
369       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
370     }
371     UserOperationCrud::EditComment => {
372       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
373     }
374     UserOperationCrud::DeleteComment => {
375       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
376     }
377     UserOperationCrud::RemoveComment => {
378       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
379     }
380     UserOperationCrud::GetComment => {
381       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
382     }
383   }
384 }
385
386 async fn do_websocket_operation_crud<'a, 'b, Data>(
387   context: LemmyContext,
388   id: ConnectionId,
389   op: UserOperationCrud,
390   data: &str,
391 ) -> result::Result<String, LemmyError>
392 where
393   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
394   for<'de> Data: Deserialize<'de>,
395 {
396   let parsed_data: Data = serde_json::from_str(data)?;
397   let res = parsed_data
398     .perform(&web::Data::new(context.clone()), Some(id))
399     .await?;
400   SendActivity::send_activity(&parsed_data, &res, &context).await?;
401   serialize_websocket_message(&op, &res)
402 }
403
404 pub async fn match_websocket_operation_apub(
405   context: LemmyContext,
406   id: ConnectionId,
407   op: UserOperationApub,
408   data: &str,
409 ) -> result::Result<String, LemmyError> {
410   match op {
411     UserOperationApub::GetPersonDetails => {
412       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
413     }
414     UserOperationApub::GetCommunity => {
415       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
416     }
417     UserOperationApub::GetComments => {
418       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
419     }
420     UserOperationApub::GetPosts => {
421       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
422     }
423     UserOperationApub::ResolveObject => {
424       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
425     }
426     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
427   }
428 }
429
430 async fn do_websocket_operation_apub<'a, 'b, Data>(
431   context: LemmyContext,
432   id: ConnectionId,
433   op: UserOperationApub,
434   data: &str,
435 ) -> result::Result<String, LemmyError>
436 where
437   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
438   for<'de> Data: Deserialize<'de>,
439 {
440   let parsed_data: Data = serde_json::from_str(data)?;
441   let res = parsed_data
442     .perform(&web::Data::new(context.clone()), Some(id))
443     .await?;
444   SendActivity::send_activity(&parsed_data, &res, &context).await?;
445   serialize_websocket_message(&op, &res)
446 }
447
448 pub async fn match_websocket_operation(
449   context: LemmyContext,
450   id: ConnectionId,
451   op: UserOperation,
452   data: &str,
453 ) -> result::Result<String, LemmyError> {
454   match op {
455     // User ops
456     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
457     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
458     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
459     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
460     UserOperation::GetUnreadRegistrationApplicationCount => {
461       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
462     }
463     UserOperation::ListRegistrationApplications => {
464       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
465     }
466     UserOperation::ApproveRegistrationApplication => {
467       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
468     }
469     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
470     UserOperation::GetBannedPersons => {
471       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
472     }
473     UserOperation::BlockPerson => {
474       do_websocket_operation::<BlockPerson>(context, id, op, data).await
475     }
476     UserOperation::GetPersonMentions => {
477       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
478     }
479     UserOperation::MarkPersonMentionAsRead => {
480       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
481     }
482     UserOperation::MarkCommentReplyAsRead => {
483       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
484     }
485     UserOperation::MarkAllAsRead => {
486       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
487     }
488     UserOperation::PasswordReset => {
489       do_websocket_operation::<PasswordReset>(context, id, op, data).await
490     }
491     UserOperation::PasswordChange => {
492       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
493     }
494     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
495     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
496     UserOperation::CommunityJoin => {
497       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
498     }
499     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
500     UserOperation::SaveUserSettings => {
501       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
502     }
503     UserOperation::ChangePassword => {
504       do_websocket_operation::<ChangePassword>(context, id, op, data).await
505     }
506     UserOperation::GetReportCount => {
507       do_websocket_operation::<GetReportCount>(context, id, op, data).await
508     }
509     UserOperation::GetUnreadCount => {
510       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
511     }
512     UserOperation::VerifyEmail => {
513       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
514     }
515
516     // Private Message ops
517     UserOperation::MarkPrivateMessageAsRead => {
518       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
519     }
520     UserOperation::CreatePrivateMessageReport => {
521       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
522     }
523     UserOperation::ResolvePrivateMessageReport => {
524       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
525     }
526     UserOperation::ListPrivateMessageReports => {
527       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
528     }
529
530     // Site ops
531     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
532     UserOperation::PurgePerson => {
533       do_websocket_operation::<PurgePerson>(context, id, op, data).await
534     }
535     UserOperation::PurgeCommunity => {
536       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
537     }
538     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
539     UserOperation::PurgeComment => {
540       do_websocket_operation::<PurgeComment>(context, id, op, data).await
541     }
542     UserOperation::TransferCommunity => {
543       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
544     }
545     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
546
547     // Community ops
548     UserOperation::FollowCommunity => {
549       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
550     }
551     UserOperation::BlockCommunity => {
552       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
553     }
554     UserOperation::BanFromCommunity => {
555       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
556     }
557     UserOperation::AddModToCommunity => {
558       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
559     }
560
561     // Post ops
562     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
563     UserOperation::FeaturePost => {
564       do_websocket_operation::<FeaturePost>(context, id, op, data).await
565     }
566     UserOperation::CreatePostLike => {
567       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
568     }
569     UserOperation::MarkPostAsRead => {
570       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
571     }
572     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
573     UserOperation::CreatePostReport => {
574       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
575     }
576     UserOperation::ListPostReports => {
577       do_websocket_operation::<ListPostReports>(context, id, op, data).await
578     }
579     UserOperation::ResolvePostReport => {
580       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
581     }
582     UserOperation::GetSiteMetadata => {
583       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
584     }
585
586     // Comment ops
587     UserOperation::SaveComment => {
588       do_websocket_operation::<SaveComment>(context, id, op, data).await
589     }
590     UserOperation::CreateCommentLike => {
591       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
592     }
593     UserOperation::CreateCommentReport => {
594       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
595     }
596     UserOperation::ListCommentReports => {
597       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
598     }
599     UserOperation::ResolveCommentReport => {
600       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
601     }
602   }
603 }
604
605 async fn do_websocket_operation<'a, 'b, Data>(
606   context: LemmyContext,
607   id: ConnectionId,
608   op: UserOperation,
609   data: &str,
610 ) -> result::Result<String, LemmyError>
611 where
612   Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
613   for<'de> Data: Deserialize<'de>,
614 {
615   let parsed_data: Data = serde_json::from_str(data)?;
616   let res = parsed_data
617     .perform(&web::Data::new(context.clone()), Some(id))
618     .await?;
619   SendActivity::send_activity(&parsed_data, &res, &context).await?;
620   serialize_websocket_message(&op, &res)
621 }