1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
34 context::LemmyContext,
35 custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
51 MarkCommentReplyAsRead,
52 MarkPersonMentionAsRead,
53 PasswordChangeAfterReset,
78 CreatePrivateMessageReport,
82 ListPrivateMessageReports,
83 MarkPrivateMessageAsRead,
84 ResolvePrivateMessageReport,
87 ApproveRegistrationApplication,
92 GetUnreadRegistrationApplicationCount,
94 ListRegistrationApplications,
103 serialize_websocket_message,
104 structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
110 use lemmy_api_crud::PerformCrud;
111 use lemmy_apub::{api::PerformApub, SendActivity};
112 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
113 use serde::Deserialize;
114 use serde_json::Value;
119 time::{Duration, Instant},
121 use tracing::{debug, error, info};
123 /// Entry point for our route
124 pub async fn websocket(
127 context: web::Data<LemmyContext>,
128 rate_limiter: web::Data<RateLimitCell>,
129 ) -> Result<HttpResponse, Error> {
130 let (response, session, stream) = actix_ws::handle(&req, body)?;
132 let client_ip = IpAddr(
135 .realip_remote_addr()
136 .unwrap_or("blank_ip")
140 let check = rate_limiter.message().check(client_ip.clone());
143 "Websocket join with IP: {} has been rate limited.",
146 session.close(None).await.map_err(LemmyError::from)?;
150 let connection_id = context.chat_server().handle_connect(session.clone())?;
151 info!("{} joined", &client_ip);
153 let alive = Arc::new(Mutex::new(Instant::now()));
154 heartbeat(session.clone(), alive.clone());
156 actix_rt::spawn(handle_messages(
169 async fn handle_messages(
170 mut stream: MessageStream,
172 mut session: Session,
173 connection_id: ConnectionId,
174 alive: Arc<Mutex<Instant>>,
175 rate_limiter: web::Data<RateLimitCell>,
176 context: web::Data<LemmyContext>,
177 ) -> Result<(), LemmyError> {
178 while let Some(Ok(msg)) = stream.next().await {
180 ws::Message::Ping(bytes) => {
181 if session.pong(&bytes).await.is_err() {
185 ws::Message::Pong(_) => {
188 .expect("Failed to acquire websocket heartbeat alive lock");
189 *lock = Instant::now();
191 ws::Message::Text(text) => {
192 let msg = text.trim().to_string();
193 let executed = parse_json_message(
197 rate_limiter.get_ref(),
198 context.get_ref().clone(),
202 let res = executed.unwrap_or_else(|e| {
203 error!("Error during message handling {}", e);
205 .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
207 session.text(res).await?;
209 ws::Message::Close(_) => {
210 session.close(None).await?;
211 context.chat_server().handle_disconnect(&connection_id)?;
214 ws::Message::Binary(_) => info!("Unexpected binary"),
221 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
222 actix_rt::spawn(async move {
223 let mut interval = actix_rt::time::interval(Duration::from_secs(5));
225 if session.ping(b"").await.is_err() {
229 let duration_since = {
230 let alive_lock = alive
232 .expect("Failed to acquire websocket heartbeat alive lock");
233 Instant::now().duration_since(*alive_lock)
235 if duration_since > Duration::from_secs(10) {
236 let _ = session.close(None).await;
239 interval.tick().await;
244 async fn parse_json_message(
247 connection_id: ConnectionId,
248 rate_limiter: &RateLimitCell,
249 context: LemmyContext,
250 ) -> Result<String, LemmyError> {
251 let json: Value = serde_json::from_str(&msg)?;
255 .ok_or_else(|| LemmyError::from_message("missing data"))?;
257 let missing_op_err = || LemmyError::from_message("missing op");
261 .ok_or_else(missing_op_err)?
263 .ok_or_else(missing_op_err)?;
265 // check if api call passes the rate limit, and generate future for later execution
266 if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
267 let passed = match user_operation_crud {
268 UserOperationCrud::Register => rate_limiter.register().check(ip),
269 UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
270 UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
271 UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
272 _ => rate_limiter.message().check(ip),
274 check_rate_limit_passed(passed)?;
275 match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
276 } else if let Ok(user_operation) = UserOperation::from_str(op) {
277 let passed = match user_operation {
278 UserOperation::GetCaptcha => rate_limiter.post().check(ip),
279 _ => rate_limiter.message().check(ip),
281 check_rate_limit_passed(passed)?;
282 match_websocket_operation(context, connection_id, user_operation, data).await
284 let user_operation = UserOperationApub::from_str(op)?;
285 let passed = match user_operation {
286 UserOperationApub::Search => rate_limiter.search().check(ip),
287 _ => rate_limiter.message().check(ip),
289 check_rate_limit_passed(passed)?;
290 match_websocket_operation_apub(context, connection_id, user_operation, data).await
294 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
298 // if rate limit was hit, respond with message
299 Err(LemmyError::from_message("rate_limit_error"))
303 pub async fn match_websocket_operation_crud(
304 context: LemmyContext,
306 op: UserOperationCrud,
308 ) -> result::Result<String, LemmyError> {
311 UserOperationCrud::Register => {
312 do_websocket_operation_crud::<Register>(context, id, op, data).await
314 UserOperationCrud::DeleteAccount => {
315 do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
318 // Private Message ops
319 UserOperationCrud::CreatePrivateMessage => {
320 do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
322 UserOperationCrud::EditPrivateMessage => {
323 do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
325 UserOperationCrud::DeletePrivateMessage => {
326 do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
328 UserOperationCrud::GetPrivateMessages => {
329 do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
333 UserOperationCrud::CreateSite => {
334 do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
336 UserOperationCrud::EditSite => {
337 do_websocket_operation_crud::<EditSite>(context, id, op, data).await
339 UserOperationCrud::GetSite => {
340 do_websocket_operation_crud::<GetSite>(context, id, op, data).await
344 UserOperationCrud::ListCommunities => {
345 do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
347 UserOperationCrud::CreateCommunity => {
348 do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
350 UserOperationCrud::EditCommunity => {
351 do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
353 UserOperationCrud::DeleteCommunity => {
354 do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
356 UserOperationCrud::RemoveCommunity => {
357 do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
361 UserOperationCrud::CreatePost => {
362 do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
364 UserOperationCrud::GetPost => {
365 do_websocket_operation_crud::<GetPost>(context, id, op, data).await
367 UserOperationCrud::EditPost => {
368 do_websocket_operation_crud::<EditPost>(context, id, op, data).await
370 UserOperationCrud::DeletePost => {
371 do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
373 UserOperationCrud::RemovePost => {
374 do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
378 UserOperationCrud::CreateComment => {
379 do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
381 UserOperationCrud::EditComment => {
382 do_websocket_operation_crud::<EditComment>(context, id, op, data).await
384 UserOperationCrud::DeleteComment => {
385 do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
387 UserOperationCrud::RemoveComment => {
388 do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
390 UserOperationCrud::GetComment => {
391 do_websocket_operation_crud::<GetComment>(context, id, op, data).await
394 UserOperationCrud::CreateCustomEmoji => {
395 do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
397 UserOperationCrud::EditCustomEmoji => {
398 do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
400 UserOperationCrud::DeleteCustomEmoji => {
401 do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
406 async fn do_websocket_operation_crud<'a, 'b, Data>(
407 context: LemmyContext,
409 op: UserOperationCrud,
411 ) -> result::Result<String, LemmyError>
413 Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
414 for<'de> Data: Deserialize<'de>,
416 let parsed_data: Data = serde_json::from_value(data)?;
417 let res = parsed_data
418 .perform(&web::Data::new(context.clone()), Some(id))
420 SendActivity::send_activity(&parsed_data, &res, &context).await?;
421 serialize_websocket_message(&op, &res)
424 pub async fn match_websocket_operation_apub(
425 context: LemmyContext,
427 op: UserOperationApub,
429 ) -> result::Result<String, LemmyError> {
431 UserOperationApub::GetPersonDetails => {
432 do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
434 UserOperationApub::GetCommunity => {
435 do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
437 UserOperationApub::GetComments => {
438 do_websocket_operation_apub::<GetComments>(context, id, op, data).await
440 UserOperationApub::GetPosts => {
441 do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
443 UserOperationApub::ResolveObject => {
444 do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
446 UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
450 async fn do_websocket_operation_apub<'a, 'b, Data>(
451 context: LemmyContext,
453 op: UserOperationApub,
455 ) -> result::Result<String, LemmyError>
457 Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
458 for<'de> Data: Deserialize<'de>,
460 let parsed_data: Data = serde_json::from_value(data)?;
461 let res = parsed_data
462 .perform(&web::Data::new(context.clone()), Some(id))
464 SendActivity::send_activity(&parsed_data, &res, &context).await?;
465 serialize_websocket_message(&op, &res)
468 pub async fn match_websocket_operation(
469 context: LemmyContext,
473 ) -> result::Result<String, LemmyError> {
476 UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
477 UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
478 UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
479 UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
480 UserOperation::GetUnreadRegistrationApplicationCount => {
481 do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
483 UserOperation::ListRegistrationApplications => {
484 do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
486 UserOperation::ApproveRegistrationApplication => {
487 do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
489 UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
490 UserOperation::GetBannedPersons => {
491 do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
493 UserOperation::BlockPerson => {
494 do_websocket_operation::<BlockPerson>(context, id, op, data).await
496 UserOperation::GetPersonMentions => {
497 do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
499 UserOperation::MarkPersonMentionAsRead => {
500 do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
502 UserOperation::MarkCommentReplyAsRead => {
503 do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
505 UserOperation::MarkAllAsRead => {
506 do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
508 UserOperation::PasswordReset => {
509 do_websocket_operation::<PasswordReset>(context, id, op, data).await
511 UserOperation::PasswordChange => {
512 do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
514 UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
515 UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
516 UserOperation::CommunityJoin => {
517 do_websocket_operation::<CommunityJoin>(context, id, op, data).await
519 UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
520 UserOperation::SaveUserSettings => {
521 do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
523 UserOperation::ChangePassword => {
524 do_websocket_operation::<ChangePassword>(context, id, op, data).await
526 UserOperation::GetReportCount => {
527 do_websocket_operation::<GetReportCount>(context, id, op, data).await
529 UserOperation::GetUnreadCount => {
530 do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
532 UserOperation::VerifyEmail => {
533 do_websocket_operation::<VerifyEmail>(context, id, op, data).await
536 // Private Message ops
537 UserOperation::MarkPrivateMessageAsRead => {
538 do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
540 UserOperation::CreatePrivateMessageReport => {
541 do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
543 UserOperation::ResolvePrivateMessageReport => {
544 do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
546 UserOperation::ListPrivateMessageReports => {
547 do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
551 UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
552 UserOperation::PurgePerson => {
553 do_websocket_operation::<PurgePerson>(context, id, op, data).await
555 UserOperation::PurgeCommunity => {
556 do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
558 UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
559 UserOperation::PurgeComment => {
560 do_websocket_operation::<PurgeComment>(context, id, op, data).await
562 UserOperation::TransferCommunity => {
563 do_websocket_operation::<TransferCommunity>(context, id, op, data).await
565 UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
568 UserOperation::FollowCommunity => {
569 do_websocket_operation::<FollowCommunity>(context, id, op, data).await
571 UserOperation::BlockCommunity => {
572 do_websocket_operation::<BlockCommunity>(context, id, op, data).await
574 UserOperation::BanFromCommunity => {
575 do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
577 UserOperation::AddModToCommunity => {
578 do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
582 UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
583 UserOperation::FeaturePost => {
584 do_websocket_operation::<FeaturePost>(context, id, op, data).await
586 UserOperation::CreatePostLike => {
587 do_websocket_operation::<CreatePostLike>(context, id, op, data).await
589 UserOperation::MarkPostAsRead => {
590 do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
592 UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
593 UserOperation::CreatePostReport => {
594 do_websocket_operation::<CreatePostReport>(context, id, op, data).await
596 UserOperation::ListPostReports => {
597 do_websocket_operation::<ListPostReports>(context, id, op, data).await
599 UserOperation::ResolvePostReport => {
600 do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
602 UserOperation::GetSiteMetadata => {
603 do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
607 UserOperation::SaveComment => {
608 do_websocket_operation::<SaveComment>(context, id, op, data).await
610 UserOperation::CreateCommentLike => {
611 do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
613 UserOperation::DistinguishComment => {
614 do_websocket_operation::<DistinguishComment>(context, id, op, data).await
616 UserOperation::CreateCommentReport => {
617 do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
619 UserOperation::ListCommentReports => {
620 do_websocket_operation::<ListCommentReports>(context, id, op, data).await
622 UserOperation::ResolveCommentReport => {
623 do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
628 async fn do_websocket_operation<'a, 'b, Data>(
629 context: LemmyContext,
633 ) -> result::Result<String, LemmyError>
635 Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
636 for<'de> Data: Deserialize<'de>,
638 let parsed_data: Data = serde_json::from_value(data)?;
639 let res = parsed_data
640 .perform(&web::Data::new(context.clone()), Some(id))
642 SendActivity::send_activity(&parsed_data, &res, &context).await?;
643 serialize_websocket_message(&op, &res)