diff --git a/examples/login.rs b/examples/login.rs index c146d3d57..a076cc846 100644 --- a/examples/login.rs +++ b/examples/login.rs @@ -1,3 +1,5 @@ +#![feature(async_closure)] + use std::{env, process::exit}; use matrix_nio::{ @@ -5,6 +7,7 @@ use matrix_nio::{ events::{ collections::all::RoomEvent, room::message::{MessageEvent, MessageEventContent, TextMessageEventContent}, + EventType, }, AsyncClient, AsyncClientConfig, SyncSettings, }; @@ -19,13 +22,19 @@ async fn login( .disable_ssl_verification(); let mut client = AsyncClient::new_with_config(&homeserver_url, None, client_config).unwrap(); - client.login(username, password, None).await?; - let response = client.sync(SyncSettings::new()).await?; + let callback = |event| { + if let RoomEvent::RoomMessage(MessageEvent { + content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), + sender, + .. + }) = event + { + println!("{}: {}", sender, msg_body); + } + }; - for (room_id, room) in response.rooms.join { - println!("Room {}", room_id); - - for event in room.timeline.events { + client.add_event_future(EventType::RoomMessage, |event| { + Box::pin(async { if let RoomEvent::RoomMessage(MessageEvent { content: MessageEventContent::Text(TextMessageEventContent { body: msg_body, .. }), sender, @@ -34,8 +43,11 @@ async fn login( { println!("{}: {}", sender, msg_body); } - } - } + }) + }); + + client.login(username, password, None).await?; + let response = client.sync(SyncSettings::new()).await?; Ok(()) } diff --git a/src/async_client.rs b/src/async_client.rs new file mode 100644 index 000000000..a15a8fa59 --- /dev/null +++ b/src/async_client.rs @@ -0,0 +1,295 @@ +use std::collections::HashMap; +use std::convert::{TryFrom, TryInto}; +use std::future::Future; +use std::pin::Pin; + +use http::Method as HttpMethod; +use http::Response as HttpResponse; +use js_int::UInt; +use reqwest; +use url::Url; + +use ruma_api::Endpoint; +use ruma_events::collections::all::RoomEvent; +use ruma_events::Event; +pub use ruma_events::EventType; + +use crate::api; +use crate::base_client::Client as BaseClient; +use crate::error::{Error, InnerError}; +use crate::session::Session; + +pub struct AsyncClient { + /// The URL of the homeserver to connect to. + homeserver: Url, + /// The underlying HTTP client. + http_client: reqwest::Client, + /// User session data. + base_client: BaseClient, + /// Event callbacks + event_callbacks: HashMap>, + event_futures: + HashMap Pin>>>>, +} + +#[derive(Default, Debug)] +pub struct AsyncClientConfig { + proxy: Option, + use_sys_proxy: bool, + user_agent: Option, + disable_ssl_verification: bool, +} + +impl AsyncClientConfig { + pub fn new() -> Self { + Default::default() + } + + pub fn proxy(mut self, proxy: &str) -> Result { + if self.use_sys_proxy { + return Err(Error(InnerError::ConfigurationError( + "Using the system proxy has been previously configured.".to_string(), + ))); + } + self.proxy = Some(reqwest::Proxy::all(proxy)?); + Ok(self) + } + + pub fn use_sys_proxy(mut self) -> Result { + if self.proxy.is_some() { + return Err(Error(InnerError::ConfigurationError( + "A proxy has already been configured.".to_string(), + ))); + } + self.use_sys_proxy = true; + Ok(self) + } + + pub fn disable_ssl_verification(mut self) -> Self { + self.disable_ssl_verification = true; + self + } +} + +#[derive(Debug, Default)] +pub struct SyncSettings { + pub(crate) timeout: Option, + pub(crate) token: Option, + pub(crate) full_state: Option, +} + +impl SyncSettings { + pub fn new() -> Self { + Default::default() + } + + pub fn token>(mut self, token: S) -> Self { + self.token = Some(token.into()); + self + } + + pub fn timeout>(mut self, timeout: T) -> Result + where + js_int::TryFromIntError: + std::convert::From<>::Error>, + { + self.timeout = Some(timeout.try_into()?); + Ok(self) + } + + pub fn full_state(mut self, full_state: bool) -> Self { + self.full_state = Some(full_state); + self + } +} + +use api::r0::session::login; +use api::r0::sync::sync_events; + +impl AsyncClient { + /// Creates a new client for making HTTP requests to the given homeserver. + pub fn new(homeserver_url: &str, session: Option) -> Result { + let homeserver = Url::parse(homeserver_url)?; + let http_client = reqwest::Client::new(); + + Ok(Self { + homeserver, + http_client, + base_client: BaseClient::new(session), + event_callbacks: HashMap::new(), + event_futures: HashMap::new(), + }) + } + + pub fn new_with_config( + homeserver_url: &str, + session: Option, + config: AsyncClientConfig, + ) -> Result { + let homeserver = Url::parse(homeserver_url)?; + let http_client = reqwest::Client::builder(); + + let http_client = if config.disable_ssl_verification { + http_client.danger_accept_invalid_certs(true) + } else { + http_client + }; + + let http_client = match config.proxy { + Some(p) => http_client.proxy(p), + None => http_client, + }; + + let http_client = if config.use_sys_proxy { + http_client.use_sys_proxy() + } else { + http_client + }; + + let mut headers = reqwest::header::HeaderMap::new(); + + headers.insert( + reqwest::header::USER_AGENT, + reqwest::header::HeaderValue::from_static("ruma"), + ); + + let http_client = http_client.default_headers(headers).build().unwrap(); + + Ok(Self { + homeserver, + http_client, + base_client: BaseClient::new(session), + event_callbacks: HashMap::new(), + event_futures: HashMap::new(), + }) + } + + pub fn add_event_callback( + &mut self, + event_type: EventType, + callback: impl FnMut(RoomEvent) + 'static, + ) { + self.event_callbacks.insert(event_type, Box::new(callback)); + } + + pub fn add_event_future( + &mut self, + event_type: EventType, + callback: impl FnMut(RoomEvent) -> Pin>> + 'static, + ) { + self.event_futures.insert(event_type, Box::new(callback)); + } + + pub async fn login>( + &mut self, + user: S, + password: S, + device_id: Option, + ) -> Result { + let request = login::Request { + address: None, + login_type: login::LoginType::Password, + medium: None, + device_id: device_id.map(|d| d.into()), + password: password.into(), + user: user.into(), + }; + + let response = self.send(request).await.unwrap(); + self.base_client.receive_login_response(&response); + + Ok(response) + } + + pub async fn sync( + &mut self, + sync_settings: SyncSettings, + ) -> Result { + let request = sync_events::Request { + filter: None, + since: sync_settings.token, + full_state: sync_settings.full_state, + set_presence: None, + timeout: sync_settings.timeout, + }; + + let response = self.send(request).await.unwrap(); + + for (_, room) in &response.rooms.join { + for event in &room.timeline.events { + let event_type = match &event { + RoomEvent::CallAnswer(e) => e.event_type(), + RoomEvent::CallCandidates(e) => e.event_type(), + RoomEvent::CallHangup(e) => e.event_type(), + RoomEvent::CallInvite(e) => e.event_type(), + RoomEvent::RoomAliases(e) => e.event_type(), + RoomEvent::RoomAvatar(e) => e.event_type(), + RoomEvent::RoomCanonicalAlias(e) => e.event_type(), + RoomEvent::RoomCreate(e) => e.event_type(), + RoomEvent::RoomGuestAccess(e) => e.event_type(), + RoomEvent::RoomHistoryVisibility(e) => e.event_type(), + RoomEvent::RoomJoinRules(e) => e.event_type(), + RoomEvent::RoomMember(e) => e.event_type(), + RoomEvent::RoomMessage(e) => e.event_type(), + RoomEvent::RoomName(e) => e.event_type(), + RoomEvent::RoomPinnedEvents(e) => e.event_type(), + RoomEvent::RoomPowerLevels(e) => e.event_type(), + RoomEvent::RoomRedaction(e) => e.event_type(), + RoomEvent::RoomThirdPartyInvite(e) => e.event_type(), + RoomEvent::RoomTopic(e) => e.event_type(), + RoomEvent::CustomRoom(e) => e.event_type(), + RoomEvent::CustomState(e) => e.event_type(), + }; + + if self.event_callbacks.contains_key(&event_type) { + let cb = self.event_callbacks.get_mut(&event_type).unwrap(); + cb(event.clone()); + } + + if self.event_futures.contains_key(&event_type) { + let cb = self.event_futures.get_mut(&event_type).unwrap(); + let future = Pin::from(cb(event.clone())); + future.await; + } + } + } + + Ok(response) + } + + async fn send(&self, request: Request) -> Result { + let request: http::Request> = request.try_into()?; + let url = request.uri(); + let url = self.homeserver.join(url.path()).unwrap(); + + let request_builder = match Request::METADATA.method { + HttpMethod::GET => self.http_client.get(url), + HttpMethod::POST => { + let body = request.body().clone(); + self.http_client.post(url).body(body) + } + HttpMethod::PUT => unimplemented!(), + HttpMethod::DELETE => unimplemented!(), + _ => panic!("Unsuported method"), + }; + + let request_builder = if Request::METADATA.requires_authentication { + if let Some(ref session) = self.base_client.session { + request_builder.bearer_auth(&session.access_token) + } else { + return Err(Error(InnerError::AuthenticationRequired)); + } + } else { + request_builder + }; + + let response = request_builder.send().await?; + + let status = response.status(); + let body = response.bytes().await?.as_ref().to_owned(); + let response = HttpResponse::builder().status(status).body(body).unwrap(); + let response = Request::Response::try_from(response)?; + + Ok(response) + } +} diff --git a/src/base_client.rs b/src/base_client.rs new file mode 100644 index 000000000..cd9ffc26a --- /dev/null +++ b/src/base_client.rs @@ -0,0 +1,197 @@ +use std::collections::HashMap; + +use crate::api::r0 as api; +use crate::events::collections::all::RoomEvent; +use crate::events::room::member::{MemberEvent, MembershipState}; +use crate::session::Session; + +pub type Token = String; +pub type RoomId = String; +pub type UserId = String; + +#[derive(Debug)] +/// A Matrix room member. +pub struct RoomMember { + /// The unique mxid of the user. + user_id: UserId, + /// The human readable name of the user. + display_name: Option, + /// The matrix url of the users avatar. + avatar_url: Option, + /// The users power level. + power_level: u8, +} + +#[derive(Debug)] +/// A Matrix rooom. +pub struct Room { + /// The unique id of the room. + room_id: RoomId, + /// The mxid of our own user. + own_user_id: UserId, + /// The mxid of the room creator. + creator: Option, + /// The map of room members. + members: HashMap, + /// A list of users that are currently typing. + typing_users: Vec, + /// A flag indicating if the room is encrypted. + encrypted: bool, +} + +impl Room { + /// Create a new room. + /// # Arguments + /// + /// * `room_id` - The unique id of the room. + /// * `own_user_id` - The mxid of our own user. + pub fn new(room_id: &str, own_user_id: &UserId) -> Self { + Room { + room_id: room_id.to_string(), + own_user_id: own_user_id.clone(), + creator: None, + members: HashMap::new(), + typing_users: Vec::new(), + encrypted: false, + } + } + + fn add_member(&mut self, event: &MemberEvent) -> bool { + if self.members.contains_key(&event.state_key) { + return false; + } + + let member = RoomMember { + user_id: event.state_key.clone(), + display_name: event.content.displayname.clone(), + avatar_url: event.content.avatar_url.clone(), + power_level: 0, + }; + + self.members.insert(event.state_key.clone(), member); + + true + } + + fn update_joined_member(&mut self, event: &MemberEvent) -> bool { + if let Some(member) = self.members.get_mut(&event.state_key) { + member.display_name = event.content.displayname.clone(); + member.avatar_url = event.content.avatar_url.clone(); + } + + false + } + + fn handle_join(&mut self, event: &MemberEvent) -> bool { + match &event.prev_content { + Some(c) => match c.membership { + MembershipState::Join => self.update_joined_member(event), + MembershipState::Invite => self.add_member(event), + MembershipState::Leave => self.add_member(event), + _ => false, + }, + None => self.add_member(event), + } + } + + fn handle_leave(&mut self, event: &MemberEvent) -> bool { + false + } + + fn handle_membership(&mut self, event: &MemberEvent) -> bool { + match event.content.membership { + MembershipState::Join => self.handle_join(event), + MembershipState::Leave => self.handle_leave(event), + MembershipState::Ban => self.handle_leave(event), + MembershipState::Invite => false, + MembershipState::Knock => false, + } + } + + /// Receive an event for this room and update the room state. + /// # Arguments + /// + /// `event` - The event of the room. + /// + /// Returns true if the joined member list changed, false otherwise. + pub fn receive_event(&mut self, event: &RoomEvent) -> bool { + match event { + RoomEvent::RoomMember(m) => self.handle_membership(m), + _ => false, + } + } +} + +#[derive(Debug)] +/// A no IO Client implementation. +/// +/// This Client is a state machine that receives responses and events and +/// accordingly updates it's state. +pub struct Client { + /// The current client session containing our user id, device id and access + /// token. + pub session: Option, + /// The current sync token that should be used for the next sync call. + pub sync_token: Option, + /// A map of the rooms our user is joined in. + pub joined_rooms: HashMap, +} + +impl Client { + /// Create a new client. + /// # Arguments + /// + /// `session` - An optional session if the user already has one from a + /// previous login call. + pub fn new(session: Option) -> Self { + Client { + session, + sync_token: None, + joined_rooms: HashMap::new(), + } + } + + /// Is the client logged in. + pub fn logged_in(&self) -> bool { + self.session.is_some() + } + + /// Receive a login response and update the session of the client. + /// # Arguments + /// + /// `response` - A successful login response that contains our access token + /// and device id. + pub fn receive_login_response(&mut self, response: &api::session::login::Response) { + let session = Session { + access_token: response.access_token.clone(), + device_id: response.device_id.clone(), + user_id: response.user_id.clone(), + }; + self.session = Some(session); + } + + /// Receive a room event for a joined room and update the client state. + /// # Arguments + /// + /// `room_id` - The unique id of the room the event belongs to. + /// `event` - The event that should be handled by the client. + /// + /// Returns true if the membership list of the room changed, false + /// otherwise. + pub fn receive_joined_room_event(&mut self, room_id: &RoomId, event: &RoomEvent) -> bool { + let room = self + .joined_rooms + .entry(room_id.to_string()) + .or_insert(Room::new( + room_id, + &self + .session + .as_ref() + .expect("Receiving events while not being logged in") + .user_id + .to_string(), + )); + + room.receive_event(event) + } +} diff --git a/src/lib.rs b/src/lib.rs index ef6dafae4..2febee9ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,233 +1,16 @@ -//! Crate `ruma_client` is a [Matrix](https://matrix.org/) client library. +//! Crate `nio-client` is a [Matrix](https://matrix.org/) client library. //! -use std::convert::{TryFrom, TryInto}; - -use http::Method as HttpMethod; -use http::Response as HttpResponse; -use js_int::UInt; -use reqwest; -use ruma_api::Endpoint; -use url::Url; - -use crate::error::InnerError; +#![warn(missing_docs)] pub use crate::{error::Error, session::Session}; pub use ruma_client_api as api; pub use ruma_events as events; //pub mod api; +mod async_client; +mod base_client; mod error; mod session; -#[derive(Debug)] -pub struct AsyncClient { - /// The URL of the homeserver to connect to. - homeserver: Url, - /// The underlying HTTP client. - client: reqwest::Client, - /// User session data. - session: Option, -} - -#[derive(Default, Debug)] -pub struct AsyncClientConfig { - proxy: Option, - use_sys_proxy: bool, - disable_ssl_verification: bool, -} - -impl AsyncClientConfig { - pub fn new() -> Self { - Default::default() - } - - pub fn proxy(mut self, proxy: &str) -> Result { - if self.use_sys_proxy { - return Err(Error(InnerError::ConfigurationError( - "Using the system proxy has been previously configured.".to_string(), - ))); - } - self.proxy = Some(reqwest::Proxy::all(proxy)?); - Ok(self) - } - - pub fn use_sys_proxy(mut self) -> Result { - if self.proxy.is_some() { - return Err(Error(InnerError::ConfigurationError( - "A proxy has already been configured.".to_string(), - ))); - } - self.use_sys_proxy = true; - Ok(self) - } - - pub fn disable_ssl_verification(mut self) -> Self { - self.disable_ssl_verification = true; - self - } -} - -#[derive(Debug, Default)] -pub struct SyncSettings { - pub(crate) timeout: Option, - pub(crate) token: Option, - pub(crate) full_state: Option, -} - -impl SyncSettings { - pub fn new() -> Self { - Default::default() - } - - pub fn token>(mut self, token: S) -> Self { - self.token = Some(token.into()); - self - } - - pub fn timeout>(mut self, timeout: T) -> Result - where - js_int::TryFromIntError: - std::convert::From<>::Error>, - { - self.timeout = Some(timeout.try_into()?); - Ok(self) - } - - pub fn full_state(mut self, full_state: bool) -> Self { - self.full_state = Some(full_state); - self - } -} - -use api::r0::session::login; -use api::r0::sync::sync_events; - -impl AsyncClient { - /// Creates a new client for making HTTP requests to the given homeserver. - pub fn new(homeserver_url: &str, session: Option) -> Result { - let homeserver = Url::parse(homeserver_url)?; - let client = reqwest::Client::new(); - - Ok(Self { - homeserver, - client, - session, - }) - } - - pub fn new_with_config( - homeserver_url: &str, - session: Option, - config: AsyncClientConfig, - ) -> Result { - let homeserver = Url::parse(homeserver_url)?; - let client = reqwest::Client::builder(); - - let client = if config.disable_ssl_verification { - client.danger_accept_invalid_certs(true) - } else { - client - }; - - let client = match config.proxy { - Some(p) => client.proxy(p), - None => client, - }; - - let client = if config.use_sys_proxy { - client.use_sys_proxy() - } else { - client - }; - - let mut headers = reqwest::header::HeaderMap::new(); - - headers.insert(reqwest::header::USER_AGENT, reqwest::header::HeaderValue::from_static("ruma")); - - let client = client.default_headers(headers).build().unwrap(); - - Ok(Self { - homeserver, - client, - session, - }) - } - - pub async fn login>( - &mut self, - user: S, - password: S, - device_id: Option, - ) -> Result { - let request = login::Request { - address: None, - login_type: login::LoginType::Password, - medium: None, - device_id: device_id.map(|d| d.into()), - password: password.into(), - user: user.into(), - }; - - let response = self.send(request).await.unwrap(); - - let session = Session { - access_token: response.access_token.clone(), - device_id: response.device_id.clone(), - user_id: response.user_id.clone(), - }; - - self.session = Some(session.clone()); - - Ok(response) - } - - pub async fn sync(&self, sync_settings: SyncSettings) -> Result { - let request = sync_events::Request { - filter: None, - since: sync_settings.token, - full_state: sync_settings.full_state, - set_presence: None, - timeout: sync_settings.timeout, - }; - - let response = self.send(request).await.unwrap(); - - Ok(response) - } - - async fn send(&self, request: Request) -> Result { - let request: http::Request> = request.try_into()?; - let url = request.uri(); - let url = self.homeserver.join(url.path()).unwrap(); - - let request_builder = match Request::METADATA.method { - HttpMethod::GET => self.client.get(url), - HttpMethod::POST => { - let body = request.body().clone(); - self.client.post(url).body(body) - } - HttpMethod::PUT => unimplemented!(), - HttpMethod::DELETE => unimplemented!(), - _ => panic!("Unsuported method"), - }; - - let request_builder = if Request::METADATA.requires_authentication { - if let Some(ref session) = self.session { - request_builder.bearer_auth(&session.access_token) - } else { - return Err(Error(InnerError::AuthenticationRequired)); - } - } else { - request_builder - }; - - let response = request_builder.send().await?; - - let status = response.status(); - let body = response.bytes().await?.as_ref().to_owned(); - let response = HttpResponse::builder().status(status).body(body).unwrap(); - let response = Request::Response::try_from(response)?; - - Ok(response) - } -} +pub use async_client::{AsyncClient, AsyncClientConfig, SyncSettings}; +pub use base_client::Client; diff --git a/src/session.rs b/src/session.rs index 331baeb7c..510ae7637 100644 --- a/src/session.rs +++ b/src/session.rs @@ -12,33 +12,3 @@ pub struct Session { /// The ID of the client device pub device_id: String, } - -impl Session { - /// Create a new user session from an access token and a user ID. - #[deprecated] - pub fn new(access_token: String, user_id: UserId, device_id: String) -> Self { - Self { - access_token, - user_id, - device_id, - } - } - - /// Get the access token associated with this session. - #[deprecated] - pub fn access_token(&self) -> &str { - &self.access_token - } - - /// Get the ID of the user the session belongs to. - #[deprecated] - pub fn user_id(&self) -> &UserId { - &self.user_id - } - - /// Get ID of the device the session belongs to. - #[deprecated] - pub fn device_id(&self) -> &str { - &self.device_id - } -}