Flesh out channels some more. They can now be closed.

As part of this, move Connection's queue to mpsc and have all channels
for a connection share that connection's main loop.
This commit is contained in:
2020-05-17 22:05:24 +01:00
parent 1eefce4f1c
commit 782662b82f
6 changed files with 249 additions and 91 deletions

View File

@@ -8,13 +8,33 @@ pub use messages::*;
mod type_text;
pub use type_text::*;
use crate::padfoot::DbusAction;
use crate::telepathy;
use std::sync::Arc;
use deltachat as dc;
use std::sync::{mpsc, Arc, RwLock};
type Result<T> = std::result::Result<T, dbus::tree::MethodErr>;
pub type HandleType = u32;
#[allow(dead_code)]
pub const HANDLE_TYPE_NONE: HandleType = 0;
pub const HANDLE_TYPE_CONTACT: HandleType = 1;
#[allow(dead_code)]
pub const HANDLE_TYPE_ROOM: HandleType = 2;
#[allow(dead_code)]
pub const HANDLE_TYPE_LIST: HandleType = 3; // Deprecated
#[allow(dead_code)]
pub const HANDLE_TYPE_GROUP: HandleType = 4; // Deprecated
// FIXME: I'm assuming that all channels will be of type text and 1-1 for now.
#[derive(Debug)]
pub struct Channel {
pub path: dbus::Path<'static>,
ctx: Arc<RwLock<dc::context::Context>>,
actq: mpsc::Sender<DbusAction>,
path: dbus::Path<'static>,
target_handle: u32, // Who we're talking to
}
// "This SHOULD NOT include the channel type and channel interface itself"
@@ -22,9 +42,49 @@ pub fn channel_interfaces() -> Vec<String> {
vec!["org.freedesktop.Telepathy.Channel.Interface.Messages".to_string()]
}
type Result<T> = std::result::Result<T, dbus::tree::MethodErr>;
impl Channel {
pub fn new(
ctx: Arc<RwLock<dc::context::Context>>,
path: dbus::Path<'static>,
target_handle: u32,
actq: mpsc::Sender<DbusAction>,
) -> Self {
Channel {
ctx,
path,
target_handle,
actq,
}
}
pub fn path(&self) -> dbus::Path<'static> {
self.path.clone()
}
pub fn chan_type(&self) -> String {
"org.freedesktop.Telepathy.Channel.Type.Text".to_string() // FIXME: this shouldn't be hardcoded
}
pub fn handle_type(&self) -> HandleType {
HANDLE_TYPE_CONTACT // FIXME: this shouldn't be hardcoded
}
pub fn handle(&self) -> u32 {
self.target_handle
}
fn target_contact(&self) -> Option<dc::contact::Contact> {
let ctx = self.ctx.read().unwrap();
dc::contact::Contact::get_by_id(&ctx, self.handle()).ok()
}
fn initiator_contact(&self) -> Option<dc::contact::Contact> {
let ctx = self.ctx.read().unwrap();
dc::contact::Contact::get_by_id(&ctx, self.handle()).ok() // FIXME: this will be wrong for outbound channels
}
pub fn build_object_path(
channel: Arc<Channel>,
) -> dbus::tree::ObjectPath<dbus::tree::MTFn, ()> {

View File

@@ -1,3 +1,4 @@
use crate::padfoot::DbusAction;
use crate::telepathy;
use dbus::tree::MethodErr;
@@ -13,27 +14,37 @@ impl AsRef<dyn telepathy::Channel + 'static> for std::sync::Arc<Channel> {
impl telepathy::Channel for Channel {
fn close(&self) -> Result<()> {
println!("Channel::close()");
self.actq
.send(DbusAction::CloseChannel(self.path()))
.unwrap();
Err(MethodErr::no_arg())
}
// Deprecated
fn get_channel_type(&self) -> Result<String> {
self.channel_type()
}
fn get_handle(&self) -> Result<(u32, u32)> {
println!("Channel::get_handle()");
Err(MethodErr::no_arg())
}
fn get_interfaces(&self) -> Result<Vec<String>> {
println!("Channel::get_interfaces()");
Err(MethodErr::no_arg())
}
fn channel_type(&self) -> Result<String> {
println!("Channel::channel_type()");
Ok("org.freedesktop.Telepathy.Channel.Text".to_string())
Ok(self.chan_type())
}
// Deprecated
fn get_handle(&self) -> Result<(u32, u32)> {
println!("Channel::get_handle()");
Ok((self.handle_type(), self.handle()))
}
// Deprecated
fn get_interfaces(&self) -> Result<Vec<String>> {
println!("Channel::get_interfaces()");
self.interfaces()
}
fn interfaces(&self) -> Result<Vec<String>> {
@@ -43,31 +54,45 @@ impl telepathy::Channel for Channel {
fn target_handle(&self) -> Result<u32> {
println!("Channel::target_handle()");
Err(MethodErr::no_arg())
Ok(self.handle())
}
fn target_id(&self) -> Result<String> {
println!("Channel::target_id()");
Err(MethodErr::no_arg())
if let Some(contact) = self.target_contact() {
Ok(contact.get_addr().to_string())
} else {
Err(MethodErr::no_arg())
}
}
fn target_handle_type(&self) -> Result<u32> {
println!("Channel::target_handle_type()");
Err(MethodErr::no_arg())
Ok(self.handle_type())
}
fn requested(&self) -> Result<bool> {
println!("Channel::requested()");
Err(MethodErr::no_arg())
Ok(false) // FIXME: channels initiated by ourselves *will* be requested
}
fn initiator_handle(&self) -> Result<u32> {
println!("Channel::initiator_handle()");
Err(MethodErr::no_arg())
self.target_handle() // FIXME: Not the case for channels initiated by ourselves
}
fn initiator_id(&self) -> Result<String> {
println!("Channel::initiator_id()");
Err(MethodErr::no_arg())
if let Some(contact) = self.initiator_contact() {
Ok(contact.get_addr().to_string())
} else {
Err(MethodErr::no_arg())
}
}
}

View File

@@ -25,6 +25,7 @@ use crate::telepathy;
use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection};
use dbus::channel::{MatchingReceiver, Sender};
use dbus::message::SignalArgs;
use dbus::tree::MethodErr;
use dc::config::Config;
@@ -32,7 +33,7 @@ use dc::context::Context;
use dc::Event;
use deltachat as dc;
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::time::Duration;
@@ -42,26 +43,28 @@ pub const CONN_OBJECT_PATH: &str = "/org/freedesktop/Telepathy/Connection/padfoo
// Only the main loop has access to the DBUS connection. Interacting with DBUS
// outside of method return values requires one of these to be added to actq
#[derive(Debug)]
enum DbusAction {
pub enum DbusAction {
Signal(dbus::Message), // Generic signal to send
NewChannel(Channel), // Add this channel
CloseChannel(dbus::Path<'static>), // Close this channel
IncomingMessage(dc::chat::ChatId, dc::message::MsgId), // Look at this \o/
FreshMessages, // Hint that some messages need looking at
}
#[derive(Debug)]
// A connection uses delta database IDs as handles, and email addresses as IDs
pub struct Connection {
// Used for sending out messages
actq: Arc<Mutex<VecDeque<DbusAction>>>,
actq: mpsc::Sender<DbusAction>,
// actq: Arc<Mutex<VecDeque<DbusAction>>>,
// Channels we own
channels: Arc<RwLock<HashMap<dbus::Path<'static>, Arc<Channel>>>>,
// Owned by the CM. Remove ourselves from this when done
conns: Arc<Mutex<HashSet<String>>>,
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
ctx: Arc<RwLock<Context>>,
settings: ConnSettings,
@@ -112,16 +115,16 @@ impl ConnSettings {
CONN_BUS_NAME.to_owned() + "." + &self.id
}
pub fn path(&self) -> String {
CONN_OBJECT_PATH.to_owned() + "/" + &self.id
pub fn path(&self) -> dbus::Path<'static> {
dbus::Path::new(format!("{}/{}", CONN_OBJECT_PATH, &self.id)).expect("Valid path")
}
}
impl Connection {
pub fn new(
settings: ConnSettings,
conns: Arc<Mutex<HashSet<String>>>,
) -> Result<Self, MethodErr> {
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
) -> Result<(Self, mpsc::Receiver<DbusAction>), MethodErr> {
let mut dbfile = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot")
.ok_or_else(MethodErr::no_arg)
.and_then(|p| Ok(p.data_local_dir().to_path_buf()))?;
@@ -129,12 +132,13 @@ impl Connection {
dbfile.push(settings.id());
dbfile.push("db.sqlite3");
// FIXME: how to give it access to the connection (initialized later)?
let actq = Arc::new(Mutex::new(VecDeque::<DbusAction>::new()));
let (q_s, q_r) = mpsc::channel::<DbusAction>();
let id = settings.id();
// Use this if we need to send messages in response to DC events:
let queue = actq.clone();
// The closure is shared between several different threads in delta, and
// we can't Send *or* clone the mpsc sender across them, so just wrap it
// in a mutex for now
let queue = Mutex::new(q_s.clone());
let f = move |_c: &Context, e: Event| {
match e {
Event::Info(msg) => println!("Connection<{}>: INFO: {}", id, msg),
@@ -153,20 +157,22 @@ impl Connection {
"Connection<{}>: Messages changed for {}: {}",
id, chat_id, msg_id
);
let q = queue.clone();
q.lock()
queue
.lock()
.unwrap()
.push_back(DbusAction::IncomingMessage(chat_id, msg_id));
.send(DbusAction::IncomingMessage(chat_id, msg_id))
.unwrap();
}
Event::IncomingMsg { chat_id, msg_id } => {
println!(
"Connection<{}>: Incoming message for {}: {}",
id, chat_id, msg_id
);
let q = queue.clone();
q.lock()
queue
.lock()
.unwrap()
.push_back(DbusAction::IncomingMessage(chat_id, msg_id));
.send(DbusAction::IncomingMessage(chat_id, msg_id))
.unwrap();
}
/* Unhandled messages:
@@ -212,16 +218,19 @@ impl Connection {
ctx.configure();
};
Ok(Connection {
conns,
settings,
actq,
channels: Arc::new(RwLock::new(
HashMap::<dbus::Path<'static>, Arc<Channel>>::new(),
)),
ctx: Arc::new(RwLock::new(ctx)),
state: Arc::new(RwLock::new(ConnState::Initial)),
})
Ok((
Connection {
conns,
settings,
actq: q_s,
channels: Arc::new(RwLock::new(
HashMap::<dbus::Path<'static>, Arc<Channel>>::new(),
)),
ctx: Arc::new(RwLock::new(ctx)),
state: Arc::new(RwLock::new(ConnState::Initial)),
},
q_r,
))
}
// This should be run inside its own thread. It will signal via the channel
@@ -229,7 +238,11 @@ impl Connection {
//
// FIXME: running several +process+ loops sure is convenient, but it also
// seems inefficient...
pub fn run(self, signal: mpsc::Sender<Option<MethodErr>>) {
pub fn run(
self,
done_signal: mpsc::Sender<Option<MethodErr>>,
queue_receiver: mpsc::Receiver<DbusAction>,
) {
let id = self.id();
let bus = self.bus();
let path = self.path();
@@ -237,6 +250,7 @@ impl Connection {
let conns = self.conns.clone();
let chans = self.channels.clone();
let actq = self.actq.clone();
let ctx = self.ctx.clone();
let state = self.state.clone();
let tree = self.build_tree();
@@ -266,19 +280,19 @@ impl Connection {
match c.request_name(bus.clone(), false, false, true) {
Ok(RequestNameReply::Exists) => {
println!("Another process is already registered on {}", bus);
signal.send(Some(MethodErr::no_arg())).unwrap();
done_signal.send(Some(MethodErr::no_arg())).unwrap();
return;
}
Err(e) => {
println!("Failed to register {}: {}", bus, e);
signal.send(Some(MethodErr::no_arg())).unwrap();
done_signal.send(Some(MethodErr::no_arg())).unwrap();
return;
}
_ => {
// All other responses we can get are a success. We are now on
// the message bus, so the caller can proceed
println!("{} listening on {}", c.unique_name(), bus);
signal.send(None).unwrap();
done_signal.send(None).unwrap();
}
};
@@ -292,15 +306,10 @@ impl Connection {
};
// Spend a bit of time sending any outgoing messages - signals, mostly
while let Some(act) = {
let mut q = actq.lock().unwrap();
let r = q.pop_front();
drop(q); // Only hold the mutex for the pop operation
r
} {
while let Some(act) = queue_receiver.try_recv().ok() {
match act {
DbusAction::Signal(msg) => {
print!("Connection<{}>: Sending message...", id);
print!("*** Connection<{}>: Sending message...", id);
match c.send(msg) {
Err(e) => println!("error! {:?}", e), // FIXME: handle error better?
@@ -308,7 +317,10 @@ impl Connection {
}
}
DbusAction::NewChannel(channel) => {
let chan_path = channel.path.clone();
let chan_type = channel.chan_type();
let handle_type = channel.handle_type();
let handle = channel.handle();
let chan_path = channel.path().clone();
let rc_channel = Arc::new(channel);
println!("*** Creating channel {}", chan_path);
@@ -322,15 +334,34 @@ impl Connection {
let op = Channel::build_object_path(rc_channel);
t2.lock().unwrap().insert(op);
// TODO: emit signals
let requests_sig = telepathy::ConnectionInterfaceRequestsNewChannels {
channels: vec![(chan_path.clone(), HashMap::new())],
};
let legacy_sig = telepathy::ConnectionNewChannel {
object_path: chan_path.clone(),
channel_type: chan_type,
handle_type, // contact. FIXME: support other channel types
handle, // id of other contact
// TODO: initiator needs to be tracked
suppress_handler: false, // We'll need to start passing this
};
actq.send(DbusAction::Signal(requests_sig.to_emit_message(&path)))
.unwrap();
actq.send(DbusAction::Signal(legacy_sig.to_emit_message(&path)))
.unwrap();
}
DbusAction::CloseChannel(path) => {
println!("*** Closing channel {}", path.clone());
let _chan = Arc::clone(&chans).write().unwrap().remove(&path);
let t2 = tree.clone();
t2.lock().unwrap().remove(&path);
// TODO: emit signals
}
DbusAction::IncomingMessage(chat_id, msg_id) => {
println!("*** Incoming message: {} {}", chat_id, msg_id);
// TODO: check if we have a channel for the chat
let chan_path = dbus::strings::Path::new(format!(
"{}/{}",
@@ -349,12 +380,44 @@ impl Connection {
msg_id, chan_path
);
} else {
println!("Channel for {} doesn't exist yet, re-enqueuing", chat_id);
let chan = Channel { path: chan_path };
let mut q = actq.lock().unwrap();
q.push_back(DbusAction::NewChannel(chan));
q.push_back(act);
drop(q);
print!("Channel for {} doesn't exist yet, creating it...", chat_id);
let contacts =
dc::chat::get_chat_contacts(&ctx.clone().read().unwrap(), chat_id);
if contacts.is_empty() {
println!("empty chat! ignoring");
continue;
}
if contacts.len() > 1 {
println!("...{} contacts in chat, ignoring!", contacts.len())
}
let handle = contacts.first().unwrap();
let chan = Channel::new(ctx.clone(), chan_path, *handle, actq.clone());
actq.send(DbusAction::NewChannel(chan)).unwrap();
actq.send(act).unwrap();
println!("OK");
}
}
DbusAction::FreshMessages => {
println!("*** FRESH MESSAGES");
let ctx_rc = ctx.clone();
let ctx = ctx_rc.read().unwrap();
for msg_id in dc::context::Context::get_fresh_msgs(&ctx) {
println!(" FRESH MESSAGE: {}", msg_id);
match dc::message::Message::load_from_db(&ctx, msg_id) {
Ok(msg) => {
actq.send(DbusAction::IncomingMessage(
msg.get_chat_id(),
msg_id,
))
.unwrap();
}
Err(e) => println!("Couldn't load fresh message {}: {}", msg_id, e),
}
}
}
}
@@ -375,7 +438,7 @@ impl Connection {
self.settings.bus()
}
pub fn path(&self) -> String {
pub fn path(&self) -> dbus::Path<'static> {
self.settings.path()
}

View File

@@ -1,6 +1,5 @@
use crate::telepathy;
use crate::telepathy::ConnectionInterfaceRequests; // Non-deprecated channel methods
use crate::telepathy::{ConnectionInterfaceContacts, ConnectionInterfaceRequests}; // Non-deprecated channel methods
use dbus::message::SignalArgs;
use dbus::tree::MethodErr;
use deltachat as dc;
@@ -120,13 +119,10 @@ impl telepathy::Connection for Connection {
reason: 1, // Requested
};
let dbus_conn_path = dbus::strings::Path::new(self.path())
.expect("Object path should meet DBUS requirements");
self.actq
.lock()
.unwrap()
.push_back(DbusAction::Signal(sig.to_emit_message(&dbus_conn_path)));
.send(DbusAction::Signal(sig.to_emit_message(&self.path())))
.unwrap();
self.actq.send(DbusAction::FreshMessages).unwrap();
Ok(())
}
@@ -213,7 +209,22 @@ impl telepathy::Connection for Connection {
handle_type,
handles
);
Err(MethodErr::no_arg()) // FIXME: should be NotImplemented?
match handle_type {
crate::padfoot::HANDLE_TYPE_CONTACT => {
let mut out = Vec::<String>::new();
for (_handle, attrs) in self.get_contact_attributes(handles, vec![], true)? {
if let Some(contact_id) =
attrs.get("org.freedesktop.Telepathy.Connection/contact-id")
{
out.push(contact_id.0.as_str().unwrap().to_string());
} else {
return Err(MethodErr::no_arg()); // FIXME: should be InvalidHandle
}
}
Ok(out)
}
_ => Err(MethodErr::no_arg()), // FIXME: should be NotImplemented?
}
}
// Deprecated in favour of Requests.Channels

View File

@@ -45,7 +45,7 @@ impl telepathy::ConnectionInterfaceRequests for Connection {
let mut out = Vec::<ChannelSpec>::new();
for channel in self.channels.read().unwrap().values() {
out.push((
channel.path.clone(),
channel.path(),
HashMap::<String, VarArg>::new(), // FIXME: work out what props should be shown
));
}

View File

@@ -14,7 +14,7 @@ pub const CM_OBJECT_PATH: &str = "/org/freedesktop/Telepathy/ConnectionManager/p
#[derive(Debug)]
pub struct ConnectionManager {
conns: Arc<Mutex<HashSet<String>>>,
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
sender: mpsc::Sender<dbus::Message>,
}
@@ -30,7 +30,7 @@ impl ConnectionManager {
(
ConnectionManager {
conns: Arc::new(Mutex::new(HashSet::<String>::new())),
conns: Arc::new(Mutex::new(HashSet::<dbus::Path<'static>>::new())),
sender: msg_s,
},
msg_r,
@@ -46,26 +46,25 @@ impl ConnectionManager {
let mut conns = self.conns.lock().expect("Mutex access");
let dbus_conn_path = dbus::strings::Path::new(path.to_owned())
.expect("Object path should meet DBUS requirements");
// We can't call connect() multiple times on the connection yet
if conns.contains(&path) {
return Err(MethodErr::no_arg());
}
let conn = Connection::new(settings, self.conns.clone())?;
// It would be nice to have a single main loop, but thread-per-conn is
// is easy enough for me to understand in Rust at the moment.
let conns_clone = self.conns.clone();
let (ok_s, ok_r) = mpsc::channel();
std::thread::spawn(move || conn.run(ok_s));
std::thread::spawn(move || {
let (conn, receiver) = Connection::new(settings, conns_clone).unwrap();
conn.run(ok_s, receiver);
});
// Emit a NewConnection signal for the benefit of others, but the caller
// learns from our RPC response
let sig = telepathy::ConnectionManagerNewConnection {
bus_name: bus.to_owned(),
object_path: dbus_conn_path.clone(),
object_path: path.clone(),
protocol: super::PROTO_NAME.to_string(),
};
@@ -81,7 +80,7 @@ impl ConnectionManager {
return Err(MethodErr::no_arg());
}
conns.insert(path);
conns.insert(path.clone());
let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string())
.expect("Object path should meet DBUS requirements");
@@ -91,7 +90,7 @@ impl ConnectionManager {
.expect("send signal");
// The bus name *must* be org.freedesktop.Telepathy.Connection.padfoot.delta.<name>
Ok((bus, dbus_conn_path))
Ok((bus, path))
}
}