+++ /dev/null
-use activitypub_federation::config::Data as ContextData;
-use actix::{
- fut,
- Actor,
- ActorContext,
- ActorFutureExt,
- AsyncContext,
- ContextFutureSpawner,
- Handler,
- Running,
- StreamHandler,
- WrapFuture,
-};
-use actix_web::{web, Error, HttpRequest, HttpResponse};
-use actix_web_actors::ws;
-use lemmy_api::Perform;
-use lemmy_api_common::{
- comment::{
- CreateComment,
- CreateCommentLike,
- CreateCommentReport,
- DeleteComment,
- DistinguishComment,
- EditComment,
- GetComment,
- GetComments,
- ListCommentReports,
- RemoveComment,
- ResolveCommentReport,
- SaveComment,
- },
- community::{
- AddModToCommunity,
- BanFromCommunity,
- BlockCommunity,
- CreateCommunity,
- DeleteCommunity,
- EditCommunity,
- FollowCommunity,
- GetCommunity,
- ListCommunities,
- RemoveCommunity,
- TransferCommunity,
- },
- context::LemmyContext,
- custom_emoji::{CreateCustomEmoji, DeleteCustomEmoji, EditCustomEmoji},
- person::{
- AddAdmin,
- BanPerson,
- BlockPerson,
- ChangePassword,
- DeleteAccount,
- GetBannedPersons,
- GetCaptcha,
- GetPersonDetails,
- GetPersonMentions,
- GetReplies,
- GetReportCount,
- GetUnreadCount,
- Login,
- MarkAllAsRead,
- MarkCommentReplyAsRead,
- MarkPersonMentionAsRead,
- PasswordChangeAfterReset,
- PasswordReset,
- Register,
- SaveUserSettings,
- VerifyEmail,
- },
- post::{
- CreatePost,
- CreatePostLike,
- CreatePostReport,
- DeletePost,
- EditPost,
- FeaturePost,
- GetPost,
- GetPosts,
- GetSiteMetadata,
- ListPostReports,
- LockPost,
- MarkPostAsRead,
- RemovePost,
- ResolvePostReport,
- SavePost,
- },
- private_message::{
- CreatePrivateMessage,
- CreatePrivateMessageReport,
- DeletePrivateMessage,
- EditPrivateMessage,
- GetPrivateMessages,
- ListPrivateMessageReports,
- MarkPrivateMessageAsRead,
- ResolvePrivateMessageReport,
- },
- site::{
- ApproveRegistrationApplication,
- CreateSite,
- EditSite,
- GetFederatedInstances,
- GetModlog,
- GetSite,
- GetUnreadRegistrationApplicationCount,
- LeaveAdmin,
- ListRegistrationApplications,
- PurgeComment,
- PurgeCommunity,
- PurgePerson,
- PurgePost,
- ResolveObject,
- Search,
- },
- websocket::{
- handlers::{
- connect::{Connect, Disconnect},
- WsMessage,
- },
- serialize_websocket_message,
- structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
- UserOperation,
- UserOperationApub,
- UserOperationCrud,
- },
-};
-use lemmy_api_crud::PerformCrud;
-use lemmy_apub::{api::PerformApub, SendActivity};
-use lemmy_utils::{error::LemmyError, rate_limit::RateLimitCell, ConnectionId, IpAddr};
-use serde::Deserialize;
-use serde_json::Value;
-use std::{
- ops::Deref,
- result,
- str::FromStr,
- time::{Duration, Instant},
-};
-use tracing::{debug, error};
-
-/// How often heartbeat pings are sent
-const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
-
-/// How long before lack of client response causes a timeout
-const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
-
-pub struct WsChatSession {
- /// unique session id
- pub id: ConnectionId,
-
- pub ip: IpAddr,
-
- /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
- /// otherwise we drop connection.
- pub hb: Instant,
-
- /// The context data
- apub_data: ContextData<LemmyContext>,
-}
-
-pub async fn websocket(
- req: HttpRequest,
- body: web::Payload,
- rate_limiter: web::Data<RateLimitCell>,
- apub_data: ContextData<LemmyContext>,
-) -> Result<HttpResponse, Error> {
- let client_ip = IpAddr(
- req
- .connection_info()
- .realip_remote_addr()
- .unwrap_or("blank_ip")
- .to_string(),
- );
-
- let check = rate_limiter.message().check(client_ip.clone());
- if !check {
- debug!(
- "Websocket join with IP: {} has been rate limited.",
- &client_ip
- );
- return Ok(HttpResponse::TooManyRequests().finish());
- }
-
- ws::start(
- WsChatSession {
- id: 0,
- ip: client_ip,
- hb: Instant::now(),
- apub_data,
- },
- &req,
- body,
- )
-}
-
-/// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL).
-///
-/// also this method checks heartbeats from client
-fn hb(ctx: &mut ws::WebsocketContext<WsChatSession>) {
- ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
- // check client heartbeats
- if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
- // heartbeat timed out
-
- // notify chat server
- act
- .apub_data
- .chat_server()
- .do_send(Disconnect { id: act.id });
-
- // stop actor
- ctx.stop();
-
- // don't try to send a ping
- return;
- }
-
- ctx.ping(b"");
- });
-}
-
-impl Actor for WsChatSession {
- type Context = ws::WebsocketContext<Self>;
-
- /// Method is called on actor start.
- /// We register ws session with ChatServer
- fn started(&mut self, ctx: &mut Self::Context) {
- // we'll start heartbeat process on session start.
- hb(ctx);
-
- // register self in chat server. `AsyncContext::wait` register
- // future within context, but context waits until this future resolves
- // before processing any other events.
- // HttpContext::state() is instance of WsChatSessionState, state is shared
- // across all routes within application
- let addr = ctx.address();
- self
- .apub_data
- .chat_server()
- .send(Connect {
- addr: addr.recipient(),
- })
- .into_actor(self)
- .then(|res, act, ctx| {
- match res {
- Ok(res) => act.id = res,
- // something is wrong with chat server
- _ => ctx.stop(),
- }
- fut::ready(())
- })
- .wait(ctx);
- }
- fn stopping(&mut self, _: &mut Self::Context) -> Running {
- // notify chat server
- self
- .apub_data
- .chat_server()
- .do_send(Disconnect { id: self.id });
- Running::Stop
- }
-}
-
-/// Handle messages from chat server, we simply send it to peer websocket
-impl Handler<WsMessage> for WsChatSession {
- type Result = ();
-
- fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
- ctx.text(msg.0);
- }
-}
-
-/// WebSocket message handler
-impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
- fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
- let msg = match msg {
- Err(_) => {
- ctx.stop();
- return;
- }
- Ok(msg) => msg,
- };
-
- match msg {
- ws::Message::Ping(msg) => {
- self.hb = Instant::now();
- ctx.pong(&msg);
- }
- ws::Message::Pong(_) => {
- self.hb = Instant::now();
- }
- ws::Message::Text(text) => {
- let ip_clone = self.ip.clone();
- let id_clone = self.id.to_owned();
- let context_clone = self.apub_data.reset_request_count();
-
- let fut = Box::pin(async move {
- let msg = text.trim().to_string();
- parse_json_message(msg, ip_clone, id_clone, context_clone).await
- });
- fut
- .into_actor(self)
- .then(|res, _, ctx| {
- match res {
- Ok(res) => ctx.text(res),
- Err(e) => error!("{}", &e),
- }
- actix::fut::ready(())
- })
- .spawn(ctx);
- }
- ws::Message::Binary(_) => println!("Unexpected binary"),
- ws::Message::Close(reason) => {
- ctx.close(reason);
- ctx.stop();
- }
- ws::Message::Continuation(_) => {
- ctx.stop();
- }
- ws::Message::Nop => (),
- }
- }
-}
-
-/// Entry point for our websocket route
-async fn parse_json_message(
- msg: String,
- ip: IpAddr,
- connection_id: ConnectionId,
- context: ContextData<LemmyContext>,
-) -> Result<String, LemmyError> {
- let rate_limiter = context.settings_updated_channel();
- let json: Value = serde_json::from_str(&msg)?;
- let data = json
- .get("data")
- .cloned()
- .ok_or_else(|| LemmyError::from_message("missing data"))?;
-
- let missing_op_err = || LemmyError::from_message("missing op");
-
- let op = json
- .get("op")
- .ok_or_else(missing_op_err)?
- .as_str()
- .ok_or_else(missing_op_err)?;
-
- // check if api call passes the rate limit, and generate future for later execution
- if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
- let passed = match user_operation_crud {
- UserOperationCrud::Register => rate_limiter.register().check(ip),
- UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
- UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
- UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
- _ => rate_limiter.message().check(ip),
- };
- check_rate_limit_passed(passed)?;
- match_websocket_operation_crud(context, connection_id, user_operation_crud, data).await
- } else if let Ok(user_operation) = UserOperation::from_str(op) {
- let passed = match user_operation {
- UserOperation::GetCaptcha => rate_limiter.post().check(ip),
- _ => rate_limiter.message().check(ip),
- };
- check_rate_limit_passed(passed)?;
- match_websocket_operation(context, connection_id, user_operation, data).await
- } else {
- let user_operation = UserOperationApub::from_str(op)?;
- let passed = match user_operation {
- UserOperationApub::Search => rate_limiter.search().check(ip),
- _ => rate_limiter.message().check(ip),
- };
- check_rate_limit_passed(passed)?;
- match_websocket_operation_apub(context, connection_id, user_operation, data).await
- }
-}
-
-fn check_rate_limit_passed(passed: bool) -> Result<(), LemmyError> {
- if passed {
- Ok(())
- } else {
- // if rate limit was hit, respond with message
- Err(LemmyError::from_message("rate_limit_error"))
- }
-}
-
-pub async fn match_websocket_operation_crud(
- context: ContextData<LemmyContext>,
- id: ConnectionId,
- op: UserOperationCrud,
- data: Value,
-) -> result::Result<String, LemmyError> {
- match op {
- // User ops
- UserOperationCrud::Register => {
- do_websocket_operation_crud::<Register>(context, id, op, data).await
- }
- UserOperationCrud::DeleteAccount => {
- do_websocket_operation_crud::<DeleteAccount>(context, id, op, data).await
- }
-
- // Private Message ops
- UserOperationCrud::CreatePrivateMessage => {
- do_websocket_operation_crud::<CreatePrivateMessage>(context, id, op, data).await
- }
- UserOperationCrud::EditPrivateMessage => {
- do_websocket_operation_crud::<EditPrivateMessage>(context, id, op, data).await
- }
- UserOperationCrud::DeletePrivateMessage => {
- do_websocket_operation_crud::<DeletePrivateMessage>(context, id, op, data).await
- }
- UserOperationCrud::GetPrivateMessages => {
- do_websocket_operation_crud::<GetPrivateMessages>(context, id, op, data).await
- }
-
- // Site ops
- UserOperationCrud::CreateSite => {
- do_websocket_operation_crud::<CreateSite>(context, id, op, data).await
- }
- UserOperationCrud::EditSite => {
- do_websocket_operation_crud::<EditSite>(context, id, op, data).await
- }
- UserOperationCrud::GetSite => {
- do_websocket_operation_crud::<GetSite>(context, id, op, data).await
- }
-
- // Community ops
- UserOperationCrud::ListCommunities => {
- do_websocket_operation_crud::<ListCommunities>(context, id, op, data).await
- }
- UserOperationCrud::CreateCommunity => {
- do_websocket_operation_crud::<CreateCommunity>(context, id, op, data).await
- }
- UserOperationCrud::EditCommunity => {
- do_websocket_operation_crud::<EditCommunity>(context, id, op, data).await
- }
- UserOperationCrud::DeleteCommunity => {
- do_websocket_operation_crud::<DeleteCommunity>(context, id, op, data).await
- }
- UserOperationCrud::RemoveCommunity => {
- do_websocket_operation_crud::<RemoveCommunity>(context, id, op, data).await
- }
-
- // Post ops
- UserOperationCrud::CreatePost => {
- do_websocket_operation_crud::<CreatePost>(context, id, op, data).await
- }
- UserOperationCrud::GetPost => {
- do_websocket_operation_crud::<GetPost>(context, id, op, data).await
- }
- UserOperationCrud::EditPost => {
- do_websocket_operation_crud::<EditPost>(context, id, op, data).await
- }
- UserOperationCrud::DeletePost => {
- do_websocket_operation_crud::<DeletePost>(context, id, op, data).await
- }
- UserOperationCrud::RemovePost => {
- do_websocket_operation_crud::<RemovePost>(context, id, op, data).await
- }
-
- // Comment ops
- UserOperationCrud::CreateComment => {
- do_websocket_operation_crud::<CreateComment>(context, id, op, data).await
- }
- UserOperationCrud::EditComment => {
- do_websocket_operation_crud::<EditComment>(context, id, op, data).await
- }
- UserOperationCrud::DeleteComment => {
- do_websocket_operation_crud::<DeleteComment>(context, id, op, data).await
- }
- UserOperationCrud::RemoveComment => {
- do_websocket_operation_crud::<RemoveComment>(context, id, op, data).await
- }
- UserOperationCrud::GetComment => {
- do_websocket_operation_crud::<GetComment>(context, id, op, data).await
- }
- // Emojis
- UserOperationCrud::CreateCustomEmoji => {
- do_websocket_operation_crud::<CreateCustomEmoji>(context, id, op, data).await
- }
- UserOperationCrud::EditCustomEmoji => {
- do_websocket_operation_crud::<EditCustomEmoji>(context, id, op, data).await
- }
- UserOperationCrud::DeleteCustomEmoji => {
- do_websocket_operation_crud::<DeleteCustomEmoji>(context, id, op, data).await
- }
- }
-}
-
-async fn do_websocket_operation_crud<'a, 'b, Data>(
- context: ContextData<LemmyContext>,
- id: ConnectionId,
- op: UserOperationCrud,
- data: Value,
-) -> result::Result<String, LemmyError>
-where
- Data: PerformCrud + SendActivity<Response = <Data as PerformCrud>::Response> + Send,
- for<'de> Data: Deserialize<'de>,
-{
- let parsed_data: Data = serde_json::from_value(data)?;
- let res = parsed_data
- .perform(&web::Data::new(context.deref().clone()), Some(id))
- .await?;
- SendActivity::send_activity(&parsed_data, &res, &context).await?;
- serialize_websocket_message(&op, &res)
-}
-
-pub async fn match_websocket_operation_apub(
- context: ContextData<LemmyContext>,
- id: ConnectionId,
- op: UserOperationApub,
- data: Value,
-) -> result::Result<String, LemmyError> {
- match op {
- UserOperationApub::GetPersonDetails => {
- do_websocket_operation_apub::<GetPersonDetails>(context, id, op, data).await
- }
- UserOperationApub::GetCommunity => {
- do_websocket_operation_apub::<GetCommunity>(context, id, op, data).await
- }
- UserOperationApub::GetComments => {
- do_websocket_operation_apub::<GetComments>(context, id, op, data).await
- }
- UserOperationApub::GetPosts => {
- do_websocket_operation_apub::<GetPosts>(context, id, op, data).await
- }
- UserOperationApub::ResolveObject => {
- do_websocket_operation_apub::<ResolveObject>(context, id, op, data).await
- }
- UserOperationApub::Search => do_websocket_operation_apub::<Search>(context, id, op, data).await,
- }
-}
-
-async fn do_websocket_operation_apub<'a, 'b, Data>(
- context: ContextData<LemmyContext>,
- id: ConnectionId,
- op: UserOperationApub,
- data: Value,
-) -> result::Result<String, LemmyError>
-where
- Data: PerformApub + SendActivity<Response = <Data as PerformApub>::Response> + Send,
- for<'de> Data: Deserialize<'de>,
-{
- let parsed_data: Data = serde_json::from_value(data)?;
- let res = parsed_data.perform(&context, Some(id)).await?;
- SendActivity::send_activity(&parsed_data, &res, &context).await?;
- serialize_websocket_message(&op, &res)
-}
-
-pub async fn match_websocket_operation(
- context: ContextData<LemmyContext>,
- id: ConnectionId,
- op: UserOperation,
- data: Value,
-) -> result::Result<String, LemmyError> {
- match op {
- // User ops
- UserOperation::Login => do_websocket_operation::<Login>(context, id, op, data).await,
- UserOperation::GetCaptcha => do_websocket_operation::<GetCaptcha>(context, id, op, data).await,
- UserOperation::GetReplies => do_websocket_operation::<GetReplies>(context, id, op, data).await,
- UserOperation::AddAdmin => do_websocket_operation::<AddAdmin>(context, id, op, data).await,
- UserOperation::GetUnreadRegistrationApplicationCount => {
- do_websocket_operation::<GetUnreadRegistrationApplicationCount>(context, id, op, data).await
- }
- UserOperation::ListRegistrationApplications => {
- do_websocket_operation::<ListRegistrationApplications>(context, id, op, data).await
- }
- UserOperation::ApproveRegistrationApplication => {
- do_websocket_operation::<ApproveRegistrationApplication>(context, id, op, data).await
- }
- UserOperation::BanPerson => do_websocket_operation::<BanPerson>(context, id, op, data).await,
- UserOperation::GetBannedPersons => {
- do_websocket_operation::<GetBannedPersons>(context, id, op, data).await
- }
- UserOperation::BlockPerson => {
- do_websocket_operation::<BlockPerson>(context, id, op, data).await
- }
- UserOperation::GetPersonMentions => {
- do_websocket_operation::<GetPersonMentions>(context, id, op, data).await
- }
- UserOperation::MarkPersonMentionAsRead => {
- do_websocket_operation::<MarkPersonMentionAsRead>(context, id, op, data).await
- }
- UserOperation::MarkCommentReplyAsRead => {
- do_websocket_operation::<MarkCommentReplyAsRead>(context, id, op, data).await
- }
- UserOperation::MarkAllAsRead => {
- do_websocket_operation::<MarkAllAsRead>(context, id, op, data).await
- }
- UserOperation::PasswordReset => {
- do_websocket_operation::<PasswordReset>(context, id, op, data).await
- }
- UserOperation::PasswordChange => {
- do_websocket_operation::<PasswordChangeAfterReset>(context, id, op, data).await
- }
- UserOperation::UserJoin => do_websocket_operation::<UserJoin>(context, id, op, data).await,
- UserOperation::PostJoin => do_websocket_operation::<PostJoin>(context, id, op, data).await,
- UserOperation::CommunityJoin => {
- do_websocket_operation::<CommunityJoin>(context, id, op, data).await
- }
- UserOperation::ModJoin => do_websocket_operation::<ModJoin>(context, id, op, data).await,
- UserOperation::SaveUserSettings => {
- do_websocket_operation::<SaveUserSettings>(context, id, op, data).await
- }
- UserOperation::ChangePassword => {
- do_websocket_operation::<ChangePassword>(context, id, op, data).await
- }
- UserOperation::GetReportCount => {
- do_websocket_operation::<GetReportCount>(context, id, op, data).await
- }
- UserOperation::GetUnreadCount => {
- do_websocket_operation::<GetUnreadCount>(context, id, op, data).await
- }
- UserOperation::VerifyEmail => {
- do_websocket_operation::<VerifyEmail>(context, id, op, data).await
- }
-
- // Private Message ops
- UserOperation::MarkPrivateMessageAsRead => {
- do_websocket_operation::<MarkPrivateMessageAsRead>(context, id, op, data).await
- }
- UserOperation::CreatePrivateMessageReport => {
- do_websocket_operation::<CreatePrivateMessageReport>(context, id, op, data).await
- }
- UserOperation::ResolvePrivateMessageReport => {
- do_websocket_operation::<ResolvePrivateMessageReport>(context, id, op, data).await
- }
- UserOperation::ListPrivateMessageReports => {
- do_websocket_operation::<ListPrivateMessageReports>(context, id, op, data).await
- }
-
- // Site ops
- UserOperation::GetModlog => do_websocket_operation::<GetModlog>(context, id, op, data).await,
- UserOperation::PurgePerson => {
- do_websocket_operation::<PurgePerson>(context, id, op, data).await
- }
- UserOperation::PurgeCommunity => {
- do_websocket_operation::<PurgeCommunity>(context, id, op, data).await
- }
- UserOperation::PurgePost => do_websocket_operation::<PurgePost>(context, id, op, data).await,
- UserOperation::PurgeComment => {
- do_websocket_operation::<PurgeComment>(context, id, op, data).await
- }
- UserOperation::TransferCommunity => {
- do_websocket_operation::<TransferCommunity>(context, id, op, data).await
- }
- UserOperation::LeaveAdmin => do_websocket_operation::<LeaveAdmin>(context, id, op, data).await,
- UserOperation::GetFederatedInstances => {
- do_websocket_operation::<GetFederatedInstances>(context, id, op, data).await
- }
-
- // Community ops
- UserOperation::FollowCommunity => {
- do_websocket_operation::<FollowCommunity>(context, id, op, data).await
- }
- UserOperation::BlockCommunity => {
- do_websocket_operation::<BlockCommunity>(context, id, op, data).await
- }
- UserOperation::BanFromCommunity => {
- do_websocket_operation::<BanFromCommunity>(context, id, op, data).await
- }
- UserOperation::AddModToCommunity => {
- do_websocket_operation::<AddModToCommunity>(context, id, op, data).await
- }
-
- // Post ops
- UserOperation::LockPost => do_websocket_operation::<LockPost>(context, id, op, data).await,
- UserOperation::FeaturePost => {
- do_websocket_operation::<FeaturePost>(context, id, op, data).await
- }
- UserOperation::CreatePostLike => {
- do_websocket_operation::<CreatePostLike>(context, id, op, data).await
- }
- UserOperation::MarkPostAsRead => {
- do_websocket_operation::<MarkPostAsRead>(context, id, op, data).await
- }
- UserOperation::SavePost => do_websocket_operation::<SavePost>(context, id, op, data).await,
- UserOperation::CreatePostReport => {
- do_websocket_operation::<CreatePostReport>(context, id, op, data).await
- }
- UserOperation::ListPostReports => {
- do_websocket_operation::<ListPostReports>(context, id, op, data).await
- }
- UserOperation::ResolvePostReport => {
- do_websocket_operation::<ResolvePostReport>(context, id, op, data).await
- }
- UserOperation::GetSiteMetadata => {
- do_websocket_operation::<GetSiteMetadata>(context, id, op, data).await
- }
-
- // Comment ops
- UserOperation::SaveComment => {
- do_websocket_operation::<SaveComment>(context, id, op, data).await
- }
- UserOperation::CreateCommentLike => {
- do_websocket_operation::<CreateCommentLike>(context, id, op, data).await
- }
- UserOperation::DistinguishComment => {
- do_websocket_operation::<DistinguishComment>(context, id, op, data).await
- }
- UserOperation::CreateCommentReport => {
- do_websocket_operation::<CreateCommentReport>(context, id, op, data).await
- }
- UserOperation::ListCommentReports => {
- do_websocket_operation::<ListCommentReports>(context, id, op, data).await
- }
- UserOperation::ResolveCommentReport => {
- do_websocket_operation::<ResolveCommentReport>(context, id, op, data).await
- }
- }
-}
-
-async fn do_websocket_operation<'a, 'b, Data>(
- context: ContextData<LemmyContext>,
- id: ConnectionId,
- op: UserOperation,
- data: Value,
-) -> result::Result<String, LemmyError>
-where
- Data: Perform + SendActivity<Response = <Data as Perform>::Response> + Send,
- for<'de> Data: Deserialize<'de>,
-{
- let parsed_data: Data = serde_json::from_value(data)?;
- let res = parsed_data
- .perform(&web::Data::new(context.deref().clone()), Some(id))
- .await?;
- SendActivity::send_activity(&parsed_data, &res, &context).await?;
- serialize_websocket_message(&op, &res)
-}