* Making the chat server an actor.
- Fixes #2778
- #2787
* Forgot to add handlers folder.
* Some cleanup.
* Forgot to remove a comment.
* Address PR comments.
* Using ToString for enum operations.
checksum = "f728064aca1c318585bf4bb04ffcfac9e75e508ab4e8b1bd9ba5dfe04e2cbed5"
dependencies = [
"actix-rt",
+ "actix_derive",
"bitflags",
"bytes",
"crossbeam-channel",
]
[[package]]
-name = "actix-ws"
-version = "0.2.5"
+name = "actix_derive"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3"
+checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7"
dependencies = [
- "actix-codec",
- "actix-http",
- "actix-web",
- "futures-core",
- "tokio",
+ "proc-macro2 1.0.47",
+ "quote 1.0.21",
+ "syn 1.0.103",
]
[[package]]
name = "lemmy_api_common"
version = "0.17.1"
dependencies = [
+ "actix",
"actix-rt",
"actix-web",
- "actix-ws",
"anyhow",
"chrono",
"encoding",
version = "0.17.1"
dependencies = [
"activitypub_federation",
+ "actix",
"actix-rt",
"actix-web",
"anyhow",
version = "0.17.1"
dependencies = [
"activitypub_federation",
+ "actix",
"actix-rt",
"actix-web",
"actix-web-actors",
- "actix-ws",
"clokwerk",
"console-subscriber",
"diesel",
rand = "0.8.5"
opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
tracing-opentelemetry = { version = "0.17.2" }
-actix-ws = "0.2.0"
+actix = "0.13"
[dependencies]
lemmy_api = { workspace = true }
reqwest-retry = { workspace = true }
serde_json = { workspace = true }
futures = { workspace = true }
-actix-ws = { workspace = true }
+actix = { workspace = true }
tracing-opentelemetry = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
actix-web-actors = { version = "4.1.0", default-features = false }
comment::{CommentResponse, CreateCommentLike},
context::LemmyContext,
utils::{check_community_ban, check_downvotes_enabled, get_local_user_view_from_jwt},
- websocket::{send::send_comment_ws_message, UserOperation},
+ websocket::UserOperation,
};
use lemmy_db_schema::{
newtypes::LocalUserId,
.map_err(|e| LemmyError::from_error_message(e, "couldnt_like_comment"))?;
}
- send_comment_ws_message(
- data.comment_id,
- UserOperation::CreateCommentLike,
- websocket_id,
- None,
- Some(local_user_view.person.id),
- recipient_ids,
- context,
- )
- .await
+ context
+ .send_comment_ws_message(
+ &UserOperation::CreateCommentLike,
+ data.comment_id,
+ websocket_id,
+ None,
+ Some(local_user_view.person.id),
+ recipient_ids,
+ )
+ .await
}
}
comment_report_view,
};
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::CreateCommentReport,
- &res,
- comment_view.community.id,
- websocket_id,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::CreateCommentReport,
+ &res,
+ comment_view.community.id,
+ websocket_id,
+ )?;
Ok(res)
}
comment_report_view,
};
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::ResolveCommentReport,
- &res,
- report.community.id,
- websocket_id,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::ResolveCommentReport,
+ &res,
+ report.community.id,
+ websocket_id,
+ )?;
Ok(res)
}
let moderators = CommunityModeratorView::for_community(context.pool(), community_id).await?;
let res = AddModToCommunityResponse { moderators };
- context
- .chat_server()
- .send_community_room_message(
- &UserOperation::AddModToCommunity,
- &res,
- community_id,
- websocket_id,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::AddModToCommunity,
+ &res,
+ community_id,
+ websocket_id,
+ )?;
+
Ok(res)
}
}
community::{BanFromCommunity, BanFromCommunityResponse},
context::LemmyContext,
utils::{get_local_user_view_from_jwt, is_mod_or_admin, remove_user_data_in_community},
- websocket::UserOperation,
+ websocket::{
+ handlers::messages::SendCommunityRoomMessage,
+ serialize_websocket_message,
+ UserOperation,
+ },
};
use lemmy_db_schema::{
source::{
banned: data.ban,
};
- context
- .chat_server()
- .send_community_room_message(
- &UserOperation::BanFromCommunity,
- &res,
- community_id,
- websocket_id,
- )
- .await?;
+ // A custom ban message
+ let message = serialize_websocket_message(&UserOperation::BanFromCommunity, &res)?;
+ context.chat_server().do_send(SendCommunityRoomMessage {
+ community_id,
+ message,
+ websocket_id,
+ });
Ok(res)
}
community::{CommunityResponse, HideCommunity},
context::LemmyContext,
utils::{get_local_user_view_from_jwt, is_admin},
- websocket::{send::send_community_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
ModHideCommunity::create(context.pool(), &mod_hide_community_form).await?;
- let op = UserOperationCrud::EditCommunity;
- send_community_ws_message(data.community_id, op, websocket_id, None, context).await
+ context
+ .send_community_ws_message(
+ &UserOperationCrud::EditCommunity,
+ data.community_id,
+ websocket_id,
+ None,
+ )
+ .await
}
}
let res = AddAdminResponse { admins };
- context
- .chat_server()
- .send_all_message(UserOperation::AddAdmin, &res, websocket_id)
- .await?;
+ context.send_all_ws_message(&UserOperation::AddAdmin, &res, websocket_id)?;
Ok(res)
}
banned: data.ban,
};
- context
- .chat_server()
- .send_all_message(UserOperation::BanPerson, &res, websocket_id)
- .await?;
+ context.send_all_ws_message(&UserOperation::BanPerson, &res, websocket_id)?;
Ok(res)
}
use lemmy_api_common::{
context::LemmyContext,
person::{CaptchaResponse, GetCaptcha, GetCaptchaResponse},
- websocket::structs::CaptchaItem,
+ websocket::{handlers::captcha::AddCaptcha, structs::CaptchaItem},
};
use lemmy_db_schema::{source::local_site::LocalSite, utils::naive_now};
use lemmy_utils::{error::LemmyError, ConnectionId};
};
// Stores the captcha item on the queue
- context.chat_server().add_captcha(captcha_item)?;
+ context.chat_server().do_send(AddCaptcha {
+ captcha: captcha_item,
+ });
Ok(GetCaptchaResponse {
ok: Some(CaptchaResponse { png, wav, uuid }),
is_admin,
is_mod_or_admin,
},
- websocket::{send::send_post_ws_message, UserOperation},
+ websocket::UserOperation,
};
use lemmy_db_schema::{
source::{
ModFeaturePost::create(context.pool(), &form).await?;
- send_post_ws_message(
- data.post_id,
- UserOperation::FeaturePost,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await
+ context
+ .send_post_ws_message(
+ &UserOperation::FeaturePost,
+ data.post_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await
}
}
get_local_user_view_from_jwt,
mark_post_as_read,
},
- websocket::{send::send_post_ws_message, UserOperation},
+ websocket::UserOperation,
};
use lemmy_db_schema::{
source::{
// Mark the post as read
mark_post_as_read(person_id, post_id, context.pool()).await?;
- send_post_ws_message(
- data.post_id,
- UserOperation::CreatePostLike,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await
+ context
+ .send_post_ws_message(
+ &UserOperation::CreatePostLike,
+ data.post_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await
}
}
get_local_user_view_from_jwt,
is_mod_or_admin,
},
- websocket::{send::send_post_ws_message, UserOperation},
+ websocket::UserOperation,
};
use lemmy_db_schema::{
source::{
};
ModLockPost::create(context.pool(), &form).await?;
- send_post_ws_message(
- data.post_id,
- UserOperation::LockPost,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await
+ context
+ .send_post_ws_message(
+ &UserOperation::LockPost,
+ data.post_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await
}
}
let res = PostReportResponse { post_report_view };
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::CreatePostReport,
- &res,
- post_view.community.id,
- websocket_id,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::CreatePostReport,
+ &res,
+ post_view.community.id,
+ websocket_id,
+ )?;
Ok(res)
}
let res = PostReportResponse { post_report_view };
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::ResolvePostReport,
- &res,
- report.community.id,
- websocket_id,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::ResolvePostReport,
+ &res,
+ report.community.id,
+ websocket_id,
+ )?;
Ok(res)
}
context::LemmyContext,
private_message::{MarkPrivateMessageAsRead, PrivateMessageResponse},
utils::get_local_user_view_from_jwt,
- websocket::{send::send_pm_ws_message, UserOperation},
+ websocket::UserOperation,
};
use lemmy_db_schema::{
source::private_message::{PrivateMessage, PrivateMessageUpdateForm},
.map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?;
// No need to send an apub update
- let op = UserOperation::MarkPrivateMessageAsRead;
- send_pm_ws_message(data.private_message_id, op, websocket_id, context).await
+ context
+ .send_pm_ws_message(
+ &UserOperation::MarkPrivateMessageAsRead,
+ data.private_message_id,
+ websocket_id,
+ )
+ .await
}
}
private_message_report_view,
};
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::CreatePrivateMessageReport,
- &res,
- CommunityId(0),
- websocket_id,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::CreatePrivateMessageReport,
+ &res,
+ CommunityId(0),
+ websocket_id,
+ )?;
// TODO: consider federating this
private_message_report_view,
};
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::ResolvePrivateMessageReport,
- &res,
- CommunityId(0),
- websocket_id,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::ResolvePrivateMessageReport,
+ &res,
+ CommunityId(0),
+ websocket_id,
+ )?;
Ok(res)
}
use lemmy_api_common::{
context::LemmyContext,
utils::get_local_user_view_from_jwt,
- websocket::structs::{
- CommunityJoin,
- CommunityJoinResponse,
- ModJoin,
- ModJoinResponse,
- PostJoin,
- PostJoinResponse,
- UserJoin,
- UserJoinResponse,
+ websocket::{
+ handlers::join_rooms::{JoinCommunityRoom, JoinModRoom, JoinPostRoom, JoinUserRoom},
+ structs::{
+ CommunityJoin,
+ CommunityJoinResponse,
+ ModJoin,
+ ModJoinResponse,
+ PostJoin,
+ PostJoinResponse,
+ UserJoin,
+ UserJoinResponse,
+ },
},
};
use lemmy_utils::{error::LemmyError, ConnectionId};
let local_user_view =
get_local_user_view_from_jwt(&data.auth, context.pool(), context.secret()).await?;
- if let Some(ws_id) = websocket_id {
- context
- .chat_server()
- .join_user_room(local_user_view.local_user.id, ws_id)?;
+ if let Some(id) = websocket_id {
+ context.chat_server().do_send(JoinUserRoom {
+ user_id: local_user_view.local_user.id,
+ id,
+ });
}
Ok(UserJoinResponse { joined: true })
) -> Result<CommunityJoinResponse, LemmyError> {
let data: &CommunityJoin = self;
- if let Some(ws_id) = websocket_id {
- context
- .chat_server()
- .join_community_room(data.community_id, ws_id)?;
+ if let Some(id) = websocket_id {
+ context.chat_server().do_send(JoinCommunityRoom {
+ community_id: data.community_id,
+ id,
+ });
}
Ok(CommunityJoinResponse { joined: true })
) -> Result<ModJoinResponse, LemmyError> {
let data: &ModJoin = self;
- if let Some(ws_id) = websocket_id {
- context
- .chat_server()
- .join_mod_room(data.community_id, ws_id)?;
+ if let Some(id) = websocket_id {
+ context.chat_server().do_send(JoinModRoom {
+ community_id: data.community_id,
+ id,
+ });
}
Ok(ModJoinResponse { joined: true })
) -> Result<PostJoinResponse, LemmyError> {
let data: &PostJoin = self;
- if let Some(ws_id) = websocket_id {
- context.chat_server().join_post_room(data.post_id, ws_id)?;
+ if let Some(id) = websocket_id {
+ context.chat_server().do_send(JoinPostRoom {
+ post_id: data.post_id,
+ id,
+ });
}
Ok(PostJoinResponse { joined: true })
anyhow = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
-actix-ws = { workspace = true }
+actix = { workspace = true }
futures = { workspace = true }
uuid = { workspace = true }
actix-rt = { workspace = true }
use crate::websocket::chat_server::ChatServer;
+use actix::Addr;
use lemmy_db_schema::{source::secret::Secret, utils::DbPool};
use lemmy_utils::{
rate_limit::RateLimitCell,
#[derive(Clone)]
pub struct LemmyContext {
pool: DbPool,
- chat_server: Arc<ChatServer>,
+ chat_server: Addr<ChatServer>,
client: Arc<ClientWithMiddleware>,
secret: Arc<Secret>,
rate_limit_cell: RateLimitCell,
impl LemmyContext {
pub fn create(
pool: DbPool,
- chat_server: Arc<ChatServer>,
+ chat_server: Addr<ChatServer>,
client: ClientWithMiddleware,
secret: Secret,
rate_limit_cell: RateLimitCell,
pub fn pool(&self) -> &DbPool {
&self.pool
}
- pub fn chat_server(&self) -> &Arc<ChatServer> {
+ pub fn chat_server(&self) -> &Addr<ChatServer> {
&self.chat_server
}
pub fn client(&self) -> &ClientWithMiddleware {
-use crate::{
- comment::CommentResponse,
- post::PostResponse,
- websocket::{serialize_websocket_message, structs::CaptchaItem, OperationType},
+use crate::websocket::{
+ handlers::{SessionInfo, WsMessage},
+ structs::CaptchaItem,
};
-use actix_ws::Session;
-use anyhow::Context as acontext;
-use futures::future::join_all;
+use actix::{Actor, Context};
use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId};
-use lemmy_utils::{error::LemmyError, location_info, ConnectionId};
+use lemmy_utils::ConnectionId;
use rand::{rngs::StdRng, SeedableRng};
-use serde::Serialize;
-use std::{
- collections::{HashMap, HashSet},
- sync::{Mutex, MutexGuard},
-};
-use tracing::log::warn;
+use std::collections::{HashMap, HashSet};
-/// `ChatServer` manages chat rooms and responsible for coordinating chat
-/// session.
pub struct ChatServer {
- inner: Mutex<ChatServerInner>,
-}
-
-pub struct ChatServerInner {
/// A map from generated random ID to session addr
- pub sessions: HashMap<ConnectionId, Session>,
+ pub sessions: HashMap<ConnectionId, SessionInfo>,
/// A map from post_id to set of connectionIDs
pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
/// And manages available rooms. Peers send messages to other peers in same
/// room through `ChatServer`.
impl ChatServer {
- pub fn startup() -> ChatServer {
+ pub fn new() -> ChatServer {
ChatServer {
- inner: Mutex::new(ChatServerInner {
- sessions: Default::default(),
- post_rooms: Default::default(),
- community_rooms: Default::default(),
- mod_rooms: Default::default(),
- user_rooms: Default::default(),
- rng: StdRng::from_entropy(),
- captchas: vec![],
- }),
- }
- }
-
- pub fn join_community_room(
- &self,
- community_id: CommunityId,
- id: ConnectionId,
- ) -> Result<(), LemmyError> {
- let mut inner = self.inner()?;
- // remove session from all rooms
- for sessions in inner.community_rooms.values_mut() {
- sessions.remove(&id);
- }
-
- // Also leave all post rooms
- // This avoids double messages
- for sessions in inner.post_rooms.values_mut() {
- sessions.remove(&id);
- }
-
- // If the room doesn't exist yet
- if inner.community_rooms.get_mut(&community_id).is_none() {
- inner.community_rooms.insert(community_id, HashSet::new());
- }
-
- inner
- .community_rooms
- .get_mut(&community_id)
- .context(location_info!())?
- .insert(id);
- Ok(())
- }
-
- pub fn join_mod_room(
- &self,
- community_id: CommunityId,
- id: ConnectionId,
- ) -> Result<(), LemmyError> {
- let mut inner = self.inner()?;
- // remove session from all rooms
- for sessions in inner.mod_rooms.values_mut() {
- sessions.remove(&id);
- }
-
- // If the room doesn't exist yet
- if inner.mod_rooms.get_mut(&community_id).is_none() {
- inner.mod_rooms.insert(community_id, HashSet::new());
- }
-
- inner
- .mod_rooms
- .get_mut(&community_id)
- .context(location_info!())?
- .insert(id);
- Ok(())
- }
-
- pub fn join_post_room(&self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
- let mut inner = self.inner()?;
- // remove session from all rooms
- for sessions in inner.post_rooms.values_mut() {
- sessions.remove(&id);
- }
-
- // Also leave all communities
- // This avoids double messages
- // TODO found a bug, whereby community messages like
- // delete and remove aren't sent, because
- // you left the community room
- for sessions in inner.community_rooms.values_mut() {
- sessions.remove(&id);
- }
-
- // If the room doesn't exist yet
- if inner.post_rooms.get_mut(&post_id).is_none() {
- inner.post_rooms.insert(post_id, HashSet::new());
- }
-
- inner
- .post_rooms
- .get_mut(&post_id)
- .context(location_info!())?
- .insert(id);
-
- Ok(())
- }
-
- pub fn join_user_room(&self, user_id: LocalUserId, id: ConnectionId) -> Result<(), LemmyError> {
- let mut inner = self.inner()?;
- // remove session from all rooms
- for sessions in inner.user_rooms.values_mut() {
- sessions.remove(&id);
- }
-
- // If the room doesn't exist yet
- if inner.user_rooms.get_mut(&user_id).is_none() {
- inner.user_rooms.insert(user_id, HashSet::new());
- }
-
- inner
- .user_rooms
- .get_mut(&user_id)
- .context(location_info!())?
- .insert(id);
- Ok(())
- }
-
- async fn send_post_room_message<OP, Response>(
- &self,
- op: &OP,
- response: &Response,
- post_id: PostId,
- websocket_id: Option<ConnectionId>,
- ) -> Result<(), LemmyError>
- where
- OP: OperationType + ToString,
- Response: Serialize,
- {
- let msg = serialize_websocket_message(op, response)?;
- let room = self.inner()?.post_rooms.get(&post_id).cloned();
- self.send_message_in_room(&msg, room, websocket_id).await?;
- Ok(())
- }
-
- /// Send message to all users viewing the given community.
- pub async fn send_community_room_message<OP, Response>(
- &self,
- op: &OP,
- response: &Response,
- community_id: CommunityId,
- websocket_id: Option<ConnectionId>,
- ) -> Result<(), LemmyError>
- where
- OP: OperationType + ToString,
- Response: Serialize,
- {
- let msg = serialize_websocket_message(op, response)?;
- let room = self.inner()?.community_rooms.get(&community_id).cloned();
- self.send_message_in_room(&msg, room, websocket_id).await?;
- Ok(())
- }
-
- /// Send message to mods of a given community. Set community_id = 0 to send to site admins.
- pub async fn send_mod_room_message<OP, Response>(
- &self,
- op: OP,
- response: &Response,
- community_id: CommunityId,
- websocket_id: Option<ConnectionId>,
- ) -> Result<(), LemmyError>
- where
- OP: OperationType + ToString,
- Response: Serialize,
- {
- let msg = serialize_websocket_message(&op, response)?;
- let room = self.inner()?.mod_rooms.get(&community_id).cloned();
- self.send_message_in_room(&msg, room, websocket_id).await?;
- Ok(())
- }
-
- pub async fn send_all_message<OP, Response>(
- &self,
- op: OP,
- response: &Response,
- exclude_connection: Option<ConnectionId>,
- ) -> Result<(), LemmyError>
- where
- OP: OperationType + ToString,
- Response: Serialize,
- {
- let msg = &serialize_websocket_message(&op, response)?;
- let sessions = self.inner()?.sessions.clone();
- // Note, this will ignore any errors, such as closed connections
- join_all(
- sessions
- .into_iter()
- .filter(|(id, _)| Some(id) != exclude_connection.as_ref())
- .map(|(_, mut s): (_, Session)| async move { s.text(msg).await }),
- )
- .await;
- Ok(())
- }
-
- pub async fn send_user_room_message<OP, Response>(
- &self,
- op: &OP,
- response: &Response,
- recipient_id: LocalUserId,
- websocket_id: Option<ConnectionId>,
- ) -> Result<(), LemmyError>
- where
- OP: OperationType + ToString,
- Response: Serialize,
- {
- let msg = serialize_websocket_message(op, response)?;
- let room = self.inner()?.user_rooms.get(&recipient_id).cloned();
- self.send_message_in_room(&msg, room, websocket_id).await?;
- Ok(())
- }
-
- pub async fn send_comment<OP>(
- &self,
- user_operation: &OP,
- comment: &CommentResponse,
- websocket_id: Option<ConnectionId>,
- ) -> Result<(), LemmyError>
- where
- OP: OperationType + ToString,
- {
- let mut comment_reply_sent = comment.clone();
-
- // Strip out my specific user info
- comment_reply_sent.comment_view.my_vote = None;
-
- // Send it to the post room
- let mut comment_post_sent = comment_reply_sent.clone();
- // Remove the recipients here to separate mentions / user messages from post or community comments
- comment_post_sent.recipient_ids = Vec::new();
- self
- .send_post_room_message(
- user_operation,
- &comment_post_sent,
- comment_post_sent.comment_view.post.id,
- websocket_id,
- )
- .await?;
-
- // Send it to the community too
- self
- .send_community_room_message(
- user_operation,
- &comment_post_sent,
- CommunityId(0),
- websocket_id,
- )
- .await?;
- self
- .send_community_room_message(
- user_operation,
- &comment_post_sent,
- comment.comment_view.community.id,
- websocket_id,
- )
- .await?;
-
- // Send it to the recipient(s) including the mentioned users
- for recipient_id in &comment_reply_sent.recipient_ids {
- self
- .send_user_room_message(
- user_operation,
- &comment_reply_sent,
- *recipient_id,
- websocket_id,
- )
- .await?;
+ sessions: Default::default(),
+ post_rooms: Default::default(),
+ community_rooms: Default::default(),
+ mod_rooms: Default::default(),
+ user_rooms: Default::default(),
+ rng: StdRng::from_entropy(),
+ captchas: vec![],
}
-
- Ok(())
- }
-
- pub async fn send_post<OP>(
- &self,
- user_operation: &OP,
- post_res: &PostResponse,
- websocket_id: Option<ConnectionId>,
- ) -> Result<(), LemmyError>
- where
- OP: OperationType + ToString,
- {
- let community_id = post_res.post_view.community.id;
-
- // Don't send my data with it
- let mut post_sent = post_res.clone();
- post_sent.post_view.my_vote = None;
-
- // Send it to /c/all and that community
- self
- .send_community_room_message(user_operation, &post_sent, CommunityId(0), websocket_id)
- .await?;
- self
- .send_community_room_message(user_operation, &post_sent, community_id, websocket_id)
- .await?;
-
- // Send it to the post room
- self
- .send_post_room_message(
- user_operation,
- &post_sent,
- post_res.post_view.post.id,
- websocket_id,
- )
- .await?;
-
- Ok(())
}
- /// Send websocket message in all sessions which joined a specific room.
- ///
- /// `message` - The json message body to send
- /// `room` - Connection IDs which should receive the message
- /// `exclude_connection` - Dont send to user who initiated the api call, as that
- /// would result in duplicate notification
- async fn send_message_in_room(
+ pub fn send_message(
&self,
+ connections: &HashSet<ConnectionId>,
message: &str,
- room: Option<HashSet<ConnectionId>>,
exclude_connection: Option<ConnectionId>,
- ) -> Result<(), LemmyError> {
- let mut session = self.inner()?.sessions.clone();
- if let Some(room) = room {
- // Note, this will ignore any errors, such as closed connections
- join_all(
- room
- .into_iter()
- .filter(|c| Some(c) != exclude_connection.as_ref())
- .filter_map(|c| session.remove(&c))
- .map(|mut s: Session| async move { s.text(message).await }),
- )
- .await;
+ ) {
+ for id in connections
+ .iter()
+ .filter(|c| Some(*c) != exclude_connection.as_ref())
+ {
+ if let Some(session) = self.sessions.get(id) {
+ session.addr.do_send(WsMessage(message.to_owned()));
+ }
}
- Ok(())
}
+}
- pub(in crate::websocket) fn inner(&self) -> Result<MutexGuard<'_, ChatServerInner>, LemmyError> {
- match self.inner.lock() {
- Ok(g) => Ok(g),
- Err(e) => {
- warn!("Failed to lock chatserver mutex: {}", e);
- Err(LemmyError::from_message("Failed to lock chatserver mutex"))
- }
- }
+impl Default for ChatServer {
+ fn default() -> Self {
+ Self::new()
}
}
+
+/// Make actor from `ChatServer`
+impl Actor for ChatServer {
+ /// We are going to use simple Context, we just need ability to communicate
+ /// with other actors.
+ type Context = Context<Self>;
+}
+++ /dev/null
-use crate::websocket::{chat_server::ChatServer, structs::CaptchaItem};
-use actix_ws::Session;
-use lemmy_db_schema::{
- newtypes::{CommunityId, PostId},
- utils::naive_now,
-};
-use lemmy_utils::{error::LemmyError, ConnectionId};
-use rand::Rng;
-
-impl ChatServer {
- /// Handler for Connect message.
- ///
- /// Register new session and assign unique id to this session
- pub fn handle_connect(&self, session: Session) -> Result<ConnectionId, LemmyError> {
- let mut inner = self.inner()?;
- // register session with random id
- let id = inner.rng.gen::<usize>();
-
- inner.sessions.insert(id, session);
- Ok(id)
- }
-
- /// Handler for Disconnect message.
- pub fn handle_disconnect(&self, connection_id: &ConnectionId) -> Result<(), LemmyError> {
- let mut inner = self.inner()?;
- // Remove connections from sessions and all 3 scopes
- if inner.sessions.remove(connection_id).is_some() {
- for sessions in inner.user_rooms.values_mut() {
- sessions.remove(connection_id);
- }
-
- for sessions in inner.post_rooms.values_mut() {
- sessions.remove(connection_id);
- }
-
- for sessions in inner.community_rooms.values_mut() {
- sessions.remove(connection_id);
- }
- }
- Ok(())
- }
-
- pub fn get_users_online(&self) -> Result<usize, LemmyError> {
- Ok(self.inner()?.sessions.len())
- }
-
- pub fn get_post_users_online(&self, post_id: PostId) -> Result<usize, LemmyError> {
- if let Some(users) = self.inner()?.post_rooms.get(&post_id) {
- Ok(users.len())
- } else {
- Ok(0)
- }
- }
-
- pub fn get_community_users_online(&self, community_id: CommunityId) -> Result<usize, LemmyError> {
- if let Some(users) = self.inner()?.community_rooms.get(&community_id) {
- Ok(users.len())
- } else {
- Ok(0)
- }
- }
-
- pub fn add_captcha(&self, captcha: CaptchaItem) -> Result<(), LemmyError> {
- self.inner()?.captchas.push(captcha);
- Ok(())
- }
-
- pub fn check_captcha(&self, uuid: String, answer: String) -> Result<bool, LemmyError> {
- let mut inner = self.inner()?;
- // Remove all the ones that are past the expire time
- inner.captchas.retain(|x| x.expires.gt(&naive_now()));
-
- let check = inner
- .captchas
- .iter()
- .any(|r| r.uuid == uuid && r.answer.to_lowercase() == answer.to_lowercase());
-
- // Remove this uuid so it can't be re-checked (Checks only work once)
- inner.captchas.retain(|x| x.uuid != uuid);
-
- Ok(check)
- }
-}
--- /dev/null
+use crate::websocket::{chat_server::ChatServer, structs::CaptchaItem};
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::utils::naive_now;
+
+/// Adding a Captcha
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct AddCaptcha {
+ pub captcha: CaptchaItem,
+}
+
+impl Handler<AddCaptcha> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: AddCaptcha, _: &mut Context<Self>) -> Self::Result {
+ self.captchas.push(msg.captcha);
+ }
+}
+
+/// Checking a Captcha
+#[derive(Message)]
+#[rtype(bool)]
+pub struct CheckCaptcha {
+ pub uuid: String,
+ pub answer: String,
+}
+
+impl Handler<CheckCaptcha> for ChatServer {
+ type Result = bool;
+
+ fn handle(&mut self, msg: CheckCaptcha, _: &mut Context<Self>) -> Self::Result {
+ // Remove all the ones that are past the expire time
+ self.captchas.retain(|x| x.expires.gt(&naive_now()));
+
+ let check = self
+ .captchas
+ .iter()
+ .any(|r| r.uuid == msg.uuid && r.answer.to_lowercase() == msg.answer.to_lowercase());
+
+ // Remove this uuid so it can't be re-checked (Checks only work once)
+ self.captchas.retain(|x| x.uuid != msg.uuid);
+
+ check
+ }
+}
--- /dev/null
+use crate::websocket::{
+ chat_server::ChatServer,
+ handlers::{SessionInfo, WsMessage},
+};
+use actix::{Context, Handler, Message, Recipient};
+use lemmy_utils::ConnectionId;
+use rand::Rng;
+
+/// New chat session is created
+#[derive(Message)]
+#[rtype(ConnectionId)]
+pub struct Connect {
+ pub addr: Recipient<WsMessage>,
+}
+
+/// Handler for Connect message.
+///
+/// Register new session and assign unique id to this session
+impl Handler<Connect> for ChatServer {
+ type Result = ConnectionId;
+
+ fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
+ // register session with random id
+ let id = self.rng.gen::<usize>();
+ let session = SessionInfo { addr: msg.addr };
+ self.sessions.insert(id, session);
+
+ // send id back
+ id
+ }
+}
+
+/// Session is disconnected
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct Disconnect {
+ pub id: ConnectionId,
+}
+
+/// Handler for Disconnect message.
+impl Handler<Disconnect> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
+ // remove address
+ if self.sessions.remove(&msg.id).is_some() {
+ // remove session from all rooms
+ for sessions in self.user_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+ for sessions in self.post_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+ for sessions in self.community_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+ for sessions in self.mod_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+ }
+ }
+}
--- /dev/null
+use crate::websocket::chat_server::ChatServer;
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId};
+use lemmy_utils::ConnectionId;
+use std::collections::HashSet;
+
+/// Joining a Post room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinPostRoom {
+ pub post_id: PostId,
+ pub id: ConnectionId,
+}
+
+impl Handler<JoinPostRoom> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: JoinPostRoom, _: &mut Context<Self>) -> Self::Result {
+ // remove session from all rooms
+ for sessions in self.post_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+
+ // Also leave all communities
+ // This avoids double messages
+ // TODO found a bug, whereby community messages like
+ // delete and remove aren't sent, because
+ // you left the community room
+ for sessions in self.community_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+
+ self
+ .post_rooms
+ .entry(msg.post_id)
+ .or_insert_with(HashSet::new)
+ .insert(msg.id);
+ }
+}
+
+/// Joining a Community Room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinCommunityRoom {
+ pub community_id: CommunityId,
+ pub id: ConnectionId,
+}
+
+impl Handler<JoinCommunityRoom> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context<Self>) -> Self::Result {
+ // remove session from all rooms
+ for sessions in self.community_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+
+ // Also leave all post rooms
+ // This avoids double messages
+ for sessions in self.post_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+
+ self
+ .community_rooms
+ .entry(msg.community_id)
+ .or_insert_with(HashSet::new)
+ .insert(msg.id);
+ }
+}
+
+/// Joining a Mod room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinModRoom {
+ pub community_id: CommunityId,
+ pub id: ConnectionId,
+}
+
+impl Handler<JoinModRoom> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: JoinModRoom, _: &mut Context<Self>) -> Self::Result {
+ // remove session from all rooms
+ for sessions in self.mod_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+
+ self
+ .mod_rooms
+ .entry(msg.community_id)
+ .or_insert_with(HashSet::new)
+ .insert(msg.id);
+ }
+}
+
+/// Joining a User room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinUserRoom {
+ pub user_id: LocalUserId,
+ pub id: ConnectionId,
+}
+
+impl Handler<JoinUserRoom> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: JoinUserRoom, _: &mut Context<Self>) -> Self::Result {
+ // remove session from all rooms
+ for sessions in self.user_rooms.values_mut() {
+ sessions.remove(&msg.id);
+ }
+
+ self
+ .user_rooms
+ .entry(msg.user_id)
+ .or_insert_with(HashSet::new)
+ .insert(msg.id);
+ }
+}
--- /dev/null
+use crate::websocket::chat_server::ChatServer;
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId};
+use lemmy_utils::ConnectionId;
+use std::collections::HashSet;
+
+/// Sending a post room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendPostRoomMessage {
+ pub post_id: PostId,
+ pub message: String,
+ pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendPostRoomMessage> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: SendPostRoomMessage, _: &mut Context<Self>) -> Self::Result {
+ let room_connections = self.post_rooms.get(&msg.post_id);
+ if let Some(connections) = room_connections {
+ self.send_message(connections, &msg.message, msg.websocket_id);
+ }
+ }
+}
+
+/// Sending a community room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendCommunityRoomMessage {
+ pub community_id: CommunityId,
+ pub message: String,
+ pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendCommunityRoomMessage> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: SendCommunityRoomMessage, _: &mut Context<Self>) -> Self::Result {
+ let room_connections = self.community_rooms.get(&msg.community_id);
+ if let Some(connections) = room_connections {
+ self.send_message(connections, &msg.message, msg.websocket_id);
+ }
+ }
+}
+
+/// Sending a mod room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendModRoomMessage {
+ pub community_id: CommunityId,
+ pub message: String,
+ pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendModRoomMessage> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: SendModRoomMessage, _: &mut Context<Self>) -> Self::Result {
+ let room_connections = self.community_rooms.get(&msg.community_id);
+ if let Some(connections) = room_connections {
+ self.send_message(connections, &msg.message, msg.websocket_id);
+ }
+ }
+}
+
+/// Sending a user room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendUserRoomMessage {
+ pub recipient_id: LocalUserId,
+ pub message: String,
+ pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendUserRoomMessage> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: SendUserRoomMessage, _: &mut Context<Self>) -> Self::Result {
+ let room_connections = self.user_rooms.get(&msg.recipient_id);
+ if let Some(connections) = room_connections {
+ self.send_message(connections, &msg.message, msg.websocket_id);
+ }
+ }
+}
+
+/// Sending a message to every session
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendAllMessage {
+ pub message: String,
+ pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendAllMessage> for ChatServer {
+ type Result = ();
+
+ fn handle(&mut self, msg: SendAllMessage, _: &mut Context<Self>) -> Self::Result {
+ let connections: HashSet<ConnectionId> = self.sessions.keys().cloned().collect();
+ self.send_message(&connections, &msg.message, msg.websocket_id);
+ }
+}
+
+///// Send websocket message in all sessions which joined a specific room.
+/////
+///// `message` - The json message body to send
+///// `room` - Connection IDs which should receive the message
+///// `exclude_connection` - Dont send to user who initiated the api call, as that
+///// would result in duplicate notification
+//async fn send_message_in_room(
+// &self,
+// message: &str,
+// room: Option<HashSet<ConnectionId>>,
+// exclude_connection: Option<ConnectionId>,
+//) -> Result<(), LemmyError> {
+// let mut session = self.inner()?.sessions.clone();
+// if let Some(room) = room {
+// // Note, this will ignore any errors, such as closed connections
+// join_all(
+// room
+// .into_iter()
+// .filter(|c| Some(c) != exclude_connection.as_ref())
+// .filter_map(|c| session.remove(&c))
+// .map(|mut s: Session| async move { s.text(message).await }),
+// )
+// .await;
+// }
+// Ok(())
+//}
+//}
--- /dev/null
+use actix::{Message, Recipient};
+
+pub mod captcha;
+pub mod connect;
+pub mod join_rooms;
+pub mod messages;
+pub mod online_users;
+
+/// A string message sent to a websocket session
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct WsMessage(pub String);
+
+// TODO move this?
+pub struct SessionInfo {
+ pub addr: Recipient<WsMessage>,
+ // pub ip: IpAddr
+}
--- /dev/null
+use crate::websocket::chat_server::ChatServer;
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::newtypes::{CommunityId, PostId};
+
+/// Getting the number of online connections
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetUsersOnline;
+
+/// Handler for Disconnect message.
+impl Handler<GetUsersOnline> for ChatServer {
+ type Result = usize;
+
+ fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context<Self>) -> Self::Result {
+ self.sessions.len()
+ }
+}
+
+/// Getting the number of post users online
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetPostUsersOnline {
+ pub post_id: PostId,
+}
+
+/// Handler for Disconnect message.
+impl Handler<GetPostUsersOnline> for ChatServer {
+ type Result = usize;
+
+ fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context<Self>) -> Self::Result {
+ self
+ .post_rooms
+ .get(&msg.post_id)
+ .map_or(1, std::collections::HashSet::len)
+ }
+}
+
+/// Getting the number of post users online
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetCommunityUsersOnline {
+ pub community_id: CommunityId,
+}
+
+/// Handler for Disconnect message.
+impl Handler<GetCommunityUsersOnline> for ChatServer {
+ type Result = usize;
+
+ fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context<Self>) -> Self::Result {
+ self
+ .community_rooms
+ .get(&msg.community_id)
+ .map_or(1, std::collections::HashSet::len)
+ }
+}
+use actix::{Message, Recipient};
use lemmy_utils::error::LemmyError;
use serde::Serialize;
pub mod send;
pub mod structs;
+/// A string message sent to a websocket session
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct WsMessage(pub String);
+
+pub struct SessionInfo {
+ pub addr: Recipient<WsMessage>,
+}
+
#[derive(Serialize)]
struct WebsocketResponse<T> {
op: String,
data: T,
}
-pub fn serialize_websocket_message<OP, Response>(
+pub fn serialize_websocket_message<Response, OP>(
op: &OP,
data: &Response,
) -> Result<String, LemmyError>
Search,
ResolveObject,
}
-
-pub trait OperationType {}
-
-impl OperationType for UserOperationCrud {}
-
-impl OperationType for UserOperation {}
-
-impl OperationType for UserOperationApub {}
+use super::{
+ handlers::messages::{
+ SendAllMessage,
+ SendCommunityRoomMessage,
+ SendModRoomMessage,
+ SendPostRoomMessage,
+ SendUserRoomMessage,
+ },
+ serialize_websocket_message,
+};
use crate::{
comment::CommentResponse,
community::CommunityResponse,
post::PostResponse,
private_message::PrivateMessageResponse,
utils::{check_person_block, get_interface_language, send_email_to_user},
- websocket::OperationType,
};
use lemmy_db_schema::{
newtypes::{CommentId, CommunityId, LocalUserId, PersonId, PostId, PrivateMessageId},
use lemmy_db_views::structs::{CommentView, LocalUserView, PostView, PrivateMessageView};
use lemmy_db_views_actor::structs::CommunityView;
use lemmy_utils::{error::LemmyError, utils::mention::MentionData, ConnectionId};
+use serde::Serialize;
+
+impl LemmyContext {
+ #[tracing::instrument(skip_all)]
+ pub async fn send_post_ws_message<OP>(
+ &self,
+ op: &OP,
+ post_id: PostId,
+ websocket_id: Option<ConnectionId>,
+ person_id: Option<PersonId>,
+ ) -> Result<PostResponse, LemmyError>
+ where
+ OP: ToString,
+ {
+ let post_view = PostView::read(self.pool(), post_id, person_id, Some(true)).await?;
+
+ let res = PostResponse { post_view };
+
+ // Send it to the post room
+ // Don't send my data with it
+ let mut post_sent = res.clone();
+ post_sent.post_view.my_vote = None;
+ let message = serialize_websocket_message(op, &post_sent)?;
+
+ self.chat_server().do_send(SendPostRoomMessage {
+ post_id,
+ message: message.clone(),
+ websocket_id,
+ });
+
+ // Send it to /c/all and that community
+ self.chat_server().do_send(SendCommunityRoomMessage {
+ community_id: CommunityId(0),
+ message: message.clone(),
+ websocket_id,
+ });
+
+ self.chat_server().do_send(SendCommunityRoomMessage {
+ community_id: post_sent.post_view.community.id,
+ message,
+ websocket_id,
+ });
+
+ Ok(res)
+ }
-#[tracing::instrument(skip_all)]
-pub async fn send_post_ws_message<OP: ToString + Send + OperationType + 'static>(
- post_id: PostId,
- op: OP,
- websocket_id: Option<ConnectionId>,
- person_id: Option<PersonId>,
- context: &LemmyContext,
-) -> Result<PostResponse, LemmyError> {
- let post_view = PostView::read(context.pool(), post_id, person_id, Some(true)).await?;
-
- let res = PostResponse { post_view };
-
- context
- .chat_server()
- .send_post(&op, &res, websocket_id)
- .await?;
-
- Ok(res)
-}
-
-// TODO: in many call sites in apub crate, we are setting an empty vec for recipient_ids,
-// we should get the actual recipient actors from somewhere
-#[tracing::instrument(skip_all)]
-pub async fn send_comment_ws_message_simple<OP: ToString + Send + OperationType + 'static>(
- comment_id: CommentId,
- op: OP,
- context: &LemmyContext,
-) -> Result<CommentResponse, LemmyError> {
- send_comment_ws_message(comment_id, op, None, None, None, vec![], context).await
-}
-
-#[tracing::instrument(skip_all)]
-pub async fn send_comment_ws_message<OP: ToString + Send + OperationType + 'static>(
- comment_id: CommentId,
- op: OP,
- websocket_id: Option<ConnectionId>,
- form_id: Option<String>,
- person_id: Option<PersonId>,
- recipient_ids: Vec<LocalUserId>,
- context: &LemmyContext,
-) -> Result<CommentResponse, LemmyError> {
- let view = CommentView::read(context.pool(), comment_id, person_id).await?;
-
- let mut res = CommentResponse {
- comment_view: view,
- recipient_ids,
- // The sent out form id should be null
- form_id: None,
- };
-
- context
- .chat_server()
- .send_comment(&op, &res, websocket_id)
- .await?;
-
- // The recipient_ids should be empty for returns
- res.recipient_ids = Vec::new();
- res.form_id = form_id;
-
- Ok(res)
-}
+ // TODO: in many call sites in apub crate, we are setting an empty vec for recipient_ids,
+ // we should get the actual recipient actors from somewhere
+ #[tracing::instrument(skip_all)]
+ pub async fn send_comment_ws_message_simple<OP>(
+ &self,
+ op: &OP,
+ comment_id: CommentId,
+ ) -> Result<CommentResponse, LemmyError>
+ where
+ OP: ToString,
+ {
+ self
+ .send_comment_ws_message(op, comment_id, None, None, None, vec![])
+ .await
+ }
-#[tracing::instrument(skip_all)]
-pub async fn send_community_ws_message<OP: ToString + Send + OperationType + 'static>(
- community_id: CommunityId,
- op: OP,
- websocket_id: Option<ConnectionId>,
- person_id: Option<PersonId>,
- context: &LemmyContext,
-) -> Result<CommunityResponse, LemmyError> {
- let community_view =
- CommunityView::read(context.pool(), community_id, person_id, Some(true)).await?;
- let discussion_languages = CommunityLanguage::read(context.pool(), community_id).await?;
-
- let mut res = CommunityResponse {
- community_view,
- discussion_languages,
- };
-
- // Strip out the person id and subscribed when sending to others
- res.community_view.subscribed = SubscribedType::NotSubscribed;
-
- context
- .chat_server()
- .send_community_room_message(&op, &res, res.community_view.community.id, websocket_id)
- .await?;
-
- Ok(res)
-}
+ #[tracing::instrument(skip_all)]
+ pub async fn send_comment_ws_message<OP>(
+ &self,
+ op: &OP,
+ comment_id: CommentId,
+ websocket_id: Option<ConnectionId>,
+ form_id: Option<String>,
+ person_id: Option<PersonId>,
+ recipient_ids: Vec<LocalUserId>,
+ ) -> Result<CommentResponse, LemmyError>
+ where
+ OP: ToString,
+ {
+ let view = CommentView::read(self.pool(), comment_id, person_id).await?;
+
+ let mut res = CommentResponse {
+ comment_view: view,
+ recipient_ids,
+ form_id,
+ };
+
+ // Strip out my specific user info
+ let mut sent_recipient_comment = res.clone();
+ sent_recipient_comment.form_id = None;
+ sent_recipient_comment.comment_view.my_vote = None;
+ let recipient_message = serialize_websocket_message(op, &sent_recipient_comment)?;
+
+ // Send it to the recipient(s) including the mentioned users
+ for recipient_id in &sent_recipient_comment.recipient_ids {
+ self.chat_server().do_send(SendUserRoomMessage {
+ recipient_id: *recipient_id,
+ message: recipient_message.clone(),
+ websocket_id,
+ });
+ }
-#[tracing::instrument(skip_all)]
-pub async fn send_pm_ws_message<OP: ToString + Send + OperationType + 'static>(
- private_message_id: PrivateMessageId,
- op: OP,
- websocket_id: Option<ConnectionId>,
- context: &LemmyContext,
-) -> Result<PrivateMessageResponse, LemmyError> {
- let view = PrivateMessageView::read(context.pool(), private_message_id).await?;
-
- let res = PrivateMessageResponse {
- private_message_view: view,
- };
-
- // Send notifications to the local recipient, if one exists
- if res.private_message_view.recipient.local {
- let recipient_id = res.private_message_view.recipient.id;
- let local_recipient = LocalUserView::read_person(context.pool(), recipient_id).await?;
-
- context
- .chat_server()
- .send_user_room_message(&op, &res, local_recipient.local_user.id, websocket_id)
- .await?;
+ // Remove the recipients here to separate mentions / user messages from post or community comments
+ let mut sent_post_comment = sent_recipient_comment;
+ sent_post_comment.recipient_ids = Vec::new();
+ let post_message = serialize_websocket_message(op, &sent_post_comment)?;
+
+ // Send it to the post room
+ self.chat_server().do_send(SendPostRoomMessage {
+ post_id: sent_post_comment.comment_view.post.id,
+ message: post_message.clone(),
+ websocket_id,
+ });
+
+ // Send it to the community too
+ self.chat_server().do_send(SendCommunityRoomMessage {
+ community_id: sent_post_comment.comment_view.community.id,
+ message: post_message,
+ websocket_id,
+ });
+ // TODO should I send it to all? Seems excessive
+ // self
+ // .send_community_room_message(
+ // user_operation,
+ // &comment_post_sent,
+ // CommunityId(0),
+ // websocket_id,
+ // )
+ // .await?;
+
+ // No need to return recipients
+ res.recipient_ids = Vec::new();
+
+ Ok(res)
}
- Ok(res)
-}
-
-#[tracing::instrument(skip_all)]
-pub async fn send_local_notifs(
- mentions: Vec<MentionData>,
- comment: &Comment,
- person: &Person,
- post: &Post,
- do_send_email: bool,
- context: &LemmyContext,
-) -> Result<Vec<LocalUserId>, LemmyError> {
- let mut recipient_ids = Vec::new();
- let inbox_link = format!("{}/inbox", context.settings().get_protocol_and_hostname());
-
- // Send the local mentions
- for mention in mentions
- .iter()
- .filter(|m| m.is_local(&context.settings().hostname) && m.name.ne(&person.name))
- .collect::<Vec<&MentionData>>()
+ #[tracing::instrument(skip_all)]
+ pub async fn send_community_ws_message<OP>(
+ &self,
+ op: &OP,
+ community_id: CommunityId,
+ websocket_id: Option<ConnectionId>,
+ person_id: Option<PersonId>,
+ ) -> Result<CommunityResponse, LemmyError>
+ where
+ OP: ToString,
{
- let mention_name = mention.name.clone();
- let user_view = LocalUserView::read_from_name(context.pool(), &mention_name).await;
- if let Ok(mention_user_view) = user_view {
- // TODO
- // At some point, make it so you can't tag the parent creator either
- // This can cause two notifications, one for reply and the other for mention
- recipient_ids.push(mention_user_view.local_user.id);
-
- let user_mention_form = PersonMentionInsertForm {
- recipient_id: mention_user_view.person.id,
- comment_id: comment.id,
- read: None,
- };
-
- // Allow this to fail softly, since comment edits might re-update or replace it
- // Let the uniqueness handle this fail
- PersonMention::create(context.pool(), &user_mention_form)
- .await
- .ok();
-
- // Send an email to those local users that have notifications on
- if do_send_email {
- let lang = get_interface_language(&mention_user_view);
- send_email_to_user(
- &mention_user_view,
- &lang.notification_mentioned_by_subject(&person.name),
- &lang.notification_mentioned_by_body(&comment.content, &inbox_link, &person.name),
- context.settings(),
- )
- }
- }
+ let community_view =
+ CommunityView::read(self.pool(), community_id, person_id, Some(true)).await?;
+ let discussion_languages = CommunityLanguage::read(self.pool(), community_id).await?;
+
+ let mut res = CommunityResponse {
+ community_view,
+ discussion_languages,
+ };
+
+ // Strip out the person id and subscribed when sending to others
+ res.community_view.subscribed = SubscribedType::NotSubscribed;
+ let message = serialize_websocket_message(op, &res)?;
+
+ self.chat_server().do_send(SendCommunityRoomMessage {
+ community_id: res.community_view.community.id,
+ message,
+ websocket_id,
+ });
+
+ Ok(res)
}
- // Send comment_reply to the parent commenter / poster
- if let Some(parent_comment_id) = comment.parent_comment_id() {
- let parent_comment = Comment::read(context.pool(), parent_comment_id).await?;
+ #[tracing::instrument(skip_all)]
+ pub async fn send_pm_ws_message<OP>(
+ &self,
+ op: &OP,
+ private_message_id: PrivateMessageId,
+ websocket_id: Option<ConnectionId>,
+ ) -> Result<PrivateMessageResponse, LemmyError>
+ where
+ OP: ToString,
+ {
+ let view = PrivateMessageView::read(self.pool(), private_message_id).await?;
- // Get the parent commenter local_user
- let parent_creator_id = parent_comment.creator_id;
+ let res = PrivateMessageResponse {
+ private_message_view: view,
+ };
- // Only add to recipients if that person isn't blocked
- let creator_blocked = check_person_block(person.id, parent_creator_id, context.pool())
- .await
- .is_err();
+ // Send notifications to the local recipient, if one exists
+ if res.private_message_view.recipient.local {
+ let recipient_id = res.private_message_view.recipient.id;
+ let local_recipient = LocalUserView::read_person(self.pool(), recipient_id).await?;
- // Don't send a notif to yourself
- if parent_comment.creator_id != person.id && !creator_blocked {
- let user_view = LocalUserView::read_person(context.pool(), parent_creator_id).await;
- if let Ok(parent_user_view) = user_view {
- recipient_ids.push(parent_user_view.local_user.id);
+ let message = serialize_websocket_message(op, &res)?;
- let comment_reply_form = CommentReplyInsertForm {
- recipient_id: parent_user_view.person.id,
+ self.chat_server().do_send(SendUserRoomMessage {
+ recipient_id: local_recipient.local_user.id,
+ message,
+ websocket_id,
+ });
+ }
+
+ Ok(res)
+ }
+
+ #[tracing::instrument(skip_all)]
+ pub async fn send_local_notifs(
+ &self,
+ mentions: Vec<MentionData>,
+ comment: &Comment,
+ person: &Person,
+ post: &Post,
+ do_send_email: bool,
+ ) -> Result<Vec<LocalUserId>, LemmyError> {
+ let mut recipient_ids = Vec::new();
+ let inbox_link = format!("{}/inbox", self.settings().get_protocol_and_hostname());
+
+ // Send the local mentions
+ for mention in mentions
+ .iter()
+ .filter(|m| m.is_local(&self.settings().hostname) && m.name.ne(&person.name))
+ .collect::<Vec<&MentionData>>()
+ {
+ let mention_name = mention.name.clone();
+ let user_view = LocalUserView::read_from_name(self.pool(), &mention_name).await;
+ if let Ok(mention_user_view) = user_view {
+ // TODO
+ // At some point, make it so you can't tag the parent creator either
+ // This can cause two notifications, one for reply and the other for mention
+ recipient_ids.push(mention_user_view.local_user.id);
+
+ let user_mention_form = PersonMentionInsertForm {
+ recipient_id: mention_user_view.person.id,
comment_id: comment.id,
read: None,
};
// Allow this to fail softly, since comment edits might re-update or replace it
// Let the uniqueness handle this fail
- CommentReply::create(context.pool(), &comment_reply_form)
+ PersonMention::create(self.pool(), &user_mention_form)
.await
.ok();
+ // Send an email to those local users that have notifications on
if do_send_email {
- let lang = get_interface_language(&parent_user_view);
+ let lang = get_interface_language(&mention_user_view);
send_email_to_user(
- &parent_user_view,
- &lang.notification_comment_reply_subject(&person.name),
- &lang.notification_comment_reply_body(&comment.content, &inbox_link, &person.name),
- context.settings(),
+ &mention_user_view,
+ &lang.notification_mentioned_by_subject(&person.name),
+ &lang.notification_mentioned_by_body(&comment.content, &inbox_link, &person.name),
+ self.settings(),
)
}
}
}
- } else {
- // If there's no parent, its the post creator
- // Only add to recipients if that person isn't blocked
- let creator_blocked = check_person_block(person.id, post.creator_id, context.pool())
- .await
- .is_err();
- if post.creator_id != person.id && !creator_blocked {
- let creator_id = post.creator_id;
- let parent_user = LocalUserView::read_person(context.pool(), creator_id).await;
- if let Ok(parent_user_view) = parent_user {
- recipient_ids.push(parent_user_view.local_user.id);
+ // Send comment_reply to the parent commenter / poster
+ if let Some(parent_comment_id) = comment.parent_comment_id() {
+ let parent_comment = Comment::read(self.pool(), parent_comment_id).await?;
- let comment_reply_form = CommentReplyInsertForm {
- recipient_id: parent_user_view.person.id,
- comment_id: comment.id,
- read: None,
- };
+ // Get the parent commenter local_user
+ let parent_creator_id = parent_comment.creator_id;
- // Allow this to fail softly, since comment edits might re-update or replace it
- // Let the uniqueness handle this fail
- CommentReply::create(context.pool(), &comment_reply_form)
- .await
- .ok();
-
- if do_send_email {
- let lang = get_interface_language(&parent_user_view);
- send_email_to_user(
- &parent_user_view,
- &lang.notification_post_reply_subject(&person.name),
- &lang.notification_post_reply_body(&comment.content, &inbox_link, &person.name),
- context.settings(),
- )
+ // Only add to recipients if that person isn't blocked
+ let creator_blocked = check_person_block(person.id, parent_creator_id, self.pool())
+ .await
+ .is_err();
+
+ // Don't send a notif to yourself
+ if parent_comment.creator_id != person.id && !creator_blocked {
+ let user_view = LocalUserView::read_person(self.pool(), parent_creator_id).await;
+ if let Ok(parent_user_view) = user_view {
+ recipient_ids.push(parent_user_view.local_user.id);
+
+ let comment_reply_form = CommentReplyInsertForm {
+ recipient_id: parent_user_view.person.id,
+ comment_id: comment.id,
+ read: None,
+ };
+
+ // Allow this to fail softly, since comment edits might re-update or replace it
+ // Let the uniqueness handle this fail
+ CommentReply::create(self.pool(), &comment_reply_form)
+ .await
+ .ok();
+
+ if do_send_email {
+ let lang = get_interface_language(&parent_user_view);
+ send_email_to_user(
+ &parent_user_view,
+ &lang.notification_comment_reply_subject(&person.name),
+ &lang.notification_comment_reply_body(&comment.content, &inbox_link, &person.name),
+ self.settings(),
+ )
+ }
+ }
+ }
+ } else {
+ // If there's no parent, its the post creator
+ // Only add to recipients if that person isn't blocked
+ let creator_blocked = check_person_block(person.id, post.creator_id, self.pool())
+ .await
+ .is_err();
+
+ if post.creator_id != person.id && !creator_blocked {
+ let creator_id = post.creator_id;
+ let parent_user = LocalUserView::read_person(self.pool(), creator_id).await;
+ if let Ok(parent_user_view) = parent_user {
+ recipient_ids.push(parent_user_view.local_user.id);
+
+ let comment_reply_form = CommentReplyInsertForm {
+ recipient_id: parent_user_view.person.id,
+ comment_id: comment.id,
+ read: None,
+ };
+
+ // Allow this to fail softly, since comment edits might re-update or replace it
+ // Let the uniqueness handle this fail
+ CommentReply::create(self.pool(), &comment_reply_form)
+ .await
+ .ok();
+
+ if do_send_email {
+ let lang = get_interface_language(&parent_user_view);
+ send_email_to_user(
+ &parent_user_view,
+ &lang.notification_post_reply_subject(&person.name),
+ &lang.notification_post_reply_body(&comment.content, &inbox_link, &person.name),
+ self.settings(),
+ )
+ }
}
}
}
+
+ Ok(recipient_ids)
+ }
+
+ #[tracing::instrument(skip_all)]
+ pub fn send_all_ws_message<Data, OP>(
+ &self,
+ op: &OP,
+ data: Data,
+ websocket_id: Option<ConnectionId>,
+ ) -> Result<(), LemmyError>
+ where
+ Data: Serialize,
+ OP: ToString,
+ {
+ let message = serialize_websocket_message(op, &data)?;
+ self.chat_server().do_send(SendAllMessage {
+ message,
+ websocket_id,
+ });
+ Ok(())
}
- Ok(recipient_ids)
+ #[tracing::instrument(skip_all)]
+ pub fn send_mod_ws_message<Data, OP>(
+ &self,
+ op: &OP,
+ data: Data,
+ community_id: CommunityId,
+ websocket_id: Option<ConnectionId>,
+ ) -> Result<(), LemmyError>
+ where
+ Data: Serialize,
+ OP: ToString,
+ {
+ let message = serialize_websocket_message(op, &data)?;
+ self.chat_server().do_send(SendModRoomMessage {
+ community_id,
+ message,
+ websocket_id,
+ });
+ Ok(())
+ }
}
local_site_to_slur_regex,
EndpointType,
},
- websocket::{
- send::{send_comment_ws_message, send_local_notifs},
- UserOperationCrud,
- },
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
// Scan the comment for user mentions, add those rows
let post_id = post.id;
let mentions = scrape_text_for_mentions(&content_slurs_removed);
- let recipient_ids = send_local_notifs(
- mentions,
- &updated_comment,
- &local_user_view.person,
- &post,
- true,
- context,
- )
- .await?;
+ let recipient_ids = context
+ .send_local_notifs(
+ mentions,
+ &updated_comment,
+ &local_user_view.person,
+ &post,
+ true,
+ )
+ .await?;
// You like your own comment by default
let like_form = CommentLikeForm {
}
}
- send_comment_ws_message(
- inserted_comment.id,
- UserOperationCrud::CreateComment,
- websocket_id,
- data.form_id.clone(),
- Some(local_user_view.person.id),
- recipient_ids,
- context,
- )
- .await
+ context
+ .send_comment_ws_message(
+ &UserOperationCrud::CreateComment,
+ inserted_comment.id,
+ websocket_id,
+ data.form_id.clone(),
+ Some(local_user_view.person.id),
+ recipient_ids,
+ )
+ .await
}
}
comment::{CommentResponse, DeleteComment},
context::LemmyContext,
utils::{check_community_ban, get_local_user_view_from_jwt},
- websocket::{
- send::{send_comment_ws_message, send_local_notifs},
- UserOperationCrud,
- },
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
let post_id = updated_comment.post_id;
let post = Post::read(context.pool(), post_id).await?;
- let recipient_ids = send_local_notifs(
- vec![],
- &updated_comment,
- &local_user_view.person,
- &post,
- false,
- context,
- )
- .await?;
+ let recipient_ids = context
+ .send_local_notifs(
+ vec![],
+ &updated_comment,
+ &local_user_view.person,
+ &post,
+ false,
+ )
+ .await?;
- let res = send_comment_ws_message(
- data.comment_id,
- UserOperationCrud::DeleteComment,
- websocket_id,
- None, // TODO a comment delete might clear forms?
- Some(local_user_view.person.id),
- recipient_ids,
- context,
- )
- .await?;
+ let res = context
+ .send_comment_ws_message(
+ &UserOperationCrud::DeleteComment,
+ data.comment_id,
+ websocket_id,
+ None,
+ Some(local_user_view.person.id),
+ recipient_ids,
+ )
+ .await?;
Ok(res)
}
comment::{CommentResponse, RemoveComment},
context::LemmyContext,
utils::{check_community_ban, get_local_user_view_from_jwt, is_mod_or_admin},
- websocket::{
- send::{send_comment_ws_message, send_local_notifs},
- UserOperationCrud,
- },
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
let post_id = updated_comment.post_id;
let post = Post::read(context.pool(), post_id).await?;
- let recipient_ids = send_local_notifs(
- vec![],
- &updated_comment,
- &local_user_view.person.clone(),
- &post,
- false,
- context,
- )
- .await?;
+ let recipient_ids = context
+ .send_local_notifs(
+ vec![],
+ &updated_comment,
+ &local_user_view.person.clone(),
+ &post,
+ false,
+ )
+ .await?;
- let res = send_comment_ws_message(
- data.comment_id,
- UserOperationCrud::RemoveComment,
- websocket_id,
- None, // TODO maybe this might clear other forms
- Some(local_user_view.person.id),
- recipient_ids,
- context,
- )
- .await?;
+ let res = context
+ .send_comment_ws_message(
+ &UserOperationCrud::RemoveComment,
+ data.comment_id,
+ websocket_id,
+ None, // TODO maybe this might clear other forms
+ Some(local_user_view.person.id),
+ recipient_ids,
+ )
+ .await?;
Ok(res)
}
comment::{CommentResponse, EditComment},
context::LemmyContext,
utils::{check_community_ban, get_local_user_view_from_jwt, local_site_to_slur_regex},
- websocket::{
- send::{send_comment_ws_message, send_local_notifs},
- UserOperationCrud,
- },
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
// Do the mentions / recipients
let updated_comment_content = updated_comment.content.clone();
let mentions = scrape_text_for_mentions(&updated_comment_content);
- let recipient_ids = send_local_notifs(
- mentions,
- &updated_comment,
- &local_user_view.person,
- &orig_comment.post,
- false,
- context,
- )
- .await?;
+ let recipient_ids = context
+ .send_local_notifs(
+ mentions,
+ &updated_comment,
+ &local_user_view.person,
+ &orig_comment.post,
+ false,
+ )
+ .await?;
- send_comment_ws_message(
- data.comment_id,
- UserOperationCrud::EditComment,
- websocket_id,
- data.form_id.clone(),
- None,
- recipient_ids,
- context,
- )
- .await
+ context
+ .send_comment_ws_message(
+ &UserOperationCrud::EditComment,
+ data.comment_id,
+ websocket_id,
+ data.form_id.clone(),
+ None,
+ recipient_ids,
+ )
+ .await
}
}
community::{CommunityResponse, DeleteCommunity},
context::LemmyContext,
utils::{get_local_user_view_from_jwt, is_top_mod},
- websocket::{send::send_community_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::community::{Community, CommunityUpdateForm},
.await
.map_err(|e| LemmyError::from_error_message(e, "couldnt_update_community"))?;
- let res = send_community_ws_message(
- data.community_id,
- UserOperationCrud::DeleteCommunity,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await?;
+ let res = context
+ .send_community_ws_message(
+ &UserOperationCrud::DeleteCommunity,
+ data.community_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await?;
Ok(res)
}
community::{CommunityResponse, RemoveCommunity},
context::LemmyContext,
utils::{get_local_user_view_from_jwt, is_admin},
- websocket::{send::send_community_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
};
ModRemoveCommunity::create(context.pool(), &form).await?;
- let res = send_community_ws_message(
- data.community_id,
- UserOperationCrud::RemoveCommunity,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await?;
+ let res = context
+ .send_community_ws_message(
+ &UserOperationCrud::RemoveCommunity,
+ data.community_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await?;
Ok(res)
}
community::{CommunityResponse, EditCommunity},
context::LemmyContext,
utils::{get_local_user_view_from_jwt, local_site_to_slur_regex},
- websocket::{send::send_community_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
newtypes::PersonId,
.await
.map_err(|e| LemmyError::from_error_message(e, "couldnt_update_community"))?;
- let op = UserOperationCrud::EditCommunity;
- send_community_ws_message(data.community_id, op, websocket_id, None, context).await
+ context
+ .send_community_ws_message(
+ &UserOperationCrud::EditCommunity,
+ data.community_id,
+ websocket_id,
+ None,
+ )
+ .await
}
}
mark_post_as_read,
EndpointType,
},
- websocket::{send::send_post_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
impls::actor_language::default_post_language,
}
}
- send_post_ws_message(
- inserted_post.id,
- UserOperationCrud::CreatePost,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await
+ context
+ .send_post_ws_message(
+ &UserOperationCrud::CreatePost,
+ inserted_post.id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await
}
}
context::LemmyContext,
post::{DeletePost, PostResponse},
utils::{check_community_ban, check_community_deleted_or_removed, get_local_user_view_from_jwt},
- websocket::{send::send_post_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::post::{Post, PostUpdateForm},
)
.await?;
- let res = send_post_ws_message(
- data.post_id,
- UserOperationCrud::DeletePost,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await?;
+ let res = context
+ .send_post_ws_message(
+ &UserOperationCrud::DeletePost,
+ data.post_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await?;
Ok(res)
}
is_mod_or_admin_opt,
mark_post_as_read,
},
+ websocket::handlers::online_users::GetPostUsersOnline,
};
use lemmy_db_schema::{
aggregates::structs::{PersonPostAggregates, PersonPostAggregatesForm},
let moderators = CommunityModeratorView::for_community(context.pool(), community_id).await?;
- let online = context.chat_server().get_post_users_online(post_id)?;
+ let online = context
+ .chat_server()
+ .send(GetPostUsersOnline { post_id })
+ .await?;
// Return the jwt
Ok(GetPostResponse {
context::LemmyContext,
post::{PostResponse, RemovePost},
utils::{check_community_ban, get_local_user_view_from_jwt, is_mod_or_admin},
- websocket::{send::send_post_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
};
ModRemovePost::create(context.pool(), &form).await?;
- let res = send_post_ws_message(
- data.post_id,
- UserOperationCrud::RemovePost,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await?;
+ let res = context
+ .send_post_ws_message(
+ &UserOperationCrud::RemovePost,
+ data.post_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await?;
Ok(res)
}
post::{EditPost, PostResponse},
request::fetch_site_data,
utils::{check_community_ban, get_local_user_view_from_jwt, local_site_to_slur_regex},
- websocket::{send::send_post_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
return Err(LemmyError::from_error_message(e, err_type));
}
- send_post_ws_message(
- data.post_id,
- UserOperationCrud::EditPost,
- websocket_id,
- Some(local_user_view.person.id),
- context,
- )
- .await
+ context
+ .send_post_ws_message(
+ &UserOperationCrud::EditPost,
+ data.post_id,
+ websocket_id,
+ Some(local_user_view.person.id),
+ )
+ .await
}
}
send_email_to_user,
EndpointType,
},
- websocket::{send::send_pm_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
.await
.map_err(|e| LemmyError::from_error_message(e, "couldnt_create_private_message"))?;
- let res = send_pm_ws_message(
- inserted_private_message.id,
- UserOperationCrud::CreatePrivateMessage,
- websocket_id,
- context,
- )
- .await?;
+ let res = context
+ .send_pm_ws_message(
+ &UserOperationCrud::CreatePrivateMessage,
+ inserted_private_message.id,
+ websocket_id,
+ )
+ .await?;
// Send email to the local recipient, if one exists
if res.private_message_view.recipient.local {
context::LemmyContext,
private_message::{DeletePrivateMessage, PrivateMessageResponse},
utils::get_local_user_view_from_jwt,
- websocket::{send::send_pm_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::private_message::{PrivateMessage, PrivateMessageUpdateForm},
.await
.map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?;
- let op = UserOperationCrud::DeletePrivateMessage;
- send_pm_ws_message(data.private_message_id, op, websocket_id, context).await
+ context
+ .send_pm_ws_message(
+ &UserOperationCrud::DeletePrivateMessage,
+ data.private_message_id,
+ websocket_id,
+ )
+ .await
}
}
context::LemmyContext,
private_message::{EditPrivateMessage, PrivateMessageResponse},
utils::{get_local_user_view_from_jwt, local_site_to_slur_regex},
- websocket::{send::send_pm_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
.await
.map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?;
- let op = UserOperationCrud::EditPrivateMessage;
- send_pm_ws_message(data.private_message_id, op, websocket_id, context).await
+ context
+ .send_pm_ws_message(
+ &UserOperationCrud::EditPrivateMessage,
+ data.private_message_id,
+ websocket_id,
+ )
+ .await
}
}
context::LemmyContext,
site::{GetSite, GetSiteResponse, MyUserInfo},
utils::{build_federated_instances, get_local_user_settings_view_from_jwt_opt},
+ websocket::handlers::online_users::GetUsersOnline,
};
use lemmy_db_schema::source::{
actor_language::{LocalUserLanguage, SiteLanguage},
let admins = PersonView::admins(context.pool()).await?;
- let online = context.chat_server().get_users_online()?;
+ let online = context.chat_server().send(GetUsersOnline).await?;
// Build the local user
let my_user = if let Some(local_user_view) = get_local_user_settings_view_from_jwt_opt(
let res = SiteResponse { site_view };
- context
- .chat_server()
- .send_all_message(UserOperationCrud::EditSite, &res, websocket_id)
- .await?;
+ context.send_all_ws_message(&UserOperationCrud::EditSite, &res, websocket_id)?;
Ok(res)
}
send_verification_email,
EndpointType,
},
+ websocket::handlers::captcha::CheckCaptcha,
};
use lemmy_db_schema::{
aggregates::structs::PersonAggregates,
// If the site is set up, check the captcha
if local_site.site_setup && local_site.captcha_enabled {
- let check = context.chat_server().check_captcha(
- data.captcha_uuid.clone().unwrap_or_default(),
- data.captcha_answer.clone().unwrap_or_default(),
- )?;
+ let check = context
+ .chat_server()
+ .send(CheckCaptcha {
+ uuid: data.captcha_uuid.clone().unwrap_or_default(),
+ answer: data.captcha_answer.clone().unwrap_or_default(),
+ })
+ .await?;
if !check {
return Err(LemmyError::from_message("captcha_incorrect"));
}
serde = { workspace = true }
actix-web = { workspace = true }
actix-rt = { workspace = true }
+actix = { workspace = true }
tracing = { workspace = true }
strum_macros = { workspace = true }
url = { workspace = true }
let post_report_view = PostReportView::read(context.pool(), report.id, actor.id).await?;
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::CreateCommentReport,
- &PostReportResponse { post_report_view },
- post.community_id,
- None,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::CreateCommentReport,
+ &PostReportResponse { post_report_view },
+ post.community_id,
+ None,
+ )?;
}
PostOrComment::Comment(comment) => {
let report_form = CommentReportForm {
CommentReportView::read(context.pool(), report.id, actor.id).await?;
let community_id = comment_report_view.community.id;
- context
- .chat_server()
- .send_mod_room_message(
- UserOperation::CreateCommentReport,
- &CommentReportResponse {
- comment_report_view,
- },
- community_id,
- None,
- )
- .await?;
+ context.send_mod_ws_message(
+ &UserOperation::CreateCommentReport,
+ &CommentReportResponse {
+ comment_report_view,
+ },
+ community_id,
+ None,
+ )?;
}
};
Ok(())
community::{CommunityResponse, EditCommunity, HideCommunity},
context::LemmyContext,
utils::get_local_user_view_from_jwt,
- websocket::{send::send_community_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{source::community::Community, traits::Crud};
use lemmy_utils::error::LemmyError;
let updated_community =
Community::update(context.pool(), community.id, &community_update_form).await?;
- send_community_ws_message(
- updated_community.id,
- UserOperationCrud::EditCommunity,
- None,
- None,
- context,
- )
- .await?;
+ context
+ .send_community_ws_message(
+ &UserOperationCrud::EditCommunity,
+ updated_community.id,
+ None,
+ None,
+ )
+ .await?;
Ok(())
}
}
comment::{CommentResponse, CreateComment, EditComment},
context::LemmyContext,
utils::{check_post_deleted_or_removed, is_mod_or_admin},
- websocket::{send::send_comment_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
newtypes::PersonId,
CreateOrUpdateType::Create => UserOperationCrud::CreateComment,
CreateOrUpdateType::Update => UserOperationCrud::EditComment,
};
- send_comment_ws_message(
- comment.id, notif_type, None, None, None, recipients, context,
- )
- .await?;
+ context
+ .send_comment_ws_message(¬if_type, comment.id, None, None, None, recipients)
+ .await?;
Ok(())
}
}
use crate::objects::person::ApubPerson;
use activitypub_federation::{config::Data, fetch::object_id::ObjectId};
-use lemmy_api_common::{context::LemmyContext, websocket::send::send_local_notifs};
+use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
newtypes::LocalUserId,
source::{comment::Comment, post::Post},
// anyway.
// TODO: for compatibility with other projects, it would be much better to read this from cc or tags
let mentions = scrape_text_for_mentions(&comment.content);
- send_local_notifs(mentions, comment, &actor, &post, do_send_email, context).await
+ context
+ .send_local_notifs(mentions, comment, &actor, &post, do_send_email)
+ .await
}
use lemmy_api_common::{
context::LemmyContext,
post::{CreatePost, EditPost, PostResponse},
- websocket::{send::send_post_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
newtypes::PersonId,
CreateOrUpdateType::Create => UserOperationCrud::CreatePost,
CreateOrUpdateType::Update => UserOperationCrud::EditPost,
};
- send_post_ws_message(post.id, notif_type, None, None, context).await?;
+ context
+ .send_post_ws_message(¬if_type, post.id, None, None)
+ .await?;
Ok(())
}
}
use lemmy_api_common::{
context::LemmyContext,
private_message::{CreatePrivateMessage, EditPrivateMessage, PrivateMessageResponse},
- websocket::{send::send_pm_ws_message, UserOperationCrud},
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
newtypes::PersonId,
CreateOrUpdateType::Create => UserOperationCrud::CreatePrivateMessage,
CreateOrUpdateType::Update => UserOperationCrud::EditPrivateMessage,
};
- send_pm_ws_message(private_message.id, notif_type, None, context).await?;
+ context
+ .send_pm_ws_message(¬if_type, private_message.id, None)
+ .await?;
Ok(())
}
protocol::{activities::deletion::delete::Delete, IdOrNestedObject},
};
use activitypub_federation::{config::Data, kinds::activity::DeleteType, traits::ActivityHandler};
-use lemmy_api_common::{
- context::LemmyContext,
- websocket::{
- send::{send_comment_ws_message_simple, send_community_ws_message, send_post_ws_message},
- UserOperationCrud,
- },
-};
+use lemmy_api_common::{context::LemmyContext, websocket::UserOperationCrud};
use lemmy_db_schema::{
source::{
comment::{Comment, CommentUpdateForm},
)
.await?;
- send_community_ws_message(deleted_community.id, RemoveCommunity, None, None, context).await?;
+ context
+ .send_community_ws_message(&RemoveCommunity, deleted_community.id, None, None)
+ .await?;
}
DeletableObjects::Post(post) => {
let form = ModRemovePostForm {
)
.await?;
- send_post_ws_message(removed_post.id, RemovePost, None, None, context).await?;
+ context
+ .send_post_ws_message(&RemovePost, removed_post.id, None, None)
+ .await?;
}
DeletableObjects::Comment(comment) => {
let form = ModRemoveCommentForm {
)
.await?;
- send_comment_ws_message_simple(removed_comment.id, RemoveComment, context).await?;
+ context
+ .send_comment_ws_message_simple(&RemoveComment, removed_comment.id)
+ .await?;
}
DeletableObjects::PrivateMessage(_) => unimplemented!(),
}
post::{DeletePost, PostResponse, RemovePost},
private_message::{DeletePrivateMessage, PrivateMessageResponse},
utils::get_local_user_view_from_jwt,
- websocket::{
- send::{
- send_comment_ws_message_simple,
- send_community_ws_message,
- send_pm_ws_message,
- send_post_ws_message,
- },
- UserOperationCrud,
- },
+ websocket::UserOperationCrud,
};
use lemmy_db_schema::{
source::{
.build(),
)
.await?;
- send_community_ws_message(
- community.id,
- UserOperationCrud::DeleteCommunity,
- None,
- None,
- context,
- )
- .await?;
+ context
+ .send_community_ws_message(
+ &UserOperationCrud::DeleteCommunity,
+ community.id,
+ None,
+ None,
+ )
+ .await?;
}
DeletableObjects::Post(post) => {
if deleted != post.deleted {
&PostUpdateForm::builder().deleted(Some(deleted)).build(),
)
.await?;
- send_post_ws_message(
- deleted_post.id,
- UserOperationCrud::DeletePost,
- None,
- None,
- context,
- )
- .await?;
+ context
+ .send_post_ws_message(&UserOperationCrud::DeletePost, deleted_post.id, None, None)
+ .await?;
}
}
DeletableObjects::Comment(comment) => {
&CommentUpdateForm::builder().deleted(Some(deleted)).build(),
)
.await?;
- send_comment_ws_message_simple(
- deleted_comment.id,
- UserOperationCrud::DeleteComment,
- context,
- )
- .await?;
+ context
+ .send_comment_ws_message_simple(&UserOperationCrud::DeleteComment, deleted_comment.id)
+ .await?;
}
}
DeletableObjects::PrivateMessage(pm) => {
)
.await?;
- send_pm_ws_message(
- deleted_private_message.id,
- UserOperationCrud::DeletePrivateMessage,
- None,
- context,
- )
- .await?;
+ context
+ .send_pm_ws_message(
+ &UserOperationCrud::DeletePrivateMessage,
+ deleted_private_message.id,
+ None,
+ )
+ .await?;
}
}
Ok(())
protocol::activities::deletion::{delete::Delete, undo_delete::UndoDelete},
};
use activitypub_federation::{config::Data, kinds::activity::UndoType, traits::ActivityHandler};
-use lemmy_api_common::{
- context::LemmyContext,
- websocket::{
- send::{send_comment_ws_message_simple, send_community_ws_message, send_post_ws_message},
- UserOperationCrud,
- },
-};
+use lemmy_api_common::{context::LemmyContext, websocket::UserOperationCrud};
use lemmy_db_schema::{
source::{
comment::{Comment, CommentUpdateForm},
&CommunityUpdateForm::builder().removed(Some(false)).build(),
)
.await?;
- send_community_ws_message(deleted_community.id, EditCommunity, None, None, context).await?;
+ context
+ .send_community_ws_message(&EditCommunity, deleted_community.id, None, None)
+ .await?;
}
DeletableObjects::Post(post) => {
let form = ModRemovePostForm {
&PostUpdateForm::builder().removed(Some(false)).build(),
)
.await?;
- send_post_ws_message(removed_post.id, EditPost, None, None, context).await?;
+ context
+ .send_post_ws_message(&EditPost, removed_post.id, None, None)
+ .await?;
}
DeletableObjects::Comment(comment) => {
let form = ModRemoveCommentForm {
&CommentUpdateForm::builder().removed(Some(false)).build(),
)
.await?;
- send_comment_ws_message_simple(removed_comment.id, EditComment, context).await?;
+ context
+ .send_comment_ws_message_simple(&EditComment, removed_comment.id)
+ .await?;
}
DeletableObjects::PrivateMessage(_) => unimplemented!(),
}
use lemmy_api_common::{
community::CommunityResponse,
context::LemmyContext,
- websocket::UserOperation,
+ websocket::{
+ handlers::messages::SendUserRoomMessage,
+ serialize_websocket_message,
+ UserOperation,
+ },
};
use lemmy_db_schema::{
source::{actor_language::CommunityLanguage, community::CommunityFollower},
.id;
let discussion_languages = CommunityLanguage::read(context.pool(), community_id).await?;
- let response = CommunityResponse {
+ let res = CommunityResponse {
community_view,
discussion_languages,
};
- context
- .chat_server()
- .send_user_room_message(
- &UserOperation::FollowCommunity,
- &response,
- local_recipient_id,
- None,
- )
- .await?;
+ let message = serialize_websocket_message(&UserOperation::FollowCommunity, &res)?;
+
+ context.chat_server().do_send(SendUserRoomMessage {
+ recipient_id: local_recipient_id,
+ message,
+ websocket_id: None,
+ });
Ok(())
}
post::{CreatePostLike, PostResponse},
sensitive::Sensitive,
utils::get_local_user_view_from_jwt,
- websocket::{
- send::{send_comment_ws_message_simple, send_post_ws_message},
- UserOperation,
- },
+ websocket::UserOperation,
};
use lemmy_db_schema::{
newtypes::CommunityId,
CommentLike::remove(context.pool(), person_id, comment_id).await?;
CommentLike::like(context.pool(), &like_form).await?;
- send_comment_ws_message_simple(comment_id, UserOperation::CreateCommentLike, context).await?;
+ context
+ .send_comment_ws_message_simple(&UserOperation::CreateCommentLike, comment_id)
+ .await?;
Ok(())
}
PostLike::remove(context.pool(), person_id, post_id).await?;
PostLike::like(context.pool(), &like_form).await?;
- send_post_ws_message(post.id, UserOperation::CreatePostLike, None, None, context).await?;
+ context
+ .send_post_ws_message(&UserOperation::CreatePostLike, post.id, None, None)
+ .await?;
Ok(())
}
let person_id = actor.id;
CommentLike::remove(context.pool(), person_id, comment_id).await?;
- send_comment_ws_message_simple(comment_id, UserOperation::CreateCommentLike, context).await?;
+ context
+ .send_comment_ws_message_simple(&UserOperation::CreateCommentLike, comment_id)
+ .await?;
Ok(())
}
let person_id = actor.id;
PostLike::remove(context.pool(), person_id, post_id).await?;
- send_post_ws_message(post_id, UserOperation::CreatePostLike, None, None, context).await?;
+ context
+ .send_post_ws_message(&UserOperation::CreatePostLike, post_id, None, None)
+ .await?;
Ok(())
}
community::{GetCommunity, GetCommunityResponse},
context::LemmyContext,
utils::{check_private_instance, get_local_user_view_from_jwt_opt, is_mod_or_admin_opt},
+ websocket::handlers::online_users::GetCommunityUsersOnline,
};
use lemmy_db_schema::{
impls::actor_language::default_post_language,
let online = context
.chat_server()
- .get_community_users_online(community_id)?;
+ .send(GetCommunityUsersOnline { community_id })
+ .await?;
let site_id =
Site::instance_actor_id_from_url(community_view.community.actor_id.clone().into());
#[cfg(test)]
pub(crate) mod tests {
use activitypub_federation::config::{Data, FederationConfig};
+ use actix::Actor;
use anyhow::anyhow;
use lemmy_api_common::{
context::LemmyContext,
};
use reqwest::{Client, Request, Response};
use reqwest_middleware::{ClientBuilder, Middleware, Next};
- use std::sync::Arc;
use task_local_extensions::Extensions;
struct BlockedMiddleware;
let rate_limit_config = RateLimitConfig::builder().build();
let rate_limit_cell = RateLimitCell::new(rate_limit_config).await;
- let chat_server = Arc::new(ChatServer::startup());
+ let chat_server = ChatServer::default().start();
let context = LemmyContext::create(pool, chat_server, client, secret, rate_limit_cell.clone());
let config = FederationConfig::builder()
.domain("example.com")
use activitypub_federation::config::Data as ContextData;
+use actix::{
+ fut,
+ Actor,
+ ActorContext,
+ ActorFutureExt,
+ AsyncContext,
+ ContextFutureSpawner,
+ Handler,
+ Running,
+ StreamHandler,
+ WrapFuture,
+};
use actix_web::{web, Error, HttpRequest, HttpResponse};
use actix_web_actors::ws;
-use actix_ws::{MessageStream, Session};
-use futures::stream::StreamExt;
use lemmy_api::Perform;
use lemmy_api_common::{
comment::{
Search,
},
websocket::{
+ handlers::{
+ connect::{Connect, Disconnect},
+ WsMessage,
+ },
serialize_websocket_message,
structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
UserOperation,
ops::Deref,
result,
str::FromStr,
- sync::{Arc, Mutex},
time::{Duration, Instant},
};
-use tracing::{debug, error, info};
+use tracing::{debug, error};
+
+/// How often heartbeat pings are sent
+const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
+
+/// How long before lack of client response causes a timeout
+const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
+
+pub struct WsChatSession {
+ /// unique session id
+ pub id: ConnectionId,
+
+ pub ip: IpAddr,
+
+ /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
+ /// otherwise we drop connection.
+ pub hb: Instant,
+
+ /// The context data
+ apub_data: ContextData<LemmyContext>,
+}
-/// Entry point for our route
pub async fn websocket(
req: HttpRequest,
body: web::Payload,
- context: web::Data<LemmyContext>,
rate_limiter: web::Data<RateLimitCell>,
apub_data: ContextData<LemmyContext>,
) -> Result<HttpResponse, Error> {
- let (response, session, stream) = actix_ws::handle(&req, body)?;
-
let client_ip = IpAddr(
req
.connection_info()
"Websocket join with IP: {} has been rate limited.",
&client_ip
);
- session.close(None).await.map_err(LemmyError::from)?;
- return Ok(response);
+ return Ok(HttpResponse::TooManyRequests().finish());
}
- let connection_id = context.chat_server().handle_connect(session.clone())?;
- info!("{} joined", &client_ip);
+ ws::start(
+ WsChatSession {
+ id: 0,
+ ip: client_ip,
+ hb: Instant::now(),
+ apub_data,
+ },
+ &req,
+ body,
+ )
+}
+
+/// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL).
+///
+/// also this method checks heartbeats from client
+fn hb(ctx: &mut ws::WebsocketContext<WsChatSession>) {
+ ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
+ // check client heartbeats
+ if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
+ // heartbeat timed out
- let alive = Arc::new(Mutex::new(Instant::now()));
- heartbeat(session.clone(), alive.clone());
+ // notify chat server
+ act
+ .apub_data
+ .chat_server()
+ .do_send(Disconnect { id: act.id });
- actix_rt::spawn(handle_messages(
- stream,
- client_ip,
- session,
- connection_id,
- alive,
- rate_limiter,
- apub_data,
- ));
+ // stop actor
+ ctx.stop();
- Ok(response)
+ // don't try to send a ping
+ return;
+ }
+
+ ctx.ping(b"");
+ });
}
-async fn handle_messages(
- mut stream: MessageStream,
- client_ip: IpAddr,
- mut session: Session,
- connection_id: ConnectionId,
- alive: Arc<Mutex<Instant>>,
- rate_limiter: web::Data<RateLimitCell>,
- context: ContextData<LemmyContext>,
-) -> Result<(), LemmyError> {
- while let Some(Ok(msg)) = stream.next().await {
- match msg {
- ws::Message::Ping(bytes) => {
- if session.pong(&bytes).await.is_err() {
- break;
+impl Actor for WsChatSession {
+ type Context = ws::WebsocketContext<Self>;
+
+ /// Method is called on actor start.
+ /// We register ws session with ChatServer
+ fn started(&mut self, ctx: &mut Self::Context) {
+ // we'll start heartbeat process on session start.
+ hb(ctx);
+
+ // register self in chat server. `AsyncContext::wait` register
+ // future within context, but context waits until this future resolves
+ // before processing any other events.
+ // HttpContext::state() is instance of WsChatSessionState, state is shared
+ // across all routes within application
+ let addr = ctx.address();
+ self
+ .apub_data
+ .chat_server()
+ .send(Connect {
+ addr: addr.recipient(),
+ })
+ .into_actor(self)
+ .then(|res, act, ctx| {
+ match res {
+ Ok(res) => act.id = res,
+ // something is wrong with chat server
+ _ => ctx.stop(),
}
+ fut::ready(())
+ })
+ .wait(ctx);
+ }
+ fn stopping(&mut self, _: &mut Self::Context) -> Running {
+ // notify chat server
+ self
+ .apub_data
+ .chat_server()
+ .do_send(Disconnect { id: self.id });
+ Running::Stop
+ }
+}
+
+/// Handle messages from chat server, we simply send it to peer websocket
+impl Handler<WsMessage> for WsChatSession {
+ type Result = ();
+
+ fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
+ ctx.text(msg.0);
+ }
+}
+
+/// WebSocket message handler
+impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
+ fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
+ let msg = match msg {
+ Err(_) => {
+ ctx.stop();
+ return;
+ }
+ Ok(msg) => msg,
+ };
+
+ match msg {
+ ws::Message::Ping(msg) => {
+ self.hb = Instant::now();
+ ctx.pong(&msg);
}
ws::Message::Pong(_) => {
- let mut lock = alive
- .lock()
- .expect("Failed to acquire websocket heartbeat alive lock");
- *lock = Instant::now();
+ self.hb = Instant::now();
}
ws::Message::Text(text) => {
- let msg = text.trim().to_string();
- let executed = parse_json_message(
- msg,
- client_ip.clone(),
- connection_id,
- rate_limiter.get_ref(),
- context.reset_request_count(),
- )
- .await;
-
- let res = executed.unwrap_or_else(|e| {
- error!("Error during message handling {}", e);
- e.to_json()
- .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
+ let ip_clone = self.ip.clone();
+ let id_clone = self.id.to_owned();
+ let context_clone = self.apub_data.reset_request_count();
+
+ let fut = Box::pin(async move {
+ let msg = text.trim().to_string();
+ parse_json_message(msg, ip_clone, id_clone, context_clone).await
});
- session.text(res).await?;
- }
- ws::Message::Close(_) => {
- session.close(None).await?;
- context.chat_server().handle_disconnect(&connection_id)?;
- break;
+ fut
+ .into_actor(self)
+ .then(|res, _, ctx| {
+ match res {
+ Ok(res) => ctx.text(res),
+ Err(e) => error!("{}", &e),
+ }
+ actix::fut::ready(())
+ })
+ .spawn(ctx);
}
- ws::Message::Binary(_) => info!("Unexpected binary"),
- _ => {}
- }
- }
- Ok(())
-}
-
-fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
- actix_rt::spawn(async move {
- let mut interval = actix_rt::time::interval(Duration::from_secs(5));
- loop {
- if session.ping(b"").await.is_err() {
- break;
+ ws::Message::Binary(_) => println!("Unexpected binary"),
+ ws::Message::Close(reason) => {
+ ctx.close(reason);
+ ctx.stop();
}
-
- let duration_since = {
- let alive_lock = alive
- .lock()
- .expect("Failed to acquire websocket heartbeat alive lock");
- Instant::now().duration_since(*alive_lock)
- };
- if duration_since > Duration::from_secs(10) {
- let _ = session.close(None).await;
- break;
+ ws::Message::Continuation(_) => {
+ ctx.stop();
}
- interval.tick().await;
+ ws::Message::Nop => (),
}
- });
+ }
}
+/// Entry point for our websocket route
async fn parse_json_message(
msg: String,
ip: IpAddr,
connection_id: ConnectionId,
- rate_limiter: &RateLimitCell,
context: ContextData<LemmyContext>,
) -> Result<String, LemmyError> {
+ let rate_limiter = context.settings_updated_channel();
let json: Value = serde_json::from_str(&msg)?;
let data = json
.get("data")
use crate::{code_migrations::run_advanced_migrations, root_span_builder::QuieterRootSpanBuilder};
use activitypub_federation::config::{FederationConfig, FederationMiddleware};
+use actix::Actor;
use actix_web::{middleware, web::Data, App, HttpServer, Result};
use doku::json::{AutoComments, CommentsStyle, Formatting, ObjectsStyle};
use lemmy_api_common::{
use reqwest_middleware::ClientBuilder;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use reqwest_tracing::TracingMiddleware;
-use std::{env, sync::Arc, thread, time::Duration};
+use std::{env, thread, time::Duration};
use tracing::subscriber::set_global_default;
use tracing_actix_web::TracingLogger;
use tracing_error::ErrorLayer;
scheduled_tasks::setup(db_url, user_agent).expect("Couldn't set up scheduled_tasks");
});
- let chat_server = Arc::new(ChatServer::startup());
+ let chat_server = ChatServer::default().start();
// Create Http server with websocket support
let settings_bind = settings.clone();