]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
Activitypub crate rewrite (#2782)
[lemmy.git] / src / api_routes_websocket.rs
1 use activitypub_federation::config::Data as ContextData;
2 use actix_web::{web, Error, HttpRequest, HttpResponse};
3 use actix_web_actors::ws;
4 use actix_ws::{MessageStream, Session};
5 use futures::stream::StreamExt;
6 use lemmy_api::Perform;
7 use lemmy_api_common::{
8   comment::{
9     CreateComment,
10     CreateCommentLike,
11     CreateCommentReport,
12     DeleteComment,
13     DistinguishComment,
14     EditComment,
15     GetComment,
16     GetComments,
17     ListCommentReports,
18     RemoveComment,
19     ResolveCommentReport,
20     SaveComment,
21   },
22   community::{
23     AddModToCommunity,
24     BanFromCommunity,
25     BlockCommunity,
26     CreateCommunity,
27     DeleteCommunity,
28     EditCommunity,
29     FollowCommunity,
30     GetCommunity,
31     ListCommunities,
32     RemoveCommunity,
33     TransferCommunity,
34   },
35   context::LemmyContext,
36   custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
37   person::{
38     AddAdmin,
39     BanPerson,
40     BlockPerson,
41     ChangePassword,
42     DeleteAccount,
43     GetBannedPersons,
44     GetCaptcha,
45     GetPersonDetails,
46     GetPersonMentions,
47     GetReplies,
48     GetReportCount,
49     GetUnreadCount,
50     Login,
51     MarkAllAsRead,
52     MarkCommentReplyAsRead,
53     MarkPersonMentionAsRead,
54     PasswordChangeAfterReset,
55     PasswordReset,
56     Register,
57     SaveUserSettings,
58     VerifyEmail,
59   },
60   post::{
61     CreatePost,
62     CreatePostLike,
63     CreatePostReport,
64     DeletePost,
65     EditPost,
66     FeaturePost,
67     GetPost,
68     GetPosts,
69     GetSiteMetadata,
70     ListPostReports,
71     LockPost,
72     MarkPostAsRead,
73     RemovePost,
74     ResolvePostReport,
75     SavePost,
76   },
77   private_message::{
78     CreatePrivateMessage,
79     CreatePrivateMessageReport,
80     DeletePrivateMessage,
81     EditPrivateMessage,
82     GetPrivateMessages,
83     ListPrivateMessageReports,
84     MarkPrivateMessageAsRead,
85     ResolvePrivateMessageReport,
86   },
87   site::{
88     ApproveRegistrationApplication,
89     CreateSite,
90     EditSite,
91     GetModlog,
92     GetSite,
93     GetUnreadRegistrationApplicationCount,
94     LeaveAdmin,
95     ListRegistrationApplications,
96     PurgeComment,
97     PurgeCommunity,
98     PurgePerson,
99     PurgePost,
100     ResolveObject,
101     Search,
102   },
103   websocket::{
104     serialize_websocket_message,
105     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
106     UserOperation,
107     UserOperationApub,
108     UserOperationCrud,
109   },
110 };
111 use lemmy_api_crud::PerformCrud;
112 use lemmy_apub::{api::PerformApub, SendActivity};
113 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
114 use serde::Deserialize;
115 use serde_json::Value;
116 use std::{
117   ops::Deref,
118   result,
119   str::FromStr,
120   sync::{Arc, Mutex},
121   time::{Duration, Instant},
122 };
123 use tracing::{debug, error, info};
124
125 /// Entry point for our route
126 pub async fn websocket(
127   req: HttpRequest,
128   body: web::Payload,
129   context: web::Data<LemmyContext>,
130   rate_limiter: web::Data<RateLimitCell>,
131   apub_data: ContextData<LemmyContext>,
132 ) -> Result<HttpResponse, Error> {
133   let (response, session, stream) = actix_ws::handle(&req, body)?;
134
135   let client_ip = IpAddr(
136     req
137       .connection_info()
138       .realip_remote_addr()
139       .unwrap_or("blank_ip")
140       .to_string(),
141   );
142
143   let check = rate_limiter.message().check(client_ip.clone());
144   if !check {
145     debug!(
146       "Websocket join with IP: {} has been rate limited.",
147       &client_ip
148     );
149     session.close(None).await.map_err(LemmyError::from)?;
150     return Ok(response);
151   }
152
153   let connection_id = context.chat_server().handle_connect(session.clone())?;
154   info!("{} joined", &client_ip);
155
156   let alive = Arc::new(Mutex::new(Instant::now()));
157   heartbeat(session.clone(), alive.clone());
158
159   actix_rt::spawn(handle_messages(
160     stream,
161     client_ip,
162     session,
163     connection_id,
164     alive,
165     rate_limiter,
166     apub_data,
167   ));
168
169   Ok(response)
170 }
171
172 async fn handle_messages(
173   mut stream: MessageStream,
174   client_ip: IpAddr,
175   mut session: Session,
176   connection_id: ConnectionId,
177   alive: Arc<Mutex<Instant>>,
178   rate_limiter: web::Data<RateLimitCell>,
179   context: ContextData<LemmyContext>,
180 ) -> Result<(), LemmyError> {
181   while let Some(Ok(msg)) = stream.next().await {
182     match msg {
183       ws::Message::Ping(bytes) => {
184         if session.pong(&bytes).await.is_err() {
185           break;
186         }
187       }
188       ws::Message::Pong(_) => {
189         let mut lock = alive
190           .lock()
191           .expect("Failed to acquire websocket heartbeat alive lock");
192         *lock = Instant::now();
193       }
194       ws::Message::Text(text) => {
195         let msg = text.trim().to_string();
196         let executed = parse_json_message(
197           msg,
198           client_ip.clone(),
199           connection_id,
200           rate_limiter.get_ref(),
201           context.reset_request_count(),
202         )
203         .await;
204
205         let res = executed.unwrap_or_else(|e| {
206           error!("Error during message handling {}", e);
207           e.to_json()
208             .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
209         });
210         session.text(res).await?;
211       }
212       ws::Message::Close(_) => {
213         session.close(None).await?;
214         context.chat_server().handle_disconnect(&connection_id)?;
215         break;
216       }
217       ws::Message::Binary(_) => info!("Unexpected binary"),
218       _ => {}
219     }
220   }
221   Ok(())
222 }
223
224 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
225   actix_rt::spawn(async move {
226     let mut interval = actix_rt::time::interval(Duration::from_secs(5));
227     loop {
228       if session.ping(b"").await.is_err() {
229         break;
230       }
231
232       let duration_since = {
233         let alive_lock = alive
234           .lock()
235           .expect("Failed to acquire websocket heartbeat alive lock");
236         Instant::now().duration_since(*alive_lock)
237       };
238       if duration_since > Duration::from_secs(10) {
239         let _ = session.close(None).await;
240         break;
241       }
242       interval.tick().await;
243     }
244   });
245 }
246
247 async fn parse_json_message(
248   msg: String,
249   ip: IpAddr,
250   connection_id: ConnectionId,
251   rate_limiter: &RateLimitCell,
252   context: ContextData<LemmyContext>,
253 ) -> Result<String, LemmyError> {
254   let json: Value = serde_json::from_str(&msg)?;
255   let data = json
256     .get("data")
257     .cloned()
258     .ok_or_else(|| LemmyError::from_message("missing data"))?;
259
260   let missing_op_err = || LemmyError::from_message("missing op");
261
262   let op = json
263     .get("op")
264     .ok_or_else(missing_op_err)?
265     .as_str()
266     .ok_or_else(missing_op_err)?;
267
268   // check if api call passes the rate limit, and generate future for later execution
269   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
270     let passed = match user_operation_crud {
271       UserOperationCrud::Register => rate_limiter.register().check(ip),
272       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
273       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
274       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
275       _ => rate_limiter.message().check(ip),
276     };
277     check_rate_limit_passed(passed)?;
278     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
279   } else if let Ok(user_operation) = UserOperation::from_str(op) {
280     let passed = match user_operation {
281       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
282       _ => rate_limiter.message().check(ip),
283     };
284     check_rate_limit_passed(passed)?;
285     match_websocket_operation(context, connection_id, user_operation, data).await
286   } else {
287     let user_operation = UserOperationApub::from_str(op)?;
288     let passed = match user_operation {
289       UserOperationApub::Search => rate_limiter.search().check(ip),
290       _ => rate_limiter.message().check(ip),
291     };
292     check_rate_limit_passed(passed)?;
293     match_websocket_operation_apub(context, connection_id, user_operation, data).await
294   }
295 }
296
297 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
298   if passed {
299     Ok(())
300   } else {
301     // if rate limit was hit, respond with message
302     Err(LemmyError::from_message("rate_limit_error"))
303   }
304 }
305
306 pub async fn match_websocket_operation_crud(
307   context: ContextData<LemmyContext>,
308   id: ConnectionId,
309   op: UserOperationCrud,
310   data: Value,
311 ) -> result::Result<String, LemmyError> {
312   match op {
313     // User ops
314     UserOperationCrud::Register => {
315       do_websocket_operation_crud::<Register>(context, id, op, data).await
316     }
317     UserOperationCrud::DeleteAccount => {
318       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
319     }
320
321     // Private Message ops
322     UserOperationCrud::CreatePrivateMessage => {
323       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
324     }
325     UserOperationCrud::EditPrivateMessage => {
326       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
327     }
328     UserOperationCrud::DeletePrivateMessage => {
329       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
330     }
331     UserOperationCrud::GetPrivateMessages => {
332       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
333     }
334
335     // Site ops
336     UserOperationCrud::CreateSite => {
337       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
338     }
339     UserOperationCrud::EditSite => {
340       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
341     }
342     UserOperationCrud::GetSite => {
343       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
344     }
345
346     // Community ops
347     UserOperationCrud::ListCommunities => {
348       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
349     }
350     UserOperationCrud::CreateCommunity => {
351       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
352     }
353     UserOperationCrud::EditCommunity => {
354       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
355     }
356     UserOperationCrud::DeleteCommunity => {
357       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
358     }
359     UserOperationCrud::RemoveCommunity => {
360       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
361     }
362
363     // Post ops
364     UserOperationCrud::CreatePost => {
365       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
366     }
367     UserOperationCrud::GetPost => {
368       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
369     }
370     UserOperationCrud::EditPost => {
371       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
372     }
373     UserOperationCrud::DeletePost => {
374       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
375     }
376     UserOperationCrud::RemovePost => {
377       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
378     }
379
380     // Comment ops
381     UserOperationCrud::CreateComment => {
382       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
383     }
384     UserOperationCrud::EditComment => {
385       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
386     }
387     UserOperationCrud::DeleteComment => {
388       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
389     }
390     UserOperationCrud::RemoveComment => {
391       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
392     }
393     UserOperationCrud::GetComment => {
394       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
395     }
396     // Emojis
397     UserOperationCrud::CreateCustomEmoji => {
398       do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
399     }
400     UserOperationCrud::EditCustomEmoji => {
401       do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
402     }
403     UserOperationCrud::DeleteCustomEmoji => {
404       do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
405     }
406   }
407 }
408
409 async fn do_websocket_operation_crud<'a, 'b, Data>(
410   context: ContextData<LemmyContext>,
411   id: ConnectionId,
412   op: UserOperationCrud,
413   data: Value,
414 ) -> result::Result<String, LemmyError>
415 where
416   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response> + Send,
417   for<'de> Data: Deserialize<'de>,
418 {
419   let parsed_data: Data = serde_json::from_value(data)?;
420   let res = parsed_data
421     .perform(&web::Data::new(context.deref().clone()), Some(id))
422     .await?;
423   SendActivity::send_activity(&parsed_data, &res, &context).await?;
424   serialize_websocket_message(&op, &res)
425 }
426
427 pub async fn match_websocket_operation_apub(
428   context: ContextData<LemmyContext>,
429   id: ConnectionId,
430   op: UserOperationApub,
431   data: Value,
432 ) -> result::Result<String, LemmyError> {
433   match op {
434     UserOperationApub::GetPersonDetails => {
435       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
436     }
437     UserOperationApub::GetCommunity => {
438       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
439     }
440     UserOperationApub::GetComments => {
441       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
442     }
443     UserOperationApub::GetPosts => {
444       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
445     }
446     UserOperationApub::ResolveObject => {
447       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
448     }
449     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
450   }
451 }
452
453 async fn do_websocket_operation_apub<'a, 'b, Data>(
454   context: ContextData<LemmyContext>,
455   id: ConnectionId,
456   op: UserOperationApub,
457   data: Value,
458 ) -> result::Result<String, LemmyError>
459 where
460   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response> + Send,
461   for<'de> Data: Deserialize<'de>,
462 {
463   let parsed_data: Data = serde_json::from_value(data)?;
464   let res = parsed_data.perform(&context, Some(id)).await?;
465   SendActivity::send_activity(&parsed_data, &res, &context).await?;
466   serialize_websocket_message(&op, &res)
467 }
468
469 pub async fn match_websocket_operation(
470   context: ContextData<LemmyContext>,
471   id: ConnectionId,
472   op: UserOperation,
473   data: Value,
474 ) -> result::Result<String, LemmyError> {
475   match op {
476     // User ops
477     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
478     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
479     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
480     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
481     UserOperation::GetUnreadRegistrationApplicationCount => {
482       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
483     }
484     UserOperation::ListRegistrationApplications => {
485       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
486     }
487     UserOperation::ApproveRegistrationApplication => {
488       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
489     }
490     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
491     UserOperation::GetBannedPersons => {
492       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
493     }
494     UserOperation::BlockPerson => {
495       do_websocket_operation::<BlockPerson>(context, id, op, data).await
496     }
497     UserOperation::GetPersonMentions => {
498       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
499     }
500     UserOperation::MarkPersonMentionAsRead => {
501       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
502     }
503     UserOperation::MarkCommentReplyAsRead => {
504       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
505     }
506     UserOperation::MarkAllAsRead => {
507       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
508     }
509     UserOperation::PasswordReset => {
510       do_websocket_operation::<PasswordReset>(context, id, op, data).await
511     }
512     UserOperation::PasswordChange => {
513       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
514     }
515     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
516     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
517     UserOperation::CommunityJoin => {
518       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
519     }
520     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
521     UserOperation::SaveUserSettings => {
522       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
523     }
524     UserOperation::ChangePassword => {
525       do_websocket_operation::<ChangePassword>(context, id, op, data).await
526     }
527     UserOperation::GetReportCount => {
528       do_websocket_operation::<GetReportCount>(context, id, op, data).await
529     }
530     UserOperation::GetUnreadCount => {
531       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
532     }
533     UserOperation::VerifyEmail => {
534       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
535     }
536
537     // Private Message ops
538     UserOperation::MarkPrivateMessageAsRead => {
539       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
540     }
541     UserOperation::CreatePrivateMessageReport => {
542       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
543     }
544     UserOperation::ResolvePrivateMessageReport => {
545       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
546     }
547     UserOperation::ListPrivateMessageReports => {
548       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
549     }
550
551     // Site ops
552     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
553     UserOperation::PurgePerson => {
554       do_websocket_operation::<PurgePerson>(context, id, op, data).await
555     }
556     UserOperation::PurgeCommunity => {
557       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
558     }
559     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
560     UserOperation::PurgeComment => {
561       do_websocket_operation::<PurgeComment>(context, id, op, data).await
562     }
563     UserOperation::TransferCommunity => {
564       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
565     }
566     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
567
568     // Community ops
569     UserOperation::FollowCommunity => {
570       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
571     }
572     UserOperation::BlockCommunity => {
573       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
574     }
575     UserOperation::BanFromCommunity => {
576       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
577     }
578     UserOperation::AddModToCommunity => {
579       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
580     }
581
582     // Post ops
583     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
584     UserOperation::FeaturePost => {
585       do_websocket_operation::<FeaturePost>(context, id, op, data).await
586     }
587     UserOperation::CreatePostLike => {
588       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
589     }
590     UserOperation::MarkPostAsRead => {
591       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
592     }
593     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
594     UserOperation::CreatePostReport => {
595       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
596     }
597     UserOperation::ListPostReports => {
598       do_websocket_operation::<ListPostReports>(context, id, op, data).await
599     }
600     UserOperation::ResolvePostReport => {
601       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
602     }
603     UserOperation::GetSiteMetadata => {
604       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
605     }
606
607     // Comment ops
608     UserOperation::SaveComment => {
609       do_websocket_operation::<SaveComment>(context, id, op, data).await
610     }
611     UserOperation::CreateCommentLike => {
612       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
613     }
614     UserOperation::DistinguishComment => {
615       do_websocket_operation::<DistinguishComment>(context, id, op, data).await
616     }
617     UserOperation::CreateCommentReport => {
618       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
619     }
620     UserOperation::ListCommentReports => {
621       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
622     }
623     UserOperation::ResolveCommentReport => {
624       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
625     }
626   }
627 }
628
629 async fn do_websocket_operation<'a, 'b, Data>(
630   context: ContextData<LemmyContext>,
631   id: ConnectionId,
632   op: UserOperation,
633   data: Value,
634 ) -> result::Result<String, LemmyError>
635 where
636   Data: Perform + SendActivity<Response = <Data as Perform>::Response> + Send,
637   for<'de> Data: Deserialize<'de>,
638 {
639   let parsed_data: Data = serde_json::from_value(data)?;
640   let res = parsed_data
641     .perform(&web::Data::new(context.deref().clone()), Some(id))
642     .await?;
643   SendActivity::send_activity(&parsed_data, &res, &context).await?;
644   serialize_websocket_message(&op, &res)
645 }