]> Untitled Git - lemmy.git/blob - src/websocket/handlers.rs
Fix nginx config for local federation setup (#104)
[lemmy.git] / src / websocket / handlers.rs
1 use crate::{
2   api::Perform,
3   websocket::chat_server::{ChatServer, SessionInfo},
4   LemmyContext,
5 };
6 use actix::{Actor, Context, Handler, ResponseFuture};
7 use actix_web::web;
8 use lemmy_db::naive_now;
9 use lemmy_rate_limit::RateLimit;
10 use lemmy_structs::websocket::*;
11 use lemmy_utils::{ConnectionId, IPAddr, LemmyError};
12 use log::{error, info};
13 use rand::Rng;
14 use serde::{Deserialize, Serialize};
15
16 pub(super) struct Args<'a> {
17   pub(super) context: LemmyContext,
18   pub(super) rate_limiter: RateLimit,
19   pub(super) id: ConnectionId,
20   pub(super) ip: IPAddr,
21   pub(super) op: UserOperation,
22   pub(super) data: &'a str,
23 }
24
25 pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result<String, LemmyError>
26 where
27   for<'de> Data: Deserialize<'de> + 'a,
28   Data: Perform,
29 {
30   let Args {
31     context,
32     rate_limiter,
33     id,
34     ip,
35     op,
36     data,
37   } = args;
38
39   let data = data.to_string();
40   let op2 = op.clone();
41
42   let fut = async move {
43     let parsed_data: Data = serde_json::from_str(&data)?;
44     let res = parsed_data
45       .perform(&web::Data::new(context), Some(id))
46       .await?;
47     to_json_string(&op, &res)
48   };
49
50   match op2 {
51     UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
52     UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
53     UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
54     _ => rate_limiter.message().wrap(ip, fut).await,
55   }
56 }
57
58 /// Make actor from `ChatServer`
59 impl Actor for ChatServer {
60   /// We are going to use simple Context, we just need ability to communicate
61   /// with other actors.
62   type Context = Context<Self>;
63 }
64
65 /// Handler for Connect message.
66 ///
67 /// Register new session and assign unique id to this session
68 impl Handler<Connect> for ChatServer {
69   type Result = usize;
70
71   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
72     // register session with random id
73     let id = self.rng.gen::<usize>();
74     info!("{} joined", &msg.ip);
75
76     self.sessions.insert(
77       id,
78       SessionInfo {
79         addr: msg.addr,
80         ip: msg.ip,
81       },
82     );
83
84     id
85   }
86 }
87
88 /// Handler for Disconnect message.
89 impl Handler<Disconnect> for ChatServer {
90   type Result = ();
91
92   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
93     // Remove connections from sessions and all 3 scopes
94     if self.sessions.remove(&msg.id).is_some() {
95       for sessions in self.user_rooms.values_mut() {
96         sessions.remove(&msg.id);
97       }
98
99       for sessions in self.post_rooms.values_mut() {
100         sessions.remove(&msg.id);
101       }
102
103       for sessions in self.community_rooms.values_mut() {
104         sessions.remove(&msg.id);
105       }
106     }
107   }
108 }
109
110 /// Handler for Message message.
111 impl Handler<StandardMessage> for ChatServer {
112   type Result = ResponseFuture<Result<String, std::convert::Infallible>>;
113
114   fn handle(&mut self, msg: StandardMessage, ctx: &mut Context<Self>) -> Self::Result {
115     let fut = self.parse_json_message(msg, ctx);
116     Box::pin(async move {
117       match fut.await {
118         Ok(m) => {
119           // info!("Message Sent: {}", m);
120           Ok(m)
121         }
122         Err(e) => {
123           error!("Error during message handling {}", e);
124           Ok(e.to_string())
125         }
126       }
127     })
128   }
129 }
130
131 impl<Response> Handler<SendAllMessage<Response>> for ChatServer
132 where
133   Response: Serialize,
134 {
135   type Result = ();
136
137   fn handle(&mut self, msg: SendAllMessage<Response>, _: &mut Context<Self>) {
138     self
139       .send_all_message(&msg.op, &msg.response, msg.websocket_id)
140       .ok();
141   }
142 }
143
144 impl<Response> Handler<SendUserRoomMessage<Response>> for ChatServer
145 where
146   Response: Serialize,
147 {
148   type Result = ();
149
150   fn handle(&mut self, msg: SendUserRoomMessage<Response>, _: &mut Context<Self>) {
151     self
152       .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.websocket_id)
153       .ok();
154   }
155 }
156
157 impl<Response> Handler<SendCommunityRoomMessage<Response>> for ChatServer
158 where
159   Response: Serialize,
160 {
161   type Result = ();
162
163   fn handle(&mut self, msg: SendCommunityRoomMessage<Response>, _: &mut Context<Self>) {
164     self
165       .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.websocket_id)
166       .ok();
167   }
168 }
169
170 impl Handler<SendPost> for ChatServer {
171   type Result = ();
172
173   fn handle(&mut self, msg: SendPost, _: &mut Context<Self>) {
174     self.send_post(&msg.op, &msg.post, msg.websocket_id).ok();
175   }
176 }
177
178 impl Handler<SendComment> for ChatServer {
179   type Result = ();
180
181   fn handle(&mut self, msg: SendComment, _: &mut Context<Self>) {
182     self
183       .send_comment(&msg.op, &msg.comment, msg.websocket_id)
184       .ok();
185   }
186 }
187
188 impl Handler<JoinUserRoom> for ChatServer {
189   type Result = ();
190
191   fn handle(&mut self, msg: JoinUserRoom, _: &mut Context<Self>) {
192     self.join_user_room(msg.user_id, msg.id).ok();
193   }
194 }
195
196 impl Handler<JoinCommunityRoom> for ChatServer {
197   type Result = ();
198
199   fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context<Self>) {
200     self.join_community_room(msg.community_id, msg.id).ok();
201   }
202 }
203
204 impl Handler<JoinPostRoom> for ChatServer {
205   type Result = ();
206
207   fn handle(&mut self, msg: JoinPostRoom, _: &mut Context<Self>) {
208     self.join_post_room(msg.post_id, msg.id).ok();
209   }
210 }
211
212 impl Handler<GetUsersOnline> for ChatServer {
213   type Result = usize;
214
215   fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context<Self>) -> Self::Result {
216     self.sessions.len()
217   }
218 }
219
220 impl Handler<GetPostUsersOnline> for ChatServer {
221   type Result = usize;
222
223   fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context<Self>) -> Self::Result {
224     if let Some(users) = self.post_rooms.get(&msg.post_id) {
225       users.len()
226     } else {
227       0
228     }
229   }
230 }
231
232 impl Handler<GetCommunityUsersOnline> for ChatServer {
233   type Result = usize;
234
235   fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context<Self>) -> Self::Result {
236     if let Some(users) = self.community_rooms.get(&msg.community_id) {
237       users.len()
238     } else {
239       0
240     }
241   }
242 }
243
244 #[derive(Serialize)]
245 struct WebsocketResponse<T> {
246   op: String,
247   data: T,
248 }
249
250 pub(super) fn to_json_string<Response>(
251   op: &UserOperation,
252   data: &Response,
253 ) -> Result<String, LemmyError>
254 where
255   Response: Serialize,
256 {
257   let response = WebsocketResponse {
258     op: op.to_string(),
259     data,
260   };
261   Ok(serde_json::to_string(&response)?)
262 }
263
264 impl Handler<CaptchaItem> for ChatServer {
265   type Result = ();
266
267   fn handle(&mut self, msg: CaptchaItem, _: &mut Context<Self>) {
268     self.captchas.push(msg);
269   }
270 }
271
272 impl Handler<CheckCaptcha> for ChatServer {
273   type Result = bool;
274
275   fn handle(&mut self, msg: CheckCaptcha, _: &mut Context<Self>) -> Self::Result {
276     // Remove all the ones that are past the expire time
277     self.captchas.retain(|x| x.expires.gt(&naive_now()));
278
279     let check = self
280       .captchas
281       .iter()
282       .any(|r| r.uuid == msg.uuid && r.answer == msg.answer);
283
284     // Remove this uuid so it can't be re-checked (Checks only work once)
285     self.captchas.retain(|x| x.uuid != msg.uuid);
286
287     check
288   }
289 }