]> Untitled Git - lemmy.git/blob - src/api_routes_websocket.rs
Dont return error in case optional auth is invalid (#2879)
[lemmy.git] / src / api_routes_websocket.rs
1 use activitypub_federation::config::Data as ContextData;
2 use actix::{
3   fut,
4   Actor,
5   ActorContext,
6   ActorFutureExt,
7   AsyncContext,
8   ContextFutureSpawner,
9   Handler,
10   Running,
11   StreamHandler,
12   WrapFuture,
13 };
14 use actix_web::{web, Error, HttpRequest, HttpResponse};
15 use actix_web_actors::ws;
16 use lemmy_api::Perform;
17 use lemmy_api_common::{
18   comment::{
19     CreateComment,
20     CreateCommentLike,
21     CreateCommentReport,
22     DeleteComment,
23     DistinguishComment,
24     EditComment,
25     GetComment,
26     GetComments,
27     ListCommentReports,
28     RemoveComment,
29     ResolveCommentReport,
30     SaveComment,
31   },
32   community::{
33     AddModToCommunity,
34     BanFromCommunity,
35     BlockCommunity,
36     CreateCommunity,
37     DeleteCommunity,
38     EditCommunity,
39     FollowCommunity,
40     GetCommunity,
41     ListCommunities,
42     RemoveCommunity,
43     TransferCommunity,
44   },
45   context::LemmyContext,
46   custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
47   person::{
48     AddAdmin,
49     BanPerson,
50     BlockPerson,
51     ChangePassword,
52     DeleteAccount,
53     GetBannedPersons,
54     GetCaptcha,
55     GetPersonDetails,
56     GetPersonMentions,
57     GetReplies,
58     GetReportCount,
59     GetUnreadCount,
60     Login,
61     MarkAllAsRead,
62     MarkCommentReplyAsRead,
63     MarkPersonMentionAsRead,
64     PasswordChangeAfterReset,
65     PasswordReset,
66     Register,
67     SaveUserSettings,
68     VerifyEmail,
69   },
70   post::{
71     CreatePost,
72     CreatePostLike,
73     CreatePostReport,
74     DeletePost,
75     EditPost,
76     FeaturePost,
77     GetPost,
78     GetPosts,
79     GetSiteMetadata,
80     ListPostReports,
81     LockPost,
82     MarkPostAsRead,
83     RemovePost,
84     ResolvePostReport,
85     SavePost,
86   },
87   private_message::{
88     CreatePrivateMessage,
89     CreatePrivateMessageReport,
90     DeletePrivateMessage,
91     EditPrivateMessage,
92     GetPrivateMessages,
93     ListPrivateMessageReports,
94     MarkPrivateMessageAsRead,
95     ResolvePrivateMessageReport,
96   },
97   site::{
98     ApproveRegistrationApplication,
99     CreateSite,
100     EditSite,
101     GetFederatedInstances,
102     GetModlog,
103     GetSite,
104     GetUnreadRegistrationApplicationCount,
105     LeaveAdmin,
106     ListRegistrationApplications,
107     PurgeComment,
108     PurgeCommunity,
109     PurgePerson,
110     PurgePost,
111     ResolveObject,
112     Search,
113   },
114   websocket::{
115     handlers::{
116       connect::{Connect, Disconnect},
117       WsMessage,
118     },
119     serialize_websocket_message,
120     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
121     UserOperation,
122     UserOperationApub,
123     UserOperationCrud,
124   },
125 };
126 use lemmy_api_crud::PerformCrud;
127 use lemmy_apub::{api::PerformApub, SendActivity};
128 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
129 use serde::Deserialize;
130 use serde_json::Value;
131 use std::{
132   ops::Deref,
133   result,
134   str::FromStr,
135   time::{Duration, Instant},
136 };
137 use tracing::{debug, error};
138
139 /// How often heartbeat pings are sent
140 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
141
142 /// How long before lack of client response causes a timeout
143 const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
144
145 pub struct WsChatSession {
146   /// unique session id
147   pub id: ConnectionId,
148
149   pub ip: IpAddr,
150
151   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
152   /// otherwise we drop connection.
153   pub hb: Instant,
154
155   /// The context data
156   apub_data: ContextData<LemmyContext>,
157 }
158
159 pub async fn websocket(
160   req: HttpRequest,
161   body: web::Payload,
162   rate_limiter: web::Data<RateLimitCell>,
163   apub_data: ContextData<LemmyContext>,
164 ) -> Result<HttpResponse, Error> {
165   let client_ip = IpAddr(
166     req
167       .connection_info()
168       .realip_remote_addr()
169       .unwrap_or("blank_ip")
170       .to_string(),
171   );
172
173   let check = rate_limiter.message().check(client_ip.clone());
174   if !check {
175     debug!(
176       "Websocket join with IP: {} has been rate limited.",
177       &client_ip
178     );
179     return Ok(HttpResponse::TooManyRequests().finish());
180   }
181
182   ws::start(
183     WsChatSession {
184       id: 0,
185       ip: client_ip,
186       hb: Instant::now(),
187       apub_data,
188     },
189     &req,
190     body,
191   )
192 }
193
194 /// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL).
195 ///
196 /// also this method checks heartbeats from client
197 fn hb(ctx: &mut ws::WebsocketContext<WsChatSession>) {
198   ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
199     // check client heartbeats
200     if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
201       // heartbeat timed out
202
203       // notify chat server
204       act
205         .apub_data
206         .chat_server()
207         .do_send(Disconnect { id: act.id });
208
209       // stop actor
210       ctx.stop();
211
212       // don't try to send a ping
213       return;
214     }
215
216     ctx.ping(b"");
217   });
218 }
219
220 impl Actor for WsChatSession {
221   type Context = ws::WebsocketContext<Self>;
222
223   /// Method is called on actor start.
224   /// We register ws session with ChatServer
225   fn started(&mut self, ctx: &mut Self::Context) {
226     // we'll start heartbeat process on session start.
227     hb(ctx);
228
229     // register self in chat server. `AsyncContext::wait` register
230     // future within context, but context waits until this future resolves
231     // before processing any other events.
232     // HttpContext::state() is instance of WsChatSessionState, state is shared
233     // across all routes within application
234     let addr = ctx.address();
235     self
236       .apub_data
237       .chat_server()
238       .send(Connect {
239         addr: addr.recipient(),
240       })
241       .into_actor(self)
242       .then(|res, act, ctx| {
243         match res {
244           Ok(res) => act.id = res,
245           // something is wrong with chat server
246           _ => ctx.stop(),
247         }
248         fut::ready(())
249       })
250       .wait(ctx);
251   }
252   fn stopping(&mut self, _: &mut Self::Context) -> Running {
253     // notify chat server
254     self
255       .apub_data
256       .chat_server()
257       .do_send(Disconnect { id: self.id });
258     Running::Stop
259   }
260 }
261
262 /// Handle messages from chat server, we simply send it to peer websocket
263 impl Handler<WsMessage> for WsChatSession {
264   type Result = ();
265
266   fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
267     ctx.text(msg.0);
268   }
269 }
270
271 /// WebSocket message handler
272 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
273   fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
274     let msg = match msg {
275       Err(_) => {
276         ctx.stop();
277         return;
278       }
279       Ok(msg) => msg,
280     };
281
282     match msg {
283       ws::Message::Ping(msg) => {
284         self.hb = Instant::now();
285         ctx.pong(&msg);
286       }
287       ws::Message::Pong(_) => {
288         self.hb = Instant::now();
289       }
290       ws::Message::Text(text) => {
291         let ip_clone = self.ip.clone();
292         let id_clone = self.id.to_owned();
293         let context_clone = self.apub_data.reset_request_count();
294
295         let fut = Box::pin(async move {
296           let msg = text.trim().to_string();
297           parse_json_message(msg, ip_clone, id_clone, context_clone).await
298         });
299         fut
300           .into_actor(self)
301           .then(|res, _, ctx| {
302             match res {
303               Ok(res) => ctx.text(res),
304               Err(e) => error!("{}", &e),
305             }
306             actix::fut::ready(())
307           })
308           .spawn(ctx);
309       }
310       ws::Message::Binary(_) => println!("Unexpected binary"),
311       ws::Message::Close(reason) => {
312         ctx.close(reason);
313         ctx.stop();
314       }
315       ws::Message::Continuation(_) => {
316         ctx.stop();
317       }
318       ws::Message::Nop => (),
319     }
320   }
321 }
322
323 /// Entry point for our websocket route
324 async fn parse_json_message(
325   msg: String,
326   ip: IpAddr,
327   connection_id: ConnectionId,
328   context: ContextData<LemmyContext>,
329 ) -> Result<String, LemmyError> {
330   let rate_limiter = context.settings_updated_channel();
331   let json: Value = serde_json::from_str(&msg)?;
332   let data = json
333     .get("data")
334     .cloned()
335     .ok_or_else(|| LemmyError::from_message("missing data"))?;
336
337   let missing_op_err = || LemmyError::from_message("missing op");
338
339   let op = json
340     .get("op")
341     .ok_or_else(missing_op_err)?
342     .as_str()
343     .ok_or_else(missing_op_err)?;
344
345   // check if api call passes the rate limit, and generate future for later execution
346   if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
347     let passed = match user_operation_crud {
348       UserOperationCrud::Register => rate_limiter.register().check(ip),
349       UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
350       UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
351       UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
352       _ => rate_limiter.message().check(ip),
353     };
354     check_rate_limit_passed(passed)?;
355     match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
356   } else if let Ok(user_operation) = UserOperation::from_str(op) {
357     let passed = match user_operation {
358       UserOperation::GetCaptcha => rate_limiter.post().check(ip),
359       _ => rate_limiter.message().check(ip),
360     };
361     check_rate_limit_passed(passed)?;
362     match_websocket_operation(context, connection_id, user_operation, data).await
363   } else {
364     let user_operation = UserOperationApub::from_str(op)?;
365     let passed = match user_operation {
366       UserOperationApub::Search => rate_limiter.search().check(ip),
367       _ => rate_limiter.message().check(ip),
368     };
369     check_rate_limit_passed(passed)?;
370     match_websocket_operation_apub(context, connection_id, user_operation, data).await
371   }
372 }
373
374 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
375   if passed {
376     Ok(())
377   } else {
378     // if rate limit was hit, respond with message
379     Err(LemmyError::from_message("rate_limit_error"))
380   }
381 }
382
383 pub async fn match_websocket_operation_crud(
384   context: ContextData<LemmyContext>,
385   id: ConnectionId,
386   op: UserOperationCrud,
387   data: Value,
388 ) -> result::Result<String, LemmyError> {
389   match op {
390     // User ops
391     UserOperationCrud::Register => {
392       do_websocket_operation_crud::<Register>(context, id, op, data).await
393     }
394     UserOperationCrud::DeleteAccount => {
395       do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
396     }
397
398     // Private Message ops
399     UserOperationCrud::CreatePrivateMessage => {
400       do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
401     }
402     UserOperationCrud::EditPrivateMessage => {
403       do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
404     }
405     UserOperationCrud::DeletePrivateMessage => {
406       do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
407     }
408     UserOperationCrud::GetPrivateMessages => {
409       do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
410     }
411
412     // Site ops
413     UserOperationCrud::CreateSite => {
414       do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
415     }
416     UserOperationCrud::EditSite => {
417       do_websocket_operation_crud::<EditSite>(context, id, op, data).await
418     }
419     UserOperationCrud::GetSite => {
420       do_websocket_operation_crud::<GetSite>(context, id, op, data).await
421     }
422
423     // Community ops
424     UserOperationCrud::ListCommunities => {
425       do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
426     }
427     UserOperationCrud::CreateCommunity => {
428       do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
429     }
430     UserOperationCrud::EditCommunity => {
431       do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
432     }
433     UserOperationCrud::DeleteCommunity => {
434       do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
435     }
436     UserOperationCrud::RemoveCommunity => {
437       do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
438     }
439
440     // Post ops
441     UserOperationCrud::CreatePost => {
442       do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
443     }
444     UserOperationCrud::GetPost => {
445       do_websocket_operation_crud::<GetPost>(context, id, op, data).await
446     }
447     UserOperationCrud::EditPost => {
448       do_websocket_operation_crud::<EditPost>(context, id, op, data).await
449     }
450     UserOperationCrud::DeletePost => {
451       do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
452     }
453     UserOperationCrud::RemovePost => {
454       do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
455     }
456
457     // Comment ops
458     UserOperationCrud::CreateComment => {
459       do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
460     }
461     UserOperationCrud::EditComment => {
462       do_websocket_operation_crud::<EditComment>(context, id, op, data).await
463     }
464     UserOperationCrud::DeleteComment => {
465       do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
466     }
467     UserOperationCrud::RemoveComment => {
468       do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
469     }
470     UserOperationCrud::GetComment => {
471       do_websocket_operation_crud::<GetComment>(context, id, op, data).await
472     }
473     // Emojis
474     UserOperationCrud::CreateCustomEmoji => {
475       do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
476     }
477     UserOperationCrud::EditCustomEmoji => {
478       do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
479     }
480     UserOperationCrud::DeleteCustomEmoji => {
481       do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
482     }
483   }
484 }
485
486 async fn do_websocket_operation_crud<'a, 'b, Data>(
487   context: ContextData<LemmyContext>,
488   id: ConnectionId,
489   op: UserOperationCrud,
490   data: Value,
491 ) -> result::Result<String, LemmyError>
492 where
493   Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response> + Send,
494   for<'de> Data: Deserialize<'de>,
495 {
496   let parsed_data: Data = serde_json::from_value(data)?;
497   let res = parsed_data
498     .perform(&web::Data::new(context.deref().clone()), Some(id))
499     .await?;
500   SendActivity::send_activity(&parsed_data, &res, &context).await?;
501   serialize_websocket_message(&op, &res)
502 }
503
504 pub async fn match_websocket_operation_apub(
505   context: ContextData<LemmyContext>,
506   id: ConnectionId,
507   op: UserOperationApub,
508   data: Value,
509 ) -> result::Result<String, LemmyError> {
510   match op {
511     UserOperationApub::GetPersonDetails => {
512       do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
513     }
514     UserOperationApub::GetCommunity => {
515       do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
516     }
517     UserOperationApub::GetComments => {
518       do_websocket_operation_apub::<GetComments>(context, id, op, data).await
519     }
520     UserOperationApub::GetPosts => {
521       do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
522     }
523     UserOperationApub::ResolveObject => {
524       do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
525     }
526     UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
527   }
528 }
529
530 async fn do_websocket_operation_apub<'a, 'b, Data>(
531   context: ContextData<LemmyContext>,
532   id: ConnectionId,
533   op: UserOperationApub,
534   data: Value,
535 ) -> result::Result<String, LemmyError>
536 where
537   Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response> + Send,
538   for<'de> Data: Deserialize<'de>,
539 {
540   let parsed_data: Data = serde_json::from_value(data)?;
541   let res = parsed_data.perform(&context, Some(id)).await?;
542   SendActivity::send_activity(&parsed_data, &res, &context).await?;
543   serialize_websocket_message(&op, &res)
544 }
545
546 pub async fn match_websocket_operation(
547   context: ContextData<LemmyContext>,
548   id: ConnectionId,
549   op: UserOperation,
550   data: Value,
551 ) -> result::Result<String, LemmyError> {
552   match op {
553     // User ops
554     UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
555     UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
556     UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
557     UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
558     UserOperation::GetUnreadRegistrationApplicationCount => {
559       do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
560     }
561     UserOperation::ListRegistrationApplications => {
562       do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
563     }
564     UserOperation::ApproveRegistrationApplication => {
565       do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
566     }
567     UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
568     UserOperation::GetBannedPersons => {
569       do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
570     }
571     UserOperation::BlockPerson => {
572       do_websocket_operation::<BlockPerson>(context, id, op, data).await
573     }
574     UserOperation::GetPersonMentions => {
575       do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
576     }
577     UserOperation::MarkPersonMentionAsRead => {
578       do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
579     }
580     UserOperation::MarkCommentReplyAsRead => {
581       do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
582     }
583     UserOperation::MarkAllAsRead => {
584       do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
585     }
586     UserOperation::PasswordReset => {
587       do_websocket_operation::<PasswordReset>(context, id, op, data).await
588     }
589     UserOperation::PasswordChange => {
590       do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
591     }
592     UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
593     UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
594     UserOperation::CommunityJoin => {
595       do_websocket_operation::<CommunityJoin>(context, id, op, data).await
596     }
597     UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
598     UserOperation::SaveUserSettings => {
599       do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
600     }
601     UserOperation::ChangePassword => {
602       do_websocket_operation::<ChangePassword>(context, id, op, data).await
603     }
604     UserOperation::GetReportCount => {
605       do_websocket_operation::<GetReportCount>(context, id, op, data).await
606     }
607     UserOperation::GetUnreadCount => {
608       do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
609     }
610     UserOperation::VerifyEmail => {
611       do_websocket_operation::<VerifyEmail>(context, id, op, data).await
612     }
613
614     // Private Message ops
615     UserOperation::MarkPrivateMessageAsRead => {
616       do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
617     }
618     UserOperation::CreatePrivateMessageReport => {
619       do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
620     }
621     UserOperation::ResolvePrivateMessageReport => {
622       do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
623     }
624     UserOperation::ListPrivateMessageReports => {
625       do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
626     }
627
628     // Site ops
629     UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
630     UserOperation::PurgePerson => {
631       do_websocket_operation::<PurgePerson>(context, id, op, data).await
632     }
633     UserOperation::PurgeCommunity => {
634       do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
635     }
636     UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
637     UserOperation::PurgeComment => {
638       do_websocket_operation::<PurgeComment>(context, id, op, data).await
639     }
640     UserOperation::TransferCommunity => {
641       do_websocket_operation::<TransferCommunity>(context, id, op, data).await
642     }
643     UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
644     UserOperation::GetFederatedInstances => {
645       do_websocket_operation::<GetFederatedInstances>(context, id, op, data).await
646     }
647
648     // Community ops
649     UserOperation::FollowCommunity => {
650       do_websocket_operation::<FollowCommunity>(context, id, op, data).await
651     }
652     UserOperation::BlockCommunity => {
653       do_websocket_operation::<BlockCommunity>(context, id, op, data).await
654     }
655     UserOperation::BanFromCommunity => {
656       do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
657     }
658     UserOperation::AddModToCommunity => {
659       do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
660     }
661
662     // Post ops
663     UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
664     UserOperation::FeaturePost => {
665       do_websocket_operation::<FeaturePost>(context, id, op, data).await
666     }
667     UserOperation::CreatePostLike => {
668       do_websocket_operation::<CreatePostLike>(context, id, op, data).await
669     }
670     UserOperation::MarkPostAsRead => {
671       do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
672     }
673     UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
674     UserOperation::CreatePostReport => {
675       do_websocket_operation::<CreatePostReport>(context, id, op, data).await
676     }
677     UserOperation::ListPostReports => {
678       do_websocket_operation::<ListPostReports>(context, id, op, data).await
679     }
680     UserOperation::ResolvePostReport => {
681       do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
682     }
683     UserOperation::GetSiteMetadata => {
684       do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
685     }
686
687     // Comment ops
688     UserOperation::SaveComment => {
689       do_websocket_operation::<SaveComment>(context, id, op, data).await
690     }
691     UserOperation::CreateCommentLike => {
692       do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
693     }
694     UserOperation::DistinguishComment => {
695       do_websocket_operation::<DistinguishComment>(context, id, op, data).await
696     }
697     UserOperation::CreateCommentReport => {
698       do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
699     }
700     UserOperation::ListCommentReports => {
701       do_websocket_operation::<ListCommentReports>(context, id, op, data).await
702     }
703     UserOperation::ResolveCommentReport => {
704       do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
705     }
706   }
707 }
708
709 async fn do_websocket_operation<'a, 'b, Data>(
710   context: ContextData<LemmyContext>,
711   id: ConnectionId,
712   op: UserOperation,
713   data: Value,
714 ) -> result::Result<String, LemmyError>
715 where
716   Data: Perform + SendActivity<Response = <Data as Perform>::Response> + Send,
717   for<'de> Data: Deserialize<'de>,
718 {
719   let parsed_data: Data = serde_json::from_value(data)?;
720   let res = parsed_data
721     .perform(&web::Data::new(context.deref().clone()), Some(id))
722     .await?;
723   SendActivity::send_activity(&parsed_data, &res, &context).await?;
724   serialize_websocket_message(&op, &res)
725 }