diff --git a/src/main.rs b/src/main.rs index e5b03c8..05be978 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,11 +4,12 @@ mod telepathy; use anyhow::{anyhow, Result}; use dbus::{ blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection}, + channel::Sender, tree::Factory, }; use padfoot::{ - Connection, ConnectionManager, Protocol, CM_BUS_NAME, CM_CONN_BUS_NAME, CM_OBJECT_PATH, - CONN_BUS_NAME, PROTO_BUS_NAME, PROTO_OBJECT_PATH, + ConnectionManager, Protocol, CM_BUS_NAME, CM_CONN_BUS_NAME, CM_OBJECT_PATH, CONN_BUS_NAME, + PROTO_BUS_NAME, PROTO_OBJECT_PATH, }; use std::sync::Arc; use std::time::Duration; @@ -30,7 +31,9 @@ impl dbus::tree::DataType for TData { } fn run() -> Result<()> { - let cm = ConnectionManager::new(); + let (msg_s, msg_r) = std::sync::mpsc::channel::(); + let cm = ConnectionManager::new(Some(msg_s)); + let proto = Protocol {}; let data = Arc::new(TData { cm: cm, p: proto }); @@ -79,7 +82,16 @@ fn run() -> Result<()> { } loop { - c.process(Duration::from_secs(1))?; + c.process(Duration::from_millis(100))?; + + // Spend a bit of time sending any outgoing messages - signals, mostly + for msg in msg_r.try_iter().take(10) { + print!("Sending message..."); + match c.send(msg) { + Err(e) => println!("error!"), + _ => println!("OK!"), + } + } } } diff --git a/src/padfoot/connection.rs b/src/padfoot/connection.rs index ec01aa2..261a68d 100644 --- a/src/padfoot/connection.rs +++ b/src/padfoot/connection.rs @@ -1,29 +1,140 @@ -use rand::Rng; - +use crate::telepathy; +use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection}; +use dbus::tree; use std::collections::HashMap; +use std::time::Duration; pub const CONN_BUS_NAME: &'static str = "org.freedesktop.Telepathy.Connection.padfoot.delta"; -pub const CONN_OBJECT_PATH: &'static str = "org/freedesktop/Telepathy/Connection/padfoot/delta"; +pub const CONN_OBJECT_PATH: &'static str = "/org/freedesktop/Telepathy/Connection/padfoot/delta"; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Connection { - path: String, + id: String, account: String, password: String, } +fn escape_one(b: u8) -> String { + format!("_{:0<2x}", b) +} + +// Some non-empty sequence of ASCII letters, digits and underscores +fn escape(s: String) -> String { + // Special-case the empty string + if s.len() == 0 { + return "_".to_string(); + } + + let bytes = s.into_bytes(); + let mut iter = bytes.iter(); + let mut out = String::new(); + + // Only alphanumeric in the first byte + let x = *iter.next().expect("Already checked len > 0"); + let first = match x { + b'a'..=b'z' | b'A'..=b'Z' => unsafe { String::from_utf8_unchecked(vec![x]) }, + _ => escape_one(x), + }; + + out.push_str(&first); + + for x in iter { + let next = match x { + b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' => unsafe { + String::from_utf8_unchecked(vec![*x]) + }, + _ => escape_one(*x), + }; + + out.push_str(&next); + } + + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_escape() { + assert_eq!(escape("".to_string()), "_"); + assert_eq!(escape("foo".to_string()), "foo"); + assert_eq!(escape("foo@bar".to_string()), "foo_40bar"); + assert_eq!(escape("foo_bar".to_string()), "foo_5fbar"); + assert_eq!(escape("foo__@__bar".to_string()), "foo_5f_5f_40_5f_5fbar"); + assert_eq!(escape("1foo".to_string()), "_31foo"); + } +} + +#[derive(Debug, Default)] +struct CData; + +impl dbus::tree::DataType for CData { + type Interface = Connection; + type Tree = (); + type Property = (); + type ObjectPath = (); + + type Method = (); + type Signal = (); +} + impl Connection { + // This is run inside its own thread + // + // FIXME: running several +process+ loops sure is convenient, but it also + // seems inefficient... + pub fn run(self) { + let bus = self.bus(); + let path = self.path(); + let f = tree::Factory::new_fn::(); + let iface = telepathy::connection_server(&f, self, |m| m.iface.get_data()); + + let mut tree = f.tree(()); + tree = tree.add(f.object_path(path, ()).introspectable().add(iface)); + + tree = tree.add(f.object_path("/", ()).introspectable()); + + // Setup DBus connection + let mut c = match LocalConnection::new_session() { + Ok(c) => c, + Err(e) => { + println!("Failed to establish DBUS session for {}: {}", bus, e); + return; // Leave early + } + }; + + tree.start_receive(&c); + + match c.request_name(bus.clone(), false, false, true) { + Ok(RequestNameReply::Exists) => { + println!("Another process is already registered on {}", bus); + return; + } + Err(e) => { + println!("Failed to register {}: {}", bus, e); + return; + } + _ => println!("{} listening on {}", c.unique_name(), bus), // All other responses we can get are a success + }; + + loop { + match c.process(Duration::from_millis(100)) { + Err(e) => { + println!("Error processing: {}", e); + return; + } + _ => {} + } + + // TODO: notice when the conn wants to exit + } + } + pub fn new(params: HashMap<&str, super::Variant>) -> Result { let err = Err(dbus::tree::MethodErr::no_arg()); - // Generate a unique identifier for this connection - let id = rand::thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .take(16) - .collect::(); - - let path = super::CONN_OBJECT_PATH.to_owned() + "/" + &id; - let acct = match params.get("account") { Some(variant) => match variant.0.as_str() { Some(str) => str.to_string(), @@ -32,6 +143,8 @@ impl Connection { None => return err, }; + let id = escape(acct.to_owned()); + let password = match params.get("password") { Some(variant) => match variant.0.as_str() { Some(str) => str.to_string(), @@ -41,13 +154,111 @@ impl Connection { }; Ok(Connection { - path: path, + id: id, account: acct, password: password, }) } + pub fn bus(&self) -> String { + CONN_BUS_NAME.to_owned() + "." + &self.id + } + pub fn path(&self) -> String { - return self.path.to_owned(); + CONN_OBJECT_PATH.to_owned() + "/" + &self.id + } +} + +impl telepathy::Connection for Connection { + fn connect(&self) -> Result<(), tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn disconnect(&self) -> Result<(), tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn get_interfaces(&self) -> Result, tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn get_protocol(&self) -> Result { + Ok(super::PROTO_NAME.to_string()) + } + + fn get_self_handle(&self) -> Result { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn get_status(&self) -> Result { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn hold_handles(&self, handle_type: u32, handles: Vec) -> Result<(), tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn inspect_handles( + &self, + handle_type: u32, + handles: Vec, + ) -> Result, tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn list_channels( + &self, + ) -> Result, String, u32, u32)>, tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn release_handles(&self, handle_type: u32, handles: Vec) -> Result<(), tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn request_channel( + &self, + type_: &str, + handle_type: u32, + handle: u32, + suppress_handler: bool, + ) -> Result, tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn request_handles( + &self, + handle_type: u32, + identifiers: Vec<&str>, + ) -> Result, tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn add_client_interest(&self, tokens: Vec<&str>) -> Result<(), tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn remove_client_interest(&self, tokens: Vec<&str>) -> Result<(), tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn interfaces(&self) -> Result, tree::MethodErr> { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn self_handle(&self) -> Result { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn self_id(&self) -> Result { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn status(&self) -> Result { + Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + } + + fn has_immortal_handles(&self) -> Result { + Ok(true) } } diff --git a/src/padfoot/connection_manager.rs b/src/padfoot/connection_manager.rs index e698efc..e0edf65 100644 --- a/src/padfoot/connection_manager.rs +++ b/src/padfoot/connection_manager.rs @@ -1,8 +1,8 @@ use crate::telepathy; -use dbus::{arg, tree}; -use std::collections::HashMap; -use std::sync::Mutex; +use dbus::{arg, message::SignalArgs, tree}; +use std::collections::{HashMap, HashSet}; +use std::sync::mpsc; pub const CM_BUS_NAME: &'static str = "org.freedesktop.Telepathy.ConnectionManager.padfoot"; pub const CM_CONN_BUS_NAME: &'static str = "org.freedesktop.Telepathy.Connection.padfoot"; @@ -10,43 +10,74 @@ pub const CM_OBJECT_PATH: &'static str = "/org/freedesktop/Telepathy/ConnectionM #[derive(Debug)] pub struct ConnectionManager { - conns: Mutex>, // FIXME: remove mutex if we can + conns: std::sync::Mutex>, + pub sender: Option>, } impl Default for ConnectionManager { fn default() -> Self { ConnectionManager { - conns: Mutex::new(HashMap::new()), + conns: std::sync::Mutex::new(HashSet::::new()), + sender: None, } } } impl ConnectionManager { - pub fn new() -> Self { - Default::default() + pub fn new(sender: Option>) -> Self { + let mut cm: ConnectionManager = Default::default(); + + cm.sender = sender; + + cm } - fn new_connection( + fn create_connection( &self, params: HashMap<&str, super::Variant>, ) -> Result<(String, dbus::Path<'static>), tree::MethodErr> { let conn = super::Connection::new(params)?; + let bus = conn.bus(); let path = conn.path(); + let dbus_conn_path = dbus::strings::Path::new(path.to_owned()) + .expect("Object path should meet DBUS requirements"); - let mut conns = self.conns.lock().expect("mutex should be lockable"); + // If we already have this connection, we can point the user at it. + // FIXME: should we update the other params in this case? + let mut conns = self.conns.lock().expect("Mutex access"); + if conns.contains(&path) { + return Ok((bus, dbus_conn_path)); + } - conns.insert(path.to_owned(), conn); + conns.insert(path.clone()); - // TODO: register new object path + bus name - // Will this be easier if I give each one its own dbus connection? + // FIXME: this thread races with the responses we send. It's possible + // the remotes will try to use the new names before they exist on the + // bus. This shouldn't be a thread anyway - try to reduce us to a single + // thread and a single event loop. + std::thread::spawn(move || conn.run()); - // FIXME: does the bus name matter? Is it used for client recovery? - // Maybe it should be org.freedesktop.Telepathy.Connection.padfoot.delta.? - Ok(( - CM_CONN_BUS_NAME.to_string(), - dbus::strings::Path::new(path).expect("Object path should meet DBUS requirements"), - )) - // Err(tree::MethodErr::no_arg()) + // Emit a NewConnection signal for the benefit of others, but the caller + // learns immediately + match &self.sender { + Some(s) => { + let sig = telepathy::ConnectionManagerNewConnection { + bus_name: bus.to_owned(), + object_path: dbus_conn_path.clone(), + protocol: super::PROTO_NAME.to_string(), + }; + + let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string()) + .expect("Object path should meet DBUS requirements"); + + s.send(sig.to_emit_message(&dbus_cm_path)) + .expect("send signal"); + } + _ => {} + }; + + // The bus name *must* be org.freedesktop.Telepathy.Connection.padfoot.delta. + Ok((bus, dbus_conn_path)) } } @@ -74,7 +105,7 @@ impl telepathy::ConnectionManager for ConnectionManager { println!("CM::request_connection({}, {:?})", protocol, params); match protocol { - super::PROTO_NAME => self.new_connection(params), + super::PROTO_NAME => self.create_connection(params), _ => Err(tree::MethodErr::no_arg()), // FIXME: should be NotImplemented? } }