From: Dessalines Date: Mon, 25 Mar 2019 03:51:27 +0000 (-0700) Subject: Refactoring websocket code, adding create community X-Git-Url: http://these/git/%7BsupportLemmyUrl%7D?a=commitdiff_plain;h=e1cb805cfc719d6266ec50e5f1ef3ac1edf74656;p=lemmy.git Refactoring websocket code, adding create community - Adding create community --- diff --git a/server/src/actions/user.rs b/server/src/actions/user.rs index 6016580d..5832bc2c 100644 --- a/server/src/actions/user.rs +++ b/server/src/actions/user.rs @@ -4,7 +4,7 @@ use diesel::result::Error; use schema::user_::dsl::*; use serde::{Serialize, Deserialize}; use {Crud,is_email_regex}; -use jsonwebtoken::{encode, decode, Header, Validation}; +use jsonwebtoken::{encode, decode, Header, Validation, TokenData}; use bcrypt::{DEFAULT_COST, hash}; #[derive(Queryable, Identifiable, PartialEq, Debug)] @@ -60,9 +60,20 @@ impl Crud for User_ { } #[derive(Debug, Serialize, Deserialize)] -struct Claims { - id: i32, - username: String +pub struct Claims { + pub id: i32, + pub username: String, + pub iss: String, +} + +impl Claims { + pub fn decode(jwt: &str) -> Result, jsonwebtoken::errors::Error> { + let v = Validation { + validate_exp: false, + ..Validation::default() + }; + decode::(&jwt, "secret".as_ref(), &v) + } } type Jwt = String; @@ -70,7 +81,8 @@ impl User_ { pub fn jwt(&self) -> Jwt { let my_claims = Claims { id: self.id, - username: self.name.to_owned() + username: self.name.to_owned(), + iss: "rrf".to_string() // TODO this should come from config file }; encode(&Header::default(), &my_claims, "secret".as_ref()).unwrap() } @@ -86,12 +98,13 @@ impl User_ { } pub fn find_by_jwt(conn: &PgConnection, jwt: &str) -> Result { - let token = decode::(&jwt, "secret".as_ref(), &Validation::default()) - .expect("Couldn't decode jwt"); - Self::read(&conn, token.claims.id) + let claims: Claims = Claims::decode(&jwt).expect("Invalid token").claims; + Self::read(&conn, claims.id) } + } + #[cfg(test)] mod tests { use establish_connection; diff --git a/server/src/bin/main.rs b/server/src/bin/main.rs index bd4c2d21..1d674cbc 100644 --- a/server/src/bin/main.rs +++ b/server/src/bin/main.rs @@ -4,7 +4,6 @@ use std::time::{Instant, Duration}; use server::actix::*; use server::actix_web::server::HttpServer; use server::actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; -use std::str::FromStr; use server::websocket_server::server::*; @@ -82,15 +81,16 @@ impl Handler for WSSession { type Result = (); fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) { + println!("id: {} msg: {}", self.id, msg.0); ctx.text(msg.0); + ctx.text("NO"); } } -use server::serde_json::Value; /// WebSocket message handler impl StreamHandler for WSSession { fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { - println!("WEBSOCKET MESSAGE: {:?}", msg); + println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id); match msg { ws::Message::Ping(msg) => { self.hb = Instant::now(); @@ -100,86 +100,29 @@ impl StreamHandler for WSSession { self.hb = Instant::now(); } ws::Message::Text(text) => { - let m = text.trim(); - let json: Value = serde_json::from_str(m).unwrap(); - - // Get the OP command, and its data - let op: &str = &json["op"].as_str().unwrap(); - let data: &Value = &json["data"]; - - let user_operation: UserOperation = UserOperation::from_str(op).unwrap(); - - match user_operation { - UserOperation::Login => { - let login: Login = serde_json::from_str(&data.to_string()).unwrap(); - ctx.state() - .addr - .send(login) - .into_actor(self) - .then(|res, _, ctx| { - match res { - Ok(response) => match response { - Ok(t) => ctx.text(serde_json::to_string(&t).unwrap()), - Err(e) => { - let error_message_str: String = serde_json::to_string(&e).unwrap(); - eprintln!("{}", &error_message_str); - ctx.text(&error_message_str); - } - }, - _ => println!("Something is wrong"), - } - fut::ok(()) - }) - .wait(ctx) - }, - UserOperation::Register => { - let register: Register = serde_json::from_str(&data.to_string()).unwrap(); - ctx.state() - .addr - .send(register) - .into_actor(self) - .then(|res, _, ctx| { - match res { - Ok(response) => match response { - Ok(t) => ctx.text(serde_json::to_string(&t).unwrap()), - Err(e) => { - let error_message_str: String = serde_json::to_string(&e).unwrap(); - eprintln!("{}", &error_message_str); - ctx.text(&error_message_str); - } - }, - _ => println!("Something is wrong"), - } - fut::ok(()) - }) - .wait(ctx) - }, - UserOperation::CreateCommunity => { - use server::actions::community::CommunityForm; - let auth: &str = &json["auth"].as_str().unwrap(); - let community_form: CommunityForm = serde_json::from_str(&data.to_string()).unwrap(); - ctx.state() - .addr - .send(community_form) - .into_actor(self) - .then(|res, _, ctx| { - match res { - Ok(response) => match response { - Ok(t) => ctx.text(serde_json::to_string(&t).unwrap()), - Err(e) => { - let error_message_str: String = serde_json::to_string(&e).unwrap(); - eprintln!("{}", &error_message_str); - ctx.text(&error_message_str); - } - }, - _ => println!("Something is wrong"), - } - fut::ok(()) - }) - .wait(ctx) - }, - _ => ctx.text(format!("!!! unknown command: {:?}", m)), - } + let m = text.trim().to_owned(); + + ctx.state() + .addr + .send(StandardMessage { + id: self.id, + msg: m + }) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(res) => ctx.text(res), + Err(e) => { + eprintln!("{}", &e); + // ctx.text(e); + } + } + // Ok(res) => ctx.text(res), + // // something is wrong with chat server + // _ => ctx.stop(), + fut::ok(()) + }) + .wait(ctx); // we check for /sss type of messages // if m.starts_with('/') { diff --git a/server/src/websocket_server/server.rs b/server/src/websocket_server/server.rs index 857db306..760bd78c 100644 --- a/server/src/websocket_server/server.rs +++ b/server/src/websocket_server/server.rs @@ -6,10 +6,13 @@ use actix::prelude::*; use rand::{rngs::ThreadRng, Rng}; use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; +use serde_json::{Result, Value}; use bcrypt::{verify}; +use std::str::FromStr; -use {Crud,establish_connection}; +use {Crud, Joinable, establish_connection}; use actions::community::*; +use actions::user::*; #[derive(EnumString,ToString,Debug)] pub enum UserOperation { @@ -58,6 +61,24 @@ pub struct ClientMessage { pub room: String, } +#[derive(Serialize, Deserialize)] +pub struct StandardMessage { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, +} + +impl actix::Message for StandardMessage { + type Result = String; +} + +#[derive(Serialize, Deserialize)] +pub struct StandardResponse { + op: String, + response: T +} + /// List of available rooms pub struct ListRooms; @@ -80,10 +101,6 @@ pub struct Login { pub password: String } -impl actix::Message for Login { - type Result = Result; -} - #[derive(Serialize, Deserialize)] pub struct Register { username: String, @@ -98,23 +115,15 @@ pub struct LoginResponse { jwt: String } -impl actix::Message for Register { - type Result = Result; +#[derive(Serialize, Deserialize)] +pub struct CreateCommunity { + name: String, } -// #[derive(Serialize, Deserialize)] -// pub struct CreateCommunity { -// name: String -// } - #[derive(Serialize, Deserialize)] pub struct CreateCommunityResponse { op: String, - community: Community -} - -impl actix::Message for CommunityForm { - type Result = Result; + data: Community } /// `ChatServer` manages chat rooms and responsible for coordinating chat @@ -152,6 +161,16 @@ impl ChatServer { } } } + + /// Send message only to self + fn send(&self, message: &str, id: &usize) { + // println!("{:?}", self.sessions); + if let Some(addr) = self.sessions.get(id) { + println!("msg: {}", message); + // println!("{:?}", addr.connected()); + let _ = addr.do_send(WSMessage(message.to_owned())); + } + } } /// Make actor from `ChatServer` @@ -219,121 +238,176 @@ impl Handler for ChatServer { } } -/// Handler for `ListRooms` message. -impl Handler for ChatServer { - type Result = MessageResult; +/// Handler for Message message. +impl Handler for ChatServer { + type Result = MessageResult; + + fn handle(&mut self, msg: StandardMessage, _: &mut Context) -> Self::Result { + + let json: Value = serde_json::from_str(&msg.msg) + .expect("Couldn't parse message"); + + let data: &Value = &json["data"]; + let op = &json["op"].as_str().unwrap(); + let auth = &json["auth"].as_str(); + let user_operation: UserOperation = UserOperation::from_str(&op).unwrap(); + + let res: String = match user_operation { + UserOperation::Login => { + let login: Login = serde_json::from_str(&data.to_string()).unwrap(); + login.perform() + }, + UserOperation::Register => { + let register: Register = serde_json::from_str(&data.to_string()).unwrap(); + register.perform() + }, + UserOperation::CreateCommunity => { + let create_community: CreateCommunity = serde_json::from_str(&data.to_string()).unwrap(); + match auth { + Some(auth) => { + create_community.perform(auth) + }, + None => serde_json::to_string( + &ErrorMessage { + op: UserOperation::CreateCommunity.to_string(), + error: "Not logged in.".to_string() + } + ) + .unwrap() + } + }, + _ => { + let e = ErrorMessage { + op: "Unknown".to_string(), + error: "Unknown User Operation".to_string() + }; + serde_json::to_string(&e).unwrap() + } + // _ => "no".to_string() + }; - fn handle(&mut self, _: ListRooms, _: &mut Context) -> Self::Result { - let mut rooms = Vec::new(); - for key in self.rooms.keys() { - rooms.push(key.to_owned()) - } - MessageResult(rooms) + // let data: &Value = &json["data"]; + // let res = StandardResponse {op: "nope".to_string(), response: "hi".to_string()}; + // let out = serde_json::to_string(&res).unwrap(); + MessageResult(res) } } -/// Join room, send disconnect message to old room -/// send join message to new room -impl Handler for ChatServer { - type Result = (); +// /// Handler for `ListRooms` message. +// impl Handler for ChatServer { +// type Result = MessageResult; - fn handle(&mut self, msg: Join, _: &mut Context) { - let Join { id, name } = msg; - let mut rooms = Vec::new(); +// fn handle(&mut self, _: ListRooms, _: &mut Context) -> Self::Result { +// let mut rooms = Vec::new(); - // remove session from all rooms - for (n, sessions) in &mut self.rooms { - if sessions.remove(&id) { - rooms.push(n.to_owned()); - } - } - // send message to other users - for room in rooms { - self.send_room_message(&room, "Someone disconnected", 0); - } +// for key in self.rooms.keys() { +// rooms.push(key.to_owned()) +// } - if self.rooms.get_mut(&name).is_none() { - self.rooms.insert(name.clone(), HashSet::new()); - } - self.send_room_message(&name, "Someone connected", id); - self.rooms.get_mut(&name).unwrap().insert(id); - } +// MessageResult(rooms) +// } +// } + +// /// Join room, send disconnect message to old room +// /// send join message to new room +// impl Handler for ChatServer { +// type Result = (); + +// fn handle(&mut self, msg: Join, _: &mut Context) { +// let Join { id, name } = msg; +// let mut rooms = Vec::new(); + +// // remove session from all rooms +// for (n, sessions) in &mut self.rooms { +// if sessions.remove(&id) { +// rooms.push(n.to_owned()); +// } +// } +// // send message to other users +// for room in rooms { +// self.send_room_message(&room, "Someone disconnected", 0); +// } + +// if self.rooms.get_mut(&name).is_none() { +// self.rooms.insert(name.clone(), HashSet::new()); +// } +// self.send_room_message(&name, "Someone connected", id); +// self.rooms.get_mut(&name).unwrap().insert(id); +// } +// } + +pub trait Perform { + fn perform(&self) -> String; } -impl Handler for ChatServer { +pub trait PerformAuth { + fn perform(&self, auth: &str) -> String; +} - type Result = MessageResult; - fn handle(&mut self, msg: Login, _: &mut Context) -> Self::Result { +impl Perform for Login { + fn perform(&self) -> String { - use actions::user::*; let conn = establish_connection(); // Fetch that username / email - let user: User_ = match User_::find_by_email_or_username(&conn, &msg.username_or_email) { + let user: User_ = match User_::find_by_email_or_username(&conn, &self.username_or_email) { Ok(user) => user, - Err(e) => return MessageResult( - Err( - ErrorMessage { - op: UserOperation::Login.to_string(), - error: "Couldn't find that username or email".to_string() - } - ) + Err(e) => return serde_json::to_string( + &ErrorMessage { + op: UserOperation::Login.to_string(), + error: "Couldn't find that username or email".to_string() + } ) + .unwrap() }; // Verify the password - let valid: bool = verify(&msg.password, &user.password_encrypted).unwrap_or(false); + let valid: bool = verify(&self.password, &user.password_encrypted).unwrap_or(false); if !valid { - return MessageResult( - Err( - ErrorMessage { - op: UserOperation::Login.to_string(), - error: "Password incorrect".to_string() - } - ) + return serde_json::to_string( + &ErrorMessage { + op: UserOperation::Login.to_string(), + error: "Password incorrect".to_string() + } ) + .unwrap() } // Return the jwt - MessageResult( - Ok( - LoginResponse { - op: UserOperation::Login.to_string(), - jwt: user.jwt() - } - ) + serde_json::to_string( + &LoginResponse { + op: UserOperation::Login.to_string(), + jwt: user.jwt() + } ) + .unwrap() } } -impl Handler for ChatServer { - - type Result = MessageResult; - fn handle(&mut self, msg: Register, _: &mut Context) -> Self::Result { +impl Perform for Register { + fn perform(&self) -> String { - use actions::user::*; let conn = establish_connection(); // Make sure passwords match - if msg.password != msg.password_verify { - return MessageResult( - Err( - ErrorMessage { - op: UserOperation::Register.to_string(), - error: "Passwords do not match.".to_string() - } - ) - ); + if &self.password != &self.password_verify { + return serde_json::to_string( + &ErrorMessage { + op: UserOperation::Register.to_string(), + error: "Passwords do not match.".to_string() + } + ) + .unwrap(); } // Register the new user let user_form = UserForm { - name: msg.username, - email: msg.email, - password_encrypted: msg.password, + name: self.username.to_owned(), + email: self.email.to_owned(), + password_encrypted: self.password.to_owned(), preferred_username: None, updated: None }; @@ -341,55 +415,232 @@ impl Handler for ChatServer { // Create the user let inserted_user = match User_::create(&conn, &user_form) { Ok(user) => user, - Err(e) => return MessageResult( - Err( - ErrorMessage { + Err(e) => { + return serde_json::to_string( + &ErrorMessage { op: UserOperation::Register.to_string(), error: "User already exists.".to_string() // overwrite the diesel error } ) - ) + .unwrap() + } }; // Return the jwt - MessageResult( - Ok( - LoginResponse { - op: UserOperation::Register.to_string(), - jwt: inserted_user.jwt() - } - ) + serde_json::to_string( + &LoginResponse { + op: UserOperation::Register.to_string(), + jwt: inserted_user.jwt() + } ) + .unwrap() } } +impl PerformAuth for CreateCommunity { + fn perform(&self, auth: &str) -> String { + + let conn = establish_connection(); + + let claims = match Claims::decode(&auth) { + Ok(claims) => claims.claims, + Err(e) => { + return serde_json::to_string( + &ErrorMessage { + op: UserOperation::CreateCommunity.to_string(), + error: "Community user already exists.".to_string() // overwrite the diesel error + } + ) + .unwrap(); + } + }; -impl Handler for ChatServer { + let user_id = claims.id; + let iss = claims.iss; - type Result = MessageResult; + // Register the new user + let community_form = CommunityForm { + name: self.name.to_owned(), + updated: None + }; - fn handle(&mut self, form: CommunityForm, _: &mut Context) -> Self::Result { - let conn = establish_connection(); - let community = match Community::create(&conn, &form) { + let inserted_community = match Community::create(&conn, &community_form) { Ok(community) => community, - Err(e) => return MessageResult( - Err( - ErrorMessage { + Err(e) => { + return serde_json::to_string( + &ErrorMessage { op: UserOperation::CreateCommunity.to_string(), error: "Community already exists.".to_string() // overwrite the diesel error } ) - ) + .unwrap() + } }; - - MessageResult( - Ok( - CreateCommunityResponse { - op: UserOperation::CreateCommunity.to_string(), - community: community - } - ) + + let community_user_form = CommunityUserForm { + community_id: inserted_community.id, + fedi_user_id: format!("{}/{}", iss, user_id) + }; + + let inserted_community_user = match CommunityUser::join(&conn, &community_user_form) { + Ok(user) => user, + Err(e) => { + return serde_json::to_string( + &ErrorMessage { + op: UserOperation::CreateCommunity.to_string(), + error: "Community user already exists.".to_string() // overwrite the diesel error + } + ) + .unwrap() + } + }; + + + // Return the jwt + serde_json::to_string( + &CreateCommunityResponse { + op: UserOperation::CreateCommunity.to_string(), + data: inserted_community + } ) + .unwrap() + } } +// impl Handler for ChatServer { + +// type Result = MessageResult; +// fn handle(&mut self, msg: Login, _: &mut Context) -> Self::Result { + +// let conn = establish_connection(); + +// // Fetch that username / email +// let user: User_ = match User_::find_by_email_or_username(&conn, &msg.username_or_email) { +// Ok(user) => user, +// Err(e) => return MessageResult( +// Err( +// ErrorMessage { +// op: UserOperation::Login.to_string(), +// error: "Couldn't find that username or email".to_string() +// } +// ) +// ) +// }; + +// // Verify the password +// let valid: bool = verify(&msg.password, &user.password_encrypted).unwrap_or(false); +// if !valid { +// return MessageResult( +// Err( +// ErrorMessage { +// op: UserOperation::Login.to_string(), +// error: "Password incorrect".to_string() +// } +// ) +// ) +// } + +// // Return the jwt +// MessageResult( +// Ok( +// LoginResponse { +// op: UserOperation::Login.to_string(), +// jwt: user.jwt() +// } +// ) +// ) +// } +// } + +// impl Handler for ChatServer { + +// type Result = MessageResult; +// fn handle(&mut self, msg: Register, _: &mut Context) -> Self::Result { + +// let conn = establish_connection(); + +// // Make sure passwords match +// if msg.password != msg.password_verify { +// return MessageResult( +// Err( +// ErrorMessage { +// op: UserOperation::Register.to_string(), +// error: "Passwords do not match.".to_string() +// } +// ) +// ); +// } + +// // Register the new user +// let user_form = UserForm { +// name: msg.username, +// email: msg.email, +// password_encrypted: msg.password, +// preferred_username: None, +// updated: None +// }; + +// // Create the user +// let inserted_user = match User_::create(&conn, &user_form) { +// Ok(user) => user, +// Err(e) => return MessageResult( +// Err( +// ErrorMessage { +// op: UserOperation::Register.to_string(), +// error: "User already exists.".to_string() // overwrite the diesel error +// } +// ) +// ) +// }; + +// // Return the jwt +// MessageResult( +// Ok( +// LoginResponse { +// op: UserOperation::Register.to_string(), +// jwt: inserted_user.jwt() +// } +// ) +// ) + +// } +// } + + +// impl Handler for ChatServer { + +// type Result = MessageResult; + +// fn handle(&mut self, msg: CreateCommunity, _: &mut Context) -> Self::Result { +// let conn = establish_connection(); + +// let user_id = Claims::decode(&msg.auth).id; + +// let community_form = CommunityForm { +// name: msg.name, +// updated: None +// }; + +// let community = match Community::create(&conn, &community_form) { +// Ok(community) => community, +// Err(e) => return MessageResult( +// Err( +// ErrorMessage { +// op: UserOperation::CreateCommunity.to_string(), +// error: "Community already exists.".to_string() // overwrite the diesel error +// } +// ) +// ) +// }; + +// MessageResult( +// Ok( +// CreateCommunityResponse { +// op: UserOperation::CreateCommunity.to_string(), +// community: community +// } +// ) +// ) +// } +// } diff --git a/ui/src/components/create-community.tsx b/ui/src/components/create-community.tsx index dbacd18d..159147b6 100644 --- a/ui/src/components/create-community.tsx +++ b/ui/src/components/create-community.tsx @@ -5,6 +5,8 @@ import { CommunityForm, UserOperation } from '../interfaces'; import { WebSocketService, UserService } from '../services'; import { msgOp } from '../utils'; +import { Community } from '../interfaces'; + interface State { communityForm: CommunityForm; } @@ -28,6 +30,7 @@ export class CreateCommunity extends Component { .subscribe( (msg) => this.parseMessage(msg), (err) => console.error(err), + () => console.log("complete") ); } @@ -80,10 +83,14 @@ export class CreateCommunity extends Component { parseMessage(msg: any) { let op: UserOperation = msgOp(msg); + console.log(msg); if (msg.error) { alert(msg.error); return; } else { + if (op == UserOperation.CreateCommunity) { + let community: Community = msg.data; + } } } diff --git a/ui/src/components/login.tsx b/ui/src/components/login.tsx index 372b1557..60ee9e01 100644 --- a/ui/src/components/login.tsx +++ b/ui/src/components/login.tsx @@ -35,6 +35,7 @@ export class Login extends Component { .subscribe( (msg) => this.parseMessage(msg), (err) => console.error(err), + () => console.log("complete") ); } diff --git a/ui/src/interfaces.ts b/ui/src/interfaces.ts index da5c415b..e620aa4e 100644 --- a/ui/src/interfaces.ts +++ b/ui/src/interfaces.ts @@ -3,10 +3,17 @@ export enum UserOperation { } export interface User { - id: number + id: number; username: string; } +export interface Community { + id: number; + name: string; + published: Date; + updated?: Date; +} + export interface LoginForm { username_or_email: string; password: string; @@ -21,7 +28,6 @@ export interface RegisterForm { export interface CommunityForm { name: string; - updated?: number } export interface PostForm { diff --git a/ui/src/services/UserService.ts b/ui/src/services/UserService.ts index af0d1d15..d90fbde5 100644 --- a/ui/src/services/UserService.ts +++ b/ui/src/services/UserService.ts @@ -19,9 +19,9 @@ export class UserService { } public login(jwt: string) { + this.setUser(jwt); Cookies.set("jwt", jwt); console.log("jwt cookie set"); - this.setUser(jwt); } public logout() { @@ -42,7 +42,6 @@ export class UserService { private setUser(jwt: string) { this.user = jwt_decode(jwt); this.sub.next(this.user); - console.log(this.user.username); } public static get Instance(){