Don't start delta until the connect signal is received

We also introduce a sleep() to "fix" a race condition where empathy
tries to contact the connection bus name before the thread has gotten
around to starting it. That really needs fixing better, but for now
I'm focused on basic functionality.
This commit is contained in:
2020-05-12 22:28:36 +01:00
parent ef1342f372
commit ef43c81955
2 changed files with 66 additions and 57 deletions

View File

@@ -19,6 +19,10 @@ pub const CONN_OBJECT_PATH: &'static str = "/org/freedesktop/Telepathy/Connectio
pub struct Connection {
id: String,
ctx: Arc<RwLock<Context>>,
// set to false when disconnect() is called. Note that it is true even
// before connect() is called
disconnected: Arc<RwLock<bool>>,
}
impl Connection {
@@ -83,6 +87,7 @@ impl Connection {
Ok(Connection {
id: id,
ctx: Arc::new(RwLock::new(ctx)),
disconnected: Arc::new(RwLock::new(false)),
})
}
@@ -93,7 +98,7 @@ impl Connection {
pub fn run(self) {
let bus = self.bus();
let path = self.path();
let ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let c_rc = std::rc::Rc::new(self);
let f = tree::Factory::new_fn::<()>();
@@ -129,58 +134,7 @@ impl Connection {
// Set up delta jobs last in case registering to DBUS fails
// "Borrowed" from https://github.com/deltachat/deltachat-core-rust/blob/master/examples/simple.rs
let running = Arc::new(RwLock::new(true));
let inbox_ctx = ctx.clone();
let r1 = running.clone();
let _t1 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
}
}
}
});
let smtp_ctx = ctx.clone();
let r1 = running.clone();
let _t2 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
}
}
});
let mvbox_ctx = ctx.clone();
let r1 = running.clone();
let _t3 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
}
}
});
let sentbox_ctx = ctx.clone();
let r1 = running.clone();
let _t4 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
}
}
});
let r1 = running.clone();
while *r1.read().unwrap() {
while !*disconnected.read().unwrap() {
match c.process(Duration::from_millis(100)) {
Err(e) => {
println!("Error processing: {}", e);
@@ -188,9 +142,10 @@ impl Connection {
}
_ => {}
}
// TODO: notice when the conn wants to exit
}
// TODO: clean up, emit disconnected signal. Join on threads started in
// connect() ?
}
pub fn bus(&self) -> String {
@@ -209,8 +164,59 @@ impl AsRef<dyn telepathy::Connection + 'static> for std::rc::Rc<Connection> {
}
impl telepathy::Connection for Connection {
// In connect(), we start the threads that drive the deltachat context
fn connect(&self) -> Result<(), tree::MethodErr> {
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
println!("Connection<{}>::connect()", self.id);
let inbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _inbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
}
}
}
});
let smtp_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _smtp_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
}
}
});
let mvbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _mvbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
}
}
});
let sentbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _sentbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
}
}
});
Ok(())
}
fn disconnect(&self) -> Result<(), tree::MethodErr> {

View File

@@ -59,13 +59,16 @@ impl ConnectionManager {
std::thread::spawn(move || conn.run());
// Emit a NewConnection signal for the benefit of others, but the caller
// learns immediately
// learns from our RPC response
let sig = telepathy::ConnectionManagerNewConnection {
bus_name: bus.to_owned(),
object_path: dbus_conn_path.clone(),
protocol: super::PROTO_NAME.to_string(),
};
// FIXME: ...but for now, just sleep for a bit
std::thread::sleep(std::time::Duration::from_secs(1));
let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string())
.expect("Object path should meet DBUS requirements");