]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
Add Custom Emojis Support (#2616)
[lemmy.git] / src / api_routes_websocket.rs
1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
7   comment::{
8     CreateComment,
9     CreateCommentLike,
10     CreateCommentReport,
11     DeleteComment,
12     DistinguishComment,
13     EditComment,
14     GetComment,
15     GetComments,
16     ListCommentReports,
17     RemoveComment,
18     ResolveCommentReport,
19     SaveComment,
20   },
21   community::{
22     AddModToCommunity,
23     BanFromCommunity,
24     BlockCommunity,
25     CreateCommunity,
26     DeleteCommunity,
27     EditCommunity,
28     FollowCommunity,
29     GetCommunity,
30     ListCommunities,
31     RemoveCommunity,
32     TransferCommunity,
33   },
34   context::LemmyContext,
35   custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
36   person::{
37     AddAdmin,
38     BanPerson,
39     BlockPerson,
40     ChangePassword,
41     DeleteAccount,
42     GetBannedPersons,
43     GetCaptcha,
44     GetPersonDetails,
45     GetPersonMentions,
46     GetReplies,
47     GetReportCount,
48     GetUnreadCount,
49     Login,
50     MarkAllAsRead,
51     MarkCommentReplyAsRead,
52     MarkPersonMentionAsRead,
53     PasswordChangeAfterReset,
54     PasswordReset,
55     Register,
56     SaveUserSettings,
57     VerifyEmail,
58   },
59   post::{
60     CreatePost,
61     CreatePostLike,
62     CreatePostReport,
63     DeletePost,
64     EditPost,
65     FeaturePost,
66     GetPost,
67     GetPosts,
68     GetSiteMetadata,
69     ListPostReports,
70     LockPost,
71     MarkPostAsRead,
72     RemovePost,
73     ResolvePostReport,
74     SavePost,
75   },
76   private_message::{
77     CreatePrivateMessage,
78     CreatePrivateMessageReport,
79     DeletePrivateMessage,
80     EditPrivateMessage,
81     GetPrivateMessages,
82     ListPrivateMessageReports,
83     MarkPrivateMessageAsRead,
84     ResolvePrivateMessageReport,
85   },
86   site::{
87     ApproveRegistrationApplication,
88     CreateSite,
89     EditSite,
90     GetModlog,
91     GetSite,
92     GetUnreadRegistrationApplicationCount,
93     LeaveAdmin,
94     ListRegistrationApplications,
95     PurgeComment,
96     PurgeCommunity,
97     PurgePerson,
98     PurgePost,
99     ResolveObject,
100     Search,
101   },
102   websocket::{
103     serialize_websocket_message,
104     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
105     UserOperation,
106     UserOperationApub,
107     UserOperationCrud,
108   },
109 };
110 use lemmy_api_crud::PerformCrud;
111 use lemmy_apub::{api::PerformApub, SendActivity};
112 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
113 use serde::Deserialize;
114 use serde_json::Value;
115 use std::{
116   result,
117   str::FromStr,
118   sync::{Arc, Mutex},
119   time::{Duration, Instant},
120 };
121 use tracing::{debug, error, info};
122
123 /// Entry point for our route
124 pub async fn websocket(
125   req: HttpRequest,
126   body: web::Payload,
127   context: web::Data<LemmyContext>,
128   rate_limiter: web::Data<RateLimitCell>,
129 ) -> Result<HttpResponse, Error> {
130   let (response, session, stream) = actix_ws::handle(&req, body)?;
131
132   let client_ip = IpAddr(
133     req
134       .connection_info()
135       .realip_remote_addr()
136       .unwrap_or("blank_ip")
137       .to_string(),
138   );
139
140   let check = rate_limiter.message().check(client_ip.clone());
141   if !check {
142     debug!(
143       "Websocket join with IP: {} has been rate limited.",
144       &client_ip
145     );
146     session.close(None).await.map_err(LemmyError::from)?;
147     return Ok(response);
148   }
149
150   let connection_id = context.chat_server().handle_connect(session.clone())?;
151   info!("{} joined", &client_ip);
152
153   let alive = Arc::new(Mutex::new(Instant::now()));
154   heartbeat(session.clone(), alive.clone());
155
156   actix_rt::spawn(handle_messages(
157     stream,
158     client_ip,
159     session,
160     connection_id,
161     alive,
162     rate_limiter,
163     context,
164   ));
165
166   Ok(response)
167 }
168
169 async fn handle_messages(
170   mut stream: MessageStream,
171   client_ip: IpAddr,
172   mut session: Session,
173   connection_id: ConnectionId,
174   alive: Arc<Mutex<Instant>>,
175   rate_limiter: web::Data<RateLimitCell>,
176   context: web::Data<LemmyContext>,
177 ) -> Result<(), LemmyError> {
178   while let Some(Ok(msg)) = stream.next().await {
179     match msg {
180       ws::Message::Ping(bytes) => {
181         if session.pong(&bytes).await.is_err() {
182           break;
183         }
184       }
185       ws::Message::Pong(_) => {
186         let mut lock = alive
187           .lock()
188           .expect("Failed to acquire websocket heartbeat alive lock");
189         *lock = Instant::now();
190       }
191       ws::Message::Text(text) => {
192         let msg = text.trim().to_string();
193         let executed = parse_json_message(
194           msg,
195           client_ip.clone(),
196           connection_id,
197           rate_limiter.get_ref(),
198           context.get_ref().clone(),
199         )
200         .await;
201
202         let res = executed.unwrap_or_else(|e| {
203           error!("Error during message handling {}", e);
204           e.to_json()
205             .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
206         });
207         session.text(res).await?;
208       }
209       ws::Message::Close(_) => {
210         session.close(None).await?;
211         context.chat_server().handle_disconnect(&connection_id)?;
212         break;
213       }
214       ws::Message::Binary(_) => info!("Unexpected binary"),
215       _ => {}
216     }
217   }
218   Ok(())
219 }
220
221 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
222   actix_rt::spawn(async move {
223     let mut interval = actix_rt::time::interval(Duration::from_secs(5));
224     loop {
225       if session.ping(b"").await.is_err() {
226         break;
227       }
228
229       let duration_since = {
230         let alive_lock = alive
231           .lock()
232           .expect("Failed to acquire websocket heartbeat alive lock");
233         Instant::now().duration_since(*alive_lock)
234       };
235       if duration_since > Duration::from_secs(10) {
236         let _ = session.close(None).await;
237         break;
238       }
239       interval.tick().await;
240     }
241   });
242 }
243
244 async fn parse_json_message(
245   msg: String,
246   ip: IpAddr,
247   connection_id: ConnectionId,
248   rate_limiter: &RateLimitCell,
249   context: LemmyContext,
250 ) -> Result<String, LemmyError> {
251   let json: Value = serde_json::from_str(&msg)?;
252   let data = json
253     .get("data")
254     .cloned()
255     .ok_or_else(|| LemmyError::from_message("missing data"))?;
256
257   let missing_op_err = || LemmyError::from_message("missing op");
258
259   let op = json
260     .get("op")
261     .ok_or_else(missing_op_err)?
262     .as_str()
263     .ok_or_else(missing_op_err)?;
264
265   // check if api call passes the rate limit, and generate future for later execution
266   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
267     let passed = match user_operation_crud {
268       UserOperationCrud::Register => rate_limiter.register().check(ip),
269       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
270       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
271       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
272       _ => rate_limiter.message().check(ip),
273     };
274     check_rate_limit_passed(passed)?;
275     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
276   } else if let Ok(user_operation) = UserOperation::from_str(op) {
277     let passed = match user_operation {
278       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
279       _ => rate_limiter.message().check(ip),
280     };
281     check_rate_limit_passed(passed)?;
282     match_websocket_operation(context, connection_id, user_operation, data).await
283   } else {
284     let user_operation = UserOperationApub::from_str(op)?;
285     let passed = match user_operation {
286       UserOperationApub::Search => rate_limiter.search().check(ip),
287       _ => rate_limiter.message().check(ip),
288     };
289     check_rate_limit_passed(passed)?;
290     match_websocket_operation_apub(context, connection_id, user_operation, data).await
291   }
292 }
293
294 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
295   if passed {
296     Ok(())
297   } else {
298     // if rate limit was hit, respond with message
299     Err(LemmyError::from_message("rate_limit_error"))
300   }
301 }
302
303 pub async fn match_websocket_operation_crud(
304   context: LemmyContext,
305   id: ConnectionId,
306   op: UserOperationCrud,
307   data: Value,
308 ) -> result::Result<String, LemmyError> {
309   match op {
310     // User ops
311     UserOperationCrud::Register => {
312       do_websocket_operation_crud::<Register>(context, id, op, data).await
313     }
314     UserOperationCrud::DeleteAccount => {
315       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
316     }
317
318     // Private Message ops
319     UserOperationCrud::CreatePrivateMessage => {
320       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
321     }
322     UserOperationCrud::EditPrivateMessage => {
323       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
324     }
325     UserOperationCrud::DeletePrivateMessage => {
326       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
327     }
328     UserOperationCrud::GetPrivateMessages => {
329       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
330     }
331
332     // Site ops
333     UserOperationCrud::CreateSite => {
334       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
335     }
336     UserOperationCrud::EditSite => {
337       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
338     }
339     UserOperationCrud::GetSite => {
340       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
341     }
342
343     // Community ops
344     UserOperationCrud::ListCommunities => {
345       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
346     }
347     UserOperationCrud::CreateCommunity => {
348       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
349     }
350     UserOperationCrud::EditCommunity => {
351       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
352     }
353     UserOperationCrud::DeleteCommunity => {
354       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
355     }
356     UserOperationCrud::RemoveCommunity => {
357       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
358     }
359
360     // Post ops
361     UserOperationCrud::CreatePost => {
362       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
363     }
364     UserOperationCrud::GetPost => {
365       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
366     }
367     UserOperationCrud::EditPost => {
368       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
369     }
370     UserOperationCrud::DeletePost => {
371       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
372     }
373     UserOperationCrud::RemovePost => {
374       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
375     }
376
377     // Comment ops
378     UserOperationCrud::CreateComment => {
379       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
380     }
381     UserOperationCrud::EditComment => {
382       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
383     }
384     UserOperationCrud::DeleteComment => {
385       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
386     }
387     UserOperationCrud::RemoveComment => {
388       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
389     }
390     UserOperationCrud::GetComment => {
391       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
392     }
393     // Emojis
394     UserOperationCrud::CreateCustomEmoji => {
395       do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
396     }
397     UserOperationCrud::EditCustomEmoji => {
398       do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
399     }
400     UserOperationCrud::DeleteCustomEmoji => {
401       do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
402     }
403   }
404 }
405
406 async fn do_websocket_operation_crud<'a, 'b, Data>(
407   context: LemmyContext,
408   id: ConnectionId,
409   op: UserOperationCrud,
410   data: Value,
411 ) -> result::Result<String, LemmyError>
412 where
413   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
414   for<'de> Data: Deserialize<'de>,
415 {
416   let parsed_data: Data = serde_json::from_value(data)?;
417   let res = parsed_data
418     .perform(&web::Data::new(context.clone()), Some(id))
419     .await?;
420   SendActivity::send_activity(&parsed_data, &res, &context).await?;
421   serialize_websocket_message(&op, &res)
422 }
423
424 pub async fn match_websocket_operation_apub(
425   context: LemmyContext,
426   id: ConnectionId,
427   op: UserOperationApub,
428   data: Value,
429 ) -> result::Result<String, LemmyError> {
430   match op {
431     UserOperationApub::GetPersonDetails => {
432       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
433     }
434     UserOperationApub::GetCommunity => {
435       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
436     }
437     UserOperationApub::GetComments => {
438       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
439     }
440     UserOperationApub::GetPosts => {
441       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
442     }
443     UserOperationApub::ResolveObject => {
444       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
445     }
446     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
447   }
448 }
449
450 async fn do_websocket_operation_apub<'a, 'b, Data>(
451   context: LemmyContext,
452   id: ConnectionId,
453   op: UserOperationApub,
454   data: Value,
455 ) -> result::Result<String, LemmyError>
456 where
457   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
458   for<'de> Data: Deserialize<'de>,
459 {
460   let parsed_data: Data = serde_json::from_value(data)?;
461   let res = parsed_data
462     .perform(&web::Data::new(context.clone()), Some(id))
463     .await?;
464   SendActivity::send_activity(&parsed_data, &res, &context).await?;
465   serialize_websocket_message(&op, &res)
466 }
467
468 pub async fn match_websocket_operation(
469   context: LemmyContext,
470   id: ConnectionId,
471   op: UserOperation,
472   data: Value,
473 ) -> result::Result<String, LemmyError> {
474   match op {
475     // User ops
476     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
477     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
478     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
479     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
480     UserOperation::GetUnreadRegistrationApplicationCount => {
481       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
482     }
483     UserOperation::ListRegistrationApplications => {
484       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
485     }
486     UserOperation::ApproveRegistrationApplication => {
487       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
488     }
489     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
490     UserOperation::GetBannedPersons => {
491       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
492     }
493     UserOperation::BlockPerson => {
494       do_websocket_operation::<BlockPerson>(context, id, op, data).await
495     }
496     UserOperation::GetPersonMentions => {
497       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
498     }
499     UserOperation::MarkPersonMentionAsRead => {
500       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
501     }
502     UserOperation::MarkCommentReplyAsRead => {
503       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
504     }
505     UserOperation::MarkAllAsRead => {
506       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
507     }
508     UserOperation::PasswordReset => {
509       do_websocket_operation::<PasswordReset>(context, id, op, data).await
510     }
511     UserOperation::PasswordChange => {
512       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
513     }
514     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
515     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
516     UserOperation::CommunityJoin => {
517       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
518     }
519     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
520     UserOperation::SaveUserSettings => {
521       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
522     }
523     UserOperation::ChangePassword => {
524       do_websocket_operation::<ChangePassword>(context, id, op, data).await
525     }
526     UserOperation::GetReportCount => {
527       do_websocket_operation::<GetReportCount>(context, id, op, data).await
528     }
529     UserOperation::GetUnreadCount => {
530       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
531     }
532     UserOperation::VerifyEmail => {
533       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
534     }
535
536     // Private Message ops
537     UserOperation::MarkPrivateMessageAsRead => {
538       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
539     }
540     UserOperation::CreatePrivateMessageReport => {
541       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
542     }
543     UserOperation::ResolvePrivateMessageReport => {
544       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
545     }
546     UserOperation::ListPrivateMessageReports => {
547       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
548     }
549
550     // Site ops
551     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
552     UserOperation::PurgePerson => {
553       do_websocket_operation::<PurgePerson>(context, id, op, data).await
554     }
555     UserOperation::PurgeCommunity => {
556       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
557     }
558     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
559     UserOperation::PurgeComment => {
560       do_websocket_operation::<PurgeComment>(context, id, op, data).await
561     }
562     UserOperation::TransferCommunity => {
563       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
564     }
565     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
566
567     // Community ops
568     UserOperation::FollowCommunity => {
569       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
570     }
571     UserOperation::BlockCommunity => {
572       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
573     }
574     UserOperation::BanFromCommunity => {
575       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
576     }
577     UserOperation::AddModToCommunity => {
578       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
579     }
580
581     // Post ops
582     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
583     UserOperation::FeaturePost => {
584       do_websocket_operation::<FeaturePost>(context, id, op, data).await
585     }
586     UserOperation::CreatePostLike => {
587       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
588     }
589     UserOperation::MarkPostAsRead => {
590       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
591     }
592     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
593     UserOperation::CreatePostReport => {
594       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
595     }
596     UserOperation::ListPostReports => {
597       do_websocket_operation::<ListPostReports>(context, id, op, data).await
598     }
599     UserOperation::ResolvePostReport => {
600       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
601     }
602     UserOperation::GetSiteMetadata => {
603       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
604     }
605
606     // Comment ops
607     UserOperation::SaveComment => {
608       do_websocket_operation::<SaveComment>(context, id, op, data).await
609     }
610     UserOperation::CreateCommentLike => {
611       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
612     }
613     UserOperation::DistinguishComment => {
614       do_websocket_operation::<DistinguishComment>(context, id, op, data).await
615     }
616     UserOperation::CreateCommentReport => {
617       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
618     }
619     UserOperation::ListCommentReports => {
620       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
621     }
622     UserOperation::ResolveCommentReport => {
623       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
624     }
625   }
626 }
627
628 async fn do_websocket_operation<'a, 'b, Data>(
629   context: LemmyContext,
630   id: ConnectionId,
631   op: UserOperation,
632   data: Value,
633 ) -> result::Result<String, LemmyError>
634 where
635   Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
636   for<'de> Data: Deserialize<'de>,
637 {
638   let parsed_data: Data = serde_json::from_value(data)?;
639   let res = parsed_data
640     .perform(&web::Data::new(context.clone()), Some(id))
641     .await?;
642   SendActivity::send_activity(&parsed_data, &res, &context).await?;
643   serialize_websocket_message(&op, &res)
644 }