Upgrade to Delta v1.33.0

Despite the incremental version number, this is a backward-incompatible
change, switching code to async. It does allow us to reduce the number
of threads significantly, though.
This commit is contained in:
2020-05-28 02:15:25 +01:00
parent 9e764d72a1
commit bd76603c54
11 changed files with 479 additions and 693 deletions

812
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,8 +9,9 @@ license = "MIT"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
async-std = "1.6"
dbus = "0.8" dbus = "0.8"
deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag="1.33.0" } deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag="1.34.0" }
directories = "2.0" directories = "2.0"
rand = "0.7" rand = "0.7"

View File

@@ -11,10 +11,11 @@ pub use type_text::*;
use crate::padfoot::{var_bool, var_str, var_str_vec, var_u32, DbusAction, VarArg}; use crate::padfoot::{var_bool, var_str, var_str_vec, var_u32, DbusAction, VarArg};
use crate::telepathy; use crate::telepathy;
use async_std::task::block_on;
use deltachat as dc; use deltachat as dc;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{mpsc, Arc, RwLock}; use std::sync::{mpsc, Arc};
type Result<T> = std::result::Result<T, dbus::tree::MethodErr>; type Result<T> = std::result::Result<T, dbus::tree::MethodErr>;
@@ -35,7 +36,7 @@ pub const HANDLE_TYPE_GROUP: HandleType = 4; // Deprecated
pub struct Channel { pub struct Channel {
actq: mpsc::Sender<DbusAction>, actq: mpsc::Sender<DbusAction>,
chat_id: dc::chat::ChatId, chat_id: dc::chat::ChatId,
ctx: Arc<RwLock<dc::context::Context>>, ctx: Arc<dc::context::Context>,
initiator_handle: u32, initiator_handle: u32,
path: dbus::Path<'static>, path: dbus::Path<'static>,
requested: bool, requested: bool,
@@ -51,7 +52,7 @@ impl Channel {
pub fn new( pub fn new(
actq: mpsc::Sender<DbusAction>, actq: mpsc::Sender<DbusAction>,
chat_id: dc::chat::ChatId, chat_id: dc::chat::ChatId,
ctx: Arc<RwLock<dc::context::Context>>, ctx: Arc<dc::context::Context>,
initiator_handle: u32, initiator_handle: u32,
path: dbus::Path<'static>, path: dbus::Path<'static>,
requested: bool, requested: bool,
@@ -126,15 +127,15 @@ impl Channel {
} }
pub fn target_contact(&self) -> Option<dc::contact::Contact> { pub fn target_contact(&self) -> Option<dc::contact::Contact> {
let ctx = self.ctx.read().unwrap(); block_on(dc::contact::Contact::get_by_id(&self.ctx, self.handle())).ok()
dc::contact::Contact::get_by_id(&ctx, self.handle()).ok()
} }
pub fn initiator_contact(&self) -> Option<dc::contact::Contact> { pub fn initiator_contact(&self) -> Option<dc::contact::Contact> {
let ctx = self.ctx.read().unwrap(); block_on(dc::contact::Contact::get_by_id(
&self.ctx,
dc::contact::Contact::get_by_id(&ctx, self.initiator_handle).ok() // FIXME: this will be wrong for outbound channels self.initiator_handle,
))
.ok() // FIXME: this will be wrong for outbound channels
} }
pub fn requested(&self) -> bool { pub fn requested(&self) -> bool {
@@ -185,10 +186,11 @@ impl Channel {
_ => return, _ => return,
}; };
let ctx = self.ctx.read().unwrap(); if let Err(e) = block_on(dc::imex::continue_key_transfer(
if let Err(e) = &self.ctx,
dc::imex::continue_key_transfer(&ctx, dc::message::MsgId::new(msg_id), &setup_code) dc::message::MsgId::new(msg_id),
{ &setup_code,
)) {
println!("Failed to apply setup code {}: {}", msg_id, e); println!("Failed to apply setup code {}: {}", msg_id, e);
} }
} }

View File

@@ -1,6 +1,7 @@
use crate::padfoot::{convert_msg, DbusAction, VarArg}; use crate::padfoot::{convert_msg, DbusAction, VarArg};
use crate::telepathy; use crate::telepathy;
use async_std::task::block_on;
use dbus::message::SignalArgs; use dbus::message::SignalArgs;
use dbus::tree::MethodErr; use dbus::tree::MethodErr;
use dc::constants::Viewtype; use dc::constants::Viewtype;
@@ -42,10 +43,10 @@ impl telepathy::ChannelInterfaceMessages for Channel {
self.try_process_setupmsg(text); self.try_process_setupmsg(text);
}; };
let ctx = self.ctx.read().unwrap(); let ctx = &self.ctx;
let blobdir = ctx.get_blobdir(); let blobdir = ctx.get_blobdir();
let msg_id = match dc::chat::send_msg(&ctx, self.chat_id, &mut delta_msg) { let msg_id = match block_on(dc::chat::send_msg(&ctx, self.chat_id, &mut delta_msg)) {
Ok(msg_id) => msg_id, Ok(msg_id) => msg_id,
Err(e) => { Err(e) => {
println!(" Failed to send message: {}", e); println!(" Failed to send message: {}", e);
@@ -111,11 +112,11 @@ impl telepathy::ChannelInterfaceMessages for Channel {
println!("Channel::pending_messages()"); println!("Channel::pending_messages()");
let mut out = Vec::<Vec<HashMap<String, VarArg>>>::new(); let mut out = Vec::<Vec<HashMap<String, VarArg>>>::new();
let ctx = self.ctx.read().unwrap(); let ctx = &self.ctx;
let blobdir = ctx.get_blobdir(); let blobdir = ctx.get_blobdir();
for msg_id in dc::chat::get_chat_msgs(&ctx, self.chat_id, 0, None) { for msg_id in block_on(dc::chat::get_chat_msgs(ctx, self.chat_id, 0, None)) {
if let Ok(msg) = dc::message::Message::load_from_db(&ctx, msg_id) { if let Ok(msg) = block_on(dc::message::Message::load_from_db(ctx, msg_id)) {
match msg.get_state() { match msg.get_state() {
MessageState::InFresh | MessageState::InNoticed => { MessageState::InFresh | MessageState::InNoticed => {
println!(" A message: {:?}", msg); println!(" A message: {:?}", msg);

View File

@@ -2,6 +2,7 @@ use crate::padfoot::DbusAction;
use crate::telepathy; use crate::telepathy;
use crate::telepathy::ChannelInterfaceMessages; use crate::telepathy::ChannelInterfaceMessages;
use async_std::task::block_on;
use dbus::message::SignalArgs; use dbus::message::SignalArgs;
use dbus::tree::MethodErr; use dbus::tree::MethodErr;
use dc::message::MsgId; use dc::message::MsgId;
@@ -31,14 +32,13 @@ impl telepathy::ChannelTypeText for Channel {
fn acknowledge_pending_messages(&self, ids: Vec<u32>) -> Result<()> { fn acknowledge_pending_messages(&self, ids: Vec<u32>) -> Result<()> {
println!("Channel::acknowledge_pending_messages({:?})", ids); println!("Channel::acknowledge_pending_messages({:?})", ids);
let ctx = self.ctx.read().unwrap();
let mut msg_ids = Vec::<MsgId>::new(); let mut msg_ids = Vec::<MsgId>::new();
for msg_id in &ids { for msg_id in &ids {
msg_ids.push(MsgId::new(*msg_id)); msg_ids.push(MsgId::new(*msg_id));
} }
print!(" Marking messages as seen..."); print!(" Marking messages as seen...");
let result = dc::message::markseen_msgs(&ctx, &msg_ids); let result = block_on(dc::message::markseen_msgs(&self.ctx, msg_ids));
if result { if result {
println!("OK!"); println!("OK!");

View File

@@ -23,6 +23,7 @@ pub use self::simple_presence::*;
use crate::padfoot::{convert_msg, Channel, VarArg}; use crate::padfoot::{convert_msg, Channel, VarArg};
use crate::telepathy; use crate::telepathy;
use async_std::task::block_on;
use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection}; use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection};
use dbus::channel::{MatchingReceiver, Sender}; use dbus::channel::{MatchingReceiver, Sender};
use dbus::message::SignalArgs; use dbus::message::SignalArgs;
@@ -35,6 +36,7 @@ use deltachat as dc;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc, Mutex, RwLock}; use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::thread;
use std::time::Duration; use std::time::Duration;
pub const CONN_BUS_NAME: &str = "org.freedesktop.Telepathy.Connection.padfoot.delta"; pub const CONN_BUS_NAME: &str = "org.freedesktop.Telepathy.Connection.padfoot.delta";
@@ -66,7 +68,7 @@ pub struct Connection {
// Owned by the CM. Remove ourselves from this when done // Owned by the CM. Remove ourselves from this when done
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>, conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
ctx: Arc<RwLock<Context>>, ctx: Arc<Context>, // Delta contexts are threadsafe
settings: ConnSettings, settings: ConnSettings,
state: Arc<RwLock<ConnState>>, state: Arc<RwLock<ConnState>>,
} }
@@ -141,9 +143,11 @@ impl Connection {
settings: ConnSettings, settings: ConnSettings,
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>, conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
) -> Result<(Self, mpsc::Receiver<DbusAction>), MethodErr> { ) -> Result<(Self, mpsc::Receiver<DbusAction>), MethodErr> {
let mut dbfile = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot") let proj_dir = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot")
.ok_or_else(MethodErr::no_arg) .ok_or_else(MethodErr::no_arg)?;
.and_then(|p| Ok(p.data_local_dir().to_path_buf()))?;
let mut dbfile = async_std::path::PathBuf::new();
dbfile.push(proj_dir.data_local_dir().to_str().unwrap());
dbfile.push(settings.id()); dbfile.push(settings.id());
dbfile.push("db.sqlite3"); dbfile.push("db.sqlite3");
@@ -151,104 +155,104 @@ impl Connection {
let (q_s, q_r) = mpsc::channel::<DbusAction>(); let (q_s, q_r) = mpsc::channel::<DbusAction>();
let id = settings.id(); let id = settings.id();
// The closure is shared between several different threads in delta, and let ctx = Arc::new(
// we can't Send *or* clone the mpsc sender across them, so just wrap it block_on(Context::new("telepathy-padfoot".to_string(), dbfile)).map_err(|e| {
// in a mutex for now
let queue = Mutex::new(q_s.clone());
let f = move |_c: &Context, e: Event| {
match e {
Event::Info(msg) => println!("Connection<{}>: INFO: {}", id, msg),
Event::Warning(msg) => println!("Connection<{}>: WARN : {}", id, msg),
Event::Error(msg) | Event::ErrorNetwork(msg) | Event::ErrorSelfNotInGroup(msg) => {
println!("Connection<{}>: ERR : {}", id, msg)
}
Event::ConfigureProgress(progress) => {
println!("Connection<{}>: Configuration progress: {}", id, progress)
}
Event::ImapConnected(msg) | Event::SmtpConnected(msg) => {
println!("Connection<{}>: Network: {}", id, msg);
}
Event::MsgsChanged { chat_id, msg_id } => {
println!(
"Connection<{}>: Messages changed for {}: {}",
id, chat_id, msg_id
);
queue
.lock()
.unwrap()
.send(DbusAction::IncomingMessage(chat_id, msg_id))
.unwrap();
}
Event::IncomingMsg { chat_id, msg_id } => {
println!(
"Connection<{}>: Incoming message for {}: {}",
id, chat_id, msg_id
);
queue
.lock()
.unwrap()
.send(DbusAction::IncomingMessage(chat_id, msg_id))
.unwrap();
}
/* Unhandled messages:
SmtpMessageSent(String),
ImapMessageDeleted(String),
ImapFolderEmptied(String),
NewBlobFile(String),
DeletedBlobFile(String),
MsgDelivered
MsgFailed
MsgRead
ChatModified(ChatId),
ContactsChanged(Option<u32>),
LocationChanged(Option<u32>),
ImexProgress(usize),
ImexFileWritten(PathBuf),
SecurejoinInviterProgress
SecurejoinJoinerProgress
*/
Event::ImapMessageMoved(_) | Event::ImapMessageDeleted(_) => {}
_ => println!("Connection<{}>: unhandled event received: {:?}", id, e),
};
};
let ctx =
Context::new(Box::new(f), "telepathy-padfoot".to_string(), dbfile).map_err(|e| {
println!( println!(
"Connection<{}>::new(): couldn't get delta context: {}", "Connection<{}>::new(): couldn't get delta context: {}",
settings.id(), settings.id(),
e e
); );
MethodErr::no_arg() // FIXME: better error handling MethodErr::no_arg() // FIXME: better error handling
})?; })?,
);
ctx.set_config(Config::Addr, Some(&settings.account)) let e_ctx = ctx.clone();
let e_queue = q_s.clone();
thread::spawn(move || {
let emitter = e_ctx.get_event_emitter();
while let Some(e) = emitter.recv_sync() {
match e {
Event::Info(msg) => println!("Connection<{}>: INFO: {}", id, msg),
Event::Warning(msg) => println!("Connection<{}>: WARN : {}", id, msg),
Event::Error(msg)
| Event::ErrorNetwork(msg)
| Event::ErrorSelfNotInGroup(msg) => {
println!("Connection<{}>: ERR : {}", id, msg)
}
Event::ConfigureProgress(progress) => {
println!("Connection<{}>: Configuration progress: {}", id, progress)
}
Event::ImapConnected(msg) | Event::SmtpConnected(msg) => {
println!("Connection<{}>: Network: {}", id, msg);
}
Event::MsgsChanged { chat_id, msg_id } => {
println!(
"Connection<{}>: Messages changed for {}: {}",
id, chat_id, msg_id
);
e_queue
.send(DbusAction::IncomingMessage(chat_id, msg_id))
.unwrap();
}
Event::IncomingMsg { chat_id, msg_id } => {
println!(
"Connection<{}>: Incoming message for {}: {}",
id, chat_id, msg_id
);
e_queue
.send(DbusAction::IncomingMessage(chat_id, msg_id))
.unwrap();
}
/* Unhandled messages:
SmtpMessageSent(String),
ImapMessageDeleted(String),
ImapFolderEmptied(String),
NewBlobFile(String),
DeletedBlobFile(String),
MsgDelivered
MsgFailed
MsgRead
ChatModified(ChatId),
ContactsChanged(Option<u32>),
LocationChanged(Option<u32>),
ImexProgress(usize),
ImexFileWritten(PathBuf),
SecurejoinInviterProgress
SecurejoinJoinerProgress
*/
Event::ImapMessageMoved(_) | Event::ImapMessageDeleted(_) => {}
_ => println!("Connection<{}>: unhandled event received: {:?}", id, e),
};
}
});
block_on(ctx.set_config(Config::Addr, Some(&settings.account)))
.map_err(|_e| MethodErr::no_arg())?; .map_err(|_e| MethodErr::no_arg())?;
ctx.set_config(Config::MailPw, Some(&settings.password)) block_on(ctx.set_config(Config::MailPw, Some(&settings.password)))
.map_err(|_e| MethodErr::no_arg())?; .map_err(|_e| MethodErr::no_arg())?;
if settings.bcc_self { if settings.bcc_self {
ctx.set_config(Config::BccSelf, Some(&"1")) block_on(ctx.set_config(Config::BccSelf, Some(&"1")))
.map_err(|_e| MethodErr::no_arg())?; .map_err(|_e| MethodErr::no_arg())?;
} else { } else {
ctx.set_config(Config::BccSelf, Some(&"0")) block_on(ctx.set_config(Config::BccSelf, Some(&"0")))
.map_err(|_e| MethodErr::no_arg())?; .map_err(|_e| MethodErr::no_arg())?;
} }
if !ctx.is_configured() { if !block_on(ctx.is_configured()) {
ctx.configure(); block_on(ctx.configure()).unwrap();
}; };
Ok(( Ok((
Connection { Connection {
conns,
settings,
actq: q_s, actq: q_s,
channels: Arc::new(RwLock::new( channels: Arc::new(RwLock::new(
HashMap::<dbus::Path<'static>, Arc<Channel>>::new(), HashMap::<dbus::Path<'static>, Arc<Channel>>::new(),
)), )),
ctx: Arc::new(RwLock::new(ctx)), conns,
ctx,
settings,
state: Arc::new(RwLock::new(ConnState::Initial)), state: Arc::new(RwLock::new(ConnState::Initial)),
}, },
q_r, q_r,
@@ -399,8 +403,8 @@ impl Connection {
let chan_path = Connection::build_channel_path(path.clone(), chat_id); let chan_path = Connection::build_channel_path(path.clone(), chat_id);
let c2 = Arc::clone(&chans); let c2 = Arc::clone(&chans);
let chans = c2.read().unwrap(); let chans = c2.read().unwrap();
let u_ctx = ctx.clone(); //let u_ctx = ctx.clone();
let ctx = u_ctx.read().unwrap(); let ctx = ctx.clone();
let blobdir = ctx.get_blobdir(); let blobdir = ctx.get_blobdir();
// Autocreate channel if it doesn't already exist // Autocreate channel if it doesn't already exist
@@ -408,7 +412,7 @@ impl Connection {
if !chans.contains_key(&chan_path) { if !chans.contains_key(&chan_path) {
print!("Channel for {} doesn't exist yet, creating it...", chat_id); print!("Channel for {} doesn't exist yet, creating it...", chat_id);
let contacts = dc::chat::get_chat_contacts(&ctx, chat_id); let contacts = block_on(dc::chat::get_chat_contacts(&ctx, chat_id));
if contacts.len() > 1 { if contacts.len() > 1 {
println!("...{} contacts in chat, ignoring!", contacts.len()); println!("...{} contacts in chat, ignoring!", contacts.len());
continue; continue;
@@ -421,7 +425,7 @@ impl Connection {
let chan = Channel::new( let chan = Channel::new(
actq.clone(), actq.clone(),
chat_id, chat_id,
u_ctx.clone(), ctx.clone(),
*handle, // initiator is the remote contact *handle, // initiator is the remote contact
chan_path, chan_path,
false, // FIXME: this needs to handle requested channels false, // FIXME: this needs to handle requested channels
@@ -437,7 +441,7 @@ impl Connection {
// Since the channel exists, emit new message signals // Since the channel exists, emit new message signals
print!("Message {} received for {}...", msg_id, chan_path); print!("Message {} received for {}...", msg_id, chan_path);
let msg = match dc::message::Message::load_from_db(&ctx, msg_id) { let msg = match block_on(dc::message::Message::load_from_db(&ctx, msg_id)) {
Ok(m) => m, Ok(m) => m,
Err(e) => { Err(e) => {
println!("Can't load from database, skipping: {}", e); println!("Can't load from database, skipping: {}", e);
@@ -474,12 +478,11 @@ impl Connection {
DbusAction::FreshMessages => { DbusAction::FreshMessages => {
println!("*** FRESH MESSAGES"); println!("*** FRESH MESSAGES");
let ctx_rc = ctx.clone(); let ctx = ctx.clone();
let ctx = ctx_rc.read().unwrap();
for msg_id in dc::context::Context::get_fresh_msgs(&ctx) { for msg_id in block_on(dc::context::Context::get_fresh_msgs(&ctx)) {
println!(" FRESH MESSAGE: {}", msg_id); println!(" FRESH MESSAGE: {}", msg_id);
match dc::message::Message::load_from_db(&ctx, msg_id) { match block_on(dc::message::Message::load_from_db(&ctx, msg_id)) {
Ok(msg) => { Ok(msg) => {
actq.send(DbusAction::IncomingMessage( actq.send(DbusAction::IncomingMessage(
msg.get_chat_id(), msg.get_chat_id(),

View File

@@ -1,6 +1,7 @@
use crate::telepathy; use crate::telepathy;
use crate::telepathy::{ConnectionInterfaceContacts, ConnectionInterfaceRequests}; // Non-deprecated channel methods use crate::telepathy::{ConnectionInterfaceContacts, ConnectionInterfaceRequests}; // Non-deprecated channel methods
use async_std::task::block_on;
use dbus::message::SignalArgs; use dbus::message::SignalArgs;
use dbus::tree::MethodErr; use dbus::tree::MethodErr;
use dc::contact::Contact; use dc::contact::Contact;
@@ -57,64 +58,12 @@ impl telepathy::Connection for Connection {
fn connect(&self) -> Result<()> { fn connect(&self) -> Result<()> {
println!("Connection<{}>::connect()", self.id()); println!("Connection<{}>::connect()", self.id());
let inbox_ctx = self.ctx.clone(); let io_ctx = self.ctx.clone();
let state = self.state.clone(); let io_id = self.id();
let id = self.id(); let _io_thread = thread::spawn(move || {
let _inbox_thread = thread::spawn(move || { block_on(io_ctx.start_io());
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected { println!("Connection<{}>::connect(): I/O thread exited", io_id);
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
}
}
}
println!("Connection<{}>::connect(): inbox thread exited", id);
});
let smtp_ctx = self.ctx.clone();
let state = self.state.clone();
let id = self.id();
let _smtp_thread = thread::spawn(move || {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
}
}
println!("Connection<{}>::connect(): smtp thread exited", id);
});
let mvbox_ctx = self.ctx.clone();
let state = self.state.clone();
let id = self.id();
let _mvbox_thread = thread::spawn(move || {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
}
}
println!("Connection<{}>::connect(): mvbox thread exited", id);
});
let sentbox_ctx = self.ctx.clone();
let state = self.state.clone();
let id = self.id();
let _sentbox_thread = thread::spawn(move || {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
}
}
println!("Connection<{}>::connect(): sentbox thread exited", id);
}); });
// Just pretend to be connected all the time for now. Tracking IMAP+SMTP // Just pretend to be connected all the time for now. Tracking IMAP+SMTP
@@ -122,7 +71,7 @@ impl telepathy::Connection for Connection {
let state = self.state.clone(); let state = self.state.clone();
let mut w = state.write().unwrap(); let mut w = state.write().unwrap();
*w = ConnState::Connected; *w = ConnState::Connected;
let ctx = self.ctx.read().unwrap(); let ctx = self.ctx.clone();
// Emit a StatusChanged signal for the benefit of others, but the caller // Emit a StatusChanged signal for the benefit of others, but the caller
// learns from our RPC response // learns from our RPC response
@@ -136,13 +85,13 @@ impl telepathy::Connection for Connection {
self.actq.send(DbusAction::FreshMessages).unwrap(); self.actq.send(DbusAction::FreshMessages).unwrap();
// If we can, emit signals on connect about the contact list // If we can, emit signals on connect about the contact list
if let Ok(handles) = Contact::get_all( if let Ok(handles) = block_on(Contact::get_all(
&ctx, &ctx,
(dc::constants::DC_GCL_ADD_SELF as usize) (dc::constants::DC_GCL_ADD_SELF as usize)
.try_into() .try_into()
.unwrap(), .unwrap(),
None::<String>, None::<String>,
) { )) {
println!(" HANDLES: {:?}", handles); println!(" HANDLES: {:?}", handles);
let mut changes = HashMap::<u32, ContactSubscription>::new(); let mut changes = HashMap::<u32, ContactSubscription>::new();
for handle in handles { for handle in handles {
@@ -170,17 +119,12 @@ impl telepathy::Connection for Connection {
fn disconnect(&self) -> Result<()> { fn disconnect(&self) -> Result<()> {
println!("Connection<{}>::disconnect()", self.id()); println!("Connection<{}>::disconnect()", self.id());
let ctx = self.ctx.read().unwrap(); block_on(self.ctx.stop_io());
let state = self.state.clone(); let state = self.state.clone();
let mut w = state.write().unwrap(); let mut w = state.write().unwrap();
*w = ConnState::Disconnected; *w = ConnState::Disconnected;
dc::job::interrupt_inbox_idle(&ctx);
dc::job::interrupt_smtp_idle(&ctx);
dc::job::interrupt_sentbox_idle(&ctx);
dc::job::interrupt_mvbox_idle(&ctx);
// FIXME: we need to signal to the CM that they should remove the // FIXME: we need to signal to the CM that they should remove the
// connection from the active list // connection from the active list
@@ -330,7 +274,7 @@ impl telepathy::Connection for Connection {
match handle_type { match handle_type {
crate::padfoot::HANDLE_TYPE_CONTACT => { crate::padfoot::HANDLE_TYPE_CONTACT => {
let ctx = self.ctx.read().unwrap(); let ctx = &self.ctx;
let mut out = Vec::<u32>::new(); let mut out = Vec::<u32>::new();
// Identifiers is a list of email addresses. These can be // Identifiers is a list of email addresses. These can be
@@ -340,12 +284,16 @@ impl telepathy::Connection for Connection {
// FIXME: will it be faster to get all and filter? // FIXME: will it be faster to get all and filter?
for addr in identifiers { for addr in identifiers {
let id = Contact::lookup_id_by_addr(&ctx, addr, dc::contact::Origin::Unknown); let id = block_on(Contact::lookup_id_by_addr(
ctx,
addr,
dc::contact::Origin::Unknown,
));
match id { match id {
0 => { 0 => {
// No contact exists for this address yet. Try to // No contact exists for this address yet. Try to
// add one so we can have an ID. // add one so we can have an ID.
match Contact::create(&ctx, &addr, &addr) { match block_on(Contact::create(ctx, &addr, &addr)) {
Ok(new_id) => out.push(new_id), Ok(new_id) => out.push(new_id),
Err(e) => { Err(e) => {
println!("Failed to add contact {}: {}", addr, e); println!("Failed to add contact {}: {}", addr, e);
@@ -384,10 +332,10 @@ impl telepathy::Connection for Connection {
fn self_id(&self) -> Result<String> { fn self_id(&self) -> Result<String> {
println!("Connection<{}>::self_id()", self.id()); println!("Connection<{}>::self_id()", self.id());
let contact = match dc::contact::Contact::get_by_id( let contact = match block_on(dc::contact::Contact::get_by_id(
&self.ctx.read().unwrap(), &self.ctx,
dc::constants::DC_CONTACT_ID_SELF, dc::constants::DC_CONTACT_ID_SELF,
) { )) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
println!(" err: {}", e); println!(" err: {}", e);

View File

@@ -1,6 +1,7 @@
use crate::padfoot::VarArg; use crate::padfoot::VarArg;
use crate::telepathy; use crate::telepathy;
use async_std::task::block_on;
use dbus::tree::MethodErr; use dbus::tree::MethodErr;
use dc::constants::DC_GCL_ADD_SELF; use dc::constants::DC_GCL_ADD_SELF;
use dc::contact::Contact; use dc::contact::Contact;
@@ -31,8 +32,11 @@ impl telepathy::ConnectionInterfaceContactList for Connection {
hold hold
); );
let ctx = &self.ctx.read().unwrap(); let ids = match block_on(Contact::get_all(
let ids = match Contact::get_all(ctx, DC_GCL_ADD_SELF.try_into().unwrap(), None::<String>) { &self.ctx,
DC_GCL_ADD_SELF.try_into().unwrap(),
None::<String>,
)) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
println!(" err: {}", e); println!(" err: {}", e);
@@ -65,11 +69,9 @@ impl telepathy::ConnectionInterfaceContactList for Connection {
fn remove_contacts(&self, contacts: Vec<u32>) -> Result<(), MethodErr> { fn remove_contacts(&self, contacts: Vec<u32>) -> Result<(), MethodErr> {
println!("Connection<{}>::remove_contacts({:?})", self.id(), contacts); println!("Connection<{}>::remove_contacts({:?})", self.id(), contacts);
let ctx = self.ctx.read().unwrap();
for contact_id in contacts { for contact_id in contacts {
// FIXME: don't ignore errors // FIXME: don't ignore errors
if let Err(e) = Contact::delete(&ctx, contact_id) { if let Err(e) = block_on(Contact::delete(&self.ctx, contact_id)) {
println!(" Deleting contact {} failed: {}", contact_id, e); println!(" Deleting contact {} failed: {}", contact_id, e);
} }
} }

View File

@@ -1,6 +1,7 @@
use crate::padfoot::{var_str, var_u32, VarArg}; use crate::padfoot::{var_str, var_u32, VarArg};
use crate::telepathy; use crate::telepathy;
use async_std::task::block_on;
use dbus::tree::MethodErr; use dbus::tree::MethodErr;
use deltachat::contact::{Contact, Origin}; use deltachat::contact::{Contact, Origin};
use std::collections::HashMap; use std::collections::HashMap;
@@ -31,7 +32,7 @@ impl telepathy::ConnectionInterfaceContacts for Connection {
let mut out = HashMap::<u32, HashMap<String, VarArg>>::new(); let mut out = HashMap::<u32, HashMap<String, VarArg>>::new();
for id in handles.iter() { for id in handles.iter() {
// FIXME: work out how to use get_all // FIXME: work out how to use get_all
let contact = match Contact::get_by_id(&self.ctx.read().unwrap(), *id) { let contact = match block_on(Contact::get_by_id(&self.ctx, *id)) {
Ok(c) => c, Ok(c) => c,
Err(_e) => continue, // Invalid IDs are silently ignored Err(_e) => continue, // Invalid IDs are silently ignored
}; };
@@ -77,10 +78,11 @@ impl telepathy::ConnectionInterfaceContacts for Connection {
interfaces interfaces
); );
let id = { let id = block_on(Contact::lookup_id_by_addr(
let ctx = &self.ctx.read().unwrap(); &self.ctx,
Contact::lookup_id_by_addr(ctx, identifier, Origin::Unknown) identifier,
}; Origin::Unknown,
));
if id == 0 { if id == 0 {
return Err(MethodErr::no_arg()); // FIXME: should be InvalidHandle return Err(MethodErr::no_arg()); // FIXME: should be InvalidHandle

View File

@@ -1,6 +1,7 @@
use crate::padfoot::{get_var_str, get_var_u32, requestables, Channel, DbusAction, VarArg}; use crate::padfoot::{get_var_str, get_var_u32, requestables, Channel, DbusAction, VarArg};
use crate::telepathy; use crate::telepathy;
use async_std::task::block_on;
use dbus::tree::MethodErr; use dbus::tree::MethodErr;
use dc::contact::Contact; use dc::contact::Contact;
use deltachat as dc; use deltachat as dc;
@@ -77,10 +78,11 @@ impl telepathy::ConnectionInterfaceRequests for Connection {
return Err(MethodErr::no_arg()); return Err(MethodErr::no_arg());
}; };
let ctx = self.ctx.read().unwrap(); let target_handle = block_on(Contact::lookup_id_by_addr(
&self.ctx,
let target_handle = target_id.clone(),
Contact::lookup_id_by_addr(&ctx, target_id.clone(), dc::contact::Origin::Unknown); dc::contact::Origin::Unknown,
));
if target_handle == 0 { if target_handle == 0 {
println!("Couldn't find target handle for {}", target_id); println!("Couldn't find target handle for {}", target_id);
return Err(MethodErr::no_arg()); return Err(MethodErr::no_arg());
@@ -96,7 +98,7 @@ impl telepathy::ConnectionInterfaceRequests for Connection {
} }
// Now we need to discover or create a chat id for the contact // Now we need to discover or create a chat id for the contact
let chat_id = dc::chat::create_by_contact_id(&ctx, target_handle).unwrap(); let chat_id = block_on(dc::chat::create_by_contact_id(&self.ctx, target_handle)).unwrap();
let channel = Channel::new( let channel = Channel::new(
self.actq.clone(), self.actq.clone(),
chat_id, chat_id,

View File

@@ -1,5 +1,6 @@
use crate::padfoot::{var_bytearray, var_i64, var_str, var_u32, VarArg}; use crate::padfoot::{var_bytearray, var_i64, var_str, var_u32, VarArg};
use async_std::path::{Path, PathBuf};
use dc::constants::Viewtype as Vt; use dc::constants::Viewtype as Vt;
use dc::message::Message; use dc::message::Message;
use deltachat as dc; use deltachat as dc;
@@ -10,7 +11,7 @@ type Part = HashMap<String, VarArg>;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
// Turns a deltachat::message::Message into a Telepathy Message_Part_List // Turns a deltachat::message::Message into a Telepathy Message_Part_List
pub fn convert_msg(blobdir: &std::path::Path, msg: &Message) -> Result<Vec<Part>> { pub fn convert_msg(blobdir: &Path, msg: &Message) -> Result<Vec<Part>> {
if msg.is_setupmessage() { if msg.is_setupmessage() {
return Ok(convert_setupmessage(msg)); return Ok(convert_setupmessage(msg));
} }
@@ -110,15 +111,13 @@ fn build_vid(_msg: &Message) -> Result<Vec<Part>> {
// The message contains a file. Detect the content-type and construct a part // The message contains a file. Detect the content-type and construct a part
// containing the data in full. // containing the data in full.
fn build_attachment(blobdir: &std::path::Path, msg: &Message) -> Result<Vec<Part>> { fn build_attachment(blobdir: &Path, msg: &Message) -> Result<Vec<Part>> {
let mime = msg let mime = msg
.get_filemime() .get_filemime()
.unwrap_or_else(|| "application/octet-stream".to_string()); .unwrap_or_else(|| "application/octet-stream".to_string());
let filename = msg.get_filename().ok_or("Failed to get filename")?; let filename = msg.get_filename().ok_or("Failed to get filename")?;
let path: std::path::PathBuf = [blobdir, &std::path::Path::new(&filename)].iter().collect(); let path: PathBuf = [blobdir, &Path::new(&filename)].iter().collect();
// let mut path = std::path::PathBuf::from(blobdir);
// path.push(filename);
let data = let data =
std::fs::read(&path).map_err(|e| format!("Failed to read file {:?}: {}", path, e))?; std::fs::read(&path).map_err(|e| format!("Failed to read file {:?}: {}", path, e))?;