]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
Adding a vector indexing check to prevent panics. Fixes #2753 (#2754)
[lemmy.git] / src / api_routes_websocket.rs
1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
7   comment::{
8     CreateComment,
9     CreateCommentLike,
10     CreateCommentReport,
11     DeleteComment,
12     DistinguishComment,
13     EditComment,
14     GetComment,
15     GetComments,
16     ListCommentReports,
17     RemoveComment,
18     ResolveCommentReport,
19     SaveComment,
20   },
21   community::{
22     AddModToCommunity,
23     BanFromCommunity,
24     BlockCommunity,
25     CreateCommunity,
26     DeleteCommunity,
27     EditCommunity,
28     FollowCommunity,
29     GetCommunity,
30     ListCommunities,
31     RemoveCommunity,
32     TransferCommunity,
33   },
34   context::LemmyContext,
35   person::{
36     AddAdmin,
37     BanPerson,
38     BlockPerson,
39     ChangePassword,
40     DeleteAccount,
41     GetBannedPersons,
42     GetCaptcha,
43     GetPersonDetails,
44     GetPersonMentions,
45     GetReplies,
46     GetReportCount,
47     GetUnreadCount,
48     Login,
49     MarkAllAsRead,
50     MarkCommentReplyAsRead,
51     MarkPersonMentionAsRead,
52     PasswordChangeAfterReset,
53     PasswordReset,
54     Register,
55     SaveUserSettings,
56     VerifyEmail,
57   },
58   post::{
59     CreatePost,
60     CreatePostLike,
61     CreatePostReport,
62     DeletePost,
63     EditPost,
64     FeaturePost,
65     GetPost,
66     GetPosts,
67     GetSiteMetadata,
68     ListPostReports,
69     LockPost,
70     MarkPostAsRead,
71     RemovePost,
72     ResolvePostReport,
73     SavePost,
74   },
75   private_message::{
76     CreatePrivateMessage,
77     CreatePrivateMessageReport,
78     DeletePrivateMessage,
79     EditPrivateMessage,
80     GetPrivateMessages,
81     ListPrivateMessageReports,
82     MarkPrivateMessageAsRead,
83     ResolvePrivateMessageReport,
84   },
85   site::{
86     ApproveRegistrationApplication,
87     CreateSite,
88     EditSite,
89     GetModlog,
90     GetSite,
91     GetUnreadRegistrationApplicationCount,
92     LeaveAdmin,
93     ListRegistrationApplications,
94     PurgeComment,
95     PurgeCommunity,
96     PurgePerson,
97     PurgePost,
98     ResolveObject,
99     Search,
100   },
101   websocket::{
102     serialize_websocket_message,
103     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
104     UserOperation,
105     UserOperationApub,
106     UserOperationCrud,
107   },
108 };
109 use lemmy_api_crud::PerformCrud;
110 use lemmy_apub::{api::PerformApub, SendActivity};
111 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
112 use serde::Deserialize;
113 use serde_json::Value;
114 use std::{
115   result,
116   str::FromStr,
117   sync::{Arc, Mutex},
118   time::{Duration, Instant},
119 };
120 use tracing::{debug, error, info};
121
122 /// Entry point for our route
123 pub async fn websocket(
124   req: HttpRequest,
125   body: web::Payload,
126   context: web::Data<LemmyContext>,
127   rate_limiter: web::Data<RateLimitCell>,
128 ) -> Result<HttpResponse, Error> {
129   let (response, session, stream) = actix_ws::handle(&req, body)?;
130
131   let client_ip = IpAddr(
132     req
133       .connection_info()
134       .realip_remote_addr()
135       .unwrap_or("blank_ip")
136       .to_string(),
137   );
138
139   let check = rate_limiter.message().check(client_ip.clone());
140   if !check {
141     debug!(
142       "Websocket join with IP: {} has been rate limited.",
143       &client_ip
144     );
145     session.close(None).await.map_err(LemmyError::from)?;
146     return Ok(response);
147   }
148
149   let connection_id = context.chat_server().handle_connect(session.clone())?;
150   info!("{} joined", &client_ip);
151
152   let alive = Arc::new(Mutex::new(Instant::now()));
153   heartbeat(session.clone(), alive.clone());
154
155   actix_rt::spawn(handle_messages(
156     stream,
157     client_ip,
158     session,
159     connection_id,
160     alive,
161     rate_limiter,
162     context,
163   ));
164
165   Ok(response)
166 }
167
168 async fn handle_messages(
169   mut stream: MessageStream,
170   client_ip: IpAddr,
171   mut session: Session,
172   connection_id: ConnectionId,
173   alive: Arc<Mutex<Instant>>,
174   rate_limiter: web::Data<RateLimitCell>,
175   context: web::Data<LemmyContext>,
176 ) -> Result<(), LemmyError> {
177   while let Some(Ok(msg)) = stream.next().await {
178     match msg {
179       ws::Message::Ping(bytes) => {
180         if session.pong(&bytes).await.is_err() {
181           break;
182         }
183       }
184       ws::Message::Pong(_) => {
185         let mut lock = alive
186           .lock()
187           .expect("Failed to acquire websocket heartbeat alive lock");
188         *lock = Instant::now();
189       }
190       ws::Message::Text(text) => {
191         let msg = text.trim().to_string();
192         let executed = parse_json_message(
193           msg,
194           client_ip.clone(),
195           connection_id,
196           rate_limiter.get_ref(),
197           context.get_ref().clone(),
198         )
199         .await;
200
201         let res = executed.unwrap_or_else(|e| {
202           error!("Error during message handling {}", e);
203           e.to_json()
204             .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
205         });
206         session.text(res).await?;
207       }
208       ws::Message::Close(_) => {
209         session.close(None).await?;
210         context.chat_server().handle_disconnect(&connection_id)?;
211         break;
212       }
213       ws::Message::Binary(_) => info!("Unexpected binary"),
214       _ => {}
215     }
216   }
217   Ok(())
218 }
219
220 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
221   actix_rt::spawn(async move {
222     let mut interval = actix_rt::time::interval(Duration::from_secs(5));
223     loop {
224       if session.ping(b"").await.is_err() {
225         break;
226       }
227
228       let duration_since = {
229         let alive_lock = alive
230           .lock()
231           .expect("Failed to acquire websocket heartbeat alive lock");
232         Instant::now().duration_since(*alive_lock)
233       };
234       if duration_since > Duration::from_secs(10) {
235         let _ = session.close(None).await;
236         break;
237       }
238       interval.tick().await;
239     }
240   });
241 }
242
243 async fn parse_json_message(
244   msg: String,
245   ip: IpAddr,
246   connection_id: ConnectionId,
247   rate_limiter: &RateLimitCell,
248   context: LemmyContext,
249 ) -> Result<String, LemmyError> {
250   let json: Value = serde_json::from_str(&msg)?;
251   let data = &json
252     .get("data")
253     .ok_or_else(|| LemmyError::from_message("missing data"))?
254     .to_string();
255   let op = &json
256     .get("op")
257     .ok_or_else(|| LemmyError::from_message("missing op"))?
258     .to_string();
259
260   // check if api call passes the rate limit, and generate future for later execution
261   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
262     let passed = match user_operation_crud {
263       UserOperationCrud::Register => rate_limiter.register().check(ip),
264       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
265       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
266       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
267       _ => rate_limiter.message().check(ip),
268     };
269     check_rate_limit_passed(passed)?;
270     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
271   } else if let Ok(user_operation) = UserOperation::from_str(op) {
272     let passed = match user_operation {
273       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
274       _ => rate_limiter.message().check(ip),
275     };
276     check_rate_limit_passed(passed)?;
277     match_websocket_operation(context, connection_id, user_operation, data).await
278   } else {
279     let user_operation = UserOperationApub::from_str(op)?;
280     let passed = match user_operation {
281       UserOperationApub::Search => rate_limiter.search().check(ip),
282       _ => rate_limiter.message().check(ip),
283     };
284     check_rate_limit_passed(passed)?;
285     match_websocket_operation_apub(context, connection_id, user_operation, data).await
286   }
287 }
288
289 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
290   if passed {
291     Ok(())
292   } else {
293     // if rate limit was hit, respond with message
294     Err(LemmyError::from_message("rate_limit_error"))
295   }
296 }
297
298 pub async fn match_websocket_operation_crud(
299   context: LemmyContext,
300   id: ConnectionId,
301   op: UserOperationCrud,
302   data: &str,
303 ) -> result::Result<String, LemmyError> {
304   match op {
305     // User ops
306     UserOperationCrud::Register => {
307       do_websocket_operation_crud::<Register>(context, id, op, data).await
308     }
309     UserOperationCrud::DeleteAccount => {
310       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
311     }
312
313     // Private Message ops
314     UserOperationCrud::CreatePrivateMessage => {
315       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
316     }
317     UserOperationCrud::EditPrivateMessage => {
318       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
319     }
320     UserOperationCrud::DeletePrivateMessage => {
321       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
322     }
323     UserOperationCrud::GetPrivateMessages => {
324       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
325     }
326
327     // Site ops
328     UserOperationCrud::CreateSite => {
329       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
330     }
331     UserOperationCrud::EditSite => {
332       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
333     }
334     UserOperationCrud::GetSite => {
335       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
336     }
337
338     // Community ops
339     UserOperationCrud::ListCommunities => {
340       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
341     }
342     UserOperationCrud::CreateCommunity => {
343       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
344     }
345     UserOperationCrud::EditCommunity => {
346       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
347     }
348     UserOperationCrud::DeleteCommunity => {
349       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
350     }
351     UserOperationCrud::RemoveCommunity => {
352       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
353     }
354
355     // Post ops
356     UserOperationCrud::CreatePost => {
357       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
358     }
359     UserOperationCrud::GetPost => {
360       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
361     }
362     UserOperationCrud::EditPost => {
363       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
364     }
365     UserOperationCrud::DeletePost => {
366       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
367     }
368     UserOperationCrud::RemovePost => {
369       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
370     }
371
372     // Comment ops
373     UserOperationCrud::CreateComment => {
374       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
375     }
376     UserOperationCrud::EditComment => {
377       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
378     }
379     UserOperationCrud::DeleteComment => {
380       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
381     }
382     UserOperationCrud::RemoveComment => {
383       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
384     }
385     UserOperationCrud::GetComment => {
386       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
387     }
388   }
389 }
390
391 async fn do_websocket_operation_crud<'a, 'b, Data>(
392   context: LemmyContext,
393   id: ConnectionId,
394   op: UserOperationCrud,
395   data: &str,
396 ) -> result::Result<String, LemmyError>
397 where
398   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
399   for<'de> Data: Deserialize<'de>,
400 {
401   let parsed_data: Data = serde_json::from_str(data)?;
402   let res = parsed_data
403     .perform(&web::Data::new(context.clone()), Some(id))
404     .await?;
405   SendActivity::send_activity(&parsed_data, &res, &context).await?;
406   serialize_websocket_message(&op, &res)
407 }
408
409 pub async fn match_websocket_operation_apub(
410   context: LemmyContext,
411   id: ConnectionId,
412   op: UserOperationApub,
413   data: &str,
414 ) -> result::Result<String, LemmyError> {
415   match op {
416     UserOperationApub::GetPersonDetails => {
417       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
418     }
419     UserOperationApub::GetCommunity => {
420       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
421     }
422     UserOperationApub::GetComments => {
423       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
424     }
425     UserOperationApub::GetPosts => {
426       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
427     }
428     UserOperationApub::ResolveObject => {
429       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
430     }
431     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
432   }
433 }
434
435 async fn do_websocket_operation_apub<'a, 'b, Data>(
436   context: LemmyContext,
437   id: ConnectionId,
438   op: UserOperationApub,
439   data: &str,
440 ) -> result::Result<String, LemmyError>
441 where
442   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
443   for<'de> Data: Deserialize<'de>,
444 {
445   let parsed_data: Data = serde_json::from_str(data)?;
446   let res = parsed_data
447     .perform(&web::Data::new(context.clone()), Some(id))
448     .await?;
449   SendActivity::send_activity(&parsed_data, &res, &context).await?;
450   serialize_websocket_message(&op, &res)
451 }
452
453 pub async fn match_websocket_operation(
454   context: LemmyContext,
455   id: ConnectionId,
456   op: UserOperation,
457   data: &str,
458 ) -> result::Result<String, LemmyError> {
459   match op {
460     // User ops
461     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
462     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
463     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
464     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
465     UserOperation::GetUnreadRegistrationApplicationCount => {
466       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
467     }
468     UserOperation::ListRegistrationApplications => {
469       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
470     }
471     UserOperation::ApproveRegistrationApplication => {
472       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
473     }
474     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
475     UserOperation::GetBannedPersons => {
476       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
477     }
478     UserOperation::BlockPerson => {
479       do_websocket_operation::<BlockPerson>(context, id, op, data).await
480     }
481     UserOperation::GetPersonMentions => {
482       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
483     }
484     UserOperation::MarkPersonMentionAsRead => {
485       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
486     }
487     UserOperation::MarkCommentReplyAsRead => {
488       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
489     }
490     UserOperation::MarkAllAsRead => {
491       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
492     }
493     UserOperation::PasswordReset => {
494       do_websocket_operation::<PasswordReset>(context, id, op, data).await
495     }
496     UserOperation::PasswordChange => {
497       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
498     }
499     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
500     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
501     UserOperation::CommunityJoin => {
502       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
503     }
504     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
505     UserOperation::SaveUserSettings => {
506       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
507     }
508     UserOperation::ChangePassword => {
509       do_websocket_operation::<ChangePassword>(context, id, op, data).await
510     }
511     UserOperation::GetReportCount => {
512       do_websocket_operation::<GetReportCount>(context, id, op, data).await
513     }
514     UserOperation::GetUnreadCount => {
515       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
516     }
517     UserOperation::VerifyEmail => {
518       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
519     }
520
521     // Private Message ops
522     UserOperation::MarkPrivateMessageAsRead => {
523       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
524     }
525     UserOperation::CreatePrivateMessageReport => {
526       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
527     }
528     UserOperation::ResolvePrivateMessageReport => {
529       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
530     }
531     UserOperation::ListPrivateMessageReports => {
532       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
533     }
534
535     // Site ops
536     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
537     UserOperation::PurgePerson => {
538       do_websocket_operation::<PurgePerson>(context, id, op, data).await
539     }
540     UserOperation::PurgeCommunity => {
541       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
542     }
543     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
544     UserOperation::PurgeComment => {
545       do_websocket_operation::<PurgeComment>(context, id, op, data).await
546     }
547     UserOperation::TransferCommunity => {
548       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
549     }
550     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
551
552     // Community ops
553     UserOperation::FollowCommunity => {
554       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
555     }
556     UserOperation::BlockCommunity => {
557       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
558     }
559     UserOperation::BanFromCommunity => {
560       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
561     }
562     UserOperation::AddModToCommunity => {
563       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
564     }
565
566     // Post ops
567     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
568     UserOperation::FeaturePost => {
569       do_websocket_operation::<FeaturePost>(context, id, op, data).await
570     }
571     UserOperation::CreatePostLike => {
572       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
573     }
574     UserOperation::MarkPostAsRead => {
575       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
576     }
577     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
578     UserOperation::CreatePostReport => {
579       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
580     }
581     UserOperation::ListPostReports => {
582       do_websocket_operation::<ListPostReports>(context, id, op, data).await
583     }
584     UserOperation::ResolvePostReport => {
585       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
586     }
587     UserOperation::GetSiteMetadata => {
588       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
589     }
590
591     // Comment ops
592     UserOperation::SaveComment => {
593       do_websocket_operation::<SaveComment>(context, id, op, data).await
594     }
595     UserOperation::CreateCommentLike => {
596       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
597     }
598     UserOperation::DistinguishComment => {
599       do_websocket_operation::<DistinguishComment>(context, id, op, data).await
600     }
601     UserOperation::CreateCommentReport => {
602       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
603     }
604     UserOperation::ListCommentReports => {
605       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
606     }
607     UserOperation::ResolveCommentReport => {
608       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
609     }
610   }
611 }
612
613 async fn do_websocket_operation<'a, 'b, Data>(
614   context: LemmyContext,
615   id: ConnectionId,
616   op: UserOperation,
617   data: &str,
618 ) -> result::Result<String, LemmyError>
619 where
620   Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
621   for<'de> Data: Deserialize<'de>,
622 {
623   let parsed_data: Data = serde_json::from_str(data)?;
624   let res = parsed_data
625     .perform(&web::Data::new(context.clone()), Some(id))
626     .await?;
627   SendActivity::send_activity(&parsed_data, &res, &context).await?;
628   serialize_websocket_message(&op, &res)
629 }