]> Untitled Git - lemmy.git/blob - server/src/main.rs
Implement nodeinfo support (fixes #144)
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web::web::Json;
9 use actix_web_actors::ws;
10 use lemmy_server::db::establish_connection;
11 use lemmy_server::websocket::server::*;
12 use std::env;
13 use std::time::{Duration, Instant};
14 use serde::Serialize;
15
16 embed_migrations!();
17
18 /// How often heartbeat pings are sent
19 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
20 /// How long before lack of client response causes a timeout
21 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
22
23 /// Entry point for our route
24 fn chat_route(
25   req: HttpRequest,
26   stream: web::Payload,
27   chat_server: web::Data<Addr<ChatServer>>,
28 ) -> Result<HttpResponse, Error> {
29   ws::start(
30     WSSession {
31       cs_addr: chat_server.get_ref().to_owned(),
32       id: 0,
33       hb: Instant::now(),
34       ip: req
35         .connection_info()
36         .remote()
37         .unwrap_or("127.0.0.1:12345")
38         .split(":")
39         .next()
40         .unwrap_or("127.0.0.1")
41         .to_string(),
42     },
43     &req,
44     stream,
45   )
46 }
47
48 struct WSSession {
49   cs_addr: Addr<ChatServer>,
50   /// unique session id
51   id: usize,
52   ip: String,
53   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
54   /// otherwise we drop connection.
55   hb: Instant,
56 }
57
58 impl Actor for WSSession {
59   type Context = ws::WebsocketContext<Self>;
60
61   /// Method is called on actor start.
62   /// We register ws session with ChatServer
63   fn started(&mut self, ctx: &mut Self::Context) {
64     // we'll start heartbeat process on session start.
65     self.hb(ctx);
66
67     // register self in chat server. `AsyncContext::wait` register
68     // future within context, but context waits until this future resolves
69     // before processing any other events.
70     // across all routes within application
71     let addr = ctx.address();
72     self
73       .cs_addr
74       .send(Connect {
75         addr: addr.recipient(),
76         ip: self.ip.to_owned(),
77       })
78       .into_actor(self)
79       .then(|res, act, ctx| {
80         match res {
81           Ok(res) => act.id = res,
82           // something is wrong with chat server
83           _ => ctx.stop(),
84         }
85         fut::ok(())
86       })
87       .wait(ctx);
88   }
89
90   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
91     // notify chat server
92     self.cs_addr.do_send(Disconnect {
93       id: self.id,
94       ip: self.ip.to_owned(),
95     });
96     Running::Stop
97   }
98 }
99
100 /// Handle messages from chat server, we simply send it to peer websocket
101 /// These are room messages, IE sent to others in the room
102 impl Handler<WSMessage> for WSSession {
103   type Result = ();
104
105   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
106     // println!("id: {} msg: {}", self.id, msg.0);
107     ctx.text(msg.0);
108   }
109 }
110
111 /// WebSocket message handler
112 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
113   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
114     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
115     match msg {
116       ws::Message::Ping(msg) => {
117         self.hb = Instant::now();
118         ctx.pong(&msg);
119       }
120       ws::Message::Pong(_) => {
121         self.hb = Instant::now();
122       }
123       ws::Message::Text(text) => {
124         let m = text.trim().to_owned();
125         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
126
127         self
128           .cs_addr
129           .send(StandardMessage {
130             id: self.id,
131             msg: m,
132           })
133           .into_actor(self)
134           .then(|res, _, ctx| {
135             match res {
136               Ok(res) => ctx.text(res),
137               Err(e) => {
138                 eprintln!("{}", &e);
139               }
140             }
141             fut::ok(())
142           })
143           .wait(ctx);
144       }
145       ws::Message::Binary(_bin) => println!("Unexpected binary"),
146       ws::Message::Close(_) => {
147         ctx.stop();
148       }
149       _ => {}
150     }
151   }
152 }
153
154 impl WSSession {
155   /// helper method that sends ping to client every second.
156   ///
157   /// also this method checks heartbeats from client
158   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
159     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
160       // check client heartbeats
161       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
162         // heartbeat timed out
163         println!("Websocket Client heartbeat failed, disconnecting!");
164
165         // notify chat server
166         act.cs_addr.do_send(Disconnect {
167           id: act.id,
168           ip: act.ip.to_owned(),
169         });
170
171         // stop actor
172         ctx.stop();
173
174         // don't try to send a ping
175         return;
176       }
177
178       ctx.ping("");
179     });
180   }
181 }
182
183 fn main() {
184   let _ = env_logger::init();
185   let sys = actix::System::new("lemmy");
186
187   // Run the migrations from code
188   let conn = establish_connection();
189   embedded_migrations::run(&conn).unwrap();
190
191   // Start chat server actor in separate thread
192   let server = ChatServer::default().start();
193   // Create Http server with websocket support
194   HttpServer::new(move || {
195     App::new()
196       .data(server.clone())
197       .service(web::resource("/api/v1/ws").to(chat_route))
198       //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
199       .service(web::resource("/").to(index))
200       // static resources
201       .service(actix_files::Files::new("/static", front_end_dir()))
202       .route("/nodeinfo/2.0.json", web::get().to(node_info))
203   })
204   .bind("0.0.0.0:8536")
205   .unwrap()
206   .start();
207
208   println!("Started http server: 0.0.0.0:8536");
209   let _ = sys.run();
210 }
211
212 #[derive(Serialize)]
213 struct Software {
214   name: String,
215   version: String,
216 }
217
218 #[derive(Serialize)]
219 struct Usage {
220   users: Users,
221   localPosts: i32,
222   localComments: i32,
223 }
224
225 #[derive(Serialize)]
226 struct Users {
227   total: i32,
228 }
229
230 #[derive(Serialize)]
231 struct NodeInfo {
232   version: String,
233   software: Software,
234   protocols: [String; 0],
235   usage: Usage,
236   openRegistrations: bool,
237 }
238
239 fn node_info() -> Result<Json<NodeInfo>> {
240   // TODO: get info from database
241   // TODO: need to get lemmy version from somewhere else
242   let conn = establish_connection();
243   let userCount = User_::count(conn)
244   let json = Json(NodeInfo {
245     version: "2.0".to_string(),
246     software: Software {
247       name: "lemmy".to_string(),
248       version: "0.1".to_string()
249     },
250     protocols: [], // TODO: activitypub once that is implemented
251     usage: Usage {
252       users: Users {
253         total: 123,
254       },
255       localPosts: 123,
256       localComments: 123,
257     },
258     openRegistrations: true });
259   return Ok(json);
260 }
261
262 fn index() -> Result<NamedFile, actix_web::error::Error> {
263   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
264 }
265
266 fn front_end_dir() -> String {
267   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
268 }