diff --git a/src/padfoot/connection.rs b/src/padfoot/connection.rs index d9ab7c4..6dea258 100644 --- a/src/padfoot/connection.rs +++ b/src/padfoot/connection.rs @@ -19,6 +19,10 @@ pub const CONN_OBJECT_PATH: &'static str = "/org/freedesktop/Telepathy/Connectio pub struct Connection { id: String, ctx: Arc>, + + // set to false when disconnect() is called. Note that it is true even + // before connect() is called + disconnected: Arc>, } impl Connection { @@ -83,6 +87,7 @@ impl Connection { Ok(Connection { id: id, ctx: Arc::new(RwLock::new(ctx)), + disconnected: Arc::new(RwLock::new(false)), }) } @@ -93,7 +98,7 @@ impl Connection { pub fn run(self) { let bus = self.bus(); let path = self.path(); - let ctx = self.ctx.clone(); + let disconnected = self.disconnected.clone(); let c_rc = std::rc::Rc::new(self); let f = tree::Factory::new_fn::<()>(); @@ -129,58 +134,7 @@ impl Connection { // Set up delta jobs last in case registering to DBUS fails // "Borrowed" from https://github.com/deltachat/deltachat-core-rust/blob/master/examples/simple.rs - let running = Arc::new(RwLock::new(true)); - - let inbox_ctx = ctx.clone(); - let r1 = running.clone(); - let _t1 = thread::spawn(move || { - while *r1.read().unwrap() { - dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap()); - if *r1.read().unwrap() { - dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap()); - - if *r1.read().unwrap() { - dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap()); - } - } - } - }); - - let smtp_ctx = ctx.clone(); - let r1 = running.clone(); - let _t2 = thread::spawn(move || { - while *r1.read().unwrap() { - dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap()); - if *r1.read().unwrap() { - dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap()); - } - } - }); - - let mvbox_ctx = ctx.clone(); - let r1 = running.clone(); - let _t3 = thread::spawn(move || { - while *r1.read().unwrap() { - dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap()); - if *r1.read().unwrap() { - dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap()); - } - } - }); - - let sentbox_ctx = ctx.clone(); - let r1 = running.clone(); - let _t4 = thread::spawn(move || { - while *r1.read().unwrap() { - dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap()); - if *r1.read().unwrap() { - dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap()); - } - } - }); - - let r1 = running.clone(); - while *r1.read().unwrap() { + while !*disconnected.read().unwrap() { match c.process(Duration::from_millis(100)) { Err(e) => { println!("Error processing: {}", e); @@ -188,9 +142,10 @@ impl Connection { } _ => {} } - - // TODO: notice when the conn wants to exit } + + // TODO: clean up, emit disconnected signal. Join on threads started in + // connect() ? } pub fn bus(&self) -> String { @@ -209,8 +164,59 @@ impl AsRef for std::rc::Rc { } impl telepathy::Connection for Connection { + // In connect(), we start the threads that drive the deltachat context fn connect(&self) -> Result<(), tree::MethodErr> { - Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented? + println!("Connection<{}>::connect()", self.id); + + let inbox_ctx = self.ctx.clone(); + let disconnected = self.disconnected.clone(); + let _inbox_thread = thread::spawn(move || { + while !*disconnected.read().unwrap() { + dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap()); + if !*disconnected.read().unwrap() { + dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap()); + + if !*disconnected.read().unwrap() { + dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap()); + } + } + } + }); + + let smtp_ctx = self.ctx.clone(); + let disconnected = self.disconnected.clone(); + let _smtp_thread = thread::spawn(move || { + while !*disconnected.read().unwrap() { + dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap()); + if !*disconnected.read().unwrap() { + dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap()); + } + } + }); + + let mvbox_ctx = self.ctx.clone(); + let disconnected = self.disconnected.clone(); + let _mvbox_thread = thread::spawn(move || { + while !*disconnected.read().unwrap() { + dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap()); + if !*disconnected.read().unwrap() { + dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap()); + } + } + }); + + let sentbox_ctx = self.ctx.clone(); + let disconnected = self.disconnected.clone(); + let _sentbox_thread = thread::spawn(move || { + while !*disconnected.read().unwrap() { + dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap()); + if !*disconnected.read().unwrap() { + dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap()); + } + } + }); + + Ok(()) } fn disconnect(&self) -> Result<(), tree::MethodErr> { diff --git a/src/padfoot/connection_manager.rs b/src/padfoot/connection_manager.rs index 3876ec2..616f441 100644 --- a/src/padfoot/connection_manager.rs +++ b/src/padfoot/connection_manager.rs @@ -59,13 +59,16 @@ impl ConnectionManager { std::thread::spawn(move || conn.run()); // Emit a NewConnection signal for the benefit of others, but the caller - // learns immediately + // learns from our RPC response let sig = telepathy::ConnectionManagerNewConnection { bus_name: bus.to_owned(), object_path: dbus_conn_path.clone(), protocol: super::PROTO_NAME.to_string(), }; + // FIXME: ...but for now, just sleep for a bit + std::thread::sleep(std::time::Duration::from_secs(1)); + let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string()) .expect("Object path should meet DBUS requirements");