WIP: async async async

This commit is contained in:
2020-05-28 02:15:25 +01:00
parent 9e764d72a1
commit 612a3a3805
11 changed files with 373 additions and 599 deletions

812
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,8 +9,9 @@ license = "MIT"
[dependencies]
anyhow = "1.0"
async-std = "1.6"
dbus = "0.8"
deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag="1.33.0" }
deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag="1.34.0" }
directories = "2.0"
rand = "0.7"

View File

@@ -11,6 +11,7 @@ pub use type_text::*;
use crate::padfoot::{var_bool, var_str, var_str_vec, var_u32, DbusAction, VarArg};
use crate::telepathy;
use async_std::task::block_on;
use deltachat as dc;
use std::collections::HashMap;
@@ -128,13 +129,13 @@ impl Channel {
pub fn target_contact(&self) -> Option<dc::contact::Contact> {
let ctx = self.ctx.read().unwrap();
dc::contact::Contact::get_by_id(&ctx, self.handle()).ok()
block_on(dc::contact::Contact::get_by_id(&ctx, self.handle())).ok()
}
pub fn initiator_contact(&self) -> Option<dc::contact::Contact> {
let ctx = self.ctx.read().unwrap();
dc::contact::Contact::get_by_id(&ctx, self.initiator_handle).ok() // FIXME: this will be wrong for outbound channels
block_on(dc::contact::Contact::get_by_id(&ctx, self.initiator_handle)).ok() // FIXME: this will be wrong for outbound channels
}
pub fn requested(&self) -> bool {
@@ -187,7 +188,7 @@ impl Channel {
let ctx = self.ctx.read().unwrap();
if let Err(e) =
dc::imex::continue_key_transfer(&ctx, dc::message::MsgId::new(msg_id), &setup_code)
block_on(dc::imex::continue_key_transfer(&ctx, dc::message::MsgId::new(msg_id), &setup_code))
{
println!("Failed to apply setup code {}: {}", msg_id, e);
}

View File

@@ -1,6 +1,7 @@
use crate::padfoot::{convert_msg, DbusAction, VarArg};
use crate::telepathy;
use async_std::task::block_on;
use dbus::message::SignalArgs;
use dbus::tree::MethodErr;
use dc::constants::Viewtype;
@@ -45,7 +46,7 @@ impl telepathy::ChannelInterfaceMessages for Channel {
let ctx = self.ctx.read().unwrap();
let blobdir = ctx.get_blobdir();
let msg_id = match dc::chat::send_msg(&ctx, self.chat_id, &mut delta_msg) {
let msg_id = match block_on(dc::chat::send_msg(&ctx, self.chat_id, &mut delta_msg)) {
Ok(msg_id) => msg_id,
Err(e) => {
println!(" Failed to send message: {}", e);
@@ -114,8 +115,8 @@ impl telepathy::ChannelInterfaceMessages for Channel {
let ctx = self.ctx.read().unwrap();
let blobdir = ctx.get_blobdir();
for msg_id in dc::chat::get_chat_msgs(&ctx, self.chat_id, 0, None) {
if let Ok(msg) = dc::message::Message::load_from_db(&ctx, msg_id) {
for msg_id in block_on(dc::chat::get_chat_msgs(&ctx, self.chat_id, 0, None)) {
if let Ok(msg) = block_on(dc::message::Message::load_from_db(&ctx, msg_id)) {
match msg.get_state() {
MessageState::InFresh | MessageState::InNoticed => {
println!(" A message: {:?}", msg);

View File

@@ -2,6 +2,7 @@ use crate::padfoot::DbusAction;
use crate::telepathy;
use crate::telepathy::ChannelInterfaceMessages;
use async_std::task::block_on;
use dbus::message::SignalArgs;
use dbus::tree::MethodErr;
use dc::message::MsgId;
@@ -38,7 +39,7 @@ impl telepathy::ChannelTypeText for Channel {
}
print!(" Marking messages as seen...");
let result = dc::message::markseen_msgs(&ctx, &msg_ids);
let result = block_on(dc::message::markseen_msgs(&ctx, msg_ids));
if result {
println!("OK!");

View File

@@ -23,6 +23,7 @@ pub use self::simple_presence::*;
use crate::padfoot::{convert_msg, Channel, VarArg};
use crate::telepathy;
use async_std::task::block_on;
use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection};
use dbus::channel::{MatchingReceiver, Sender};
use dbus::message::SignalArgs;
@@ -141,9 +142,11 @@ impl Connection {
settings: ConnSettings,
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
) -> Result<(Self, mpsc::Receiver<DbusAction>), MethodErr> {
let mut dbfile = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot")
.ok_or_else(MethodErr::no_arg)
.and_then(|p| Ok(p.data_local_dir().to_path_buf()))?;
let proj_dir = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot")
.ok_or_else(MethodErr::no_arg)?;
let mut dbfile = async_std::path::PathBuf::new();
dbfile.push(proj_dir.data_local_dir().to_str().unwrap());
dbfile.push(settings.id());
dbfile.push("db.sqlite3");
@@ -214,7 +217,7 @@ impl Connection {
};
let ctx =
Context::new(Box::new(f), "telepathy-padfoot".to_string(), dbfile).map_err(|e| {
block_on(Context::new("telepathy-padfoot".to_string(), dbfile)).map_err(|e| {
println!(
"Connection<{}>::new(): couldn't get delta context: {}",
settings.id(),
@@ -223,21 +226,21 @@ impl Connection {
MethodErr::no_arg() // FIXME: better error handling
})?;
ctx.set_config(Config::Addr, Some(&settings.account))
block_on(ctx.set_config(Config::Addr, Some(&settings.account)))
.map_err(|_e| MethodErr::no_arg())?;
ctx.set_config(Config::MailPw, Some(&settings.password))
block_on(ctx.set_config(Config::MailPw, Some(&settings.password)))
.map_err(|_e| MethodErr::no_arg())?;
if settings.bcc_self {
ctx.set_config(Config::BccSelf, Some(&"1"))
block_on(ctx.set_config(Config::BccSelf, Some(&"1")))
.map_err(|_e| MethodErr::no_arg())?;
} else {
ctx.set_config(Config::BccSelf, Some(&"0"))
block_on(ctx.set_config(Config::BccSelf, Some(&"0")))
.map_err(|_e| MethodErr::no_arg())?;
}
if !ctx.is_configured() {
ctx.configure();
if !block_on(ctx.is_configured()) {
block_on(ctx.configure()).unwrap();
};
Ok((
@@ -275,7 +278,7 @@ impl Connection {
let ctx = self.ctx.clone();
let state = self.state.clone();
let tree = self.build_tree();
// Setup DBus connection
let mut c = match LocalConnection::new_session() {
Ok(c) => c,
@@ -321,6 +324,8 @@ impl Connection {
// Set up delta jobs last in case registering to DBUS fails
// "Borrowed" from https://github.com/deltachat/deltachat-core-rust/blob/master/examples/simple.rs
while *state.read().unwrap() != ConnState::Disconnected {
// FIXME: IO junk here?
if let Err(e) = c.process(Duration::from_millis(100)) {
println!("Error processing: {}", e);
@@ -408,7 +413,7 @@ impl Connection {
if !chans.contains_key(&chan_path) {
print!("Channel for {} doesn't exist yet, creating it...", chat_id);
let contacts = dc::chat::get_chat_contacts(&ctx, chat_id);
let contacts = block_on(dc::chat::get_chat_contacts(&ctx, chat_id));
if contacts.len() > 1 {
println!("...{} contacts in chat, ignoring!", contacts.len());
continue;
@@ -437,7 +442,7 @@ impl Connection {
// Since the channel exists, emit new message signals
print!("Message {} received for {}...", msg_id, chan_path);
let msg = match dc::message::Message::load_from_db(&ctx, msg_id) {
let msg = match block_on(dc::message::Message::load_from_db(&ctx, msg_id)) {
Ok(m) => m,
Err(e) => {
println!("Can't load from database, skipping: {}", e);
@@ -477,9 +482,9 @@ impl Connection {
let ctx_rc = ctx.clone();
let ctx = ctx_rc.read().unwrap();
for msg_id in dc::context::Context::get_fresh_msgs(&ctx) {
for msg_id in block_on(dc::context::Context::get_fresh_msgs(&ctx)) {
println!(" FRESH MESSAGE: {}", msg_id);
match dc::message::Message::load_from_db(&ctx, msg_id) {
match block_on(dc::message::Message::load_from_db(&ctx, msg_id)) {
Ok(msg) => {
actq.send(DbusAction::IncomingMessage(
msg.get_chat_id(),

View File

@@ -1,6 +1,7 @@
use crate::telepathy;
use crate::telepathy::{ConnectionInterfaceContacts, ConnectionInterfaceRequests}; // Non-deprecated channel methods
use async_std::task::block_on;
use dbus::message::SignalArgs;
use dbus::tree::MethodErr;
use dc::contact::Contact;
@@ -8,7 +9,6 @@ use deltachat as dc;
use std::collections::HashMap;
use std::convert::TryInto;
use std::thread;
use super::{Connection, DbusAction};
@@ -57,66 +57,6 @@ impl telepathy::Connection for Connection {
fn connect(&self) -> Result<()> {
println!("Connection<{}>::connect()", self.id());
let inbox_ctx = self.ctx.clone();
let state = self.state.clone();
let id = self.id();
let _inbox_thread = thread::spawn(move || {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
}
}
}
println!("Connection<{}>::connect(): inbox thread exited", id);
});
let smtp_ctx = self.ctx.clone();
let state = self.state.clone();
let id = self.id();
let _smtp_thread = thread::spawn(move || {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
}
}
println!("Connection<{}>::connect(): smtp thread exited", id);
});
let mvbox_ctx = self.ctx.clone();
let state = self.state.clone();
let id = self.id();
let _mvbox_thread = thread::spawn(move || {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
}
}
println!("Connection<{}>::connect(): mvbox thread exited", id);
});
let sentbox_ctx = self.ctx.clone();
let state = self.state.clone();
let id = self.id();
let _sentbox_thread = thread::spawn(move || {
while *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
if *state.read().unwrap() != ConnState::Disconnected {
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
}
}
println!("Connection<{}>::connect(): sentbox thread exited", id);
});
// Just pretend to be connected all the time for now. Tracking IMAP+SMTP
// state is a pain
let state = self.state.clone();
@@ -136,13 +76,13 @@ impl telepathy::Connection for Connection {
self.actq.send(DbusAction::FreshMessages).unwrap();
// If we can, emit signals on connect about the contact list
if let Ok(handles) = Contact::get_all(
if let Ok(handles) = block_on(Contact::get_all(
&ctx,
(dc::constants::DC_GCL_ADD_SELF as usize)
.try_into()
.unwrap(),
None::<String>,
) {
)) {
println!(" HANDLES: {:?}", handles);
let mut changes = HashMap::<u32, ContactSubscription>::new();
for handle in handles {
@@ -172,15 +112,12 @@ impl telepathy::Connection for Connection {
println!("Connection<{}>::disconnect()", self.id());
let ctx = self.ctx.read().unwrap();
block_on(ctx.stop_io());
let state = self.state.clone();
let mut w = state.write().unwrap();
*w = ConnState::Disconnected;
dc::job::interrupt_inbox_idle(&ctx);
dc::job::interrupt_smtp_idle(&ctx);
dc::job::interrupt_sentbox_idle(&ctx);
dc::job::interrupt_mvbox_idle(&ctx);
// FIXME: we need to signal to the CM that they should remove the
// connection from the active list
@@ -340,12 +277,12 @@ impl telepathy::Connection for Connection {
// FIXME: will it be faster to get all and filter?
for addr in identifiers {
let id = Contact::lookup_id_by_addr(&ctx, addr, dc::contact::Origin::Unknown);
let id = block_on(Contact::lookup_id_by_addr(&ctx, addr, dc::contact::Origin::Unknown));
match id {
0 => {
// No contact exists for this address yet. Try to
// add one so we can have an ID.
match Contact::create(&ctx, &addr, &addr) {
match block_on(Contact::create(&ctx, &addr, &addr)) {
Ok(new_id) => out.push(new_id),
Err(e) => {
println!("Failed to add contact {}: {}", addr, e);
@@ -384,10 +321,10 @@ impl telepathy::Connection for Connection {
fn self_id(&self) -> Result<String> {
println!("Connection<{}>::self_id()", self.id());
let contact = match dc::contact::Contact::get_by_id(
let contact = match block_on(dc::contact::Contact::get_by_id(
&self.ctx.read().unwrap(),
dc::constants::DC_CONTACT_ID_SELF,
) {
)) {
Ok(c) => c,
Err(e) => {
println!(" err: {}", e);

View File

@@ -1,6 +1,7 @@
use crate::padfoot::VarArg;
use crate::telepathy;
use async_std::task::block_on;
use dbus::tree::MethodErr;
use dc::constants::DC_GCL_ADD_SELF;
use dc::contact::Contact;
@@ -32,7 +33,7 @@ impl telepathy::ConnectionInterfaceContactList for Connection {
);
let ctx = &self.ctx.read().unwrap();
let ids = match Contact::get_all(ctx, DC_GCL_ADD_SELF.try_into().unwrap(), None::<String>) {
let ids = match block_on(Contact::get_all(ctx, DC_GCL_ADD_SELF.try_into().unwrap(), None::<String>)) {
Ok(c) => c,
Err(e) => {
println!(" err: {}", e);
@@ -69,7 +70,7 @@ impl telepathy::ConnectionInterfaceContactList for Connection {
for contact_id in contacts {
// FIXME: don't ignore errors
if let Err(e) = Contact::delete(&ctx, contact_id) {
if let Err(e) = block_on(Contact::delete(&ctx, contact_id)) {
println!(" Deleting contact {} failed: {}", contact_id, e);
}
}

View File

@@ -1,6 +1,7 @@
use crate::padfoot::{var_str, var_u32, VarArg};
use crate::telepathy;
use async_std::task::block_on;
use dbus::tree::MethodErr;
use deltachat::contact::{Contact, Origin};
use std::collections::HashMap;
@@ -31,7 +32,7 @@ impl telepathy::ConnectionInterfaceContacts for Connection {
let mut out = HashMap::<u32, HashMap<String, VarArg>>::new();
for id in handles.iter() {
// FIXME: work out how to use get_all
let contact = match Contact::get_by_id(&self.ctx.read().unwrap(), *id) {
let contact = match block_on(Contact::get_by_id(&self.ctx.read().unwrap(), *id)) {
Ok(c) => c,
Err(_e) => continue, // Invalid IDs are silently ignored
};
@@ -79,7 +80,7 @@ impl telepathy::ConnectionInterfaceContacts for Connection {
let id = {
let ctx = &self.ctx.read().unwrap();
Contact::lookup_id_by_addr(ctx, identifier, Origin::Unknown)
block_on(Contact::lookup_id_by_addr(ctx, identifier, Origin::Unknown))
};
if id == 0 {

View File

@@ -1,6 +1,7 @@
use crate::padfoot::{get_var_str, get_var_u32, requestables, Channel, DbusAction, VarArg};
use crate::telepathy;
use async_std::task::block_on;
use dbus::tree::MethodErr;
use dc::contact::Contact;
use deltachat as dc;
@@ -80,7 +81,7 @@ impl telepathy::ConnectionInterfaceRequests for Connection {
let ctx = self.ctx.read().unwrap();
let target_handle =
Contact::lookup_id_by_addr(&ctx, target_id.clone(), dc::contact::Origin::Unknown);
block_on(Contact::lookup_id_by_addr(&ctx, target_id.clone(), dc::contact::Origin::Unknown));
if target_handle == 0 {
println!("Couldn't find target handle for {}", target_id);
return Err(MethodErr::no_arg());
@@ -96,7 +97,7 @@ impl telepathy::ConnectionInterfaceRequests for Connection {
}
// Now we need to discover or create a chat id for the contact
let chat_id = dc::chat::create_by_contact_id(&ctx, target_handle).unwrap();
let chat_id = block_on(dc::chat::create_by_contact_id(&ctx, target_handle)).unwrap();
let channel = Channel::new(
self.actq.clone(),
chat_id,

View File

@@ -1,5 +1,6 @@
use crate::padfoot::{var_bytearray, var_i64, var_str, var_u32, VarArg};
use async_std::path::{Path, PathBuf};
use dc::constants::Viewtype as Vt;
use dc::message::Message;
use deltachat as dc;
@@ -10,7 +11,7 @@ type Part = HashMap<String, VarArg>;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
// Turns a deltachat::message::Message into a Telepathy Message_Part_List
pub fn convert_msg(blobdir: &std::path::Path, msg: &Message) -> Result<Vec<Part>> {
pub fn convert_msg(blobdir: &Path, msg: &Message) -> Result<Vec<Part>> {
if msg.is_setupmessage() {
return Ok(convert_setupmessage(msg));
}
@@ -110,15 +111,13 @@ fn build_vid(_msg: &Message) -> Result<Vec<Part>> {
// The message contains a file. Detect the content-type and construct a part
// containing the data in full.
fn build_attachment(blobdir: &std::path::Path, msg: &Message) -> Result<Vec<Part>> {
fn build_attachment(blobdir: &Path, msg: &Message) -> Result<Vec<Part>> {
let mime = msg
.get_filemime()
.unwrap_or_else(|| "application/octet-stream".to_string());
let filename = msg.get_filename().ok_or("Failed to get filename")?;
let path: std::path::PathBuf = [blobdir, &std::path::Path::new(&filename)].iter().collect();
// let mut path = std::path::PathBuf::from(blobdir);
// path.push(filename);
let path: PathBuf = [blobdir, &Path::new(&filename)].iter().collect();
let data =
std::fs::read(&path).map_err(|e| format!("Failed to read file {:?}: {}", path, e))?;