1 use activitypub_federation::config::Data as ContextData;
14 use actix_web::{web, Error, HttpRequest, HttpResponse};
15 use actix_web_actors::ws;
16 use lemmy_api::Perform;
17 use lemmy_api_common::{
45 context::LemmyContext,
46 custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
62 MarkCommentReplyAsRead,
63 MarkPersonMentionAsRead,
64 PasswordChangeAfterReset,
89 CreatePrivateMessageReport,
93 ListPrivateMessageReports,
94 MarkPrivateMessageAsRead,
95 ResolvePrivateMessageReport,
98 ApproveRegistrationApplication,
101 GetFederatedInstances,
104 GetUnreadRegistrationApplicationCount,
106 ListRegistrationApplications,
116 connect::{Connect, Disconnect},
119 serialize_websocket_message,
120 structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
126 use lemmy_api_crud::PerformCrud;
127 use lemmy_apub::{api::PerformApub, SendActivity};
128 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
129 use serde::Deserialize;
130 use serde_json::Value;
135 time::{Duration, Instant},
137 use tracing::{debug, error};
139 /// How often heartbeat pings are sent
140 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
142 /// How long before lack of client response causes a timeout
143 const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
145 pub struct WsChatSession {
146 /// unique session id
147 pub id: ConnectionId,
151 /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
152 /// otherwise we drop connection.
156 apub_data: ContextData<LemmyContext>,
159 pub async fn websocket(
162 rate_limiter: web::Data<RateLimitCell>,
163 apub_data: ContextData<LemmyContext>,
164 ) -> Result<HttpResponse, Error> {
165 let client_ip = IpAddr(
168 .realip_remote_addr()
169 .unwrap_or("blank_ip")
173 let check = rate_limiter.message().check(client_ip.clone());
176 "Websocket join with IP: {} has been rate limited.",
179 return Ok(HttpResponse::TooManyRequests().finish());
194 /// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL).
196 /// also this method checks heartbeats from client
197 fn hb(ctx: &mut ws::WebsocketContext<WsChatSession>) {
198 ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
199 // check client heartbeats
200 if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
201 // heartbeat timed out
203 // notify chat server
207 .do_send(Disconnect { id: act.id });
212 // don't try to send a ping
220 impl Actor for WsChatSession {
221 type Context = ws::WebsocketContext<Self>;
223 /// Method is called on actor start.
224 /// We register ws session with ChatServer
225 fn started(&mut self, ctx: &mut Self::Context) {
226 // we'll start heartbeat process on session start.
229 // register self in chat server. `AsyncContext::wait` register
230 // future within context, but context waits until this future resolves
231 // before processing any other events.
232 // HttpContext::state() is instance of WsChatSessionState, state is shared
233 // across all routes within application
234 let addr = ctx.address();
239 addr: addr.recipient(),
242 .then(|res, act, ctx| {
244 Ok(res) => act.id = res,
245 // something is wrong with chat server
252 fn stopping(&mut self, _: &mut Self::Context) -> Running {
253 // notify chat server
257 .do_send(Disconnect { id: self.id });
262 /// Handle messages from chat server, we simply send it to peer websocket
263 impl Handler<WsMessage> for WsChatSession {
266 fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
271 /// WebSocket message handler
272 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
273 fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
274 let msg = match msg {
283 ws::Message::Ping(msg) => {
284 self.hb = Instant::now();
287 ws::Message::Pong(_) => {
288 self.hb = Instant::now();
290 ws::Message::Text(text) => {
291 let ip_clone = self.ip.clone();
292 let id_clone = self.id.to_owned();
293 let context_clone = self.apub_data.reset_request_count();
295 let fut = Box::pin(async move {
296 let msg = text.trim().to_string();
297 parse_json_message(msg, ip_clone, id_clone, context_clone).await
301 .then(|res, _, ctx| {
303 Ok(res) => ctx.text(res),
304 Err(e) => error!("{}", &e),
306 actix::fut::ready(())
310 ws::Message::Binary(_) => println!("Unexpected binary"),
311 ws::Message::Close(reason) => {
315 ws::Message::Continuation(_) => {
318 ws::Message::Nop => (),
323 /// Entry point for our websocket route
324 async fn parse_json_message(
327 connection_id: ConnectionId,
328 context: ContextData<LemmyContext>,
329 ) -> Result<String, LemmyError> {
330 let rate_limiter = context.settings_updated_channel();
331 let json: Value = serde_json::from_str(&msg)?;
335 .ok_or_else(|| LemmyError::from_message("missing data"))?;
337 let missing_op_err = || LemmyError::from_message("missing op");
341 .ok_or_else(missing_op_err)?
343 .ok_or_else(missing_op_err)?;
345 // check if api call passes the rate limit, and generate future for later execution
346 if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
347 let passed = match user_operation_crud {
348 UserOperationCrud::Register => rate_limiter.register().check(ip),
349 UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
350 UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
351 UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
352 _ => rate_limiter.message().check(ip),
354 check_rate_limit_passed(passed)?;
355 match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
356 } else if let Ok(user_operation) = UserOperation::from_str(op) {
357 let passed = match user_operation {
358 UserOperation::GetCaptcha => rate_limiter.post().check(ip),
359 _ => rate_limiter.message().check(ip),
361 check_rate_limit_passed(passed)?;
362 match_websocket_operation(context, connection_id, user_operation, data).await
364 let user_operation = UserOperationApub::from_str(op)?;
365 let passed = match user_operation {
366 UserOperationApub::Search => rate_limiter.search().check(ip),
367 _ => rate_limiter.message().check(ip),
369 check_rate_limit_passed(passed)?;
370 match_websocket_operation_apub(context, connection_id, user_operation, data).await
374 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
378 // if rate limit was hit, respond with message
379 Err(LemmyError::from_message("rate_limit_error"))
383 pub async fn match_websocket_operation_crud(
384 context: ContextData<LemmyContext>,
386 op: UserOperationCrud,
388 ) -> result::Result<String, LemmyError> {
391 UserOperationCrud::Register => {
392 do_websocket_operation_crud::<Register>(context, id, op, data).await
394 UserOperationCrud::DeleteAccount => {
395 do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
398 // Private Message ops
399 UserOperationCrud::CreatePrivateMessage => {
400 do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
402 UserOperationCrud::EditPrivateMessage => {
403 do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
405 UserOperationCrud::DeletePrivateMessage => {
406 do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
408 UserOperationCrud::GetPrivateMessages => {
409 do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
413 UserOperationCrud::CreateSite => {
414 do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
416 UserOperationCrud::EditSite => {
417 do_websocket_operation_crud::<EditSite>(context, id, op, data).await
419 UserOperationCrud::GetSite => {
420 do_websocket_operation_crud::<GetSite>(context, id, op, data).await
424 UserOperationCrud::ListCommunities => {
425 do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
427 UserOperationCrud::CreateCommunity => {
428 do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
430 UserOperationCrud::EditCommunity => {
431 do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
433 UserOperationCrud::DeleteCommunity => {
434 do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
436 UserOperationCrud::RemoveCommunity => {
437 do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
441 UserOperationCrud::CreatePost => {
442 do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
444 UserOperationCrud::GetPost => {
445 do_websocket_operation_crud::<GetPost>(context, id, op, data).await
447 UserOperationCrud::EditPost => {
448 do_websocket_operation_crud::<EditPost>(context, id, op, data).await
450 UserOperationCrud::DeletePost => {
451 do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
453 UserOperationCrud::RemovePost => {
454 do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
458 UserOperationCrud::CreateComment => {
459 do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
461 UserOperationCrud::EditComment => {
462 do_websocket_operation_crud::<EditComment>(context, id, op, data).await
464 UserOperationCrud::DeleteComment => {
465 do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
467 UserOperationCrud::RemoveComment => {
468 do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
470 UserOperationCrud::GetComment => {
471 do_websocket_operation_crud::<GetComment>(context, id, op, data).await
474 UserOperationCrud::CreateCustomEmoji => {
475 do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
477 UserOperationCrud::EditCustomEmoji => {
478 do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
480 UserOperationCrud::DeleteCustomEmoji => {
481 do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
486 async fn do_websocket_operation_crud<'a, 'b, Data>(
487 context: ContextData<LemmyContext>,
489 op: UserOperationCrud,
491 ) -> result::Result<String, LemmyError>
493 Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response> + Send,
494 for<'de> Data: Deserialize<'de>,
496 let parsed_data: Data = serde_json::from_value(data)?;
497 let res = parsed_data
498 .perform(&web::Data::new(context.deref().clone()), Some(id))
500 SendActivity::send_activity(&parsed_data, &res, &context).await?;
501 serialize_websocket_message(&op, &res)
504 pub async fn match_websocket_operation_apub(
505 context: ContextData<LemmyContext>,
507 op: UserOperationApub,
509 ) -> result::Result<String, LemmyError> {
511 UserOperationApub::GetPersonDetails => {
512 do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
514 UserOperationApub::GetCommunity => {
515 do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
517 UserOperationApub::GetComments => {
518 do_websocket_operation_apub::<GetComments>(context, id, op, data).await
520 UserOperationApub::GetPosts => {
521 do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
523 UserOperationApub::ResolveObject => {
524 do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
526 UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
530 async fn do_websocket_operation_apub<'a, 'b, Data>(
531 context: ContextData<LemmyContext>,
533 op: UserOperationApub,
535 ) -> result::Result<String, LemmyError>
537 Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response> + Send,
538 for<'de> Data: Deserialize<'de>,
540 let parsed_data: Data = serde_json::from_value(data)?;
541 let res = parsed_data.perform(&context, Some(id)).await?;
542 SendActivity::send_activity(&parsed_data, &res, &context).await?;
543 serialize_websocket_message(&op, &res)
546 pub async fn match_websocket_operation(
547 context: ContextData<LemmyContext>,
551 ) -> result::Result<String, LemmyError> {
554 UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
555 UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
556 UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
557 UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
558 UserOperation::GetUnreadRegistrationApplicationCount => {
559 do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
561 UserOperation::ListRegistrationApplications => {
562 do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
564 UserOperation::ApproveRegistrationApplication => {
565 do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
567 UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
568 UserOperation::GetBannedPersons => {
569 do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
571 UserOperation::BlockPerson => {
572 do_websocket_operation::<BlockPerson>(context, id, op, data).await
574 UserOperation::GetPersonMentions => {
575 do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
577 UserOperation::MarkPersonMentionAsRead => {
578 do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
580 UserOperation::MarkCommentReplyAsRead => {
581 do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
583 UserOperation::MarkAllAsRead => {
584 do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
586 UserOperation::PasswordReset => {
587 do_websocket_operation::<PasswordReset>(context, id, op, data).await
589 UserOperation::PasswordChange => {
590 do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
592 UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
593 UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
594 UserOperation::CommunityJoin => {
595 do_websocket_operation::<CommunityJoin>(context, id, op, data).await
597 UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
598 UserOperation::SaveUserSettings => {
599 do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
601 UserOperation::ChangePassword => {
602 do_websocket_operation::<ChangePassword>(context, id, op, data).await
604 UserOperation::GetReportCount => {
605 do_websocket_operation::<GetReportCount>(context, id, op, data).await
607 UserOperation::GetUnreadCount => {
608 do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
610 UserOperation::VerifyEmail => {
611 do_websocket_operation::<VerifyEmail>(context, id, op, data).await
614 // Private Message ops
615 UserOperation::MarkPrivateMessageAsRead => {
616 do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
618 UserOperation::CreatePrivateMessageReport => {
619 do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
621 UserOperation::ResolvePrivateMessageReport => {
622 do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
624 UserOperation::ListPrivateMessageReports => {
625 do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
629 UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
630 UserOperation::PurgePerson => {
631 do_websocket_operation::<PurgePerson>(context, id, op, data).await
633 UserOperation::PurgeCommunity => {
634 do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
636 UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
637 UserOperation::PurgeComment => {
638 do_websocket_operation::<PurgeComment>(context, id, op, data).await
640 UserOperation::TransferCommunity => {
641 do_websocket_operation::<TransferCommunity>(context, id, op, data).await
643 UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
644 UserOperation::GetFederatedInstances => {
645 do_websocket_operation::<GetFederatedInstances>(context, id, op, data).await
649 UserOperation::FollowCommunity => {
650 do_websocket_operation::<FollowCommunity>(context, id, op, data).await
652 UserOperation::BlockCommunity => {
653 do_websocket_operation::<BlockCommunity>(context, id, op, data).await
655 UserOperation::BanFromCommunity => {
656 do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
658 UserOperation::AddModToCommunity => {
659 do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
663 UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
664 UserOperation::FeaturePost => {
665 do_websocket_operation::<FeaturePost>(context, id, op, data).await
667 UserOperation::CreatePostLike => {
668 do_websocket_operation::<CreatePostLike>(context, id, op, data).await
670 UserOperation::MarkPostAsRead => {
671 do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
673 UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
674 UserOperation::CreatePostReport => {
675 do_websocket_operation::<CreatePostReport>(context, id, op, data).await
677 UserOperation::ListPostReports => {
678 do_websocket_operation::<ListPostReports>(context, id, op, data).await
680 UserOperation::ResolvePostReport => {
681 do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
683 UserOperation::GetSiteMetadata => {
684 do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
688 UserOperation::SaveComment => {
689 do_websocket_operation::<SaveComment>(context, id, op, data).await
691 UserOperation::CreateCommentLike => {
692 do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
694 UserOperation::DistinguishComment => {
695 do_websocket_operation::<DistinguishComment>(context, id, op, data).await
697 UserOperation::CreateCommentReport => {
698 do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
700 UserOperation::ListCommentReports => {
701 do_websocket_operation::<ListCommentReports>(context, id, op, data).await
703 UserOperation::ResolveCommentReport => {
704 do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
709 async fn do_websocket_operation<'a, 'b, Data>(
710 context: ContextData<LemmyContext>,
714 ) -> result::Result<String, LemmyError>
716 Data: Perform + SendActivity<Response = <Data as Perform>::Response> + Send,
717 for<'de> Data: Deserialize<'de>,
719 let parsed_data: Data = serde_json::from_value(data)?;
720 let res = parsed_data
721 .perform(&web::Data::new(context.deref().clone()), Some(id))
723 SendActivity::send_activity(&parsed_data, &res, &context).await?;
724 serialize_websocket_message(&op, &res)