Replace a sleep() with an MPSC channel

This commit is contained in:
2020-05-16 18:09:40 +01:00
parent 640948241a
commit e08ec6b476
3 changed files with 48 additions and 25 deletions

View File

@@ -29,7 +29,7 @@ use dc::Event;
use deltachat as dc; use deltachat as dc;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
pub const CONN_BUS_NAME: &str = "org.freedesktop.Telepathy.Connection.padfoot.delta"; pub const CONN_BUS_NAME: &str = "org.freedesktop.Telepathy.Connection.padfoot.delta";
@@ -144,11 +144,12 @@ impl Connection {
}) })
} }
// This is run inside its own thread // This should be run inside its own thread. It will signal via the channel
// once the main loop is ready
// //
// FIXME: running several +process+ loops sure is convenient, but it also // FIXME: running several +process+ loops sure is convenient, but it also
// seems inefficient... // seems inefficient...
pub fn run(self) { pub fn run(self, signal: mpsc::Sender<Option<MethodErr>>) {
let bus = self.bus(); let bus = self.bus();
let path = self.path(); let path = self.path();
let state = self.state.clone(); let state = self.state.clone();
@@ -207,13 +208,20 @@ impl Connection {
match c.request_name(bus.clone(), false, false, true) { match c.request_name(bus.clone(), false, false, true) {
Ok(RequestNameReply::Exists) => { Ok(RequestNameReply::Exists) => {
println!("Another process is already registered on {}", bus); println!("Another process is already registered on {}", bus);
signal.send(Some(MethodErr::no_arg())).unwrap();
return; return;
} }
Err(e) => { Err(e) => {
println!("Failed to register {}: {}", bus, e); println!("Failed to register {}: {}", bus, e);
signal.send(Some(MethodErr::no_arg())).unwrap();
return; return;
} }
_ => println!("{} listening on {}", c.unique_name(), bus), // All other responses we can get are a success _ => {
// All other responses we can get are a success. We are now on
// the message bus, so the caller can proceed
println!("{} listening on {}", c.unique_name(), bus);
signal.send(None).unwrap();
}
}; };
// Set up delta jobs last in case registering to DBUS fails // Set up delta jobs last in case registering to DBUS fails

View File

@@ -122,11 +122,17 @@ impl telepathy::Connection for Connection {
fn disconnect(&self) -> Result<(), MethodErr> { fn disconnect(&self) -> Result<(), MethodErr> {
println!("Connection<{}>::disconnect()", self.id); println!("Connection<{}>::disconnect()", self.id);
let ctx = self.ctx.read().unwrap();
let state = self.state.clone(); let state = self.state.clone();
let mut w = state.write().unwrap(); let mut w = state.write().unwrap();
*w = ConnState::Disconnected; *w = ConnState::Disconnected;
dc::job::interrupt_inbox_idle(&ctx);
dc::job::interrupt_smtp_idle(&ctx);
dc::job::interrupt_sentbox_idle(&ctx);
dc::job::interrupt_mvbox_idle(&ctx);
// FIXME: we need to signal to the CM that they should remove the // FIXME: we need to signal to the CM that they should remove the
// connection from the active list // connection from the active list

View File

@@ -1,6 +1,8 @@
use crate::telepathy; use crate::telepathy;
use dbus::{arg, message::SignalArgs, tree}; use dbus::arg;
use dbus::message::SignalArgs;
use dbus::tree::MethodErr;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::mpsc; use std::sync::mpsc;
@@ -22,7 +24,7 @@ impl AsRef<dyn telepathy::ConnectionManager + 'static> for std::rc::Rc<Connectio
impl ConnectionManager { impl ConnectionManager {
pub fn new() -> (Self, mpsc::Receiver<dbus::Message>) { pub fn new() -> (Self, mpsc::Receiver<dbus::Message>) {
let (msg_s, msg_r) = std::sync::mpsc::channel::<dbus::Message>(); let (msg_s, msg_r) = mpsc::channel::<dbus::Message>();
( (
ConnectionManager { ConnectionManager {
@@ -36,7 +38,7 @@ impl ConnectionManager {
fn create_connection( fn create_connection(
&self, &self,
params: HashMap<&str, super::Variant>, params: HashMap<&str, super::Variant>,
) -> Result<(String, dbus::Path<'static>), tree::MethodErr> { ) -> Result<(String, dbus::Path<'static>), MethodErr> {
let conn = super::Connection::new(params)?; let conn = super::Connection::new(params)?;
let bus = conn.bus(); let bus = conn.bus();
let path = conn.path(); let path = conn.path();
@@ -46,17 +48,15 @@ impl ConnectionManager {
// If we already have this connection, we can point the user at it. // If we already have this connection, we can point the user at it.
// FIXME: should we update the other params in this case? // FIXME: should we update the other params in this case?
let mut conns = self.conns.lock().expect("Mutex access"); let mut conns = self.conns.lock().expect("Mutex access");
if conns.contains(&path) { if conns.contains(&path) {
return Ok((bus, dbus_conn_path)); return Ok((bus, dbus_conn_path));
} }
conns.insert(path); // It would be nice to have a single main loop, but thread-per-conn is
// is easy enough for me to understand in Rust at the moment.
// FIXME: this thread races with the responses we send. It's possible let (ok_s, ok_r) = mpsc::channel();
// the remotes will try to use the new names before they exist on the std::thread::spawn(move || conn.run(ok_s));
// bus. This shouldn't be a thread anyway - try to reduce us to a single
// thread and a single event loop.
std::thread::spawn(move || conn.run());
// Emit a NewConnection signal for the benefit of others, but the caller // Emit a NewConnection signal for the benefit of others, but the caller
// learns from our RPC response // learns from our RPC response
@@ -66,8 +66,19 @@ impl ConnectionManager {
protocol: super::PROTO_NAME.to_string(), protocol: super::PROTO_NAME.to_string(),
}; };
// FIXME: ...but for now, just sleep for a bit // We must wait for the new name to appear on the bus, otherwise the
std::thread::sleep(std::time::Duration::from_secs(1)); // client can race
if let Ok(opt) = ok_r.recv() {
if let Some(e) = opt {
return Err(e);
}
// Otherwise OK
} else {
return Err(MethodErr::no_arg());
}
conns.insert(path);
let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string()) let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string())
.expect("Object path should meet DBUS requirements"); .expect("Object path should meet DBUS requirements");
@@ -82,16 +93,16 @@ impl ConnectionManager {
} }
impl telepathy::ConnectionManager for ConnectionManager { impl telepathy::ConnectionManager for ConnectionManager {
fn get_parameters(&self, protocol: &str) -> Result<Vec<super::ParamSpec>, tree::MethodErr> { fn get_parameters(&self, protocol: &str) -> Result<Vec<super::ParamSpec>, MethodErr> {
println!("CM::get_parameters({})", protocol); println!("CM::get_parameters({})", protocol);
match protocol { match protocol {
super::PROTO_NAME => Ok(super::parameters()), super::PROTO_NAME => Ok(super::parameters()),
_ => Err(tree::MethodErr::no_arg()), // FIXME: should be NotImplemented? _ => Err(MethodErr::no_arg()), // FIXME: should be NotImplemented?
} }
} }
fn list_protocols(&self) -> Result<Vec<String>, tree::MethodErr> { fn list_protocols(&self) -> Result<Vec<String>, MethodErr> {
println!("CM::list_protocols()"); println!("CM::list_protocols()");
Ok(vec![super::PROTO_NAME.to_string()]) Ok(vec![super::PROTO_NAME.to_string()])
@@ -101,18 +112,16 @@ impl telepathy::ConnectionManager for ConnectionManager {
&self, &self,
protocol: &str, protocol: &str,
params: HashMap<&str, super::Variant>, params: HashMap<&str, super::Variant>,
) -> Result<(String, dbus::Path<'static>), tree::MethodErr> { ) -> Result<(String, dbus::Path<'static>), MethodErr> {
println!("CM::request_connection({}, ...)", protocol); println!("CM::request_connection({}, ...)", protocol);
match protocol { match protocol {
super::PROTO_NAME => self.create_connection(params), super::PROTO_NAME => self.create_connection(params),
_ => Err(tree::MethodErr::no_arg()), // FIXME: should be NotImplemented? _ => Err(MethodErr::no_arg()), // FIXME: should be NotImplemented?
} }
} }
fn protocols( fn protocols(&self) -> Result<HashMap<String, HashMap<String, super::Variant>>, MethodErr> {
&self,
) -> Result<HashMap<String, HashMap<String, super::Variant>>, tree::MethodErr> {
println!("CM::protocols()"); println!("CM::protocols()");
// FIXME: so much duplication. It would be good if we could get the // FIXME: so much duplication. It would be good if we could get the
@@ -161,7 +170,7 @@ impl telepathy::ConnectionManager for ConnectionManager {
Ok(out) Ok(out)
} }
fn interfaces(&self) -> Result<Vec<String>, tree::MethodErr> { fn interfaces(&self) -> Result<Vec<String>, MethodErr> {
println!("CM::interfaces()"); println!("CM::interfaces()");
Ok(vec![]) Ok(vec![])