Track connection state and implement more properties

Empathy still doesn't show us as connected. I think we need to add the
Requests and Contacts interfaces (which we're advertising as a minimal
requirement, but not implementing) before it will.
This commit is contained in:
2020-05-12 23:52:52 +01:00
parent ef43c81955
commit b6fbcfeeb8

View File

@@ -1,5 +1,7 @@
use crate::telepathy;
use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection};
use dbus::channel::Sender;
use dbus::message::SignalArgs;
use dbus::tree;
use dc::config::Config;
@@ -7,22 +9,30 @@ use dc::context::Context;
use dc::Event;
use deltachat as dc;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::Duration;
pub const CONN_BUS_NAME: &'static str = "org.freedesktop.Telepathy.Connection.padfoot.delta";
pub const CONN_OBJECT_PATH: &'static str = "/org/freedesktop/Telepathy/Connection/padfoot/delta";
#[derive(Debug, PartialEq, Eq)]
enum ConnState {
Initial,
Connected,
Disconnected,
}
#[derive(Debug)]
pub struct Connection {
id: String,
ctx: Arc<RwLock<Context>>,
// set to false when disconnect() is called. Note that it is true even
// before connect() is called
disconnected: Arc<RwLock<bool>>,
state: Arc<RwLock<ConnState>>,
// Used for sending out messages
msgq: Arc<Mutex<VecDeque<dbus::Message>>>,
}
impl Connection {
@@ -55,7 +65,10 @@ impl Connection {
dbfile.push("db.sqlite3");
// FIXME: how to give it access to the connection (initialized later)?
let msgq = Arc::new(Mutex::new(VecDeque::<dbus::Message>::new()));
let id2 = id.clone();
let msgq2 = msgq.clone();
let f = move |_c: &Context, e: Event| {
match e {
Event::Info(msg) => println!("Connection<{}>: INFO: {}", id2, msg),
@@ -63,6 +76,32 @@ impl Connection {
Event::Error(msg) | Event::ErrorNetwork(msg) | Event::ErrorSelfNotInGroup(msg) => {
println!("Connection<{}>: ERR : {}", id2, msg)
}
Event::ConfigureProgress(progress) => {
println!("Connection<{}>: Configuration progress: {}", id2, progress)
}
Event::ImapConnected(msg) | Event::SmtpConnected(msg) => {
println!("Connection<{}>: Network: {}", id2, msg);
}
/* Unhandled messages:
SmtpMessageSent(String),
ImapMessageDeleted(String),
ImapMessageMoved(String),
ImapFolderEmptied(String),
NewBlobFile(String),
DeletedBlobFile(String),
MsgsChanged
IncomingMsg
MsgDelivered
MsgFailed
MsgRead
ChatModified(ChatId),
ContactsChanged(Option<u32>),
LocationChanged(Option<u32>),
ImexProgress(usize),
ImexFileWritten(PathBuf),
SecurejoinInviterProgress
SecurejoinJoinerProgress
*/
_ => println!("Connection<{}>: unhandled event received: {:?}", id2, e),
};
};
@@ -87,7 +126,8 @@ impl Connection {
Ok(Connection {
id: id,
ctx: Arc::new(RwLock::new(ctx)),
disconnected: Arc::new(RwLock::new(false)),
state: Arc::new(RwLock::new(ConnState::Initial)),
msgq: msgq.clone(),
})
}
@@ -98,7 +138,9 @@ impl Connection {
pub fn run(self) {
let bus = self.bus();
let path = self.path();
let disconnected = self.disconnected.clone();
let state = self.state.clone();
let msgq = self.msgq.clone();
let id = self.id.clone();
let c_rc = std::rc::Rc::new(self);
let f = tree::Factory::new_fn::<()>();
@@ -134,7 +176,7 @@ impl Connection {
// Set up delta jobs last in case registering to DBUS fails
// "Borrowed" from https://github.com/deltachat/deltachat-core-rust/blob/master/examples/simple.rs
while !*disconnected.read().unwrap() {
while *state.read().unwrap() != ConnState::Disconnected {
match c.process(Duration::from_millis(100)) {
Err(e) => {
println!("Error processing: {}", e);
@@ -142,6 +184,20 @@ impl Connection {
}
_ => {}
}
// Spend a bit of time sending any outgoing messages - signals, mostly
loop {
let msg = match msgq.lock().unwrap().pop_front() {
Some(msg) => msg,
None => break,
};
print!("Connection<{}>: Sending message...", id);
match c.send(msg) {
Err(e) => println!("error! {:?}", e),
_ => println!("OK!"),
}
}
}
// TODO: clean up, emit disconnected signal. Join on threads started in
@@ -169,14 +225,14 @@ impl telepathy::Connection for Connection {
println!("Connection<{}>::connect()", self.id);
let inbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let state = self.state.clone();
let _inbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
}
}
@@ -184,64 +240,120 @@ impl telepathy::Connection for Connection {
});
let smtp_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let state = self.state.clone();
let _smtp_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
}
}
});
let mvbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let state = self.state.clone();
let _mvbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
}
}
});
let sentbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let state = self.state.clone();
let _sentbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
}
}
});
// Just pretend to be connected all the time for now. Tracking IMAP+SMTP
// state is a pain
let state = self.state.clone();
let mut w = state.write().unwrap();
*w = ConnState::Connected;
// Emit a NewConnection signal for the benefit of others, but the caller
// learns from our RPC response
let sig = telepathy::ConnectionStatusChanged {
status: 0, // Connected
reason: 1, // Requested
};
let dbus_conn_path = dbus::strings::Path::new(CONN_OBJECT_PATH.to_string())
.expect("Object path should meet DBUS requirements");
self.msgq
.clone()
.lock()
.unwrap()
.push_back(sig.to_emit_message(&dbus_conn_path));
Ok(())
}
fn disconnect(&self) -> Result<(), tree::MethodErr> {
println!("Connection<{}>::disconnect()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
let state = self.state.clone();
let mut w = state.write().unwrap();
*w = ConnState::Disconnected;
// FIXME: we need to signal to the CM that they should remove the
// connection from the active list
Ok(())
}
fn interfaces(&self) -> Result<Vec<String>, tree::MethodErr> {
println!("Connection<{}>::interfaces()", self.id);
self.get_interfaces()
}
fn get_interfaces(&self) -> Result<Vec<String>, tree::MethodErr> {
println!("Connection<{}>::get_interfaces()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
Ok(vec![
"org.freedesktop.Telepathy.Connection.Interface.Contacts".to_string(),
"org.freedesktop.Telepathy.Connection.Interface.Requests".to_string(),
])
}
fn get_protocol(&self) -> Result<String, tree::MethodErr> {
println!("Connection<{}>::get_protocol()", self.id);
Ok(super::PROTO_NAME.to_string())
}
fn self_handle(&self) -> Result<u32, tree::MethodErr> {
println!("Connection<{}>::self_handle()", self.id);
self.get_self_handle()
}
fn get_self_handle(&self) -> Result<u32, tree::MethodErr> {
println!("Connection<{}>::get_self_handle()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
Ok(deltachat::constants::DC_CONTACT_ID_SELF)
}
fn status(&self) -> Result<u32, tree::MethodErr> {
println!("Connection<{}>::status()", self.id);
self.get_status()
}
fn get_status(&self) -> Result<u32, tree::MethodErr> {
println!("Connection<{}>::get_status()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
match *self.state.clone().read().unwrap() {
ConnState::Initial | ConnState::Disconnected => Ok(2),
ConnState::Connected => Ok(0),
}
}
fn hold_handles(&self, handle_type: u32, handles: Vec<u32>) -> Result<(), tree::MethodErr> {
@@ -318,28 +430,15 @@ impl telepathy::Connection for Connection {
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
}
fn interfaces(&self) -> Result<Vec<String>, tree::MethodErr> {
println!("Connection<{}>::interfaces()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
}
fn self_handle(&self) -> Result<u32, tree::MethodErr> {
println!("Connection<{}>::self_handle()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
}
fn self_id(&self) -> Result<String, tree::MethodErr> {
println!("Connection<{}>::self_id()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
}
fn status(&self) -> Result<u32, tree::MethodErr> {
println!("Connection<{}>::status()", self.id);
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
Ok("Yourself".to_string()) // FIXME: this could be passed through config
}
fn has_immortal_handles(&self) -> Result<bool, tree::MethodErr> {
println!("Connection<{}>::has_immortal_handles()", self.id);
Ok(true)
}
}