]> Untitled Git - lemmy.git/blob - server/src/main.rs
rewrite
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_server::db::establish_connection;
10 use lemmy_server::nodeinfo;
11 use lemmy_server::websocket::server::*;
12 use std::env;
13 use std::time::{Duration, Instant};
14 use actix_web::http::header::ContentType;
15
16 embed_migrations!();
17
18 /// How often heartbeat pings are sent
19 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
20 /// How long before lack of client response causes a timeout
21 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
22
23 /// Entry point for our route
24 fn chat_route(
25   req: HttpRequest,
26   stream: web::Payload,
27   chat_server: web::Data<Addr<ChatServer>>,
28 ) -> Result<HttpResponse, Error> {
29   ws::start(
30     WSSession {
31       cs_addr: chat_server.get_ref().to_owned(),
32       id: 0,
33       hb: Instant::now(),
34       ip: req
35         .connection_info()
36         .remote()
37         .unwrap_or("127.0.0.1:12345")
38         .split(":")
39         .next()
40         .unwrap_or("127.0.0.1")
41         .to_string(),
42     },
43     &req,
44     stream,
45   )
46 }
47
48 struct WSSession {
49   cs_addr: Addr<ChatServer>,
50   /// unique session id
51   id: usize,
52   ip: String,
53   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
54   /// otherwise we drop connection.
55   hb: Instant,
56 }
57
58 impl Actor for WSSession {
59   type Context = ws::WebsocketContext<Self>;
60
61   /// Method is called on actor start.
62   /// We register ws session with ChatServer
63   fn started(&mut self, ctx: &mut Self::Context) {
64     // we'll start heartbeat process on session start.
65     self.hb(ctx);
66
67     // register self in chat server. `AsyncContext::wait` register
68     // future within context, but context waits until this future resolves
69     // before processing any other events.
70     // across all routes within application
71     let addr = ctx.address();
72     self
73       .cs_addr
74       .send(Connect {
75         addr: addr.recipient(),
76         ip: self.ip.to_owned(),
77       })
78       .into_actor(self)
79       .then(|res, act, ctx| {
80         match res {
81           Ok(res) => act.id = res,
82           // something is wrong with chat server
83           _ => ctx.stop(),
84         }
85         fut::ok(())
86       })
87       .wait(ctx);
88   }
89
90   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
91     // notify chat server
92     self.cs_addr.do_send(Disconnect {
93       id: self.id,
94       ip: self.ip.to_owned(),
95     });
96     Running::Stop
97   }
98 }
99
100 /// Handle messages from chat server, we simply send it to peer websocket
101 /// These are room messages, IE sent to others in the room
102 impl Handler<WSMessage> for WSSession {
103   type Result = ();
104
105   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
106     // println!("id: {} msg: {}", self.id, msg.0);
107     ctx.text(msg.0);
108   }
109 }
110
111 /// WebSocket message handler
112 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
113   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
114     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
115     match msg {
116       ws::Message::Ping(msg) => {
117         self.hb = Instant::now();
118         ctx.pong(&msg);
119       }
120       ws::Message::Pong(_) => {
121         self.hb = Instant::now();
122       }
123       ws::Message::Text(text) => {
124         let m = text.trim().to_owned();
125         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
126
127         self
128           .cs_addr
129           .send(StandardMessage {
130             id: self.id,
131             msg: m,
132           })
133           .into_actor(self)
134           .then(|res, _, ctx| {
135             match res {
136               Ok(res) => ctx.text(res),
137               Err(e) => {
138                 eprintln!("{}", &e);
139               }
140             }
141             fut::ok(())
142           })
143           .wait(ctx);
144       }
145       ws::Message::Binary(_bin) => println!("Unexpected binary"),
146       ws::Message::Close(_) => {
147         ctx.stop();
148       }
149       _ => {}
150     }
151   }
152 }
153
154 impl WSSession {
155   /// helper method that sends ping to client every second.
156   ///
157   /// also this method checks heartbeats from client
158   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
159     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
160       // check client heartbeats
161       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
162         // heartbeat timed out
163         println!("Websocket Client heartbeat failed, disconnecting!");
164
165         // notify chat server
166         act.cs_addr.do_send(Disconnect {
167           id: act.id,
168           ip: act.ip.to_owned(),
169         });
170
171         // stop actor
172         ctx.stop();
173
174         // don't try to send a ping
175         return;
176       }
177
178       ctx.ping("");
179     });
180   }
181 }
182
183 fn main() {
184   let _ = env_logger::init();
185   let sys = actix::System::new("lemmy");
186
187   // Run the migrations from code
188   let conn = establish_connection();
189   embedded_migrations::run(&conn).unwrap();
190
191   // Start chat server actor in separate thread
192   let server = ChatServer::default().start();
193   // Create Http server with websocket support
194
195   HttpServer::new(move || {
196     App::new()
197       .data(server.clone())
198       .service(web::resource("/api/v1/ws").to(chat_route))
199       //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
200       .service(web::resource("/").to(index))
201       // static resources
202       .service(actix_files::Files::new("/static", front_end_dir()))
203       .route("/nodeinfo/2.0.json", web::get().to(nodeinfo::node_info))
204       .route("/.well-known/nodeinfo", web::get().to(nodeinfo::node_info_well_known))
205   })
206   .bind("0.0.0.0:8536")
207   .unwrap()
208   .start();
209
210   println!("Started http server: 0.0.0.0:8536");
211   let _ = sys.run();
212 }
213
214 fn index() -> Result<NamedFile, actix_web::error::Error> {
215   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
216 }
217
218 fn front_end_dir() -> String {
219   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
220 }