]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
fdaba6ba545fd04953cb6c951cc130310866a0f5
[lemmy.git] / src / api_routes_websocket.rs
1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
7   comment::{
8     CreateComment,
9     CreateCommentLike,
10     CreateCommentReport,
11     DeleteComment,
12     DistinguishComment,
13     EditComment,
14     GetComment,
15     GetComments,
16     ListCommentReports,
17     RemoveComment,
18     ResolveCommentReport,
19     SaveComment,
20   },
21   community::{
22     AddModToCommunity,
23     BanFromCommunity,
24     BlockCommunity,
25     CreateCommunity,
26     DeleteCommunity,
27     EditCommunity,
28     FollowCommunity,
29     GetCommunity,
30     ListCommunities,
31     RemoveCommunity,
32     TransferCommunity,
33   },
34   context::LemmyContext,
35   person::{
36     AddAdmin,
37     BanPerson,
38     BlockPerson,
39     ChangePassword,
40     DeleteAccount,
41     GetBannedPersons,
42     GetCaptcha,
43     GetPersonDetails,
44     GetPersonMentions,
45     GetReplies,
46     GetReportCount,
47     GetUnreadCount,
48     Login,
49     MarkAllAsRead,
50     MarkCommentReplyAsRead,
51     MarkPersonMentionAsRead,
52     PasswordChangeAfterReset,
53     PasswordReset,
54     Register,
55     SaveUserSettings,
56     VerifyEmail,
57   },
58   post::{
59     CreatePost,
60     CreatePostLike,
61     CreatePostReport,
62     DeletePost,
63     EditPost,
64     FeaturePost,
65     GetPost,
66     GetPosts,
67     GetSiteMetadata,
68     ListPostReports,
69     LockPost,
70     MarkPostAsRead,
71     RemovePost,
72     ResolvePostReport,
73     SavePost,
74   },
75   private_message::{
76     CreatePrivateMessage,
77     CreatePrivateMessageReport,
78     DeletePrivateMessage,
79     EditPrivateMessage,
80     GetPrivateMessages,
81     ListPrivateMessageReports,
82     MarkPrivateMessageAsRead,
83     ResolvePrivateMessageReport,
84   },
85   site::{
86     ApproveRegistrationApplication,
87     CreateSite,
88     EditSite,
89     GetModlog,
90     GetSite,
91     GetUnreadRegistrationApplicationCount,
92     LeaveAdmin,
93     ListRegistrationApplications,
94     PurgeComment,
95     PurgeCommunity,
96     PurgePerson,
97     PurgePost,
98     ResolveObject,
99     Search,
100   },
101   websocket::{
102     serialize_websocket_message,
103     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
104     UserOperation,
105     UserOperationApub,
106     UserOperationCrud,
107   },
108 };
109 use lemmy_api_crud::PerformCrud;
110 use lemmy_apub::{api::PerformApub, SendActivity};
111 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
112 use serde::Deserialize;
113 use serde_json::Value;
114 use std::{
115   result,
116   str::FromStr,
117   sync::{Arc, Mutex},
118   time::{Duration, Instant},
119 };
120 use tracing::{debug, error, info};
121
122 /// Entry point for our route
123 pub async fn websocket(
124   req: HttpRequest,
125   body: web::Payload,
126   context: web::Data<LemmyContext>,
127   rate_limiter: web::Data<RateLimitCell>,
128 ) -> Result<HttpResponse, Error> {
129   let (response, session, stream) = actix_ws::handle(&req, body)?;
130
131   let client_ip = IpAddr(
132     req
133       .connection_info()
134       .realip_remote_addr()
135       .unwrap_or("blank_ip")
136       .to_string(),
137   );
138
139   let check = rate_limiter.message().check(client_ip.clone());
140   if !check {
141     debug!(
142       "Websocket join with IP: {} has been rate limited.",
143       &client_ip
144     );
145     session.close(None).await.map_err(LemmyError::from)?;
146     return Ok(response);
147   }
148
149   let connection_id = context.chat_server().handle_connect(session.clone())?;
150   info!("{} joined", &client_ip);
151
152   let alive = Arc::new(Mutex::new(Instant::now()));
153   heartbeat(session.clone(), alive.clone());
154
155   actix_rt::spawn(handle_messages(
156     stream,
157     client_ip,
158     session,
159     connection_id,
160     alive,
161     rate_limiter,
162     context,
163   ));
164
165   Ok(response)
166 }
167
168 async fn handle_messages(
169   mut stream: MessageStream,
170   client_ip: IpAddr,
171   mut session: Session,
172   connection_id: ConnectionId,
173   alive: Arc<Mutex<Instant>>,
174   rate_limiter: web::Data<RateLimitCell>,
175   context: web::Data<LemmyContext>,
176 ) -> Result<(), LemmyError> {
177   while let Some(Ok(msg)) = stream.next().await {
178     match msg {
179       ws::Message::Ping(bytes) => {
180         if session.pong(&bytes).await.is_err() {
181           break;
182         }
183       }
184       ws::Message::Pong(_) => {
185         let mut lock = alive
186           .lock()
187           .expect("Failed to acquire websocket heartbeat alive lock");
188         *lock = Instant::now();
189       }
190       ws::Message::Text(text) => {
191         let msg = text.trim().to_string();
192         let executed = parse_json_message(
193           msg,
194           client_ip.clone(),
195           connection_id,
196           rate_limiter.get_ref(),
197           context.get_ref().clone(),
198         )
199         .await;
200
201         let res = executed.unwrap_or_else(|e| {
202           error!("Error during message handling {}", e);
203           e.to_json()
204             .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
205         });
206         session.text(res).await?;
207       }
208       ws::Message::Close(_) => {
209         session.close(None).await?;
210         context.chat_server().handle_disconnect(&connection_id)?;
211         break;
212       }
213       ws::Message::Binary(_) => info!("Unexpected binary"),
214       _ => {}
215     }
216   }
217   Ok(())
218 }
219
220 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
221   actix_rt::spawn(async move {
222     let mut interval = actix_rt::time::interval(Duration::from_secs(5));
223     loop {
224       if session.ping(b"").await.is_err() {
225         break;
226       }
227
228       let duration_since = {
229         let alive_lock = alive
230           .lock()
231           .expect("Failed to acquire websocket heartbeat alive lock");
232         Instant::now().duration_since(*alive_lock)
233       };
234       if duration_since > Duration::from_secs(10) {
235         let _ = session.close(None).await;
236         break;
237       }
238       interval.tick().await;
239     }
240   });
241 }
242
243 async fn parse_json_message(
244   msg: String,
245   ip: IpAddr,
246   connection_id: ConnectionId,
247   rate_limiter: &RateLimitCell,
248   context: LemmyContext,
249 ) -> Result<String, LemmyError> {
250   let json: Value = serde_json::from_str(&msg)?;
251   let data = json
252     .get("data")
253     .cloned()
254     .ok_or_else(|| LemmyError::from_message("missing data"))?;
255
256   let missing_op_err = || LemmyError::from_message("missing op");
257
258   let op = json
259     .get("op")
260     .ok_or_else(missing_op_err)?
261     .as_str()
262     .ok_or_else(missing_op_err)?;
263
264   // check if api call passes the rate limit, and generate future for later execution
265   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
266     let passed = match user_operation_crud {
267       UserOperationCrud::Register => rate_limiter.register().check(ip),
268       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
269       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
270       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
271       _ => rate_limiter.message().check(ip),
272     };
273     check_rate_limit_passed(passed)?;
274     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
275   } else if let Ok(user_operation) = UserOperation::from_str(op) {
276     let passed = match user_operation {
277       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
278       _ => rate_limiter.message().check(ip),
279     };
280     check_rate_limit_passed(passed)?;
281     match_websocket_operation(context, connection_id, user_operation, data).await
282   } else {
283     let user_operation = UserOperationApub::from_str(op)?;
284     let passed = match user_operation {
285       UserOperationApub::Search => rate_limiter.search().check(ip),
286       _ => rate_limiter.message().check(ip),
287     };
288     check_rate_limit_passed(passed)?;
289     match_websocket_operation_apub(context, connection_id, user_operation, data).await
290   }
291 }
292
293 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
294   if passed {
295     Ok(())
296   } else {
297     // if rate limit was hit, respond with message
298     Err(LemmyError::from_message("rate_limit_error"))
299   }
300 }
301
302 pub async fn match_websocket_operation_crud(
303   context: LemmyContext,
304   id: ConnectionId,
305   op: UserOperationCrud,
306   data: Value,
307 ) -> result::Result<String, LemmyError> {
308   match op {
309     // User ops
310     UserOperationCrud::Register => {
311       do_websocket_operation_crud::<Register>(context, id, op, data).await
312     }
313     UserOperationCrud::DeleteAccount => {
314       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
315     }
316
317     // Private Message ops
318     UserOperationCrud::CreatePrivateMessage => {
319       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
320     }
321     UserOperationCrud::EditPrivateMessage => {
322       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
323     }
324     UserOperationCrud::DeletePrivateMessage => {
325       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
326     }
327     UserOperationCrud::GetPrivateMessages => {
328       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
329     }
330
331     // Site ops
332     UserOperationCrud::CreateSite => {
333       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
334     }
335     UserOperationCrud::EditSite => {
336       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
337     }
338     UserOperationCrud::GetSite => {
339       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
340     }
341
342     // Community ops
343     UserOperationCrud::ListCommunities => {
344       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
345     }
346     UserOperationCrud::CreateCommunity => {
347       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
348     }
349     UserOperationCrud::EditCommunity => {
350       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
351     }
352     UserOperationCrud::DeleteCommunity => {
353       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
354     }
355     UserOperationCrud::RemoveCommunity => {
356       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
357     }
358
359     // Post ops
360     UserOperationCrud::CreatePost => {
361       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
362     }
363     UserOperationCrud::GetPost => {
364       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
365     }
366     UserOperationCrud::EditPost => {
367       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
368     }
369     UserOperationCrud::DeletePost => {
370       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
371     }
372     UserOperationCrud::RemovePost => {
373       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
374     }
375
376     // Comment ops
377     UserOperationCrud::CreateComment => {
378       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
379     }
380     UserOperationCrud::EditComment => {
381       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
382     }
383     UserOperationCrud::DeleteComment => {
384       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
385     }
386     UserOperationCrud::RemoveComment => {
387       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
388     }
389     UserOperationCrud::GetComment => {
390       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
391     }
392   }
393 }
394
395 async fn do_websocket_operation_crud<'a, 'b, Data>(
396   context: LemmyContext,
397   id: ConnectionId,
398   op: UserOperationCrud,
399   data: Value,
400 ) -> result::Result<String, LemmyError>
401 where
402   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
403   for<'de> Data: Deserialize<'de>,
404 {
405   let parsed_data: Data = serde_json::from_value(data)?;
406   let res = parsed_data
407     .perform(&web::Data::new(context.clone()), Some(id))
408     .await?;
409   SendActivity::send_activity(&parsed_data, &res, &context).await?;
410   serialize_websocket_message(&op, &res)
411 }
412
413 pub async fn match_websocket_operation_apub(
414   context: LemmyContext,
415   id: ConnectionId,
416   op: UserOperationApub,
417   data: Value,
418 ) -> result::Result<String, LemmyError> {
419   match op {
420     UserOperationApub::GetPersonDetails => {
421       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
422     }
423     UserOperationApub::GetCommunity => {
424       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
425     }
426     UserOperationApub::GetComments => {
427       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
428     }
429     UserOperationApub::GetPosts => {
430       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
431     }
432     UserOperationApub::ResolveObject => {
433       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
434     }
435     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
436   }
437 }
438
439 async fn do_websocket_operation_apub<'a, 'b, Data>(
440   context: LemmyContext,
441   id: ConnectionId,
442   op: UserOperationApub,
443   data: Value,
444 ) -> result::Result<String, LemmyError>
445 where
446   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
447   for<'de> Data: Deserialize<'de>,
448 {
449   let parsed_data: Data = serde_json::from_value(data)?;
450   let res = parsed_data
451     .perform(&web::Data::new(context.clone()), Some(id))
452     .await?;
453   SendActivity::send_activity(&parsed_data, &res, &context).await?;
454   serialize_websocket_message(&op, &res)
455 }
456
457 pub async fn match_websocket_operation(
458   context: LemmyContext,
459   id: ConnectionId,
460   op: UserOperation,
461   data: Value,
462 ) -> result::Result<String, LemmyError> {
463   match op {
464     // User ops
465     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
466     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
467     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
468     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
469     UserOperation::GetUnreadRegistrationApplicationCount => {
470       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
471     }
472     UserOperation::ListRegistrationApplications => {
473       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
474     }
475     UserOperation::ApproveRegistrationApplication => {
476       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
477     }
478     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
479     UserOperation::GetBannedPersons => {
480       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
481     }
482     UserOperation::BlockPerson => {
483       do_websocket_operation::<BlockPerson>(context, id, op, data).await
484     }
485     UserOperation::GetPersonMentions => {
486       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
487     }
488     UserOperation::MarkPersonMentionAsRead => {
489       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
490     }
491     UserOperation::MarkCommentReplyAsRead => {
492       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
493     }
494     UserOperation::MarkAllAsRead => {
495       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
496     }
497     UserOperation::PasswordReset => {
498       do_websocket_operation::<PasswordReset>(context, id, op, data).await
499     }
500     UserOperation::PasswordChange => {
501       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
502     }
503     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
504     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
505     UserOperation::CommunityJoin => {
506       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
507     }
508     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
509     UserOperation::SaveUserSettings => {
510       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
511     }
512     UserOperation::ChangePassword => {
513       do_websocket_operation::<ChangePassword>(context, id, op, data).await
514     }
515     UserOperation::GetReportCount => {
516       do_websocket_operation::<GetReportCount>(context, id, op, data).await
517     }
518     UserOperation::GetUnreadCount => {
519       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
520     }
521     UserOperation::VerifyEmail => {
522       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
523     }
524
525     // Private Message ops
526     UserOperation::MarkPrivateMessageAsRead => {
527       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
528     }
529     UserOperation::CreatePrivateMessageReport => {
530       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
531     }
532     UserOperation::ResolvePrivateMessageReport => {
533       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
534     }
535     UserOperation::ListPrivateMessageReports => {
536       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
537     }
538
539     // Site ops
540     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
541     UserOperation::PurgePerson => {
542       do_websocket_operation::<PurgePerson>(context, id, op, data).await
543     }
544     UserOperation::PurgeCommunity => {
545       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
546     }
547     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
548     UserOperation::PurgeComment => {
549       do_websocket_operation::<PurgeComment>(context, id, op, data).await
550     }
551     UserOperation::TransferCommunity => {
552       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
553     }
554     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
555
556     // Community ops
557     UserOperation::FollowCommunity => {
558       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
559     }
560     UserOperation::BlockCommunity => {
561       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
562     }
563     UserOperation::BanFromCommunity => {
564       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
565     }
566     UserOperation::AddModToCommunity => {
567       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
568     }
569
570     // Post ops
571     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
572     UserOperation::FeaturePost => {
573       do_websocket_operation::<FeaturePost>(context, id, op, data).await
574     }
575     UserOperation::CreatePostLike => {
576       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
577     }
578     UserOperation::MarkPostAsRead => {
579       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
580     }
581     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
582     UserOperation::CreatePostReport => {
583       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
584     }
585     UserOperation::ListPostReports => {
586       do_websocket_operation::<ListPostReports>(context, id, op, data).await
587     }
588     UserOperation::ResolvePostReport => {
589       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
590     }
591     UserOperation::GetSiteMetadata => {
592       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
593     }
594
595     // Comment ops
596     UserOperation::SaveComment => {
597       do_websocket_operation::<SaveComment>(context, id, op, data).await
598     }
599     UserOperation::CreateCommentLike => {
600       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
601     }
602     UserOperation::DistinguishComment => {
603       do_websocket_operation::<DistinguishComment>(context, id, op, data).await
604     }
605     UserOperation::CreateCommentReport => {
606       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
607     }
608     UserOperation::ListCommentReports => {
609       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
610     }
611     UserOperation::ResolveCommentReport => {
612       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
613     }
614   }
615 }
616
617 async fn do_websocket_operation<'a, 'b, Data>(
618   context: LemmyContext,
619   id: ConnectionId,
620   op: UserOperation,
621   data: Value,
622 ) -> result::Result<String, LemmyError>
623 where
624   Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
625   for<'de> Data: Deserialize<'de>,
626 {
627   let parsed_data: Data = serde_json::from_value(data)?;
628   let res = parsed_data
629     .perform(&web::Data::new(context.clone()), Some(id))
630     .await?;
631   SendActivity::send_activity(&parsed_data, &res, &context).await?;
632   serialize_websocket_message(&op, &res)
633 }