]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
Making the chat server an actor. (#2793)
[lemmy.git] / src / api_routes_websocket.rs
1 use activitypub_federation::config::Data as ContextData;
2 use actix::{
3   fut,
4   Actor,
5   ActorContext,
6   ActorFutureExt,
7   AsyncContext,
8   ContextFutureSpawner,
9   Handler,
10   Running,
11   StreamHandler,
12   WrapFuture,
13 };
14 use actix_web::{web, Error, HttpRequest, HttpResponse};
15 use actix_web_actors::ws;
16 use lemmy_api::Perform;
17 use lemmy_api_common::{
18   comment::{
19     CreateComment,
20     CreateCommentLike,
21     CreateCommentReport,
22     DeleteComment,
23     DistinguishComment,
24     EditComment,
25     GetComment,
26     GetComments,
27     ListCommentReports,
28     RemoveComment,
29     ResolveCommentReport,
30     SaveComment,
31   },
32   community::{
33     AddModToCommunity,
34     BanFromCommunity,
35     BlockCommunity,
36     CreateCommunity,
37     DeleteCommunity,
38     EditCommunity,
39     FollowCommunity,
40     GetCommunity,
41     ListCommunities,
42     RemoveCommunity,
43     TransferCommunity,
44   },
45   context::LemmyContext,
46   custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
47   person::{
48     AddAdmin,
49     BanPerson,
50     BlockPerson,
51     ChangePassword,
52     DeleteAccount,
53     GetBannedPersons,
54     GetCaptcha,
55     GetPersonDetails,
56     GetPersonMentions,
57     GetReplies,
58     GetReportCount,
59     GetUnreadCount,
60     Login,
61     MarkAllAsRead,
62     MarkCommentReplyAsRead,
63     MarkPersonMentionAsRead,
64     PasswordChangeAfterReset,
65     PasswordReset,
66     Register,
67     SaveUserSettings,
68     VerifyEmail,
69   },
70   post::{
71     CreatePost,
72     CreatePostLike,
73     CreatePostReport,
74     DeletePost,
75     EditPost,
76     FeaturePost,
77     GetPost,
78     GetPosts,
79     GetSiteMetadata,
80     ListPostReports,
81     LockPost,
82     MarkPostAsRead,
83     RemovePost,
84     ResolvePostReport,
85     SavePost,
86   },
87   private_message::{
88     CreatePrivateMessage,
89     CreatePrivateMessageReport,
90     DeletePrivateMessage,
91     EditPrivateMessage,
92     GetPrivateMessages,
93     ListPrivateMessageReports,
94     MarkPrivateMessageAsRead,
95     ResolvePrivateMessageReport,
96   },
97   site::{
98     ApproveRegistrationApplication,
99     CreateSite,
100     EditSite,
101     GetModlog,
102     GetSite,
103     GetUnreadRegistrationApplicationCount,
104     LeaveAdmin,
105     ListRegistrationApplications,
106     PurgeComment,
107     PurgeCommunity,
108     PurgePerson,
109     PurgePost,
110     ResolveObject,
111     Search,
112   },
113   websocket::{
114     handlers::{
115       connect::{Connect, Disconnect},
116       WsMessage,
117     },
118     serialize_websocket_message,
119     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
120     UserOperation,
121     UserOperationApub,
122     UserOperationCrud,
123   },
124 };
125 use lemmy_api_crud::PerformCrud;
126 use lemmy_apub::{api::PerformApub, SendActivity};
127 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
128 use serde::Deserialize;
129 use serde_json::Value;
130 use std::{
131   ops::Deref,
132   result,
133   str::FromStr,
134   time::{Duration, Instant},
135 };
136 use tracing::{debug, error};
137
138 /// How often heartbeat pings are sent
139 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
140
141 /// How long before lack of client response causes a timeout
142 const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
143
144 pub struct WsChatSession {
145   /// unique session id
146   pub id: ConnectionId,
147
148   pub ip: IpAddr,
149
150   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
151   /// otherwise we drop connection.
152   pub hb: Instant,
153
154   /// The context data
155   apub_data: ContextData<LemmyContext>,
156 }
157
158 pub async fn websocket(
159   req: HttpRequest,
160   body: web::Payload,
161   rate_limiter: web::Data<RateLimitCell>,
162   apub_data: ContextData<LemmyContext>,
163 ) -> Result<HttpResponse, Error> {
164   let client_ip = IpAddr(
165     req
166       .connection_info()
167       .realip_remote_addr()
168       .unwrap_or("blank_ip")
169       .to_string(),
170   );
171
172   let check = rate_limiter.message().check(client_ip.clone());
173   if !check {
174     debug!(
175       "Websocket join with IP: {} has been rate limited.",
176       &client_ip
177     );
178     return Ok(HttpResponse::TooManyRequests().finish());
179   }
180
181   ws::start(
182     WsChatSession {
183       id: 0,
184       ip: client_ip,
185       hb: Instant::now(),
186       apub_data,
187     },
188     &req,
189     body,
190   )
191 }
192
193 /// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL).
194 ///
195 /// also this method checks heartbeats from client
196 fn hb(ctx: &mut ws::WebsocketContext<WsChatSession>) {
197   ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
198     // check client heartbeats
199     if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
200       // heartbeat timed out
201
202       // notify chat server
203       act
204         .apub_data
205         .chat_server()
206         .do_send(Disconnect { id: act.id });
207
208       // stop actor
209       ctx.stop();
210
211       // don't try to send a ping
212       return;
213     }
214
215     ctx.ping(b"");
216   });
217 }
218
219 impl Actor for WsChatSession {
220   type Context = ws::WebsocketContext<Self>;
221
222   /// Method is called on actor start.
223   /// We register ws session with ChatServer
224   fn started(&mut self, ctx: &mut Self::Context) {
225     // we'll start heartbeat process on session start.
226     hb(ctx);
227
228     // register self in chat server. `AsyncContext::wait` register
229     // future within context, but context waits until this future resolves
230     // before processing any other events.
231     // HttpContext::state() is instance of WsChatSessionState, state is shared
232     // across all routes within application
233     let addr = ctx.address();
234     self
235       .apub_data
236       .chat_server()
237       .send(Connect {
238         addr: addr.recipient(),
239       })
240       .into_actor(self)
241       .then(|res, act, ctx| {
242         match res {
243           Ok(res) => act.id = res,
244           // something is wrong with chat server
245           _ => ctx.stop(),
246         }
247         fut::ready(())
248       })
249       .wait(ctx);
250   }
251   fn stopping(&mut self, _: &mut Self::Context) -> Running {
252     // notify chat server
253     self
254       .apub_data
255       .chat_server()
256       .do_send(Disconnect { id: self.id });
257     Running::Stop
258   }
259 }
260
261 /// Handle messages from chat server, we simply send it to peer websocket
262 impl Handler<WsMessage> for WsChatSession {
263   type Result = ();
264
265   fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
266     ctx.text(msg.0);
267   }
268 }
269
270 /// WebSocket message handler
271 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
272   fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
273     let msg = match msg {
274       Err(_) => {
275         ctx.stop();
276         return;
277       }
278       Ok(msg) => msg,
279     };
280
281     match msg {
282       ws::Message::Ping(msg) => {
283         self.hb = Instant::now();
284         ctx.pong(&msg);
285       }
286       ws::Message::Pong(_) => {
287         self.hb = Instant::now();
288       }
289       ws::Message::Text(text) => {
290         let ip_clone = self.ip.clone();
291         let id_clone = self.id.to_owned();
292         let context_clone = self.apub_data.reset_request_count();
293
294         let fut = Box::pin(async move {
295           let msg = text.trim().to_string();
296           parse_json_message(msg, ip_clone, id_clone, context_clone).await
297         });
298         fut
299           .into_actor(self)
300           .then(|res, _, ctx| {
301             match res {
302               Ok(res) => ctx.text(res),
303               Err(e) => error!("{}", &e),
304             }
305             actix::fut::ready(())
306           })
307           .spawn(ctx);
308       }
309       ws::Message::Binary(_) => println!("Unexpected binary"),
310       ws::Message::Close(reason) => {
311         ctx.close(reason);
312         ctx.stop();
313       }
314       ws::Message::Continuation(_) => {
315         ctx.stop();
316       }
317       ws::Message::Nop => (),
318     }
319   }
320 }
321
322 /// Entry point for our websocket route
323 async fn parse_json_message(
324   msg: String,
325   ip: IpAddr,
326   connection_id: ConnectionId,
327   context: ContextData<LemmyContext>,
328 ) -> Result<String, LemmyError> {
329   let rate_limiter = context.settings_updated_channel();
330   let json: Value = serde_json::from_str(&msg)?;
331   let data = json
332     .get("data")
333     .cloned()
334     .ok_or_else(|| LemmyError::from_message("missing data"))?;
335
336   let missing_op_err = || LemmyError::from_message("missing op");
337
338   let op = json
339     .get("op")
340     .ok_or_else(missing_op_err)?
341     .as_str()
342     .ok_or_else(missing_op_err)?;
343
344   // check if api call passes the rate limit, and generate future for later execution
345   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
346     let passed = match user_operation_crud {
347       UserOperationCrud::Register => rate_limiter.register().check(ip),
348       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
349       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
350       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
351       _ => rate_limiter.message().check(ip),
352     };
353     check_rate_limit_passed(passed)?;
354     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
355   } else if let Ok(user_operation) = UserOperation::from_str(op) {
356     let passed = match user_operation {
357       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
358       _ => rate_limiter.message().check(ip),
359     };
360     check_rate_limit_passed(passed)?;
361     match_websocket_operation(context, connection_id, user_operation, data).await
362   } else {
363     let user_operation = UserOperationApub::from_str(op)?;
364     let passed = match user_operation {
365       UserOperationApub::Search => rate_limiter.search().check(ip),
366       _ => rate_limiter.message().check(ip),
367     };
368     check_rate_limit_passed(passed)?;
369     match_websocket_operation_apub(context, connection_id, user_operation, data).await
370   }
371 }
372
373 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
374   if passed {
375     Ok(())
376   } else {
377     // if rate limit was hit, respond with message
378     Err(LemmyError::from_message("rate_limit_error"))
379   }
380 }
381
382 pub async fn match_websocket_operation_crud(
383   context: ContextData<LemmyContext>,
384   id: ConnectionId,
385   op: UserOperationCrud,
386   data: Value,
387 ) -> result::Result<String, LemmyError> {
388   match op {
389     // User ops
390     UserOperationCrud::Register => {
391       do_websocket_operation_crud::<Register>(context, id, op, data).await
392     }
393     UserOperationCrud::DeleteAccount => {
394       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
395     }
396
397     // Private Message ops
398     UserOperationCrud::CreatePrivateMessage => {
399       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
400     }
401     UserOperationCrud::EditPrivateMessage => {
402       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
403     }
404     UserOperationCrud::DeletePrivateMessage => {
405       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
406     }
407     UserOperationCrud::GetPrivateMessages => {
408       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
409     }
410
411     // Site ops
412     UserOperationCrud::CreateSite => {
413       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
414     }
415     UserOperationCrud::EditSite => {
416       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
417     }
418     UserOperationCrud::GetSite => {
419       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
420     }
421
422     // Community ops
423     UserOperationCrud::ListCommunities => {
424       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
425     }
426     UserOperationCrud::CreateCommunity => {
427       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
428     }
429     UserOperationCrud::EditCommunity => {
430       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
431     }
432     UserOperationCrud::DeleteCommunity => {
433       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
434     }
435     UserOperationCrud::RemoveCommunity => {
436       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
437     }
438
439     // Post ops
440     UserOperationCrud::CreatePost => {
441       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
442     }
443     UserOperationCrud::GetPost => {
444       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
445     }
446     UserOperationCrud::EditPost => {
447       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
448     }
449     UserOperationCrud::DeletePost => {
450       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
451     }
452     UserOperationCrud::RemovePost => {
453       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
454     }
455
456     // Comment ops
457     UserOperationCrud::CreateComment => {
458       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
459     }
460     UserOperationCrud::EditComment => {
461       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
462     }
463     UserOperationCrud::DeleteComment => {
464       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
465     }
466     UserOperationCrud::RemoveComment => {
467       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
468     }
469     UserOperationCrud::GetComment => {
470       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
471     }
472     // Emojis
473     UserOperationCrud::CreateCustomEmoji => {
474       do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
475     }
476     UserOperationCrud::EditCustomEmoji => {
477       do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
478     }
479     UserOperationCrud::DeleteCustomEmoji => {
480       do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
481     }
482   }
483 }
484
485 async fn do_websocket_operation_crud<'a, 'b, Data>(
486   context: ContextData<LemmyContext>,
487   id: ConnectionId,
488   op: UserOperationCrud,
489   data: Value,
490 ) -> result::Result<String, LemmyError>
491 where
492   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response> + Send,
493   for<'de> Data: Deserialize<'de>,
494 {
495   let parsed_data: Data = serde_json::from_value(data)?;
496   let res = parsed_data
497     .perform(&web::Data::new(context.deref().clone()), Some(id))
498     .await?;
499   SendActivity::send_activity(&parsed_data, &res, &context).await?;
500   serialize_websocket_message(&op, &res)
501 }
502
503 pub async fn match_websocket_operation_apub(
504   context: ContextData<LemmyContext>,
505   id: ConnectionId,
506   op: UserOperationApub,
507   data: Value,
508 ) -> result::Result<String, LemmyError> {
509   match op {
510     UserOperationApub::GetPersonDetails => {
511       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
512     }
513     UserOperationApub::GetCommunity => {
514       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
515     }
516     UserOperationApub::GetComments => {
517       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
518     }
519     UserOperationApub::GetPosts => {
520       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
521     }
522     UserOperationApub::ResolveObject => {
523       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
524     }
525     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
526   }
527 }
528
529 async fn do_websocket_operation_apub<'a, 'b, Data>(
530   context: ContextData<LemmyContext>,
531   id: ConnectionId,
532   op: UserOperationApub,
533   data: Value,
534 ) -> result::Result<String, LemmyError>
535 where
536   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response> + Send,
537   for<'de> Data: Deserialize<'de>,
538 {
539   let parsed_data: Data = serde_json::from_value(data)?;
540   let res = parsed_data.perform(&context, Some(id)).await?;
541   SendActivity::send_activity(&parsed_data, &res, &context).await?;
542   serialize_websocket_message(&op, &res)
543 }
544
545 pub async fn match_websocket_operation(
546   context: ContextData<LemmyContext>,
547   id: ConnectionId,
548   op: UserOperation,
549   data: Value,
550 ) -> result::Result<String, LemmyError> {
551   match op {
552     // User ops
553     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
554     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
555     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
556     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
557     UserOperation::GetUnreadRegistrationApplicationCount => {
558       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
559     }
560     UserOperation::ListRegistrationApplications => {
561       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
562     }
563     UserOperation::ApproveRegistrationApplication => {
564       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
565     }
566     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
567     UserOperation::GetBannedPersons => {
568       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
569     }
570     UserOperation::BlockPerson => {
571       do_websocket_operation::<BlockPerson>(context, id, op, data).await
572     }
573     UserOperation::GetPersonMentions => {
574       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
575     }
576     UserOperation::MarkPersonMentionAsRead => {
577       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
578     }
579     UserOperation::MarkCommentReplyAsRead => {
580       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
581     }
582     UserOperation::MarkAllAsRead => {
583       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
584     }
585     UserOperation::PasswordReset => {
586       do_websocket_operation::<PasswordReset>(context, id, op, data).await
587     }
588     UserOperation::PasswordChange => {
589       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
590     }
591     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
592     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
593     UserOperation::CommunityJoin => {
594       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
595     }
596     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
597     UserOperation::SaveUserSettings => {
598       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
599     }
600     UserOperation::ChangePassword => {
601       do_websocket_operation::<ChangePassword>(context, id, op, data).await
602     }
603     UserOperation::GetReportCount => {
604       do_websocket_operation::<GetReportCount>(context, id, op, data).await
605     }
606     UserOperation::GetUnreadCount => {
607       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
608     }
609     UserOperation::VerifyEmail => {
610       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
611     }
612
613     // Private Message ops
614     UserOperation::MarkPrivateMessageAsRead => {
615       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
616     }
617     UserOperation::CreatePrivateMessageReport => {
618       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
619     }
620     UserOperation::ResolvePrivateMessageReport => {
621       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
622     }
623     UserOperation::ListPrivateMessageReports => {
624       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
625     }
626
627     // Site ops
628     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
629     UserOperation::PurgePerson => {
630       do_websocket_operation::<PurgePerson>(context, id, op, data).await
631     }
632     UserOperation::PurgeCommunity => {
633       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
634     }
635     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
636     UserOperation::PurgeComment => {
637       do_websocket_operation::<PurgeComment>(context, id, op, data).await
638     }
639     UserOperation::TransferCommunity => {
640       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
641     }
642     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
643
644     // Community ops
645     UserOperation::FollowCommunity => {
646       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
647     }
648     UserOperation::BlockCommunity => {
649       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
650     }
651     UserOperation::BanFromCommunity => {
652       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
653     }
654     UserOperation::AddModToCommunity => {
655       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
656     }
657
658     // Post ops
659     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
660     UserOperation::FeaturePost => {
661       do_websocket_operation::<FeaturePost>(context, id, op, data).await
662     }
663     UserOperation::CreatePostLike => {
664       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
665     }
666     UserOperation::MarkPostAsRead => {
667       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
668     }
669     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
670     UserOperation::CreatePostReport => {
671       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
672     }
673     UserOperation::ListPostReports => {
674       do_websocket_operation::<ListPostReports>(context, id, op, data).await
675     }
676     UserOperation::ResolvePostReport => {
677       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
678     }
679     UserOperation::GetSiteMetadata => {
680       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
681     }
682
683     // Comment ops
684     UserOperation::SaveComment => {
685       do_websocket_operation::<SaveComment>(context, id, op, data).await
686     }
687     UserOperation::CreateCommentLike => {
688       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
689     }
690     UserOperation::DistinguishComment => {
691       do_websocket_operation::<DistinguishComment>(context, id, op, data).await
692     }
693     UserOperation::CreateCommentReport => {
694       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
695     }
696     UserOperation::ListCommentReports => {
697       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
698     }
699     UserOperation::ResolveCommentReport => {
700       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
701     }
702   }
703 }
704
705 async fn do_websocket_operation<'a, 'b, Data>(
706   context: ContextData<LemmyContext>,
707   id: ConnectionId,
708   op: UserOperation,
709   data: Value,
710 ) -> result::Result<String, LemmyError>
711 where
712   Data: Perform + SendActivity<Response = <Data as Perform>::Response> + Send,
713   for<'de> Data: Deserialize<'de>,
714 {
715   let parsed_data: Data = serde_json::from_value(data)?;
716   let res = parsed_data
717     .perform(&web::Data::new(context.deref().clone()), Some(id))
718     .await?;
719   SendActivity::send_activity(&parsed_data, &res, &context).await?;
720   serialize_websocket_message(&op, &res)
721 }