From 782662b82f89f7f53dc756dea410baf1f801a757 Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Sun, 17 May 2020 22:05:24 +0100 Subject: [PATCH] Flesh out channels some more. They can now be closed. As part of this, move Connection's queue to mpsc and have all channels for a connection share that connection's main loop. --- src/padfoot/channel.rs | 68 ++++++++++- src/padfoot/channel/channel.rs | 59 +++++++--- src/padfoot/connection.rs | 161 +++++++++++++++++++-------- src/padfoot/connection/connection.rs | 29 +++-- src/padfoot/connection/requests.rs | 2 +- src/padfoot/connection_manager.rs | 21 ++-- 6 files changed, 249 insertions(+), 91 deletions(-) diff --git a/src/padfoot/channel.rs b/src/padfoot/channel.rs index 56c5ce9..a90bc78 100644 --- a/src/padfoot/channel.rs +++ b/src/padfoot/channel.rs @@ -8,13 +8,33 @@ pub use messages::*; mod type_text; pub use type_text::*; +use crate::padfoot::DbusAction; use crate::telepathy; -use std::sync::Arc; + +use deltachat as dc; +use std::sync::{mpsc, Arc, RwLock}; + +type Result = std::result::Result; + +pub type HandleType = u32; + +#[allow(dead_code)] +pub const HANDLE_TYPE_NONE: HandleType = 0; +pub const HANDLE_TYPE_CONTACT: HandleType = 1; +#[allow(dead_code)] +pub const HANDLE_TYPE_ROOM: HandleType = 2; +#[allow(dead_code)] +pub const HANDLE_TYPE_LIST: HandleType = 3; // Deprecated +#[allow(dead_code)] +pub const HANDLE_TYPE_GROUP: HandleType = 4; // Deprecated // FIXME: I'm assuming that all channels will be of type text and 1-1 for now. #[derive(Debug)] pub struct Channel { - pub path: dbus::Path<'static>, + ctx: Arc>, + actq: mpsc::Sender, + path: dbus::Path<'static>, + target_handle: u32, // Who we're talking to } // "This SHOULD NOT include the channel type and channel interface itself" @@ -22,9 +42,49 @@ pub fn channel_interfaces() -> Vec { vec!["org.freedesktop.Telepathy.Channel.Interface.Messages".to_string()] } -type Result = std::result::Result; - impl Channel { + pub fn new( + ctx: Arc>, + path: dbus::Path<'static>, + target_handle: u32, + actq: mpsc::Sender, + ) -> Self { + Channel { + ctx, + path, + target_handle, + actq, + } + } + + pub fn path(&self) -> dbus::Path<'static> { + self.path.clone() + } + + pub fn chan_type(&self) -> String { + "org.freedesktop.Telepathy.Channel.Type.Text".to_string() // FIXME: this shouldn't be hardcoded + } + + pub fn handle_type(&self) -> HandleType { + HANDLE_TYPE_CONTACT // FIXME: this shouldn't be hardcoded + } + + pub fn handle(&self) -> u32 { + self.target_handle + } + + fn target_contact(&self) -> Option { + let ctx = self.ctx.read().unwrap(); + + dc::contact::Contact::get_by_id(&ctx, self.handle()).ok() + } + + fn initiator_contact(&self) -> Option { + let ctx = self.ctx.read().unwrap(); + + dc::contact::Contact::get_by_id(&ctx, self.handle()).ok() // FIXME: this will be wrong for outbound channels + } + pub fn build_object_path( channel: Arc, ) -> dbus::tree::ObjectPath { diff --git a/src/padfoot/channel/channel.rs b/src/padfoot/channel/channel.rs index 02d491a..28167ee 100644 --- a/src/padfoot/channel/channel.rs +++ b/src/padfoot/channel/channel.rs @@ -1,3 +1,4 @@ +use crate::padfoot::DbusAction; use crate::telepathy; use dbus::tree::MethodErr; @@ -13,27 +14,37 @@ impl AsRef for std::sync::Arc { impl telepathy::Channel for Channel { fn close(&self) -> Result<()> { println!("Channel::close()"); + + self.actq + .send(DbusAction::CloseChannel(self.path())) + .unwrap(); + Err(MethodErr::no_arg()) } + // Deprecated fn get_channel_type(&self) -> Result { self.channel_type() } - fn get_handle(&self) -> Result<(u32, u32)> { - println!("Channel::get_handle()"); - Err(MethodErr::no_arg()) - } - - fn get_interfaces(&self) -> Result> { - println!("Channel::get_interfaces()"); - Err(MethodErr::no_arg()) - } - fn channel_type(&self) -> Result { println!("Channel::channel_type()"); - Ok("org.freedesktop.Telepathy.Channel.Text".to_string()) + Ok(self.chan_type()) + } + + // Deprecated + fn get_handle(&self) -> Result<(u32, u32)> { + println!("Channel::get_handle()"); + + Ok((self.handle_type(), self.handle())) + } + + // Deprecated + fn get_interfaces(&self) -> Result> { + println!("Channel::get_interfaces()"); + + self.interfaces() } fn interfaces(&self) -> Result> { @@ -43,31 +54,45 @@ impl telepathy::Channel for Channel { fn target_handle(&self) -> Result { println!("Channel::target_handle()"); - Err(MethodErr::no_arg()) + + Ok(self.handle()) } fn target_id(&self) -> Result { println!("Channel::target_id()"); - Err(MethodErr::no_arg()) + + if let Some(contact) = self.target_contact() { + Ok(contact.get_addr().to_string()) + } else { + Err(MethodErr::no_arg()) + } } fn target_handle_type(&self) -> Result { println!("Channel::target_handle_type()"); - Err(MethodErr::no_arg()) + + Ok(self.handle_type()) } fn requested(&self) -> Result { println!("Channel::requested()"); - Err(MethodErr::no_arg()) + + Ok(false) // FIXME: channels initiated by ourselves *will* be requested } fn initiator_handle(&self) -> Result { println!("Channel::initiator_handle()"); - Err(MethodErr::no_arg()) + + self.target_handle() // FIXME: Not the case for channels initiated by ourselves } fn initiator_id(&self) -> Result { println!("Channel::initiator_id()"); - Err(MethodErr::no_arg()) + + if let Some(contact) = self.initiator_contact() { + Ok(contact.get_addr().to_string()) + } else { + Err(MethodErr::no_arg()) + } } } diff --git a/src/padfoot/connection.rs b/src/padfoot/connection.rs index b3753c1..8babf36 100644 --- a/src/padfoot/connection.rs +++ b/src/padfoot/connection.rs @@ -25,6 +25,7 @@ use crate::telepathy; use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection}; use dbus::channel::{MatchingReceiver, Sender}; +use dbus::message::SignalArgs; use dbus::tree::MethodErr; use dc::config::Config; @@ -32,7 +33,7 @@ use dc::context::Context; use dc::Event; use deltachat as dc; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; use std::sync::{mpsc, Arc, Mutex, RwLock}; use std::time::Duration; @@ -42,26 +43,28 @@ pub const CONN_OBJECT_PATH: &str = "/org/freedesktop/Telepathy/Connection/padfoo // Only the main loop has access to the DBUS connection. Interacting with DBUS // outside of method return values requires one of these to be added to actq #[derive(Debug)] -enum DbusAction { +pub enum DbusAction { Signal(dbus::Message), // Generic signal to send NewChannel(Channel), // Add this channel CloseChannel(dbus::Path<'static>), // Close this channel IncomingMessage(dc::chat::ChatId, dc::message::MsgId), // Look at this \o/ + FreshMessages, // Hint that some messages need looking at } #[derive(Debug)] // A connection uses delta database IDs as handles, and email addresses as IDs pub struct Connection { // Used for sending out messages - actq: Arc>>, + actq: mpsc::Sender, + // actq: Arc>>, // Channels we own channels: Arc, Arc>>>, // Owned by the CM. Remove ourselves from this when done - conns: Arc>>, + conns: Arc>>>, ctx: Arc>, settings: ConnSettings, @@ -112,16 +115,16 @@ impl ConnSettings { CONN_BUS_NAME.to_owned() + "." + &self.id } - pub fn path(&self) -> String { - CONN_OBJECT_PATH.to_owned() + "/" + &self.id + pub fn path(&self) -> dbus::Path<'static> { + dbus::Path::new(format!("{}/{}", CONN_OBJECT_PATH, &self.id)).expect("Valid path") } } impl Connection { pub fn new( settings: ConnSettings, - conns: Arc>>, - ) -> Result { + conns: Arc>>>, + ) -> Result<(Self, mpsc::Receiver), MethodErr> { let mut dbfile = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot") .ok_or_else(MethodErr::no_arg) .and_then(|p| Ok(p.data_local_dir().to_path_buf()))?; @@ -129,12 +132,13 @@ impl Connection { dbfile.push(settings.id()); dbfile.push("db.sqlite3"); - // FIXME: how to give it access to the connection (initialized later)? - let actq = Arc::new(Mutex::new(VecDeque::::new())); - + let (q_s, q_r) = mpsc::channel::(); let id = settings.id(); - // Use this if we need to send messages in response to DC events: - let queue = actq.clone(); + + // The closure is shared between several different threads in delta, and + // we can't Send *or* clone the mpsc sender across them, so just wrap it + // in a mutex for now + let queue = Mutex::new(q_s.clone()); let f = move |_c: &Context, e: Event| { match e { Event::Info(msg) => println!("Connection<{}>: INFO: {}", id, msg), @@ -153,20 +157,22 @@ impl Connection { "Connection<{}>: Messages changed for {}: {}", id, chat_id, msg_id ); - let q = queue.clone(); - q.lock() + queue + .lock() .unwrap() - .push_back(DbusAction::IncomingMessage(chat_id, msg_id)); + .send(DbusAction::IncomingMessage(chat_id, msg_id)) + .unwrap(); } Event::IncomingMsg { chat_id, msg_id } => { println!( "Connection<{}>: Incoming message for {}: {}", id, chat_id, msg_id ); - let q = queue.clone(); - q.lock() + queue + .lock() .unwrap() - .push_back(DbusAction::IncomingMessage(chat_id, msg_id)); + .send(DbusAction::IncomingMessage(chat_id, msg_id)) + .unwrap(); } /* Unhandled messages: @@ -212,16 +218,19 @@ impl Connection { ctx.configure(); }; - Ok(Connection { - conns, - settings, - actq, - channels: Arc::new(RwLock::new( - HashMap::, Arc>::new(), - )), - ctx: Arc::new(RwLock::new(ctx)), - state: Arc::new(RwLock::new(ConnState::Initial)), - }) + Ok(( + Connection { + conns, + settings, + actq: q_s, + channels: Arc::new(RwLock::new( + HashMap::, Arc>::new(), + )), + ctx: Arc::new(RwLock::new(ctx)), + state: Arc::new(RwLock::new(ConnState::Initial)), + }, + q_r, + )) } // This should be run inside its own thread. It will signal via the channel @@ -229,7 +238,11 @@ impl Connection { // // FIXME: running several +process+ loops sure is convenient, but it also // seems inefficient... - pub fn run(self, signal: mpsc::Sender>) { + pub fn run( + self, + done_signal: mpsc::Sender>, + queue_receiver: mpsc::Receiver, + ) { let id = self.id(); let bus = self.bus(); let path = self.path(); @@ -237,6 +250,7 @@ impl Connection { let conns = self.conns.clone(); let chans = self.channels.clone(); let actq = self.actq.clone(); + let ctx = self.ctx.clone(); let state = self.state.clone(); let tree = self.build_tree(); @@ -266,19 +280,19 @@ impl Connection { match c.request_name(bus.clone(), false, false, true) { Ok(RequestNameReply::Exists) => { println!("Another process is already registered on {}", bus); - signal.send(Some(MethodErr::no_arg())).unwrap(); + done_signal.send(Some(MethodErr::no_arg())).unwrap(); return; } Err(e) => { println!("Failed to register {}: {}", bus, e); - signal.send(Some(MethodErr::no_arg())).unwrap(); + done_signal.send(Some(MethodErr::no_arg())).unwrap(); return; } _ => { // All other responses we can get are a success. We are now on // the message bus, so the caller can proceed println!("{} listening on {}", c.unique_name(), bus); - signal.send(None).unwrap(); + done_signal.send(None).unwrap(); } }; @@ -292,15 +306,10 @@ impl Connection { }; // Spend a bit of time sending any outgoing messages - signals, mostly - while let Some(act) = { - let mut q = actq.lock().unwrap(); - let r = q.pop_front(); - drop(q); // Only hold the mutex for the pop operation - r - } { + while let Some(act) = queue_receiver.try_recv().ok() { match act { DbusAction::Signal(msg) => { - print!("Connection<{}>: Sending message...", id); + print!("*** Connection<{}>: Sending message...", id); match c.send(msg) { Err(e) => println!("error! {:?}", e), // FIXME: handle error better? @@ -308,7 +317,10 @@ impl Connection { } } DbusAction::NewChannel(channel) => { - let chan_path = channel.path.clone(); + let chan_type = channel.chan_type(); + let handle_type = channel.handle_type(); + let handle = channel.handle(); + let chan_path = channel.path().clone(); let rc_channel = Arc::new(channel); println!("*** Creating channel {}", chan_path); @@ -322,15 +334,34 @@ impl Connection { let op = Channel::build_object_path(rc_channel); t2.lock().unwrap().insert(op); - // TODO: emit signals + + let requests_sig = telepathy::ConnectionInterfaceRequestsNewChannels { + channels: vec![(chan_path.clone(), HashMap::new())], + }; + + let legacy_sig = telepathy::ConnectionNewChannel { + object_path: chan_path.clone(), + channel_type: chan_type, + handle_type, // contact. FIXME: support other channel types + handle, // id of other contact + // TODO: initiator needs to be tracked + suppress_handler: false, // We'll need to start passing this + }; + + actq.send(DbusAction::Signal(requests_sig.to_emit_message(&path))) + .unwrap(); + actq.send(DbusAction::Signal(legacy_sig.to_emit_message(&path))) + .unwrap(); } DbusAction::CloseChannel(path) => { + println!("*** Closing channel {}", path.clone()); let _chan = Arc::clone(&chans).write().unwrap().remove(&path); let t2 = tree.clone(); t2.lock().unwrap().remove(&path); // TODO: emit signals } DbusAction::IncomingMessage(chat_id, msg_id) => { + println!("*** Incoming message: {} {}", chat_id, msg_id); // TODO: check if we have a channel for the chat let chan_path = dbus::strings::Path::new(format!( "{}/{}", @@ -349,12 +380,44 @@ impl Connection { msg_id, chan_path ); } else { - println!("Channel for {} doesn't exist yet, re-enqueuing", chat_id); - let chan = Channel { path: chan_path }; - let mut q = actq.lock().unwrap(); - q.push_back(DbusAction::NewChannel(chan)); - q.push_back(act); - drop(q); + print!("Channel for {} doesn't exist yet, creating it...", chat_id); + + let contacts = + dc::chat::get_chat_contacts(&ctx.clone().read().unwrap(), chat_id); + if contacts.is_empty() { + println!("empty chat! ignoring"); + continue; + } + if contacts.len() > 1 { + println!("...{} contacts in chat, ignoring!", contacts.len()) + } + + let handle = contacts.first().unwrap(); + let chan = Channel::new(ctx.clone(), chan_path, *handle, actq.clone()); + actq.send(DbusAction::NewChannel(chan)).unwrap(); + actq.send(act).unwrap(); + + println!("OK"); + } + } + + DbusAction::FreshMessages => { + println!("*** FRESH MESSAGES"); + let ctx_rc = ctx.clone(); + let ctx = ctx_rc.read().unwrap(); + + for msg_id in dc::context::Context::get_fresh_msgs(&ctx) { + println!(" FRESH MESSAGE: {}", msg_id); + match dc::message::Message::load_from_db(&ctx, msg_id) { + Ok(msg) => { + actq.send(DbusAction::IncomingMessage( + msg.get_chat_id(), + msg_id, + )) + .unwrap(); + } + Err(e) => println!("Couldn't load fresh message {}: {}", msg_id, e), + } } } } @@ -375,7 +438,7 @@ impl Connection { self.settings.bus() } - pub fn path(&self) -> String { + pub fn path(&self) -> dbus::Path<'static> { self.settings.path() } diff --git a/src/padfoot/connection/connection.rs b/src/padfoot/connection/connection.rs index 8a33e8d..fab8357 100644 --- a/src/padfoot/connection/connection.rs +++ b/src/padfoot/connection/connection.rs @@ -1,6 +1,5 @@ use crate::telepathy; -use crate::telepathy::ConnectionInterfaceRequests; // Non-deprecated channel methods - +use crate::telepathy::{ConnectionInterfaceContacts, ConnectionInterfaceRequests}; // Non-deprecated channel methods use dbus::message::SignalArgs; use dbus::tree::MethodErr; use deltachat as dc; @@ -120,13 +119,10 @@ impl telepathy::Connection for Connection { reason: 1, // Requested }; - let dbus_conn_path = dbus::strings::Path::new(self.path()) - .expect("Object path should meet DBUS requirements"); - self.actq - .lock() - .unwrap() - .push_back(DbusAction::Signal(sig.to_emit_message(&dbus_conn_path))); + .send(DbusAction::Signal(sig.to_emit_message(&self.path()))) + .unwrap(); + self.actq.send(DbusAction::FreshMessages).unwrap(); Ok(()) } @@ -213,7 +209,22 @@ impl telepathy::Connection for Connection { handle_type, handles ); - Err(MethodErr::no_arg()) // FIXME: should be NotImplemented? + match handle_type { + crate::padfoot::HANDLE_TYPE_CONTACT => { + let mut out = Vec::::new(); + for (_handle, attrs) in self.get_contact_attributes(handles, vec![], true)? { + if let Some(contact_id) = + attrs.get("org.freedesktop.Telepathy.Connection/contact-id") + { + out.push(contact_id.0.as_str().unwrap().to_string()); + } else { + return Err(MethodErr::no_arg()); // FIXME: should be InvalidHandle + } + } + Ok(out) + } + _ => Err(MethodErr::no_arg()), // FIXME: should be NotImplemented? + } } // Deprecated in favour of Requests.Channels diff --git a/src/padfoot/connection/requests.rs b/src/padfoot/connection/requests.rs index 7f488d1..139b632 100644 --- a/src/padfoot/connection/requests.rs +++ b/src/padfoot/connection/requests.rs @@ -45,7 +45,7 @@ impl telepathy::ConnectionInterfaceRequests for Connection { let mut out = Vec::::new(); for channel in self.channels.read().unwrap().values() { out.push(( - channel.path.clone(), + channel.path(), HashMap::::new(), // FIXME: work out what props should be shown )); } diff --git a/src/padfoot/connection_manager.rs b/src/padfoot/connection_manager.rs index a7ce23f..76cfcd7 100644 --- a/src/padfoot/connection_manager.rs +++ b/src/padfoot/connection_manager.rs @@ -14,7 +14,7 @@ pub const CM_OBJECT_PATH: &str = "/org/freedesktop/Telepathy/ConnectionManager/p #[derive(Debug)] pub struct ConnectionManager { - conns: Arc>>, + conns: Arc>>>, sender: mpsc::Sender, } @@ -30,7 +30,7 @@ impl ConnectionManager { ( ConnectionManager { - conns: Arc::new(Mutex::new(HashSet::::new())), + conns: Arc::new(Mutex::new(HashSet::>::new())), sender: msg_s, }, msg_r, @@ -46,26 +46,25 @@ impl ConnectionManager { let mut conns = self.conns.lock().expect("Mutex access"); - let dbus_conn_path = dbus::strings::Path::new(path.to_owned()) - .expect("Object path should meet DBUS requirements"); - // We can't call connect() multiple times on the connection yet if conns.contains(&path) { return Err(MethodErr::no_arg()); } - let conn = Connection::new(settings, self.conns.clone())?; - // It would be nice to have a single main loop, but thread-per-conn is // is easy enough for me to understand in Rust at the moment. + let conns_clone = self.conns.clone(); let (ok_s, ok_r) = mpsc::channel(); - std::thread::spawn(move || conn.run(ok_s)); + std::thread::spawn(move || { + let (conn, receiver) = Connection::new(settings, conns_clone).unwrap(); + conn.run(ok_s, receiver); + }); // Emit a NewConnection signal for the benefit of others, but the caller // learns from our RPC response let sig = telepathy::ConnectionManagerNewConnection { bus_name: bus.to_owned(), - object_path: dbus_conn_path.clone(), + object_path: path.clone(), protocol: super::PROTO_NAME.to_string(), }; @@ -81,7 +80,7 @@ impl ConnectionManager { return Err(MethodErr::no_arg()); } - conns.insert(path); + conns.insert(path.clone()); let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string()) .expect("Object path should meet DBUS requirements"); @@ -91,7 +90,7 @@ impl ConnectionManager { .expect("send signal"); // The bus name *must* be org.freedesktop.Telepathy.Connection.padfoot.delta. - Ok((bus, dbus_conn_path)) + Ok((bus, path)) } }