]> Untitled Git - lemmy.git/blob - server/src/main.rs
Merge branch 'Jonnnh-upgrade-actor-web-1.0.x' into dev
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_server::db::establish_connection;
10 use lemmy_server::websocket::server::*;
11 use std::env;
12 use std::time::{Duration, Instant};
13
14 embed_migrations!();
15
16 /// How often heartbeat pings are sent
17 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
18 /// How long before lack of client response causes a timeout
19 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
20
21 /// Entry point for our route
22 fn chat_route(
23   req: HttpRequest,
24   stream: web::Payload,
25   chat_server: web::Data<Addr<ChatServer>>,
26   ) -> Result<HttpResponse, Error> {
27   ws::start(
28     WSSession {
29       cs_addr: chat_server.get_ref().to_owned(),
30       id: 0,
31       hb: Instant::now(),
32       ip: req
33         .connection_info()
34         .remote()
35         .unwrap_or("127.0.0.1:12345")
36         .split(":")
37         .next()
38         .unwrap_or("127.0.0.1")
39         .to_string(),
40     },
41     &req,
42     stream,
43     )
44 }
45
46 struct WSSession {
47   cs_addr: Addr<ChatServer>,
48   /// unique session id
49   id: usize,
50   ip: String,
51   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
52   /// otherwise we drop connection.
53   hb: Instant,
54 }
55
56 impl Actor for WSSession {
57   type Context = ws::WebsocketContext<Self>;
58
59   /// Method is called on actor start.
60   /// We register ws session with ChatServer
61   fn started(&mut self, ctx: &mut Self::Context) {
62     // we'll start heartbeat process on session start.
63     self.hb(ctx);
64
65     // register self in chat server. `AsyncContext::wait` register
66     // future within context, but context waits until this future resolves
67     // before processing any other events.
68     // across all routes within application
69     let addr = ctx.address();
70     self.cs_addr
71       .send(Connect {
72         addr: addr.recipient(),
73         ip: self.ip.to_owned(),
74       })
75     .into_actor(self)
76       .then(|res, act, ctx| {
77         match res {
78           Ok(res) => act.id = res,
79           // something is wrong with chat server
80           _ => ctx.stop(),
81         }
82         fut::ok(())
83       })
84     .wait(ctx);
85   }
86
87   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
88     // notify chat server
89     self.cs_addr.do_send(Disconnect {
90       id: self.id,
91       ip: self.ip.to_owned(),
92     });
93     Running::Stop
94   }
95 }
96
97 /// Handle messages from chat server, we simply send it to peer websocket
98 /// These are room messages, IE sent to others in the room
99 impl Handler<WSMessage> for WSSession {
100   type Result = ();
101
102   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
103     // println!("id: {} msg: {}", self.id, msg.0);
104     ctx.text(msg.0);
105   }
106 }
107
108 /// WebSocket message handler
109 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
110   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
111     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
112     match msg {
113       ws::Message::Ping(msg) => {
114         self.hb = Instant::now();
115         ctx.pong(&msg);
116       }
117       ws::Message::Pong(_) => {
118         self.hb = Instant::now();
119       }
120       ws::Message::Text(text) => {
121         let m = text.trim().to_owned();
122         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
123
124         self.cs_addr
125           .send(StandardMessage {
126             id: self.id,
127             msg: m,
128           })
129         .into_actor(self)
130           .then(|res, _, ctx| {
131             match res {
132               Ok(res) => ctx.text(res),
133               Err(e) => {
134                 eprintln!("{}", &e);
135               }
136             }
137             fut::ok(())
138           })
139         .wait(ctx);
140       }
141       ws::Message::Binary(_bin) => println!("Unexpected binary"),
142       ws::Message::Close(_) => {
143         ctx.stop();
144       }
145       _ => {}
146     }
147   }
148 }
149
150 impl WSSession {
151   /// helper method that sends ping to client every second.
152   ///
153   /// also this method checks heartbeats from client
154   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
155     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
156       // check client heartbeats
157       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
158         // heartbeat timed out
159         println!("Websocket Client heartbeat failed, disconnecting!");
160
161         // notify chat server
162         act.cs_addr.do_send(Disconnect {
163           id: act.id,
164           ip: act.ip.to_owned(),
165         });
166
167         // stop actor
168         ctx.stop();
169
170         // don't try to send a ping
171         return;
172       }
173
174       ctx.ping("");
175     });
176   }
177 }
178
179 fn main() {
180   let _ = env_logger::init();
181   let sys = actix::System::new("lemmy");
182
183   // Run the migrations from code
184   let conn = establish_connection();
185   embedded_migrations::run(&conn).unwrap();
186
187   // Start chat server actor in separate thread
188   let server = ChatServer::default().start();
189   // Create Http server with websocket support
190   HttpServer::new(move || {
191     App::new()
192       .data(server.clone())
193       .service(web::resource("/api/v1/ws").to(chat_route))
194       //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
195       .service(web::resource("/").to(index))
196       // static resources
197       .service(actix_files::Files::new("/static", front_end_dir()))
198   })
199   .bind("0.0.0.0:8536")
200     .unwrap()
201     .start();
202
203   println!("Started http server: 0.0.0.0:8536");
204   let _ = sys.run();
205 }
206
207 fn index() -> Result<NamedFile, actix_web::error::Error> {
208   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
209 }
210
211 fn front_end_dir() -> String {
212   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
213 }