From: Dessalines Date: Mon, 24 Aug 2020 11:58:24 +0000 (-0400) Subject: Fixing broken websocket sends. Removing WebSocketInfo (#1098) X-Git-Url: http://these/git/%7B%60%24%7BwebArchiveUrl%7D/%22%7B%7D/%22https:/nerdica.net/search?a=commitdiff_plain;h=2d5a50e80fb8bac8ab8ee9b4e5ce86cdaab51257;p=lemmy.git Fixing broken websocket sends. Removing WebSocketInfo (#1098) --- diff --git a/server/src/api/comment.rs b/server/src/api/comment.rs index 24055d4d..3384993f 100644 --- a/server/src/api/comment.rs +++ b/server/src/api/comment.rs @@ -13,8 +13,8 @@ use crate::{ websocket::{ server::{JoinCommunityRoom, SendComment}, UserOperation, - WebsocketInfo, }, + ConnectionId, DbPool, LemmyContext, LemmyError, @@ -129,7 +129,7 @@ impl Perform for CreateComment { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &CreateComment = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -226,17 +226,15 @@ impl Perform for CreateComment { form_id: data.form_id.to_owned(), }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendComment { - op: UserOperation::CreateComment, - comment: res.clone(), - my_id: ws.id, - }); + context.chat_server().do_send(SendComment { + op: UserOperation::CreateComment, + comment: res.clone(), + websocket_id, + }); - // strip out the recipient_ids, so that - // users don't get double notifs - res.recipient_ids = Vec::new(); - } + // strip out the recipient_ids, so that + // users don't get double notifs + res.recipient_ids = Vec::new(); Ok(res) } @@ -249,7 +247,7 @@ impl Perform for EditComment { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &EditComment = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -311,17 +309,15 @@ impl Perform for EditComment { form_id: data.form_id.to_owned(), }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendComment { - op: UserOperation::EditComment, - comment: res.clone(), - my_id: ws.id, - }); + context.chat_server().do_send(SendComment { + op: UserOperation::EditComment, + comment: res.clone(), + websocket_id, + }); - // strip out the recipient_ids, so that - // users don't get double notifs - res.recipient_ids = Vec::new(); - } + // strip out the recipient_ids, so that + // users don't get double notifs + res.recipient_ids = Vec::new(); Ok(res) } @@ -334,7 +330,7 @@ impl Perform for DeleteComment { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &DeleteComment = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -398,17 +394,15 @@ impl Perform for DeleteComment { form_id: None, }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendComment { - op: UserOperation::DeleteComment, - comment: res.clone(), - my_id: ws.id, - }); + context.chat_server().do_send(SendComment { + op: UserOperation::DeleteComment, + comment: res.clone(), + websocket_id, + }); - // strip out the recipient_ids, so that - // users don't get double notifs - res.recipient_ids = Vec::new(); - } + // strip out the recipient_ids, so that + // users don't get double notifs + res.recipient_ids = Vec::new(); Ok(res) } @@ -421,7 +415,7 @@ impl Perform for RemoveComment { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &RemoveComment = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -495,17 +489,15 @@ impl Perform for RemoveComment { form_id: None, }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendComment { - op: UserOperation::RemoveComment, - comment: res.clone(), - my_id: ws.id, - }); + context.chat_server().do_send(SendComment { + op: UserOperation::RemoveComment, + comment: res.clone(), + websocket_id, + }); - // strip out the recipient_ids, so that - // users don't get double notifs - res.recipient_ids = Vec::new(); - } + // strip out the recipient_ids, so that + // users don't get double notifs + res.recipient_ids = Vec::new(); Ok(res) } @@ -518,7 +510,7 @@ impl Perform for MarkCommentAsRead { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &MarkCommentAsRead = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -590,7 +582,7 @@ impl Perform for SaveComment { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &SaveComment = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -634,7 +626,7 @@ impl Perform for CreateCommentLike { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &CreateCommentLike = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -726,17 +718,15 @@ impl Perform for CreateCommentLike { form_id: None, }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendComment { - op: UserOperation::CreateCommentLike, - comment: res.clone(), - my_id: ws.id, - }); + context.chat_server().do_send(SendComment { + op: UserOperation::CreateCommentLike, + comment: res.clone(), + websocket_id, + }); - // strip out the recipient_ids, so that - // users don't get double notifs - res.recipient_ids = Vec::new(); - } + // strip out the recipient_ids, so that + // users don't get double notifs + res.recipient_ids = Vec::new(); Ok(res) } @@ -749,7 +739,7 @@ impl Perform for GetComments { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &GetComments = &self; let user = get_user_from_jwt_opt(&data.auth, context.pool()).await?; @@ -777,17 +767,15 @@ impl Perform for GetComments { Err(_) => return Err(APIError::err("couldnt_get_comments").into()), }; - if let Some(ws) = websocket_info { + if let Some(id) = websocket_id { // You don't need to join the specific community room, bc this is already handled by // GetCommunity if data.community_id.is_none() { - if let Some(id) = ws.id { - // 0 is the "all" community - ws.chatserver.do_send(JoinCommunityRoom { - community_id: 0, - id, - }); - } + // 0 is the "all" community + context.chat_server().do_send(JoinCommunityRoom { + community_id: 0, + id, + }); } } diff --git a/server/src/api/community.rs b/server/src/api/community.rs index f5fffb7c..7b63c672 100644 --- a/server/src/api/community.rs +++ b/server/src/api/community.rs @@ -6,8 +6,8 @@ use crate::{ websocket::{ server::{GetCommunityUsersOnline, JoinCommunityRoom, SendCommunityRoomMessage}, UserOperation, - WebsocketInfo, }, + ConnectionId, }; use anyhow::Context; use lemmy_db::{ @@ -166,7 +166,7 @@ impl Perform for GetCommunity { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &GetCommunity = &self; let user = get_user_from_jwt_opt(&data.auth, context.pool()).await?; @@ -205,20 +205,17 @@ impl Perform for GetCommunity { Err(_e) => return Err(APIError::err("couldnt_find_community").into()), }; - let online = if let Some(ws) = websocket_info { - if let Some(id) = ws.id { - ws.chatserver.do_send(JoinCommunityRoom { - community_id: community.id, - id, - }); - } - ws.chatserver - .send(GetCommunityUsersOnline { community_id }) - .await - .unwrap_or(1) - } else { - 0 - }; + if let Some(id) = websocket_id { + context + .chat_server() + .do_send(JoinCommunityRoom { community_id, id }); + } + + let online = context + .chat_server() + .send(GetCommunityUsersOnline { community_id }) + .await + .unwrap_or(1); let res = GetCommunityResponse { community: community_view, @@ -238,7 +235,7 @@ impl Perform for CreateCommunity { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &CreateCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -333,7 +330,7 @@ impl Perform for EditCommunity { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &EditCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -403,7 +400,7 @@ impl Perform for EditCommunity { community: community_view, }; - send_community_websocket(&res, websocket_info, UserOperation::EditCommunity); + send_community_websocket(&res, context, websocket_id, UserOperation::EditCommunity); Ok(res) } @@ -416,7 +413,7 @@ impl Perform for DeleteCommunity { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &DeleteCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -459,7 +456,7 @@ impl Perform for DeleteCommunity { community: community_view, }; - send_community_websocket(&res, websocket_info, UserOperation::DeleteCommunity); + send_community_websocket(&res, context, websocket_id, UserOperation::DeleteCommunity); Ok(res) } @@ -472,7 +469,7 @@ impl Perform for RemoveCommunity { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &RemoveCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -527,7 +524,7 @@ impl Perform for RemoveCommunity { community: community_view, }; - send_community_websocket(&res, websocket_info, UserOperation::RemoveCommunity); + send_community_websocket(&res, context, websocket_id, UserOperation::RemoveCommunity); Ok(res) } @@ -540,7 +537,7 @@ impl Perform for ListCommunities { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &ListCommunities = &self; let user = get_user_from_jwt_opt(&data.auth, context.pool()).await?; @@ -582,7 +579,7 @@ impl Perform for FollowCommunity { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &FollowCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -643,7 +640,7 @@ impl Perform for GetFollowedCommunities { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &GetFollowedCommunities = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -670,7 +667,7 @@ impl Perform for BanFromCommunity { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &BanFromCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -757,14 +754,12 @@ impl Perform for BanFromCommunity { banned: data.ban, }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendCommunityRoomMessage { - op: UserOperation::BanFromCommunity, - response: res.clone(), - community_id: data.community_id, - my_id: ws.id, - }); - } + context.chat_server().do_send(SendCommunityRoomMessage { + op: UserOperation::BanFromCommunity, + response: res.clone(), + community_id, + websocket_id, + }); Ok(res) } @@ -777,7 +772,7 @@ impl Perform for AddModToCommunity { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &AddModToCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -824,14 +819,12 @@ impl Perform for AddModToCommunity { let res = AddModToCommunityResponse { moderators }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendCommunityRoomMessage { - op: UserOperation::AddModToCommunity, - response: res.clone(), - community_id: data.community_id, - my_id: ws.id, - }); - } + context.chat_server().do_send(SendCommunityRoomMessage { + op: UserOperation::AddModToCommunity, + response: res.clone(), + community_id, + websocket_id, + }); Ok(res) } @@ -844,7 +837,7 @@ impl Perform for TransferCommunity { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &TransferCommunity = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -957,20 +950,19 @@ impl Perform for TransferCommunity { pub fn send_community_websocket( res: &CommunityResponse, - websocket_info: Option, + context: &Data, + websocket_id: Option, op: UserOperation, ) { - if let Some(ws) = websocket_info { - // Strip out the user id and subscribed when sending to others - let mut res_sent = res.clone(); - res_sent.community.user_id = None; - res_sent.community.subscribed = None; - - ws.chatserver.do_send(SendCommunityRoomMessage { - op, - response: res_sent, - community_id: res.community.id, - my_id: ws.id, - }); - } + // Strip out the user id and subscribed when sending to others + let mut res_sent = res.clone(); + res_sent.community.user_id = None; + res_sent.community.subscribed = None; + + context.chat_server().do_send(SendCommunityRoomMessage { + op, + response: res_sent, + community_id: res.community.id, + websocket_id, + }); } diff --git a/server/src/api/mod.rs b/server/src/api/mod.rs index c64707f9..5f8706e0 100644 --- a/server/src/api/mod.rs +++ b/server/src/api/mod.rs @@ -1,11 +1,4 @@ -use crate::{ - api::claims::Claims, - blocking, - websocket::WebsocketInfo, - DbPool, - LemmyContext, - LemmyError, -}; +use crate::{api::claims::Claims, blocking, ConnectionId, DbPool, LemmyContext, LemmyError}; use actix_web::web::Data; use lemmy_db::{ community::*, @@ -48,7 +41,7 @@ pub trait Perform { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result; } diff --git a/server/src/api/post.rs b/server/src/api/post.rs index fa3c73be..5cb7e322 100644 --- a/server/src/api/post.rs +++ b/server/src/api/post.rs @@ -15,8 +15,8 @@ use crate::{ websocket::{ server::{GetPostUsersOnline, JoinCommunityRoom, JoinPostRoom, SendPost}, UserOperation, - WebsocketInfo, }, + ConnectionId, LemmyContext, LemmyError, }; @@ -146,7 +146,7 @@ impl Perform for CreatePost { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &CreatePost = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -247,13 +247,11 @@ impl Perform for CreatePost { let res = PostResponse { post: post_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendPost { - op: UserOperation::CreatePost, - post: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendPost { + op: UserOperation::CreatePost, + post: res.clone(), + websocket_id, + }); Ok(res) } @@ -266,7 +264,7 @@ impl Perform for GetPost { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &GetPost = &self; let user = get_user_from_jwt_opt(&data.auth, context.pool()).await?; @@ -304,20 +302,18 @@ impl Perform for GetPost { }) .await??; - let online = if let Some(ws) = websocket_info { - if let Some(id) = ws.id { - ws.chatserver.do_send(JoinPostRoom { - post_id: data.id, - id, - }); - } - ws.chatserver - .send(GetPostUsersOnline { post_id: data.id }) - .await - .unwrap_or(1) - } else { - 0 - }; + if let Some(id) = websocket_id { + context.chat_server().do_send(JoinPostRoom { + post_id: data.id, + id, + }); + } + + let online = context + .chat_server() + .send(GetPostUsersOnline { post_id: data.id }) + .await + .unwrap_or(1); // Return the jwt Ok(GetPostResponse { @@ -337,7 +333,7 @@ impl Perform for GetPosts { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &GetPosts = &self; let user = get_user_from_jwt_opt(&data.auth, context.pool()).await?; @@ -377,17 +373,15 @@ impl Perform for GetPosts { Err(_e) => return Err(APIError::err("couldnt_get_posts").into()), }; - if let Some(ws) = websocket_info { + if let Some(id) = websocket_id { // You don't need to join the specific community room, bc this is already handled by // GetCommunity if data.community_id.is_none() { - if let Some(id) = ws.id { - // 0 is the "all" community - ws.chatserver.do_send(JoinCommunityRoom { - community_id: 0, - id, - }); - } + // 0 is the "all" community + context.chat_server().do_send(JoinCommunityRoom { + community_id: 0, + id, + }); } } @@ -402,7 +396,7 @@ impl Perform for CreatePostLike { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &CreatePostLike = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -465,13 +459,11 @@ impl Perform for CreatePostLike { let res = PostResponse { post: post_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendPost { - op: UserOperation::CreatePostLike, - post: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendPost { + op: UserOperation::CreatePostLike, + post: res.clone(), + websocket_id, + }); Ok(res) } @@ -484,7 +476,7 @@ impl Perform for EditPost { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &EditPost = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -560,13 +552,11 @@ impl Perform for EditPost { let res = PostResponse { post: post_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendPost { - op: UserOperation::EditPost, - post: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendPost { + op: UserOperation::EditPost, + post: res.clone(), + websocket_id, + }); Ok(res) } @@ -579,7 +569,7 @@ impl Perform for DeletePost { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &DeletePost = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -618,13 +608,11 @@ impl Perform for DeletePost { let res = PostResponse { post: post_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendPost { - op: UserOperation::DeletePost, - post: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendPost { + op: UserOperation::DeletePost, + post: res.clone(), + websocket_id, + }); Ok(res) } @@ -637,7 +625,7 @@ impl Perform for RemovePost { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &RemovePost = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -687,13 +675,11 @@ impl Perform for RemovePost { let res = PostResponse { post: post_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendPost { - op: UserOperation::RemovePost, - post: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendPost { + op: UserOperation::RemovePost, + post: res.clone(), + websocket_id, + }); Ok(res) } @@ -706,7 +692,7 @@ impl Perform for LockPost { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &LockPost = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -747,13 +733,11 @@ impl Perform for LockPost { let res = PostResponse { post: post_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendPost { - op: UserOperation::LockPost, - post: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendPost { + op: UserOperation::LockPost, + post: res.clone(), + websocket_id, + }); Ok(res) } @@ -766,7 +750,7 @@ impl Perform for StickyPost { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &StickyPost = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -811,13 +795,11 @@ impl Perform for StickyPost { let res = PostResponse { post: post_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendPost { - op: UserOperation::StickyPost, - post: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendPost { + op: UserOperation::StickyPost, + post: res.clone(), + websocket_id, + }); Ok(res) } @@ -830,7 +812,7 @@ impl Perform for SavePost { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &SavePost = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; diff --git a/server/src/api/site.rs b/server/src/api/site.rs index a9e393d8..8f5f0e93 100644 --- a/server/src/api/site.rs +++ b/server/src/api/site.rs @@ -15,8 +15,8 @@ use crate::{ websocket::{ server::{GetUsersOnline, SendAllMessage}, UserOperation, - WebsocketInfo, }, + ConnectionId, LemmyContext, LemmyError, }; @@ -167,7 +167,7 @@ impl Perform for ListCategories { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let _data: &ListCategories = &self; @@ -185,7 +185,7 @@ impl Perform for GetModlog { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &GetModlog = &self; @@ -259,7 +259,7 @@ impl Perform for CreateSite { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &CreateSite = &self; @@ -300,7 +300,7 @@ impl Perform for EditSite { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &EditSite = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -337,13 +337,11 @@ impl Perform for EditSite { let res = SiteResponse { site: site_view }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendAllMessage { - op: UserOperation::EditSite, - response: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendAllMessage { + op: UserOperation::EditSite, + response: res.clone(), + websocket_id, + }); Ok(res) } @@ -356,7 +354,7 @@ impl Perform for GetSite { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &GetSite = &self; @@ -375,7 +373,7 @@ impl Perform for GetSite { captcha_uuid: None, captcha_answer: None, }; - let login_response = register.perform(context, websocket_info.clone()).await?; + let login_response = register.perform(context, websocket_id).await?; info!("Admin {} created", setup.admin_username); let create_site = CreateSite { @@ -388,7 +386,7 @@ impl Perform for GetSite { enable_nsfw: true, auth: login_response.jwt, }; - create_site.perform(context, websocket_info.clone()).await?; + create_site.perform(context, websocket_id).await?; info!("Site {} created", setup.site_name); Some(blocking(context.pool(), move |conn| SiteView::read(conn)).await??) } else { @@ -410,11 +408,11 @@ impl Perform for GetSite { let banned = blocking(context.pool(), move |conn| UserView::banned(conn)).await??; - let online = if let Some(ws) = websocket_info { - ws.chatserver.send(GetUsersOnline).await.unwrap_or(1) - } else { - 0 - }; + let online = context + .chat_server() + .send(GetUsersOnline) + .await + .unwrap_or(1); let my_user = get_user_from_jwt_opt(&data.auth, context.pool()) .await? @@ -444,7 +442,7 @@ impl Perform for Search { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &Search = &self; @@ -608,7 +606,7 @@ impl Perform for TransferSite { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &TransferSite = &self; let mut user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -671,7 +669,7 @@ impl Perform for GetSiteConfig { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &GetSiteConfig = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -692,7 +690,7 @@ impl Perform for SaveSiteConfig { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &SaveSiteConfig = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; diff --git a/server/src/api/user.rs b/server/src/api/user.rs index a84b8987..e97a6d33 100644 --- a/server/src/api/user.rs +++ b/server/src/api/user.rs @@ -14,8 +14,8 @@ use crate::{ websocket::{ server::{CaptchaItem, CheckCaptcha, JoinUserRoom, SendAllMessage, SendUserRoomMessage}, UserOperation, - WebsocketInfo, }, + ConnectionId, LemmyContext, LemmyError, }; @@ -303,7 +303,7 @@ impl Perform for Login { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &Login = &self; @@ -338,7 +338,7 @@ impl Perform for Register { async fn perform( &self, context: &Data, - websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &Register = &self; @@ -357,27 +357,22 @@ impl Perform for Register { // If its not the admin, check the captcha if !data.admin && Settings::get().captcha.enabled { - match websocket_info { - Some(ws) => { - let check = ws - .chatserver - .send(CheckCaptcha { - uuid: data - .captcha_uuid - .to_owned() - .unwrap_or_else(|| "".to_string()), - answer: data - .captcha_answer - .to_owned() - .unwrap_or_else(|| "".to_string()), - }) - .await?; - if !check { - return Err(APIError::err("captcha_incorrect").into()); - } - } - None => return Err(APIError::err("captcha_incorrect").into()), - }; + let check = context + .chat_server() + .send(CheckCaptcha { + uuid: data + .captcha_uuid + .to_owned() + .unwrap_or_else(|| "".to_string()), + answer: data + .captcha_answer + .to_owned() + .unwrap_or_else(|| "".to_string()), + }) + .await?; + if !check { + return Err(APIError::err("captcha_incorrect").into()); + } } check_slurs(&data.username)?; @@ -515,8 +510,8 @@ impl Perform for GetCaptcha { async fn perform( &self, - _context: &Data, - websocket_info: Option, + context: &Data, + _websocket_id: Option, ) -> Result { let captcha_settings = Settings::get().captcha; @@ -547,9 +542,8 @@ impl Perform for GetCaptcha { expires: naive_now() + Duration::minutes(10), // expires in 10 minutes }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(captcha_item); - } + // Stores the captcha item on the queue + context.chat_server().do_send(captcha_item); Ok(GetCaptchaResponse { ok: Some(CaptchaResponse { png, uuid, wav }), @@ -564,7 +558,7 @@ impl Perform for SaveUserSettings { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &SaveUserSettings = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -690,7 +684,7 @@ impl Perform for GetUserDetails { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &GetUserDetails = &self; let user = get_user_from_jwt_opt(&data.auth, context.pool()).await?; @@ -788,7 +782,7 @@ impl Perform for AddAdmin { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &AddAdmin = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -827,13 +821,11 @@ impl Perform for AddAdmin { let res = AddAdminResponse { admins }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendAllMessage { - op: UserOperation::AddAdmin, - response: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendAllMessage { + op: UserOperation::AddAdmin, + response: res.clone(), + websocket_id, + }); Ok(res) } @@ -846,7 +838,7 @@ impl Perform for BanUser { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &BanUser = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -909,13 +901,11 @@ impl Perform for BanUser { banned: data.ban, }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendAllMessage { - op: UserOperation::BanUser, - response: res.clone(), - my_id: ws.id, - }); - } + context.chat_server().do_send(SendAllMessage { + op: UserOperation::BanUser, + response: res.clone(), + websocket_id, + }); Ok(res) } @@ -928,7 +918,7 @@ impl Perform for GetReplies { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &GetReplies = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -960,7 +950,7 @@ impl Perform for GetUserMentions { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &GetUserMentions = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -992,7 +982,7 @@ impl Perform for MarkUserMentionAsRead { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &MarkUserMentionAsRead = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1034,7 +1024,7 @@ impl Perform for MarkAllAsRead { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &MarkAllAsRead = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1086,7 +1076,7 @@ impl Perform for DeleteAccount { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &DeleteAccount = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1123,7 +1113,7 @@ impl Perform for PasswordReset { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &PasswordReset = &self; @@ -1171,7 +1161,7 @@ impl Perform for PasswordChange { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &PasswordChange = &self; @@ -1212,7 +1202,7 @@ impl Perform for CreatePrivateMessage { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &CreatePrivateMessage = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1290,14 +1280,12 @@ impl Perform for CreatePrivateMessage { let res = PrivateMessageResponse { message }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendUserRoomMessage { - op: UserOperation::CreatePrivateMessage, - response: res.clone(), - recipient_id: recipient_user.id, - my_id: ws.id, - }); - } + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::CreatePrivateMessage, + response: res.clone(), + recipient_id, + websocket_id, + }); Ok(res) } @@ -1310,7 +1298,7 @@ impl Perform for EditPrivateMessage { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &EditPrivateMessage = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1349,14 +1337,12 @@ impl Perform for EditPrivateMessage { let res = PrivateMessageResponse { message }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendUserRoomMessage { - op: UserOperation::EditPrivateMessage, - response: res.clone(), - recipient_id, - my_id: ws.id, - }); - } + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::EditPrivateMessage, + response: res.clone(), + recipient_id, + websocket_id, + }); Ok(res) } @@ -1369,7 +1355,7 @@ impl Perform for DeletePrivateMessage { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &DeletePrivateMessage = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1414,14 +1400,12 @@ impl Perform for DeletePrivateMessage { let res = PrivateMessageResponse { message }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendUserRoomMessage { - op: UserOperation::DeletePrivateMessage, - response: res.clone(), - recipient_id, - my_id: ws.id, - }); - } + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::DeletePrivateMessage, + response: res.clone(), + recipient_id, + websocket_id, + }); Ok(res) } @@ -1434,7 +1418,7 @@ impl Perform for MarkPrivateMessageAsRead { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &MarkPrivateMessageAsRead = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1472,14 +1456,12 @@ impl Perform for MarkPrivateMessageAsRead { let res = PrivateMessageResponse { message }; - if let Some(ws) = websocket_info { - ws.chatserver.do_send(SendUserRoomMessage { - op: UserOperation::MarkPrivateMessageAsRead, - response: res.clone(), - recipient_id, - my_id: ws.id, - }); - } + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::MarkPrivateMessageAsRead, + response: res.clone(), + recipient_id, + websocket_id, + }); Ok(res) } @@ -1492,7 +1474,7 @@ impl Perform for GetPrivateMessages { async fn perform( &self, context: &Data, - _websocket_info: Option, + _websocket_id: Option, ) -> Result { let data: &GetPrivateMessages = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; @@ -1521,18 +1503,16 @@ impl Perform for UserJoin { async fn perform( &self, context: &Data, - websocket_info: Option, + websocket_id: Option, ) -> Result { let data: &UserJoin = &self; let user = get_user_from_jwt(&data.auth, context.pool()).await?; - if let Some(ws) = websocket_info { - if let Some(id) = ws.id { - ws.chatserver.do_send(JoinUserRoom { - user_id: user.id, - id, - }); - } + if let Some(ws_id) = websocket_id { + context.chat_server().do_send(JoinUserRoom { + user_id: user.id, + id: ws_id, + }); } Ok(UserJoinResponse { user_id: user.id }) diff --git a/server/src/apub/inbox/activities/create.rs b/server/src/apub/inbox/activities/create.rs index 696aca9e..caba560d 100644 --- a/server/src/apub/inbox/activities/create.rs +++ b/server/src/apub/inbox/activities/create.rs @@ -75,7 +75,7 @@ async fn receive_create_post( context.chat_server().do_send(SendPost { op: UserOperation::CreatePost, post: res, - my_id: None, + websocket_id: None, }); announce_if_community_is_local(create, &user, context).await?; @@ -128,7 +128,7 @@ async fn receive_create_comment( context.chat_server().do_send(SendComment { op: UserOperation::CreateComment, comment: res, - my_id: None, + websocket_id: None, }); announce_if_community_is_local(create, &user, context).await?; diff --git a/server/src/apub/inbox/activities/delete.rs b/server/src/apub/inbox/activities/delete.rs index c9ab042a..2a6689db 100644 --- a/server/src/apub/inbox/activities/delete.rs +++ b/server/src/apub/inbox/activities/delete.rs @@ -100,7 +100,7 @@ async fn receive_delete_post( context.chat_server().do_send(SendPost { op: UserOperation::EditPost, post: res, - my_id: None, + websocket_id: None, }); announce_if_community_is_local(delete, &user, context).await?; @@ -158,7 +158,7 @@ async fn receive_delete_comment( context.chat_server().do_send(SendComment { op: UserOperation::EditComment, comment: res, - my_id: None, + websocket_id: None, }); announce_if_community_is_local(delete, &user, context).await?; @@ -222,7 +222,7 @@ async fn receive_delete_community( op: UserOperation::EditCommunity, response: res, community_id, - my_id: None, + websocket_id: None, }); announce_if_community_is_local(delete, &user, context).await?; diff --git a/server/src/apub/inbox/activities/dislike.rs b/server/src/apub/inbox/activities/dislike.rs index e1d3006a..4d59dd47 100644 --- a/server/src/apub/inbox/activities/dislike.rs +++ b/server/src/apub/inbox/activities/dislike.rs @@ -85,7 +85,7 @@ async fn receive_dislike_post( context.chat_server().do_send(SendPost { op: UserOperation::CreatePostLike, post: res, - my_id: None, + websocket_id: None, }); announce_if_community_is_local(dislike, &user, context).await?; @@ -142,7 +142,7 @@ async fn receive_dislike_comment( context.chat_server().do_send(SendComment { op: UserOperation::CreateCommentLike, comment: res, - my_id: None, + websocket_id: None, }); announce_if_community_is_local(dislike, &user, context).await?; diff --git a/server/src/apub/inbox/activities/like.rs b/server/src/apub/inbox/activities/like.rs index 804c33c3..a3f19b3c 100644 --- a/server/src/apub/inbox/activities/like.rs +++ b/server/src/apub/inbox/activities/like.rs @@ -76,7 +76,7 @@ async fn receive_like_post(like: Like, context: &LemmyContext) -> Result Result<(), LemmyError> { settings.bind, settings.port ); + let chat_server = + ChatServer::startup(pool.clone(), rate_limiter.clone(), Client::default()).start(); + // Create Http server with websocket support HttpServer::new(move || { - let chat_server = - ChatServer::startup(pool.clone(), rate_limiter.clone(), Client::default()).start(); - let context = LemmyContext::create(pool.clone(), chat_server, Client::default()); + let context = LemmyContext::create(pool.clone(), chat_server.to_owned(), Client::default()); let settings = Settings::get(); let rate_limiter = rate_limiter.clone(); App::new() diff --git a/server/src/routes/api.rs b/server/src/routes/api.rs index 20190862..f2ee38d2 100644 --- a/server/src/routes/api.rs +++ b/server/src/routes/api.rs @@ -1,7 +1,6 @@ use crate::{ api::{comment::*, community::*, post::*, site::*, user::*, Perform}, rate_limit::RateLimit, - websocket::WebsocketInfo, LemmyContext, }; use actix_web::{error::ErrorBadRequest, *}; @@ -182,13 +181,8 @@ where Request: Perform, Request: Send + 'static, { - let ws_info = WebsocketInfo { - chatserver: context.chat_server().to_owned(), - id: None, - }; - let res = data - .perform(&context, Some(ws_info)) + .perform(&context, None) .await .map(|json| HttpResponse::Ok().json(json)) .map_err(ErrorBadRequest)?; diff --git a/server/src/websocket/mod.rs b/server/src/websocket/mod.rs index 2b0cd1bd..1430d89a 100644 --- a/server/src/websocket/mod.rs +++ b/server/src/websocket/mod.rs @@ -1,6 +1,5 @@ pub mod server; -use crate::ConnectionId; use actix::prelude::*; use diesel::{ r2d2::{ConnectionManager, Pool}, @@ -10,7 +9,6 @@ use log::{error, info}; use rand::{rngs::ThreadRng, Rng}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use server::ChatServer; use std::{ collections::{HashMap, HashSet}, str::FromStr, @@ -77,9 +75,3 @@ pub enum UserOperation { GetSiteConfig, SaveSiteConfig, } - -#[derive(Clone)] -pub struct WebsocketInfo { - pub chatserver: Addr, - pub id: Option, -} diff --git a/server/src/websocket/server.rs b/server/src/websocket/server.rs index bfc55f63..4d0a1c4d 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/server.rs @@ -58,7 +58,7 @@ pub struct StandardMessage { pub struct SendAllMessage { pub op: UserOperation, pub response: Response, - pub my_id: Option, + pub websocket_id: Option, } #[derive(Message)] @@ -67,7 +67,7 @@ pub struct SendUserRoomMessage { pub op: UserOperation, pub response: Response, pub recipient_id: UserId, - pub my_id: Option, + pub websocket_id: Option, } #[derive(Message)] @@ -76,7 +76,7 @@ pub struct SendCommunityRoomMessage { pub op: UserOperation, pub response: Response, pub community_id: CommunityId, - pub my_id: Option, + pub websocket_id: Option, } #[derive(Message)] @@ -84,7 +84,7 @@ pub struct SendCommunityRoomMessage { pub struct SendPost { pub op: UserOperation, pub post: PostResponse, - pub my_id: Option, + pub websocket_id: Option, } #[derive(Message)] @@ -92,7 +92,7 @@ pub struct SendPost { pub struct SendComment { pub op: UserOperation, pub comment: CommentResponse, - pub my_id: Option, + pub websocket_id: Option, } #[derive(Message)] @@ -285,7 +285,7 @@ impl ChatServer { op: &UserOperation, response: &Response, post_id: PostId, - my_id: Option, + websocket_id: Option, ) -> Result<(), LemmyError> where Response: Serialize, @@ -293,7 +293,7 @@ impl ChatServer { let res_str = &to_json_string(op, response)?; if let Some(sessions) = self.post_rooms.get(&post_id) { for id in sessions { - if let Some(my_id) = my_id { + if let Some(my_id) = websocket_id { if *id == my_id { continue; } @@ -309,7 +309,7 @@ impl ChatServer { op: &UserOperation, response: &Response, community_id: CommunityId, - my_id: Option, + websocket_id: Option, ) -> Result<(), LemmyError> where Response: Serialize, @@ -317,7 +317,7 @@ impl ChatServer { let res_str = &to_json_string(op, response)?; if let Some(sessions) = self.community_rooms.get(&community_id) { for id in sessions { - if let Some(my_id) = my_id { + if let Some(my_id) = websocket_id { if *id == my_id { continue; } @@ -332,14 +332,14 @@ impl ChatServer { &self, op: &UserOperation, response: &Response, - my_id: Option, + websocket_id: Option, ) -> Result<(), LemmyError> where Response: Serialize, { let res_str = &to_json_string(op, response)?; for id in self.sessions.keys() { - if let Some(my_id) = my_id { + if let Some(my_id) = websocket_id { if *id == my_id { continue; } @@ -354,7 +354,7 @@ impl ChatServer { op: &UserOperation, response: &Response, recipient_id: UserId, - my_id: Option, + websocket_id: Option, ) -> Result<(), LemmyError> where Response: Serialize, @@ -362,7 +362,7 @@ impl ChatServer { let res_str = &to_json_string(op, response)?; if let Some(sessions) = self.user_rooms.get(&recipient_id) { for id in sessions { - if let Some(my_id) = my_id { + if let Some(my_id) = websocket_id { if *id == my_id { continue; } @@ -377,7 +377,7 @@ impl ChatServer { &self, user_operation: &UserOperation, comment: &CommentResponse, - my_id: Option, + websocket_id: Option, ) -> Result<(), LemmyError> { let mut comment_reply_sent = comment.clone(); comment_reply_sent.comment.my_vote = None; @@ -391,21 +391,26 @@ impl ChatServer { user_operation, &comment_post_sent, comment_post_sent.comment.post_id, - my_id, + websocket_id, )?; // Send it to the recipient(s) including the mentioned users for recipient_id in &comment_reply_sent.recipient_ids { - self.send_user_room_message(user_operation, &comment_reply_sent, *recipient_id, my_id)?; + self.send_user_room_message( + user_operation, + &comment_reply_sent, + *recipient_id, + websocket_id, + )?; } // Send it to the community too - self.send_community_room_message(user_operation, &comment_post_sent, 0, my_id)?; + self.send_community_room_message(user_operation, &comment_post_sent, 0, websocket_id)?; self.send_community_room_message( user_operation, &comment_post_sent, comment.comment.community_id, - my_id, + websocket_id, )?; Ok(()) @@ -415,7 +420,7 @@ impl ChatServer { &self, user_operation: &UserOperation, post: &PostResponse, - my_id: Option, + websocket_id: Option, ) -> Result<(), LemmyError> { let community_id = post.post.community_id; @@ -425,11 +430,11 @@ impl ChatServer { post_sent.post.user_id = None; // Send it to /c/all and that community - self.send_community_room_message(user_operation, &post_sent, 0, my_id)?; - self.send_community_room_message(user_operation, &post_sent, community_id, my_id)?; + self.send_community_room_message(user_operation, &post_sent, 0, websocket_id)?; + self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?; // Send it to the post room - self.send_post_room_message(user_operation, &post_sent, post.post.id, my_id)?; + self.send_post_room_message(user_operation, &post_sent, post.post.id, websocket_id)?; Ok(()) } @@ -471,7 +476,7 @@ impl ChatServer { client, }; let args = Args { - context: &context, + context, rate_limiter, id: msg.id, ip, @@ -565,7 +570,7 @@ impl ChatServer { } struct Args<'a> { - context: &'a LemmyContext, + context: LemmyContext, rate_limiter: RateLimit, id: ConnectionId, ip: IPAddr, @@ -587,18 +592,13 @@ where data, } = args; - let ws_info = WebsocketInfo { - chatserver: context.chat_server().to_owned(), - id: Some(id), - }; - let data = data.to_string(); let op2 = op.clone(); let fut = async move { let parsed_data: Data = serde_json::from_str(&data)?; let res = parsed_data - .perform(&web::Data::new(context.to_owned()), Some(ws_info)) + .perform(&web::Data::new(context), Some(id)) .await?; to_json_string(&op, &res) }; @@ -692,7 +692,7 @@ where fn handle(&mut self, msg: SendAllMessage, _: &mut Context) { self - .send_all_message(&msg.op, &msg.response, msg.my_id) + .send_all_message(&msg.op, &msg.response, msg.websocket_id) .ok(); } } @@ -705,7 +705,7 @@ where fn handle(&mut self, msg: SendUserRoomMessage, _: &mut Context) { self - .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.my_id) + .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.websocket_id) .ok(); } } @@ -718,7 +718,7 @@ where fn handle(&mut self, msg: SendCommunityRoomMessage, _: &mut Context) { self - .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.my_id) + .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.websocket_id) .ok(); } } @@ -727,7 +727,7 @@ impl Handler for ChatServer { type Result = (); fn handle(&mut self, msg: SendPost, _: &mut Context) { - self.send_post(&msg.op, &msg.post, msg.my_id).ok(); + self.send_post(&msg.op, &msg.post, msg.websocket_id).ok(); } } @@ -735,7 +735,9 @@ impl Handler for ChatServer { type Result = (); fn handle(&mut self, msg: SendComment, _: &mut Context) { - self.send_comment(&msg.op, &msg.comment, msg.my_id).ok(); + self + .send_comment(&msg.op, &msg.comment, msg.websocket_id) + .ok(); } }