From b6fbcfeeb8346743945153a226f70ae0f9a64f5d Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Tue, 12 May 2020 23:52:52 +0100 Subject: [PATCH] Track connection state and implement more properties Empathy still doesn't show us as connected. I think we need to add the Requests and Contacts interfaces (which we're advertising as a minimal requirement, but not implementing) before it will. --- src/padfoot/connection.rs | 181 +++++++++++++++++++++++++++++--------- 1 file changed, 140 insertions(+), 41 deletions(-) diff --git a/src/padfoot/connection.rs b/src/padfoot/connection.rs index 6dea258..639eba1 100644 --- a/src/padfoot/connection.rs +++ b/src/padfoot/connection.rs @@ -1,5 +1,7 @@ use crate::telepathy; use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection}; +use dbus::channel::Sender; +use dbus::message::SignalArgs; use dbus::tree; use dc::config::Config; @@ -7,22 +9,30 @@ use dc::context::Context; use dc::Event; use deltachat as dc; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::time::Duration; pub const CONN_BUS_NAME: &'static str = "org.freedesktop.Telepathy.Connection.padfoot.delta"; pub const CONN_OBJECT_PATH: &'static str = "/org/freedesktop/Telepathy/Connection/padfoot/delta"; +#[derive(Debug, PartialEq, Eq)] +enum ConnState { + Initial, + Connected, + Disconnected, +} + #[derive(Debug)] pub struct Connection { id: String, ctx: Arc>, - // set to false when disconnect() is called. Note that it is true even - // before connect() is called - disconnected: Arc>, + state: Arc>, + + // Used for sending out messages + msgq: Arc>>, } impl Connection { @@ -55,7 +65,10 @@ impl Connection { dbfile.push("db.sqlite3"); // FIXME: how to give it access to the connection (initialized later)? + let msgq = Arc::new(Mutex::new(VecDeque::::new())); + let id2 = id.clone(); + let msgq2 = msgq.clone(); let f = move |_c: &Context, e: Event| { match e { Event::Info(msg) => println!("Connection<{}>: INFO: {}", id2, msg), @@ -63,6 +76,32 @@ impl Connection { Event::Error(msg) | Event::ErrorNetwork(msg) | Event::ErrorSelfNotInGroup(msg) => { println!("Connection<{}>: ERR : {}", id2, msg) } + Event::ConfigureProgress(progress) => { + println!("Connection<{}>: Configuration progress: {}", id2, progress) + } + Event::ImapConnected(msg) | Event::SmtpConnected(msg) => { + println!("Connection<{}>: Network: {}", id2, msg); + } + /* Unhandled messages: + SmtpMessageSent(String), + ImapMessageDeleted(String), + ImapMessageMoved(String), + ImapFolderEmptied(String), + NewBlobFile(String), + DeletedBlobFile(String), + MsgsChanged + IncomingMsg + MsgDelivered + MsgFailed + MsgRead + ChatModified(ChatId), + ContactsChanged(Option), + LocationChanged(Option), + ImexProgress(usize), + ImexFileWritten(PathBuf), + SecurejoinInviterProgress + SecurejoinJoinerProgress + */ _ => println!("Connection<{}>: unhandled event received: {:?}", id2, e), }; }; @@ -87,7 +126,8 @@ impl Connection { Ok(Connection { id: id, ctx: Arc::new(RwLock::new(ctx)), - disconnected: Arc::new(RwLock::new(false)), + state: Arc::new(RwLock::new(ConnState::Initial)), + msgq: msgq.clone(), }) } @@ -98,7 +138,9 @@ impl Connection { pub fn run(self) { let bus = self.bus(); let path = self.path(); - let disconnected = self.disconnected.clone(); + let state = self.state.clone(); + let msgq = self.msgq.clone(); + let id = self.id.clone(); let c_rc = std::rc::Rc::new(self); let f = tree::Factory::new_fn::<()>(); @@ -134,7 +176,7 @@ impl Connection { // Set up delta jobs last in case registering to DBUS fails // "Borrowed" from https://github.com/deltachat/deltachat-core-rust/blob/master/examples/simple.rs - while !*disconnected.read().unwrap() { + while *state.read().unwrap() != ConnState::Disconnected { match c.process(Duration::from_millis(100)) { Err(e) => { println!("Error processing: {}", e); @@ -142,6 +184,20 @@ impl Connection { } _ => {} } + + // Spend a bit of time sending any outgoing messages - signals, mostly + loop { + let msg = match msgq.lock().unwrap().pop_front() { + Some(msg) => msg, + None => break, + }; + + print!("Connection<{}>: Sending message...", id); + match c.send(msg) { + Err(e) => println!("error! {:?}", e), + _ => println!("OK!"), + } + } } // TODO: clean up, emit disconnected signal. Join on threads started in @@ -169,14 +225,14 @@ impl telepathy::Connection for Connection { println!("Connection<{}>::connect()", self.id); let inbox_ctx = self.ctx.clone(); - let disconnected = self.disconnected.clone(); + let state = self.state.clone(); let _inbox_thread = thread::spawn(move || { - while !*disconnected.read().unwrap() { + while *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap()); - if !*disconnected.read().unwrap() { + if *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap()); - if !*disconnected.read().unwrap() { + if *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap()); } } @@ -184,64 +240,120 @@ impl telepathy::Connection for Connection { }); let smtp_ctx = self.ctx.clone(); - let disconnected = self.disconnected.clone(); + let state = self.state.clone(); let _smtp_thread = thread::spawn(move || { - while !*disconnected.read().unwrap() { + while *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap()); - if !*disconnected.read().unwrap() { + if *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap()); } } }); let mvbox_ctx = self.ctx.clone(); - let disconnected = self.disconnected.clone(); + let state = self.state.clone(); let _mvbox_thread = thread::spawn(move || { - while !*disconnected.read().unwrap() { + while *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap()); - if !*disconnected.read().unwrap() { + if *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap()); } } }); let sentbox_ctx = self.ctx.clone(); - let disconnected = self.disconnected.clone(); + let state = self.state.clone(); let _sentbox_thread = thread::spawn(move || { - while !*disconnected.read().unwrap() { + while *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap()); - if !*disconnected.read().unwrap() { + if *state.read().unwrap() != ConnState::Disconnected { dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap()); } } }); + // Just pretend to be connected all the time for now. Tracking IMAP+SMTP + // state is a pain + let state = self.state.clone(); + let mut w = state.write().unwrap(); + *w = ConnState::Connected; + + // Emit a NewConnection signal for the benefit of others, but the caller + // learns from our RPC response + let sig = telepathy::ConnectionStatusChanged { + status: 0, // Connected + reason: 1, // Requested + }; + + let dbus_conn_path = dbus::strings::Path::new(CONN_OBJECT_PATH.to_string()) + .expect("Object path should meet DBUS requirements"); + + self.msgq + .clone() + .lock() + .unwrap() + .push_back(sig.to_emit_message(&dbus_conn_path)); + Ok(()) } fn disconnect(&self) -> Result<(), tree::MethodErr> { println!("Connection<{}>::disconnect()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + + let state = self.state.clone(); + let mut w = state.write().unwrap(); + *w = ConnState::Disconnected; + + // FIXME: we need to signal to the CM that they should remove the + // connection from the active list + + Ok(()) + } + + fn interfaces(&self) -> Result, tree::MethodErr> { + println!("Connection<{}>::interfaces()", self.id); + + self.get_interfaces() } fn get_interfaces(&self) -> Result, tree::MethodErr> { println!("Connection<{}>::get_interfaces()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + + Ok(vec![ + "org.freedesktop.Telepathy.Connection.Interface.Contacts".to_string(), + "org.freedesktop.Telepathy.Connection.Interface.Requests".to_string(), + ]) } fn get_protocol(&self) -> Result { println!("Connection<{}>::get_protocol()", self.id); + Ok(super::PROTO_NAME.to_string()) } + fn self_handle(&self) -> Result { + println!("Connection<{}>::self_handle()", self.id); + + self.get_self_handle() + } + fn get_self_handle(&self) -> Result { println!("Connection<{}>::get_self_handle()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + + Ok(deltachat::constants::DC_CONTACT_ID_SELF) + } + + fn status(&self) -> Result { + println!("Connection<{}>::status()", self.id); + + self.get_status() } fn get_status(&self) -> Result { - println!("Connection<{}>::get_status()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + match *self.state.clone().read().unwrap() { + ConnState::Initial | ConnState::Disconnected => Ok(2), + ConnState::Connected => Ok(0), + } } fn hold_handles(&self, handle_type: u32, handles: Vec) -> Result<(), tree::MethodErr> { @@ -318,28 +430,15 @@ impl telepathy::Connection for Connection { Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? } - fn interfaces(&self) -> Result, tree::MethodErr> { - println!("Connection<{}>::interfaces()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? - } - - fn self_handle(&self) -> Result { - println!("Connection<{}>::self_handle()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? - } - fn self_id(&self) -> Result { println!("Connection<{}>::self_id()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? - } - fn status(&self) -> Result { - println!("Connection<{}>::status()", self.id); - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + Ok("Yourself".to_string()) // FIXME: this could be passed through config } fn has_immortal_handles(&self) -> Result { println!("Connection<{}>::has_immortal_handles()", self.id); + Ok(true) } }