]> Untitled Git - lemmy.git/blob - server/src/main.rs
Merge branch 'upgrade-actor-web-1.0.x' of https://github.com/Jonnnh/lemmy into Jonnnh...
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_server::db::establish_connection;
10 use lemmy_server::websocket::server::*;
11 use std::env;
12 use std::time::{Duration, Instant};
13
14 embed_migrations!();
15
16 /// How often heartbeat pings are sent
17 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
18 /// How long before lack of client response causes a timeout
19 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
20
21 /// Entry point for our route
22 fn chat_route(
23     req: HttpRequest,
24     stream: web::Payload,
25     chat_server: web::Data<Addr<ChatServer>>,
26 ) -> Result<HttpResponse, Error> {
27     ws::start(
28         WSSession {
29             cs_addr: chat_server.get_ref().to_owned(),
30             id: 0,
31             hb: Instant::now(),
32             ip: req
33                 .connection_info()
34                 .remote()
35                 .unwrap_or("127.0.0.1:12345")
36                 .split(":")
37                 .next()
38                 .unwrap_or("127.0.0.1")
39                 .to_string(),
40         },
41         &req,
42         stream,
43     )
44 }
45
46 struct WSSession {
47     cs_addr: Addr<ChatServer>,
48     /// unique session id
49     id: usize,
50     ip: String,
51     /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
52     /// otherwise we drop connection.
53     hb: Instant,
54 }
55
56 impl Actor for WSSession {
57     type Context = ws::WebsocketContext<Self>;
58
59     /// Method is called on actor start.
60     /// We register ws session with ChatServer
61     fn started(&mut self, ctx: &mut Self::Context) {
62         // we'll start heartbeat process on session start.
63         self.hb(ctx);
64
65         // register self in chat server. `AsyncContext::wait` register
66         // future within context, but context waits until this future resolves
67         // before processing any other events.
68         // across all routes within application
69         let addr = ctx.address();
70         self.cs_addr
71             .send(Connect {
72                 addr: addr.recipient(),
73                 ip: self.ip.to_owned(),
74             })
75             .into_actor(self)
76             .then(|res, act, ctx| {
77                 match res {
78                     Ok(res) => act.id = res,
79                     // something is wrong with chat server
80                     _ => ctx.stop(),
81                 }
82                 fut::ok(())
83             })
84             .wait(ctx);
85     }
86
87     fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
88         // notify chat server
89         self.cs_addr.do_send(Disconnect {
90             id: self.id,
91             ip: self.ip.to_owned(),
92         });
93         Running::Stop
94     }
95 }
96
97 /// Handle messages from chat server, we simply send it to peer websocket
98 /// These are room messages, IE sent to others in the room
99 impl Handler<WSMessage> for WSSession {
100     type Result = ();
101
102     fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
103         // println!("id: {} msg: {}", self.id, msg.0);
104         ctx.text(msg.0);
105     }
106 }
107
108 /// WebSocket message handler
109 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
110     fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
111         // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
112         match msg {
113             ws::Message::Ping(msg) => {
114                 self.hb = Instant::now();
115                 ctx.pong(&msg);
116             }
117             ws::Message::Pong(_) => {
118                 self.hb = Instant::now();
119             }
120             ws::Message::Text(text) => {
121                 let m = text.trim().to_owned();
122                 println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
123
124                 self.cs_addr
125                     .send(StandardMessage {
126                         id: self.id,
127                         msg: m,
128                     })
129                     .into_actor(self)
130                     .then(|res, _, ctx| {
131                         match res {
132                             Ok(res) => ctx.text(res),
133                             Err(e) => {
134                                 eprintln!("{}", &e);
135                             }
136                         }
137                         fut::ok(())
138                     })
139                     .wait(ctx);
140             }
141             ws::Message::Binary(_bin) => println!("Unexpected binary"),
142             ws::Message::Close(_) => {
143                 ctx.stop();
144             }
145             _ => {}
146         }
147     }
148 }
149
150 impl WSSession {
151     /// helper method that sends ping to client every second.
152     ///
153     /// also this method checks heartbeats from client
154     fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
155         ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
156             // check client heartbeats
157             if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
158                 // heartbeat timed out
159                 println!("Websocket Client heartbeat failed, disconnecting!");
160
161                 // notify chat server
162                 act.cs_addr.do_send(Disconnect {
163                     id: act.id,
164                     ip: act.ip.to_owned(),
165                 });
166
167                 // stop actor
168                 ctx.stop();
169
170                 // don't try to send a ping
171                 return;
172             }
173
174             ctx.ping("");
175         });
176     }
177 }
178
179 fn main() {
180     let _ = env_logger::init();
181     let sys = actix::System::new("lemmy");
182
183     // Run the migrations from code
184     let conn = establish_connection();
185     embedded_migrations::run(&conn).unwrap();
186
187     // Start chat server actor in separate thread
188     let server = ChatServer::default().start();
189     // Create Http server with websocket support
190     HttpServer::new(move || {
191         App::new()
192             .data(server.clone())
193             .service(web::resource("/api/v1/ws").to(chat_route))
194             //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
195             .service(web::resource("/").to(index))
196             // static resources
197             .service(actix_files::Files::new("/static", front_end_dir()))
198     })
199     .bind("0.0.0.0:8536")
200     .unwrap()
201     .start();
202
203     println!("Started http server: 0.0.0.0:8536");
204     let _ = sys.run();
205 }
206
207 fn index() -> Result<NamedFile, actix_web::error::Error> {
208     Ok(NamedFile::open(front_end_dir() + "/index.html")?)
209 }
210
211 fn front_end_dir() -> String {
212     env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
213 }