1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
34 context::LemmyContext,
50 MarkCommentReplyAsRead,
51 MarkPersonMentionAsRead,
52 PasswordChangeAfterReset,
77 CreatePrivateMessageReport,
81 ListPrivateMessageReports,
82 MarkPrivateMessageAsRead,
83 ResolvePrivateMessageReport,
86 ApproveRegistrationApplication,
91 GetUnreadRegistrationApplicationCount,
93 ListRegistrationApplications,
102 serialize_websocket_message,
103 structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
109 use lemmy_api_crud::PerformCrud;
110 use lemmy_apub::{api::PerformApub, SendActivity};
111 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
112 use serde::Deserialize;
113 use serde_json::Value;
118 time::{Duration, Instant},
120 use tracing::{debug, error, info};
122 /// Entry point for our route
123 pub async fn websocket(
126 context: web::Data<LemmyContext>,
127 rate_limiter: web::Data<RateLimitCell>,
128 ) -> Result<HttpResponse, Error> {
129 let (response, session, stream) = actix_ws::handle(&req, body)?;
131 let client_ip = IpAddr(
134 .realip_remote_addr()
135 .unwrap_or("blank_ip")
139 let check = rate_limiter.message().check(client_ip.clone());
142 "Websocket join with IP: {} has been rate limited.",
145 session.close(None).await.map_err(LemmyError::from)?;
149 let connection_id = context.chat_server().handle_connect(session.clone())?;
150 info!("{} joined", &client_ip);
152 let alive = Arc::new(Mutex::new(Instant::now()));
153 heartbeat(session.clone(), alive.clone());
155 actix_rt::spawn(handle_messages(
168 async fn handle_messages(
169 mut stream: MessageStream,
171 mut session: Session,
172 connection_id: ConnectionId,
173 alive: Arc<Mutex<Instant>>,
174 rate_limiter: web::Data<RateLimitCell>,
175 context: web::Data<LemmyContext>,
176 ) -> Result<(), LemmyError> {
177 while let Some(Ok(msg)) = stream.next().await {
179 ws::Message::Ping(bytes) => {
180 if session.pong(&bytes).await.is_err() {
184 ws::Message::Pong(_) => {
187 .expect("Failed to acquire websocket heartbeat alive lock");
188 *lock = Instant::now();
190 ws::Message::Text(text) => {
191 let msg = text.trim().to_string();
192 let executed = parse_json_message(
196 rate_limiter.get_ref(),
197 context.get_ref().clone(),
201 let res = executed.unwrap_or_else(|e| {
202 error!("Error during message handling {}", e);
204 .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
206 session.text(res).await?;
208 ws::Message::Close(_) => {
209 session.close(None).await?;
210 context.chat_server().handle_disconnect(&connection_id)?;
213 ws::Message::Binary(_) => info!("Unexpected binary"),
220 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
221 actix_rt::spawn(async move {
222 let mut interval = actix_rt::time::interval(Duration::from_secs(5));
224 if session.ping(b"").await.is_err() {
228 let duration_since = {
229 let alive_lock = alive
231 .expect("Failed to acquire websocket heartbeat alive lock");
232 Instant::now().duration_since(*alive_lock)
234 if duration_since > Duration::from_secs(10) {
235 let _ = session.close(None).await;
238 interval.tick().await;
243 async fn parse_json_message(
246 connection_id: ConnectionId,
247 rate_limiter: &RateLimitCell,
248 context: LemmyContext,
249 ) -> Result<String, LemmyError> {
250 let json: Value = serde_json::from_str(&msg)?;
254 .ok_or_else(|| LemmyError::from_message("missing data"))?;
256 let missing_op_err = || LemmyError::from_message("missing op");
260 .ok_or_else(missing_op_err)?
262 .ok_or_else(missing_op_err)?;
264 // check if api call passes the rate limit, and generate future for later execution
265 if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
266 let passed = match user_operation_crud {
267 UserOperationCrud::Register => rate_limiter.register().check(ip),
268 UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
269 UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
270 UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
271 _ => rate_limiter.message().check(ip),
273 check_rate_limit_passed(passed)?;
274 match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
275 } else if let Ok(user_operation) = UserOperation::from_str(op) {
276 let passed = match user_operation {
277 UserOperation::GetCaptcha => rate_limiter.post().check(ip),
278 _ => rate_limiter.message().check(ip),
280 check_rate_limit_passed(passed)?;
281 match_websocket_operation(context, connection_id, user_operation, data).await
283 let user_operation = UserOperationApub::from_str(op)?;
284 let passed = match user_operation {
285 UserOperationApub::Search => rate_limiter.search().check(ip),
286 _ => rate_limiter.message().check(ip),
288 check_rate_limit_passed(passed)?;
289 match_websocket_operation_apub(context, connection_id, user_operation, data).await
293 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
297 // if rate limit was hit, respond with message
298 Err(LemmyError::from_message("rate_limit_error"))
302 pub async fn match_websocket_operation_crud(
303 context: LemmyContext,
305 op: UserOperationCrud,
307 ) -> result::Result<String, LemmyError> {
310 UserOperationCrud::Register => {
311 do_websocket_operation_crud::<Register>(context, id, op, data).await
313 UserOperationCrud::DeleteAccount => {
314 do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
317 // Private Message ops
318 UserOperationCrud::CreatePrivateMessage => {
319 do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
321 UserOperationCrud::EditPrivateMessage => {
322 do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
324 UserOperationCrud::DeletePrivateMessage => {
325 do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
327 UserOperationCrud::GetPrivateMessages => {
328 do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
332 UserOperationCrud::CreateSite => {
333 do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
335 UserOperationCrud::EditSite => {
336 do_websocket_operation_crud::<EditSite>(context, id, op, data).await
338 UserOperationCrud::GetSite => {
339 do_websocket_operation_crud::<GetSite>(context, id, op, data).await
343 UserOperationCrud::ListCommunities => {
344 do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
346 UserOperationCrud::CreateCommunity => {
347 do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
349 UserOperationCrud::EditCommunity => {
350 do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
352 UserOperationCrud::DeleteCommunity => {
353 do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
355 UserOperationCrud::RemoveCommunity => {
356 do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
360 UserOperationCrud::CreatePost => {
361 do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
363 UserOperationCrud::GetPost => {
364 do_websocket_operation_crud::<GetPost>(context, id, op, data).await
366 UserOperationCrud::EditPost => {
367 do_websocket_operation_crud::<EditPost>(context, id, op, data).await
369 UserOperationCrud::DeletePost => {
370 do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
372 UserOperationCrud::RemovePost => {
373 do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
377 UserOperationCrud::CreateComment => {
378 do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
380 UserOperationCrud::EditComment => {
381 do_websocket_operation_crud::<EditComment>(context, id, op, data).await
383 UserOperationCrud::DeleteComment => {
384 do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
386 UserOperationCrud::RemoveComment => {
387 do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
389 UserOperationCrud::GetComment => {
390 do_websocket_operation_crud::<GetComment>(context, id, op, data).await
395 async fn do_websocket_operation_crud<'a, 'b, Data>(
396 context: LemmyContext,
398 op: UserOperationCrud,
400 ) -> result::Result<String, LemmyError>
402 Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
403 for<'de> Data: Deserialize<'de>,
405 let parsed_data: Data = serde_json::from_value(data)?;
406 let res = parsed_data
407 .perform(&web::Data::new(context.clone()), Some(id))
409 SendActivity::send_activity(&parsed_data, &res, &context).await?;
410 serialize_websocket_message(&op, &res)
413 pub async fn match_websocket_operation_apub(
414 context: LemmyContext,
416 op: UserOperationApub,
418 ) -> result::Result<String, LemmyError> {
420 UserOperationApub::GetPersonDetails => {
421 do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
423 UserOperationApub::GetCommunity => {
424 do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
426 UserOperationApub::GetComments => {
427 do_websocket_operation_apub::<GetComments>(context, id, op, data).await
429 UserOperationApub::GetPosts => {
430 do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
432 UserOperationApub::ResolveObject => {
433 do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
435 UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
439 async fn do_websocket_operation_apub<'a, 'b, Data>(
440 context: LemmyContext,
442 op: UserOperationApub,
444 ) -> result::Result<String, LemmyError>
446 Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
447 for<'de> Data: Deserialize<'de>,
449 let parsed_data: Data = serde_json::from_value(data)?;
450 let res = parsed_data
451 .perform(&web::Data::new(context.clone()), Some(id))
453 SendActivity::send_activity(&parsed_data, &res, &context).await?;
454 serialize_websocket_message(&op, &res)
457 pub async fn match_websocket_operation(
458 context: LemmyContext,
462 ) -> result::Result<String, LemmyError> {
465 UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
466 UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
467 UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
468 UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
469 UserOperation::GetUnreadRegistrationApplicationCount => {
470 do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
472 UserOperation::ListRegistrationApplications => {
473 do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
475 UserOperation::ApproveRegistrationApplication => {
476 do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
478 UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
479 UserOperation::GetBannedPersons => {
480 do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
482 UserOperation::BlockPerson => {
483 do_websocket_operation::<BlockPerson>(context, id, op, data).await
485 UserOperation::GetPersonMentions => {
486 do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
488 UserOperation::MarkPersonMentionAsRead => {
489 do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
491 UserOperation::MarkCommentReplyAsRead => {
492 do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
494 UserOperation::MarkAllAsRead => {
495 do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
497 UserOperation::PasswordReset => {
498 do_websocket_operation::<PasswordReset>(context, id, op, data).await
500 UserOperation::PasswordChange => {
501 do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
503 UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
504 UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
505 UserOperation::CommunityJoin => {
506 do_websocket_operation::<CommunityJoin>(context, id, op, data).await
508 UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
509 UserOperation::SaveUserSettings => {
510 do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
512 UserOperation::ChangePassword => {
513 do_websocket_operation::<ChangePassword>(context, id, op, data).await
515 UserOperation::GetReportCount => {
516 do_websocket_operation::<GetReportCount>(context, id, op, data).await
518 UserOperation::GetUnreadCount => {
519 do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
521 UserOperation::VerifyEmail => {
522 do_websocket_operation::<VerifyEmail>(context, id, op, data).await
525 // Private Message ops
526 UserOperation::MarkPrivateMessageAsRead => {
527 do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
529 UserOperation::CreatePrivateMessageReport => {
530 do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
532 UserOperation::ResolvePrivateMessageReport => {
533 do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
535 UserOperation::ListPrivateMessageReports => {
536 do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
540 UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
541 UserOperation::PurgePerson => {
542 do_websocket_operation::<PurgePerson>(context, id, op, data).await
544 UserOperation::PurgeCommunity => {
545 do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
547 UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
548 UserOperation::PurgeComment => {
549 do_websocket_operation::<PurgeComment>(context, id, op, data).await
551 UserOperation::TransferCommunity => {
552 do_websocket_operation::<TransferCommunity>(context, id, op, data).await
554 UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
557 UserOperation::FollowCommunity => {
558 do_websocket_operation::<FollowCommunity>(context, id, op, data).await
560 UserOperation::BlockCommunity => {
561 do_websocket_operation::<BlockCommunity>(context, id, op, data).await
563 UserOperation::BanFromCommunity => {
564 do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
566 UserOperation::AddModToCommunity => {
567 do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
571 UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
572 UserOperation::FeaturePost => {
573 do_websocket_operation::<FeaturePost>(context, id, op, data).await
575 UserOperation::CreatePostLike => {
576 do_websocket_operation::<CreatePostLike>(context, id, op, data).await
578 UserOperation::MarkPostAsRead => {
579 do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
581 UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
582 UserOperation::CreatePostReport => {
583 do_websocket_operation::<CreatePostReport>(context, id, op, data).await
585 UserOperation::ListPostReports => {
586 do_websocket_operation::<ListPostReports>(context, id, op, data).await
588 UserOperation::ResolvePostReport => {
589 do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
591 UserOperation::GetSiteMetadata => {
592 do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
596 UserOperation::SaveComment => {
597 do_websocket_operation::<SaveComment>(context, id, op, data).await
599 UserOperation::CreateCommentLike => {
600 do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
602 UserOperation::DistinguishComment => {
603 do_websocket_operation::<DistinguishComment>(context, id, op, data).await
605 UserOperation::CreateCommentReport => {
606 do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
608 UserOperation::ListCommentReports => {
609 do_websocket_operation::<ListCommentReports>(context, id, op, data).await
611 UserOperation::ResolveCommentReport => {
612 do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
617 async fn do_websocket_operation<'a, 'b, Data>(
618 context: LemmyContext,
622 ) -> result::Result<String, LemmyError>
624 Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
625 for<'de> Data: Deserialize<'de>,
627 let parsed_data: Data = serde_json::from_value(data)?;
628 let res = parsed_data
629 .perform(&web::Data::new(context.clone()), Some(id))
631 SendActivity::send_activity(&parsed_data, &res, &context).await?;
632 serialize_websocket_message(&op, &res)