3 websocket::chat_server::{ChatServer, SessionInfo},
6 use actix::{Actor, Context, Handler, ResponseFuture};
8 use lemmy_structs::websocket::*;
9 use lemmy_db::naive_now;
10 use lemmy_rate_limit::RateLimit;
11 use lemmy_utils::{ConnectionId, IPAddr, LemmyError};
12 use log::{error, info};
14 use serde::{Deserialize, Serialize};
16 pub(super) struct Args<'a> {
17 pub(super) context: LemmyContext,
18 pub(super) rate_limiter: RateLimit,
19 pub(super) id: ConnectionId,
20 pub(super) ip: IPAddr,
21 pub(super) op: UserOperation,
22 pub(super) data: &'a str,
25 pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result<String, LemmyError>
27 for<'de> Data: Deserialize<'de> + 'a,
39 let data = data.to_string();
42 let fut = async move {
43 let parsed_data: Data = serde_json::from_str(&data)?;
45 .perform(&web::Data::new(context), Some(id))
47 to_json_string(&op, &res)
51 UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
52 UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
53 UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
54 _ => rate_limiter.message().wrap(ip, fut).await,
58 /// Make actor from `ChatServer`
59 impl Actor for ChatServer {
60 /// We are going to use simple Context, we just need ability to communicate
61 /// with other actors.
62 type Context = Context<Self>;
65 /// Handler for Connect message.
67 /// Register new session and assign unique id to this session
68 impl Handler<Connect> for ChatServer {
71 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
72 // register session with random id
73 let id = self.rng.gen::<usize>();
74 info!("{} joined", &msg.ip);
88 /// Handler for Disconnect message.
89 impl Handler<Disconnect> for ChatServer {
92 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
93 // Remove connections from sessions and all 3 scopes
94 if self.sessions.remove(&msg.id).is_some() {
95 for sessions in self.user_rooms.values_mut() {
96 sessions.remove(&msg.id);
99 for sessions in self.post_rooms.values_mut() {
100 sessions.remove(&msg.id);
103 for sessions in self.community_rooms.values_mut() {
104 sessions.remove(&msg.id);
110 /// Handler for Message message.
111 impl Handler<StandardMessage> for ChatServer {
112 type Result = ResponseFuture<Result<String, std::convert::Infallible>>;
114 fn handle(&mut self, msg: StandardMessage, ctx: &mut Context<Self>) -> Self::Result {
115 let fut = self.parse_json_message(msg, ctx);
116 Box::pin(async move {
119 // info!("Message Sent: {}", m);
123 error!("Error during message handling {}", e);
131 impl<Response> Handler<SendAllMessage<Response>> for ChatServer
137 fn handle(&mut self, msg: SendAllMessage<Response>, _: &mut Context<Self>) {
139 .send_all_message(&msg.op, &msg.response, msg.websocket_id)
144 impl<Response> Handler<SendUserRoomMessage<Response>> for ChatServer
150 fn handle(&mut self, msg: SendUserRoomMessage<Response>, _: &mut Context<Self>) {
152 .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.websocket_id)
157 impl<Response> Handler<SendCommunityRoomMessage<Response>> for ChatServer
163 fn handle(&mut self, msg: SendCommunityRoomMessage<Response>, _: &mut Context<Self>) {
165 .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.websocket_id)
170 impl Handler<SendPost> for ChatServer {
173 fn handle(&mut self, msg: SendPost, _: &mut Context<Self>) {
174 self.send_post(&msg.op, &msg.post, msg.websocket_id).ok();
178 impl Handler<SendComment> for ChatServer {
181 fn handle(&mut self, msg: SendComment, _: &mut Context<Self>) {
183 .send_comment(&msg.op, &msg.comment, msg.websocket_id)
188 impl Handler<JoinUserRoom> for ChatServer {
191 fn handle(&mut self, msg: JoinUserRoom, _: &mut Context<Self>) {
192 self.join_user_room(msg.user_id, msg.id).ok();
196 impl Handler<JoinCommunityRoom> for ChatServer {
199 fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context<Self>) {
200 self.join_community_room(msg.community_id, msg.id).ok();
204 impl Handler<JoinPostRoom> for ChatServer {
207 fn handle(&mut self, msg: JoinPostRoom, _: &mut Context<Self>) {
208 self.join_post_room(msg.post_id, msg.id).ok();
212 impl Handler<GetUsersOnline> for ChatServer {
215 fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context<Self>) -> Self::Result {
220 impl Handler<GetPostUsersOnline> for ChatServer {
223 fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context<Self>) -> Self::Result {
224 if let Some(users) = self.post_rooms.get(&msg.post_id) {
232 impl Handler<GetCommunityUsersOnline> for ChatServer {
235 fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context<Self>) -> Self::Result {
236 if let Some(users) = self.community_rooms.get(&msg.community_id) {
245 struct WebsocketResponse<T> {
250 pub(super) fn to_json_string<Response>(
253 ) -> Result<String, LemmyError>
257 let response = WebsocketResponse {
261 Ok(serde_json::to_string(&response)?)
264 impl Handler<CaptchaItem> for ChatServer {
267 fn handle(&mut self, msg: CaptchaItem, _: &mut Context<Self>) {
268 self.captchas.push(msg);
272 impl Handler<CheckCaptcha> for ChatServer {
275 fn handle(&mut self, msg: CheckCaptcha, _: &mut Context<Self>) -> Self::Result {
276 // Remove all the ones that are past the expire time
277 self.captchas.retain(|x| x.expires.gt(&naive_now()));
282 .any(|r| r.uuid == msg.uuid && r.answer == msg.answer);
284 // Remove this uuid so it can't be re-checked (Checks only work once)
285 self.captchas.retain(|x| x.uuid != msg.uuid);