]> Untitled Git - lemmy.git/blob - server/src/main.rs
upgrade
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use std::time::{Instant, Duration};
6 use std::env;
7 use actix_web::*;
8 use actix::prelude::*;
9 use actix_files::NamedFile;
10 use actix_web_actors::ws;
11 use lemmy_server::websocket::server::*;
12 use lemmy_server::db::establish_connection;
13
14 embed_migrations!();
15
16 /// How often heartbeat pings are sent
17 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
18 /// How long before lack of client response causes a timeout
19 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
20
21
22 /// Entry point for our route
23 fn chat_route(req: HttpRequest, stream: web::Payload, chat_server: web::Data<Addr<ChatServer>>) -> Result<HttpResponse, Error> {
24     ws::start(
25         WSSession {
26             cs_addr: chat_server.get_ref().to_owned(),
27             id: 0,
28             hb: Instant::now(),
29             ip: req.connection_info()
30                 .remote()
31                 .unwrap_or("127.0.0.1:12345")
32                 .split(":")
33                 .next()
34                 .unwrap_or("127.0.0.1")
35                 .to_string(),
36         },
37         &req,
38         stream,
39     )
40 }
41
42 struct WSSession {
43     cs_addr: Addr<ChatServer>,
44     /// unique session id
45     id: usize,
46     ip: String,
47     /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
48     /// otherwise we drop connection.
49     hb: Instant,
50 }
51
52 impl Actor for WSSession {
53     type Context = ws::WebsocketContext<Self>;
54
55     /// Method is called on actor start.
56     /// We register ws session with ChatServer
57     fn started(&mut self, ctx: &mut Self::Context) {
58         // we'll start heartbeat process on session start.
59         self.hb(ctx);
60
61         // register self in chat server. `AsyncContext::wait` register
62         // future within context, but context waits until this future resolves
63         // before processing any other events.
64         // across all routes within application
65         let addr = ctx.address();
66         self.cs_addr
67             .send(Connect {
68                 addr: addr.recipient(),
69                 ip: self.ip.to_owned(),
70             })
71             .into_actor(self)
72             .then(|res, act, ctx| {
73                 match res {
74                     Ok(res) => act.id = res,
75                     // something is wrong with chat server
76                     _ => ctx.stop(),
77                 }
78                 fut::ok(())
79             })
80             .wait(ctx);
81     }
82
83     fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
84         // notify chat server
85         self.cs_addr.do_send(Disconnect {
86             id: self.id,
87             ip: self.ip.to_owned(),
88         });
89         Running::Stop
90     }
91 }
92
93 /// Handle messages from chat server, we simply send it to peer websocket
94 /// These are room messages, IE sent to others in the room
95 impl Handler<WSMessage> for WSSession {
96     type Result = ();
97
98     fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
99         // println!("id: {} msg: {}", self.id, msg.0);
100         ctx.text(msg.0);
101     }
102 }
103
104 /// WebSocket message handler
105 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
106     fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
107         // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
108         match msg {
109             ws::Message::Ping(msg) => {
110                 self.hb = Instant::now();
111                 ctx.pong(&msg);
112             }
113             ws::Message::Pong(_) => {
114                 self.hb = Instant::now();
115             }
116             ws::Message::Text(text) => {
117                 let m = text.trim().to_owned();
118                 println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
119
120                 self.cs_addr
121                     .send(StandardMessage {
122                         id: self.id,
123                         msg: m,
124                     })
125                     .into_actor(self)
126                     .then(|res, _, ctx| {
127                         match res {
128                             Ok(res) => ctx.text(res),
129                             Err(e) => {
130                                 eprintln!("{}", &e);
131                             }
132                         }
133                         fut::ok(())
134                     })
135                     .wait(ctx);
136             }
137             ws::Message::Binary(_bin) => println!("Unexpected binary"),
138             ws::Message::Close(_) => {
139                 ctx.stop();
140             }
141             _ => {}
142         }
143     }
144 }
145
146 impl WSSession {
147     /// helper method that sends ping to client every second.
148     ///
149     /// also this method checks heartbeats from client
150     fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
151         ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
152             // check client heartbeats
153             if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
154                 // heartbeat timed out
155                 println!("Websocket Client heartbeat failed, disconnecting!");
156
157                 // notify chat server
158                 act.cs_addr
159                     .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() });
160
161                 // stop actor
162                 ctx.stop();
163
164                 // don't try to send a ping
165                 return;
166             }
167
168             ctx.ping("");
169         });
170     }
171 }
172
173 fn main() {
174     let _ = env_logger::init();
175     let sys = actix::System::new("lemmy");
176
177     // Run the migrations from code
178     let conn = establish_connection();
179     embedded_migrations::run(&conn).unwrap();
180
181     // Start chat server actor in separate thread
182     let server = ChatServer::default().start();
183     // Create Http server with websocket support
184     HttpServer::new(move || {
185
186         App::new()
187             .data(server.clone())
188             .service(web::resource("/api/v1/ws").to(chat_route))
189 //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
190             .service(web::resource("/").to(index))
191             // static resources
192             .service(actix_files::Files::new("/static", front_end_dir()))
193     }).bind("0.0.0.0:8536")
194         .unwrap()
195         .start();
196
197     println!("Started http server: 0.0.0.0:8536");
198     let _ = sys.run();
199 }
200
201 fn index() -> Result<NamedFile, actix_web::error::Error> {
202     Ok(NamedFile::open(front_end_dir() + "/index.html")?)
203 }
204
205 fn front_end_dir() -> String {
206     env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
207 }