1 use actix_web::{web, Error, HttpRequest, HttpResponse};
2 use actix_web_actors::ws;
3 use actix_ws::{MessageStream, Session};
4 use futures::stream::StreamExt;
5 use lemmy_api::Perform;
6 use lemmy_api_common::{
33 context::LemmyContext,
49 MarkCommentReplyAsRead,
50 MarkPersonMentionAsRead,
51 PasswordChangeAfterReset,
76 CreatePrivateMessageReport,
80 ListPrivateMessageReports,
81 MarkPrivateMessageAsRead,
82 ResolvePrivateMessageReport,
85 ApproveRegistrationApplication,
90 GetUnreadRegistrationApplicationCount,
92 ListRegistrationApplications,
101 serialize_websocket_message,
102 structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
108 use lemmy_api_crud::PerformCrud;
109 use lemmy_apub::{api::PerformApub, SendActivity};
110 use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
111 use serde::Deserialize;
112 use serde_json::Value;
117 time::{Duration, Instant},
119 use tracing::{debug, error, info};
121 /// Entry point for our route
122 pub async fn websocket(
125 context: web::Data<LemmyContext>,
126 rate_limiter: web::Data<RateLimitCell>,
127 ) -> Result<HttpResponse, Error> {
128 let (response, session, stream) = actix_ws::handle(&req, body)?;
130 let client_ip = IpAddr(
133 .realip_remote_addr()
134 .unwrap_or("blank_ip")
138 let check = rate_limiter.message().check(client_ip.clone());
141 "Websocket join with IP: {} has been rate limited.",
144 session.close(None).await.map_err(LemmyError::from)?;
148 let connection_id = context.chat_server().handle_connect(session.clone())?;
149 info!("{} joined", &client_ip);
151 let alive = Arc::new(Mutex::new(Instant::now()));
152 heartbeat(session.clone(), alive.clone());
154 actix_rt::spawn(handle_messages(
167 async fn handle_messages(
168 mut stream: MessageStream,
170 mut session: Session,
171 connection_id: ConnectionId,
172 alive: Arc<Mutex<Instant>>,
173 rate_limiter: web::Data<RateLimitCell>,
174 context: web::Data<LemmyContext>,
175 ) -> Result<(), LemmyError> {
176 while let Some(Ok(msg)) = stream.next().await {
178 ws::Message::Ping(bytes) => {
179 if session.pong(&bytes).await.is_err() {
183 ws::Message::Pong(_) => {
186 .expect("Failed to acquire websocket heartbeat alive lock");
187 *lock = Instant::now();
189 ws::Message::Text(text) => {
190 let msg = text.trim().to_string();
191 let executed = parse_json_message(
195 rate_limiter.get_ref(),
196 context.get_ref().clone(),
200 let res = executed.unwrap_or_else(|e| {
201 error!("Error during message handling {}", e);
203 .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
205 session.text(res).await?;
207 ws::Message::Close(_) => {
208 session.close(None).await?;
209 context.chat_server().handle_disconnect(&connection_id)?;
212 ws::Message::Binary(_) => info!("Unexpected binary"),
219 fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
220 actix_rt::spawn(async move {
221 let mut interval = actix_rt::time::interval(Duration::from_secs(5));
223 if session.ping(b"").await.is_err() {
227 let duration_since = {
228 let alive_lock = alive
230 .expect("Failed to acquire websocket heartbeat alive lock");
231 Instant::now().duration_since(*alive_lock)
233 if duration_since > Duration::from_secs(10) {
234 let _ = session.close(None).await;
237 interval.tick().await;
242 async fn parse_json_message(
245 connection_id: ConnectionId,
246 rate_limiter: &RateLimitCell,
247 context: LemmyContext,
248 ) -> Result<String, LemmyError> {
249 let json: Value = serde_json::from_str(&msg)?;
250 let data = &json["data"].to_string();
253 .ok_or_else(|| LemmyError::from_message("missing op"))?;
255 // check if api call passes the rate limit, and generate future for later execution
256 if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
257 let passed = match user_operation_crud {
258 UserOperationCrud::Register => rate_limiter.register().check(ip),
259 UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
260 UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
261 UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
262 _ => rate_limiter.message().check(ip),
264 check_rate_limit_passed(passed)?;
265 match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
266 } else if let Ok(user_operation) = UserOperation::from_str(op) {
267 let passed = match user_operation {
268 UserOperation::GetCaptcha => rate_limiter.post().check(ip),
269 _ => rate_limiter.message().check(ip),
271 check_rate_limit_passed(passed)?;
272 match_websocket_operation(context, connection_id, user_operation, data).await
274 let user_operation = UserOperationApub::from_str(op)?;
275 let passed = match user_operation {
276 UserOperationApub::Search => rate_limiter.search().check(ip),
277 _ => rate_limiter.message().check(ip),
279 check_rate_limit_passed(passed)?;
280 match_websocket_operation_apub(context, connection_id, user_operation, data).await
284 fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
288 // if rate limit was hit, respond with message
289 Err(LemmyError::from_message("rate_limit_error"))
293 pub async fn match_websocket_operation_crud(
294 context: LemmyContext,
296 op: UserOperationCrud,
298 ) -> result::Result<String, LemmyError> {
301 UserOperationCrud::Register => {
302 do_websocket_operation_crud::<Register>(context, id, op, data).await
304 UserOperationCrud::DeleteAccount => {
305 do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
308 // Private Message ops
309 UserOperationCrud::CreatePrivateMessage => {
310 do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
312 UserOperationCrud::EditPrivateMessage => {
313 do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
315 UserOperationCrud::DeletePrivateMessage => {
316 do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
318 UserOperationCrud::GetPrivateMessages => {
319 do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
323 UserOperationCrud::CreateSite => {
324 do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
326 UserOperationCrud::EditSite => {
327 do_websocket_operation_crud::<EditSite>(context, id, op, data).await
329 UserOperationCrud::GetSite => {
330 do_websocket_operation_crud::<GetSite>(context, id, op, data).await
334 UserOperationCrud::ListCommunities => {
335 do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
337 UserOperationCrud::CreateCommunity => {
338 do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
340 UserOperationCrud::EditCommunity => {
341 do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
343 UserOperationCrud::DeleteCommunity => {
344 do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
346 UserOperationCrud::RemoveCommunity => {
347 do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
351 UserOperationCrud::CreatePost => {
352 do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
354 UserOperationCrud::GetPost => {
355 do_websocket_operation_crud::<GetPost>(context, id, op, data).await
357 UserOperationCrud::EditPost => {
358 do_websocket_operation_crud::<EditPost>(context, id, op, data).await
360 UserOperationCrud::DeletePost => {
361 do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
363 UserOperationCrud::RemovePost => {
364 do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
368 UserOperationCrud::CreateComment => {
369 do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
371 UserOperationCrud::EditComment => {
372 do_websocket_operation_crud::<EditComment>(context, id, op, data).await
374 UserOperationCrud::DeleteComment => {
375 do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
377 UserOperationCrud::RemoveComment => {
378 do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
380 UserOperationCrud::GetComment => {
381 do_websocket_operation_crud::<GetComment>(context, id, op, data).await
386 async fn do_websocket_operation_crud<'a, 'b, Data>(
387 context: LemmyContext,
389 op: UserOperationCrud,
391 ) -> result::Result<String, LemmyError>
393 Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response>,
394 for<'de> Data: Deserialize<'de>,
396 let parsed_data: Data = serde_json::from_str(data)?;
397 let res = parsed_data
398 .perform(&web::Data::new(context.clone()), Some(id))
400 SendActivity::send_activity(&parsed_data, &res, &context).await?;
401 serialize_websocket_message(&op, &res)
404 pub async fn match_websocket_operation_apub(
405 context: LemmyContext,
407 op: UserOperationApub,
409 ) -> result::Result<String, LemmyError> {
411 UserOperationApub::GetPersonDetails => {
412 do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
414 UserOperationApub::GetCommunity => {
415 do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
417 UserOperationApub::GetComments => {
418 do_websocket_operation_apub::<GetComments>(context, id, op, data).await
420 UserOperationApub::GetPosts => {
421 do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
423 UserOperationApub::ResolveObject => {
424 do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
426 UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
430 async fn do_websocket_operation_apub<'a, 'b, Data>(
431 context: LemmyContext,
433 op: UserOperationApub,
435 ) -> result::Result<String, LemmyError>
437 Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response>,
438 for<'de> Data: Deserialize<'de>,
440 let parsed_data: Data = serde_json::from_str(data)?;
441 let res = parsed_data
442 .perform(&web::Data::new(context.clone()), Some(id))
444 SendActivity::send_activity(&parsed_data, &res, &context).await?;
445 serialize_websocket_message(&op, &res)
448 pub async fn match_websocket_operation(
449 context: LemmyContext,
453 ) -> result::Result<String, LemmyError> {
456 UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
457 UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
458 UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
459 UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
460 UserOperation::GetUnreadRegistrationApplicationCount => {
461 do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
463 UserOperation::ListRegistrationApplications => {
464 do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
466 UserOperation::ApproveRegistrationApplication => {
467 do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
469 UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
470 UserOperation::GetBannedPersons => {
471 do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
473 UserOperation::BlockPerson => {
474 do_websocket_operation::<BlockPerson>(context, id, op, data).await
476 UserOperation::GetPersonMentions => {
477 do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
479 UserOperation::MarkPersonMentionAsRead => {
480 do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
482 UserOperation::MarkCommentReplyAsRead => {
483 do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
485 UserOperation::MarkAllAsRead => {
486 do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
488 UserOperation::PasswordReset => {
489 do_websocket_operation::<PasswordReset>(context, id, op, data).await
491 UserOperation::PasswordChange => {
492 do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
494 UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
495 UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
496 UserOperation::CommunityJoin => {
497 do_websocket_operation::<CommunityJoin>(context, id, op, data).await
499 UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
500 UserOperation::SaveUserSettings => {
501 do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
503 UserOperation::ChangePassword => {
504 do_websocket_operation::<ChangePassword>(context, id, op, data).await
506 UserOperation::GetReportCount => {
507 do_websocket_operation::<GetReportCount>(context, id, op, data).await
509 UserOperation::GetUnreadCount => {
510 do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
512 UserOperation::VerifyEmail => {
513 do_websocket_operation::<VerifyEmail>(context, id, op, data).await
516 // Private Message ops
517 UserOperation::MarkPrivateMessageAsRead => {
518 do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
520 UserOperation::CreatePrivateMessageReport => {
521 do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
523 UserOperation::ResolvePrivateMessageReport => {
524 do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
526 UserOperation::ListPrivateMessageReports => {
527 do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
531 UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
532 UserOperation::PurgePerson => {
533 do_websocket_operation::<PurgePerson>(context, id, op, data).await
535 UserOperation::PurgeCommunity => {
536 do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
538 UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
539 UserOperation::PurgeComment => {
540 do_websocket_operation::<PurgeComment>(context, id, op, data).await
542 UserOperation::TransferCommunity => {
543 do_websocket_operation::<TransferCommunity>(context, id, op, data).await
545 UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
548 UserOperation::FollowCommunity => {
549 do_websocket_operation::<FollowCommunity>(context, id, op, data).await
551 UserOperation::BlockCommunity => {
552 do_websocket_operation::<BlockCommunity>(context, id, op, data).await
554 UserOperation::BanFromCommunity => {
555 do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
557 UserOperation::AddModToCommunity => {
558 do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
562 UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
563 UserOperation::StickyPost => do_websocket_operation::<StickyPost>(context, id, op, data).await,
564 UserOperation::CreatePostLike => {
565 do_websocket_operation::<CreatePostLike>(context, id, op, data).await
567 UserOperation::MarkPostAsRead => {
568 do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
570 UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
571 UserOperation::CreatePostReport => {
572 do_websocket_operation::<CreatePostReport>(context, id, op, data).await
574 UserOperation::ListPostReports => {
575 do_websocket_operation::<ListPostReports>(context, id, op, data).await
577 UserOperation::ResolvePostReport => {
578 do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
580 UserOperation::GetSiteMetadata => {
581 do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
585 UserOperation::SaveComment => {
586 do_websocket_operation::<SaveComment>(context, id, op, data).await
588 UserOperation::CreateCommentLike => {
589 do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
591 UserOperation::CreateCommentReport => {
592 do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
594 UserOperation::ListCommentReports => {
595 do_websocket_operation::<ListCommentReports>(context, id, op, data).await
597 UserOperation::ResolveCommentReport => {
598 do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
603 async fn do_websocket_operation<'a, 'b, Data>(
604 context: LemmyContext,
608 ) -> result::Result<String, LemmyError>
610 Data: Perform + SendActivity<Response = <Data as Perform>::Response>,
611 for<'de> Data: Deserialize<'de>,
613 let parsed_data: Data = serde_json::from_str(data)?;
614 let res = parsed_data
615 .perform(&web::Data::new(context.clone()), Some(id))
617 SendActivity::send_activity(&parsed_data, &res, &context).await?;
618 serialize_websocket_message(&op, &res)