]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
Separate comment distinguish (#2740)
[lemmy.git] / src / api_routes_websocket.rs
1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
7   comment::{
8     CreateComment,
9     CreateCommentLike,
10     CreateCommentReport,
11     DeleteComment,
12     DistinguishComment,
13     EditComment,
14     GetComment,
15     GetComments,
16     ListCommentReports,
17     RemoveComment,
18     ResolveCommentReport,
19     SaveComment,
20   },
21   community::{
22     AddModToCommunity,
23     BanFromCommunity,
24     BlockCommunity,
25     CreateCommunity,
26     DeleteCommunity,
27     EditCommunity,
28     FollowCommunity,
29     GetCommunity,
30     ListCommunities,
31     RemoveCommunity,
32     TransferCommunity,
33   },
34   context::LemmyContext,
35   person::{
36     AddAdmin,
37     BanPerson,
38     BlockPerson,
39     ChangePassword,
40     DeleteAccount,
41     GetBannedPersons,
42     GetCaptcha,
43     GetPersonDetails,
44     GetPersonMentions,
45     GetReplies,
46     GetReportCount,
47     GetUnreadCount,
48     Login,
49     MarkAllAsRead,
50     MarkCommentReplyAsRead,
51     MarkPersonMentionAsRead,
52     PasswordChangeAfterReset,
53     PasswordReset,
54     Register,
55     SaveUserSettings,
56     VerifyEmail,
57   },
58   post::{
59     CreatePost,
60     CreatePostLike,
61     CreatePostReport,
62     DeletePost,
63     EditPost,
64     FeaturePost,
65     GetPost,
66     GetPosts,
67     GetSiteMetadata,
68     ListPostReports,
69     LockPost,
70     MarkPostAsRead,
71     RemovePost,
72     ResolvePostReport,
73     SavePost,
74   },
75   private_message::{
76     CreatePrivateMessage,
77     CreatePrivateMessageReport,
78     DeletePrivateMessage,
79     EditPrivateMessage,
80     GetPrivateMessages,
81     ListPrivateMessageReports,
82     MarkPrivateMessageAsRead,
83     ResolvePrivateMessageReport,
84   },
85   site::{
86     ApproveRegistrationApplication,
87     CreateSite,
88     EditSite,
89     GetModlog,
90     GetSite,
91     GetUnreadRegistrationApplicationCount,
92     LeaveAdmin,
93     ListRegistrationApplications,
94     PurgeComment,
95     PurgeCommunity,
96     PurgePerson,
97     PurgePost,
98     ResolveObject,
99     Search,
100   },
101   websocket::{
102     serialize_websocket_message,
103     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
104     UserOperation,
105     UserOperationApub,
106     UserOperationCrud,
107   },
108 };
109 use lemmy_api_crud::PerformCrud;
110 use lemmy_apub::{api::PerformApub, SendActivity};
111 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
112 use serde::Deserialize;
113 use serde_json::Value;
114 use std::{
115   result,
116   str::FromStr,
117   sync::{Arc, Mutex},
118   time::{Duration, Instant},
119 };
120 use tracing::{debug, error, info};
121
122 /// Entry point for our route
123 pub async fn websocket(
124   req: HttpRequest,
125   body: web::Payload,
126   context: web::Data<LemmyContext>,
127   rate_limiter: web::Data<RateLimitCell>,
128 ) -> Result<HttpResponse, Error> {
129   let (response, session, stream) = actix_ws::handle(&req, body)?;
130
131   let client_ip = IpAddr(
132     req
133       .connection_info()
134       .realip_remote_addr()
135       .unwrap_or("blank_ip")
136       .to_string(),
137   );
138
139   let check = rate_limiter.message().check(client_ip.clone());
140   if !check {
141     debug!(
142       "Websocket join with IP: {} has been rate limited.",
143       &client_ip
144     );
145     session.close(None).await.map_err(LemmyError::from)?;
146     return Ok(response);
147   }
148
149   let connection_id = context.chat_server().handle_connect(session.clone())?;
150   info!("{} joined", &client_ip);
151
152   let alive = Arc::new(Mutex::new(Instant::now()));
153   heartbeat(session.clone(), alive.clone());
154
155   actix_rt::spawn(handle_messages(
156     stream,
157     client_ip,
158     session,
159     connection_id,
160     alive,
161     rate_limiter,
162     context,
163   ));
164
165   Ok(response)
166 }
167
168 async fn handle_messages(
169   mut stream: MessageStream,
170   client_ip: IpAddr,
171   mut session: Session,
172   connection_id: ConnectionId,
173   alive: Arc<Mutex<Instant>>,
174   rate_limiter: web::Data<RateLimitCell>,
175   context: web::Data<LemmyContext>,
176 ) -> Result<(), LemmyError> {
177   while let Some(Ok(msg)) = stream.next().await {
178     match msg {
179       ws::Message::Ping(bytes) => {
180         if session.pong(&bytes).await.is_err() {
181           break;
182         }
183       }
184       ws::Message::Pong(_) => {
185         let mut lock = alive
186           .lock()
187           .expect("Failed to acquire websocket heartbeat alive lock");
188         *lock = Instant::now();
189       }
190       ws::Message::Text(text) => {
191         let msg = text.trim().to_string();
192         let executed = parse_json_message(
193           msg,
194           client_ip.clone(),
195           connection_id,
196           rate_limiter.get_ref(),
197           context.get_ref().clone(),
198         )
199         .await;
200
201         let res = executed.unwrap_or_else(|e| {
202           error!("Error during message handling {}", e);
203           e.to_json()
204             .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
205         });
206         session.text(res).await?;
207       }
208       ws::Message::Close(_) => {
209         session.close(None).await?;
210         context.chat_server().handle_disconnect(&connection_id)?;
211         break;
212       }
213       ws::Message::Binary(_) => info!("Unexpected binary"),
214       _ => {}
215     }
216   }
217   Ok(())
218 }
219
220 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
221   actix_rt::spawn(async move {
222     let mut interval = actix_rt::time::interval(Duration::from_secs(5));
223     loop {
224       if session.ping(b"").await.is_err() {
225         break;
226       }
227
228       let duration_since = {
229         let alive_lock = alive
230           .lock()
231           .expect("Failed to acquire websocket heartbeat alive lock");
232         Instant::now().duration_since(*alive_lock)
233       };
234       if duration_since > Duration::from_secs(10) {
235         let _ = session.close(None).await;
236         break;
237       }
238       interval.tick().await;
239     }
240   });
241 }
242
243 async fn parse_json_message(
244   msg: String,
245   ip: IpAddr,
246   connection_id: ConnectionId,
247   rate_limiter: &RateLimitCell,
248   context: LemmyContext,
249 ) -> Result<String, LemmyError> {
250   let json: Value = serde_json::from_str(&msg)?;
251   let data = &json["data"].to_string();
252   let op = &json["op"]
253     .as_str()
254     .ok_or_else(|| LemmyError::from_message("missing op"))?;
255
256   // check if api call passes the rate limit, and generate future for later execution
257   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
258     let passed = match user_operation_crud {
259       UserOperationCrud::Register => rate_limiter.register().check(ip),
260       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
261       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
262       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
263       _ => rate_limiter.message().check(ip),
264     };
265     check_rate_limit_passed(passed)?;
266     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
267   } else if let Ok(user_operation) = UserOperation::from_str(op) {
268     let passed = match user_operation {
269       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
270       _ => rate_limiter.message().check(ip),
271     };
272     check_rate_limit_passed(passed)?;
273     match_websocket_operation(context, connection_id, user_operation, data).await
274   } else {
275     let user_operation = UserOperationApub::from_str(op)?;
276     let passed = match user_operation {
277       UserOperationApub::Search => rate_limiter.search().check(ip),
278       _ => rate_limiter.message().check(ip),
279     };
280     check_rate_limit_passed(passed)?;
281     match_websocket_operation_apub(context, connection_id, user_operation, data).await
282   }
283 }
284
285 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
286   if passed {
287     Ok(())
288   } else {
289     // if rate limit was hit, respond with message
290     Err(LemmyError::from_message("rate_limit_error"))
291   }
292 }
293
294 pub async fn match_websocket_operation_crud(
295   context: LemmyContext,
296   id: ConnectionId,
297   op: UserOperationCrud,
298   data: &str,
299 ) -> result::Result<String, LemmyError> {
300   match op {
301     // User ops
302     UserOperationCrud::Register => {
303       do_websocket_operation_crud::<Register>(context, id, op, data).await
304     }
305     UserOperationCrud::DeleteAccount => {
306       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
307     }
308
309     // Private Message ops
310     UserOperationCrud::CreatePrivateMessage => {
311       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
312     }
313     UserOperationCrud::EditPrivateMessage => {
314       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
315     }
316     UserOperationCrud::DeletePrivateMessage => {
317       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
318     }
319     UserOperationCrud::GetPrivateMessages => {
320       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
321     }
322
323     // Site ops
324     UserOperationCrud::CreateSite => {
325       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
326     }
327     UserOperationCrud::EditSite => {
328       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
329     }
330     UserOperationCrud::GetSite => {
331       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
332     }
333
334     // Community ops
335     UserOperationCrud::ListCommunities => {
336       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
337     }
338     UserOperationCrud::CreateCommunity => {
339       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
340     }
341     UserOperationCrud::EditCommunity => {
342       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
343     }
344     UserOperationCrud::DeleteCommunity => {
345       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
346     }
347     UserOperationCrud::RemoveCommunity => {
348       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
349     }
350
351     // Post ops
352     UserOperationCrud::CreatePost => {
353       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
354     }
355     UserOperationCrud::GetPost => {
356       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
357     }
358     UserOperationCrud::EditPost => {
359       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
360     }
361     UserOperationCrud::DeletePost => {
362       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
363     }
364     UserOperationCrud::RemovePost => {
365       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
366     }
367
368     // Comment ops
369     UserOperationCrud::CreateComment => {
370       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
371     }
372     UserOperationCrud::EditComment => {
373       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
374     }
375     UserOperationCrud::DeleteComment => {
376       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
377     }
378     UserOperationCrud::RemoveComment => {
379       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
380     }
381     UserOperationCrud::GetComment => {
382       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
383     }
384   }
385 }
386
387 async fn do_websocket_operation_crud<'a, 'b, Data>(
388   context: LemmyContext,
389   id: ConnectionId,
390   op: UserOperationCrud,
391   data: &str,
392 ) -> result::Result<String, LemmyError>
393 where
394   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
395   for<'de> Data: Deserialize<'de>,
396 {
397   let parsed_data: Data = serde_json::from_str(data)?;
398   let res = parsed_data
399     .perform(&web::Data::new(context.clone()), Some(id))
400     .await?;
401   SendActivity::send_activity(&parsed_data, &res, &context).await?;
402   serialize_websocket_message(&op, &res)
403 }
404
405 pub async fn match_websocket_operation_apub(
406   context: LemmyContext,
407   id: ConnectionId,
408   op: UserOperationApub,
409   data: &str,
410 ) -> result::Result<String, LemmyError> {
411   match op {
412     UserOperationApub::GetPersonDetails => {
413       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
414     }
415     UserOperationApub::GetCommunity => {
416       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
417     }
418     UserOperationApub::GetComments => {
419       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
420     }
421     UserOperationApub::GetPosts => {
422       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
423     }
424     UserOperationApub::ResolveObject => {
425       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
426     }
427     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
428   }
429 }
430
431 async fn do_websocket_operation_apub<'a, 'b, Data>(
432   context: LemmyContext,
433   id: ConnectionId,
434   op: UserOperationApub,
435   data: &str,
436 ) -> result::Result<String, LemmyError>
437 where
438   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
439   for<'de> Data: Deserialize<'de>,
440 {
441   let parsed_data: Data = serde_json::from_str(data)?;
442   let res = parsed_data
443     .perform(&web::Data::new(context.clone()), Some(id))
444     .await?;
445   SendActivity::send_activity(&parsed_data, &res, &context).await?;
446   serialize_websocket_message(&op, &res)
447 }
448
449 pub async fn match_websocket_operation(
450   context: LemmyContext,
451   id: ConnectionId,
452   op: UserOperation,
453   data: &str,
454 ) -> result::Result<String, LemmyError> {
455   match op {
456     // User ops
457     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
458     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
459     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
460     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
461     UserOperation::GetUnreadRegistrationApplicationCount => {
462       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
463     }
464     UserOperation::ListRegistrationApplications => {
465       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
466     }
467     UserOperation::ApproveRegistrationApplication => {
468       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
469     }
470     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
471     UserOperation::GetBannedPersons => {
472       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
473     }
474     UserOperation::BlockPerson => {
475       do_websocket_operation::<BlockPerson>(context, id, op, data).await
476     }
477     UserOperation::GetPersonMentions => {
478       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
479     }
480     UserOperation::MarkPersonMentionAsRead => {
481       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
482     }
483     UserOperation::MarkCommentReplyAsRead => {
484       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
485     }
486     UserOperation::MarkAllAsRead => {
487       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
488     }
489     UserOperation::PasswordReset => {
490       do_websocket_operation::<PasswordReset>(context, id, op, data).await
491     }
492     UserOperation::PasswordChange => {
493       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
494     }
495     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
496     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
497     UserOperation::CommunityJoin => {
498       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
499     }
500     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
501     UserOperation::SaveUserSettings => {
502       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
503     }
504     UserOperation::ChangePassword => {
505       do_websocket_operation::<ChangePassword>(context, id, op, data).await
506     }
507     UserOperation::GetReportCount => {
508       do_websocket_operation::<GetReportCount>(context, id, op, data).await
509     }
510     UserOperation::GetUnreadCount => {
511       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
512     }
513     UserOperation::VerifyEmail => {
514       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
515     }
516
517     // Private Message ops
518     UserOperation::MarkPrivateMessageAsRead => {
519       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
520     }
521     UserOperation::CreatePrivateMessageReport => {
522       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
523     }
524     UserOperation::ResolvePrivateMessageReport => {
525       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
526     }
527     UserOperation::ListPrivateMessageReports => {
528       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
529     }
530
531     // Site ops
532     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
533     UserOperation::PurgePerson => {
534       do_websocket_operation::<PurgePerson>(context, id, op, data).await
535     }
536     UserOperation::PurgeCommunity => {
537       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
538     }
539     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
540     UserOperation::PurgeComment => {
541       do_websocket_operation::<PurgeComment>(context, id, op, data).await
542     }
543     UserOperation::TransferCommunity => {
544       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
545     }
546     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
547
548     // Community ops
549     UserOperation::FollowCommunity => {
550       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
551     }
552     UserOperation::BlockCommunity => {
553       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
554     }
555     UserOperation::BanFromCommunity => {
556       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
557     }
558     UserOperation::AddModToCommunity => {
559       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
560     }
561
562     // Post ops
563     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
564     UserOperation::FeaturePost => {
565       do_websocket_operation::<FeaturePost>(context, id, op, data).await
566     }
567     UserOperation::CreatePostLike => {
568       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
569     }
570     UserOperation::MarkPostAsRead => {
571       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
572     }
573     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
574     UserOperation::CreatePostReport => {
575       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
576     }
577     UserOperation::ListPostReports => {
578       do_websocket_operation::<ListPostReports>(context, id, op, data).await
579     }
580     UserOperation::ResolvePostReport => {
581       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
582     }
583     UserOperation::GetSiteMetadata => {
584       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
585     }
586
587     // Comment ops
588     UserOperation::SaveComment => {
589       do_websocket_operation::<SaveComment>(context, id, op, data).await
590     }
591     UserOperation::CreateCommentLike => {
592       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
593     }
594     UserOperation::DistinguishComment => {
595       do_websocket_operation::<DistinguishComment>(context, id, op, data).await
596     }
597     UserOperation::CreateCommentReport => {
598       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
599     }
600     UserOperation::ListCommentReports => {
601       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
602     }
603     UserOperation::ResolveCommentReport => {
604       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
605     }
606   }
607 }
608
609 async fn do_websocket_operation<'a, 'b, Data>(
610   context: LemmyContext,
611   id: ConnectionId,
612   op: UserOperation,
613   data: &str,
614 ) -> result::Result<String, LemmyError>
615 where
616   Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
617   for<'de> Data: Deserialize<'de>,
618 {
619   let parsed_data: Data = serde_json::from_str(data)?;
620   let res = parsed_data
621     .perform(&web::Data::new(context.clone()), Some(id))
622     .await?;
623   SendActivity::send_activity(&parsed_data, &res, &context).await?;
624   serialize_websocket_message(&op, &res)
625 }