]> Untitled Git - lemmy.git/blob - server/src/main.rs
Running cargo fmt on server code.
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_server::db::establish_connection;
10 use lemmy_server::websocket::server::*;
11 use std::env;
12 use std::time::{Duration, Instant};
13
14 embed_migrations!();
15
16 /// How often heartbeat pings are sent
17 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
18 /// How long before lack of client response causes a timeout
19 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
20
21 /// Entry point for our route
22 fn chat_route(
23   req: HttpRequest,
24   stream: web::Payload,
25   chat_server: web::Data<Addr<ChatServer>>,
26 ) -> Result<HttpResponse, Error> {
27   ws::start(
28     WSSession {
29       cs_addr: chat_server.get_ref().to_owned(),
30       id: 0,
31       hb: Instant::now(),
32       ip: req
33         .connection_info()
34         .remote()
35         .unwrap_or("127.0.0.1:12345")
36         .split(":")
37         .next()
38         .unwrap_or("127.0.0.1")
39         .to_string(),
40     },
41     &req,
42     stream,
43   )
44 }
45
46 struct WSSession {
47   cs_addr: Addr<ChatServer>,
48   /// unique session id
49   id: usize,
50   ip: String,
51   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
52   /// otherwise we drop connection.
53   hb: Instant,
54 }
55
56 impl Actor for WSSession {
57   type Context = ws::WebsocketContext<Self>;
58
59   /// Method is called on actor start.
60   /// We register ws session with ChatServer
61   fn started(&mut self, ctx: &mut Self::Context) {
62     // we'll start heartbeat process on session start.
63     self.hb(ctx);
64
65     // register self in chat server. `AsyncContext::wait` register
66     // future within context, but context waits until this future resolves
67     // before processing any other events.
68     // across all routes within application
69     let addr = ctx.address();
70     self
71       .cs_addr
72       .send(Connect {
73         addr: addr.recipient(),
74         ip: self.ip.to_owned(),
75       })
76       .into_actor(self)
77       .then(|res, act, ctx| {
78         match res {
79           Ok(res) => act.id = res,
80           // something is wrong with chat server
81           _ => ctx.stop(),
82         }
83         fut::ok(())
84       })
85       .wait(ctx);
86   }
87
88   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
89     // notify chat server
90     self.cs_addr.do_send(Disconnect {
91       id: self.id,
92       ip: self.ip.to_owned(),
93     });
94     Running::Stop
95   }
96 }
97
98 /// Handle messages from chat server, we simply send it to peer websocket
99 /// These are room messages, IE sent to others in the room
100 impl Handler<WSMessage> for WSSession {
101   type Result = ();
102
103   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
104     // println!("id: {} msg: {}", self.id, msg.0);
105     ctx.text(msg.0);
106   }
107 }
108
109 /// WebSocket message handler
110 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
111   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
112     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
113     match msg {
114       ws::Message::Ping(msg) => {
115         self.hb = Instant::now();
116         ctx.pong(&msg);
117       }
118       ws::Message::Pong(_) => {
119         self.hb = Instant::now();
120       }
121       ws::Message::Text(text) => {
122         let m = text.trim().to_owned();
123         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
124
125         self
126           .cs_addr
127           .send(StandardMessage {
128             id: self.id,
129             msg: m,
130           })
131           .into_actor(self)
132           .then(|res, _, ctx| {
133             match res {
134               Ok(res) => ctx.text(res),
135               Err(e) => {
136                 eprintln!("{}", &e);
137               }
138             }
139             fut::ok(())
140           })
141           .wait(ctx);
142       }
143       ws::Message::Binary(_bin) => println!("Unexpected binary"),
144       ws::Message::Close(_) => {
145         ctx.stop();
146       }
147       _ => {}
148     }
149   }
150 }
151
152 impl WSSession {
153   /// helper method that sends ping to client every second.
154   ///
155   /// also this method checks heartbeats from client
156   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
157     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
158       // check client heartbeats
159       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
160         // heartbeat timed out
161         println!("Websocket Client heartbeat failed, disconnecting!");
162
163         // notify chat server
164         act.cs_addr.do_send(Disconnect {
165           id: act.id,
166           ip: act.ip.to_owned(),
167         });
168
169         // stop actor
170         ctx.stop();
171
172         // don't try to send a ping
173         return;
174       }
175
176       ctx.ping("");
177     });
178   }
179 }
180
181 fn main() {
182   let _ = env_logger::init();
183   let sys = actix::System::new("lemmy");
184
185   // Run the migrations from code
186   let conn = establish_connection();
187   embedded_migrations::run(&conn).unwrap();
188
189   // Start chat server actor in separate thread
190   let server = ChatServer::default().start();
191   // Create Http server with websocket support
192   HttpServer::new(move || {
193     App::new()
194       .data(server.clone())
195       .service(web::resource("/api/v1/ws").to(chat_route))
196       //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
197       .service(web::resource("/").to(index))
198       // static resources
199       .service(actix_files::Files::new("/static", front_end_dir()))
200   })
201   .bind("0.0.0.0:8536")
202   .unwrap()
203   .start();
204
205   println!("Started http server: 0.0.0.0:8536");
206   let _ = sys.run();
207 }
208
209 fn index() -> Result<NamedFile, actix_web::error::Error> {
210   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
211 }
212
213 fn front_end_dir() -> String {
214   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
215 }