Merge branch 'async'
This commit is contained in:
812
Cargo.lock
generated
812
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -9,8 +9,9 @@ license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
async-std = "1.6"
|
||||
dbus = "0.8"
|
||||
deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag="1.33.0" }
|
||||
deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag="1.34.0" }
|
||||
directories = "2.0"
|
||||
rand = "0.7"
|
||||
|
||||
|
@@ -11,10 +11,11 @@ pub use type_text::*;
|
||||
use crate::padfoot::{var_bool, var_str, var_str_vec, var_u32, DbusAction, VarArg};
|
||||
use crate::telepathy;
|
||||
|
||||
use async_std::task::block_on;
|
||||
use deltachat as dc;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{mpsc, Arc, RwLock};
|
||||
use std::sync::{mpsc, Arc};
|
||||
|
||||
type Result<T> = std::result::Result<T, dbus::tree::MethodErr>;
|
||||
|
||||
@@ -35,7 +36,7 @@ pub const HANDLE_TYPE_GROUP: HandleType = 4; // Deprecated
|
||||
pub struct Channel {
|
||||
actq: mpsc::Sender<DbusAction>,
|
||||
chat_id: dc::chat::ChatId,
|
||||
ctx: Arc<RwLock<dc::context::Context>>,
|
||||
ctx: Arc<dc::context::Context>,
|
||||
initiator_handle: u32,
|
||||
path: dbus::Path<'static>,
|
||||
requested: bool,
|
||||
@@ -51,7 +52,7 @@ impl Channel {
|
||||
pub fn new(
|
||||
actq: mpsc::Sender<DbusAction>,
|
||||
chat_id: dc::chat::ChatId,
|
||||
ctx: Arc<RwLock<dc::context::Context>>,
|
||||
ctx: Arc<dc::context::Context>,
|
||||
initiator_handle: u32,
|
||||
path: dbus::Path<'static>,
|
||||
requested: bool,
|
||||
@@ -126,15 +127,15 @@ impl Channel {
|
||||
}
|
||||
|
||||
pub fn target_contact(&self) -> Option<dc::contact::Contact> {
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
|
||||
dc::contact::Contact::get_by_id(&ctx, self.handle()).ok()
|
||||
block_on(dc::contact::Contact::get_by_id(&self.ctx, self.handle())).ok()
|
||||
}
|
||||
|
||||
pub fn initiator_contact(&self) -> Option<dc::contact::Contact> {
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
|
||||
dc::contact::Contact::get_by_id(&ctx, self.initiator_handle).ok() // FIXME: this will be wrong for outbound channels
|
||||
block_on(dc::contact::Contact::get_by_id(
|
||||
&self.ctx,
|
||||
self.initiator_handle,
|
||||
))
|
||||
.ok() // FIXME: this will be wrong for outbound channels
|
||||
}
|
||||
|
||||
pub fn requested(&self) -> bool {
|
||||
@@ -185,10 +186,11 @@ impl Channel {
|
||||
_ => return,
|
||||
};
|
||||
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
if let Err(e) =
|
||||
dc::imex::continue_key_transfer(&ctx, dc::message::MsgId::new(msg_id), &setup_code)
|
||||
{
|
||||
if let Err(e) = block_on(dc::imex::continue_key_transfer(
|
||||
&self.ctx,
|
||||
dc::message::MsgId::new(msg_id),
|
||||
&setup_code,
|
||||
)) {
|
||||
println!("Failed to apply setup code {}: {}", msg_id, e);
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
use crate::padfoot::{convert_msg, DbusAction, VarArg};
|
||||
use crate::telepathy;
|
||||
|
||||
use async_std::task::block_on;
|
||||
use dbus::message::SignalArgs;
|
||||
use dbus::tree::MethodErr;
|
||||
use dc::constants::Viewtype;
|
||||
@@ -42,10 +43,10 @@ impl telepathy::ChannelInterfaceMessages for Channel {
|
||||
self.try_process_setupmsg(text);
|
||||
};
|
||||
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
let ctx = &self.ctx;
|
||||
let blobdir = ctx.get_blobdir();
|
||||
|
||||
let msg_id = match dc::chat::send_msg(&ctx, self.chat_id, &mut delta_msg) {
|
||||
let msg_id = match block_on(dc::chat::send_msg(&ctx, self.chat_id, &mut delta_msg)) {
|
||||
Ok(msg_id) => msg_id,
|
||||
Err(e) => {
|
||||
println!(" Failed to send message: {}", e);
|
||||
@@ -111,11 +112,11 @@ impl telepathy::ChannelInterfaceMessages for Channel {
|
||||
println!("Channel::pending_messages()");
|
||||
|
||||
let mut out = Vec::<Vec<HashMap<String, VarArg>>>::new();
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
let ctx = &self.ctx;
|
||||
let blobdir = ctx.get_blobdir();
|
||||
|
||||
for msg_id in dc::chat::get_chat_msgs(&ctx, self.chat_id, 0, None) {
|
||||
if let Ok(msg) = dc::message::Message::load_from_db(&ctx, msg_id) {
|
||||
for msg_id in block_on(dc::chat::get_chat_msgs(ctx, self.chat_id, 0, None)) {
|
||||
if let Ok(msg) = block_on(dc::message::Message::load_from_db(ctx, msg_id)) {
|
||||
match msg.get_state() {
|
||||
MessageState::InFresh | MessageState::InNoticed => {
|
||||
println!(" A message: {:?}", msg);
|
||||
|
@@ -2,6 +2,7 @@ use crate::padfoot::DbusAction;
|
||||
use crate::telepathy;
|
||||
use crate::telepathy::ChannelInterfaceMessages;
|
||||
|
||||
use async_std::task::block_on;
|
||||
use dbus::message::SignalArgs;
|
||||
use dbus::tree::MethodErr;
|
||||
use dc::message::MsgId;
|
||||
@@ -31,14 +32,13 @@ impl telepathy::ChannelTypeText for Channel {
|
||||
fn acknowledge_pending_messages(&self, ids: Vec<u32>) -> Result<()> {
|
||||
println!("Channel::acknowledge_pending_messages({:?})", ids);
|
||||
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
let mut msg_ids = Vec::<MsgId>::new();
|
||||
for msg_id in &ids {
|
||||
msg_ids.push(MsgId::new(*msg_id));
|
||||
}
|
||||
|
||||
print!(" Marking messages as seen...");
|
||||
let result = dc::message::markseen_msgs(&ctx, &msg_ids);
|
||||
let result = block_on(dc::message::markseen_msgs(&self.ctx, msg_ids));
|
||||
if result {
|
||||
println!("OK!");
|
||||
|
||||
|
@@ -23,6 +23,7 @@ pub use self::simple_presence::*;
|
||||
use crate::padfoot::{convert_msg, Channel, VarArg};
|
||||
use crate::telepathy;
|
||||
|
||||
use async_std::task::block_on;
|
||||
use dbus::blocking::{stdintf::org_freedesktop_dbus::RequestNameReply, LocalConnection};
|
||||
use dbus::channel::{MatchingReceiver, Sender};
|
||||
use dbus::message::SignalArgs;
|
||||
@@ -35,6 +36,7 @@ use deltachat as dc;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{mpsc, Arc, Mutex, RwLock};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const CONN_BUS_NAME: &str = "org.freedesktop.Telepathy.Connection.padfoot.delta";
|
||||
@@ -66,7 +68,7 @@ pub struct Connection {
|
||||
// Owned by the CM. Remove ourselves from this when done
|
||||
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
|
||||
|
||||
ctx: Arc<RwLock<Context>>,
|
||||
ctx: Arc<Context>, // Delta contexts are threadsafe
|
||||
settings: ConnSettings,
|
||||
state: Arc<RwLock<ConnState>>,
|
||||
}
|
||||
@@ -141,9 +143,11 @@ impl Connection {
|
||||
settings: ConnSettings,
|
||||
conns: Arc<Mutex<HashSet<dbus::Path<'static>>>>,
|
||||
) -> Result<(Self, mpsc::Receiver<DbusAction>), MethodErr> {
|
||||
let mut dbfile = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot")
|
||||
.ok_or_else(MethodErr::no_arg)
|
||||
.and_then(|p| Ok(p.data_local_dir().to_path_buf()))?;
|
||||
let proj_dir = directories::ProjectDirs::from("gs", "ur", "telepathy-padfoot")
|
||||
.ok_or_else(MethodErr::no_arg)?;
|
||||
|
||||
let mut dbfile = async_std::path::PathBuf::new();
|
||||
dbfile.push(proj_dir.data_local_dir().to_str().unwrap());
|
||||
|
||||
dbfile.push(settings.id());
|
||||
dbfile.push("db.sqlite3");
|
||||
@@ -151,104 +155,104 @@ impl Connection {
|
||||
let (q_s, q_r) = mpsc::channel::<DbusAction>();
|
||||
let id = settings.id();
|
||||
|
||||
// The closure is shared between several different threads in delta, and
|
||||
// we can't Send *or* clone the mpsc sender across them, so just wrap it
|
||||
// in a mutex for now
|
||||
let queue = Mutex::new(q_s.clone());
|
||||
let f = move |_c: &Context, e: Event| {
|
||||
match e {
|
||||
Event::Info(msg) => println!("Connection<{}>: INFO: {}", id, msg),
|
||||
Event::Warning(msg) => println!("Connection<{}>: WARN : {}", id, msg),
|
||||
Event::Error(msg) | Event::ErrorNetwork(msg) | Event::ErrorSelfNotInGroup(msg) => {
|
||||
println!("Connection<{}>: ERR : {}", id, msg)
|
||||
}
|
||||
Event::ConfigureProgress(progress) => {
|
||||
println!("Connection<{}>: Configuration progress: {}", id, progress)
|
||||
}
|
||||
Event::ImapConnected(msg) | Event::SmtpConnected(msg) => {
|
||||
println!("Connection<{}>: Network: {}", id, msg);
|
||||
}
|
||||
Event::MsgsChanged { chat_id, msg_id } => {
|
||||
println!(
|
||||
"Connection<{}>: Messages changed for {}: {}",
|
||||
id, chat_id, msg_id
|
||||
);
|
||||
queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.send(DbusAction::IncomingMessage(chat_id, msg_id))
|
||||
.unwrap();
|
||||
}
|
||||
Event::IncomingMsg { chat_id, msg_id } => {
|
||||
println!(
|
||||
"Connection<{}>: Incoming message for {}: {}",
|
||||
id, chat_id, msg_id
|
||||
);
|
||||
queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.send(DbusAction::IncomingMessage(chat_id, msg_id))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/* Unhandled messages:
|
||||
SmtpMessageSent(String),
|
||||
ImapMessageDeleted(String),
|
||||
ImapFolderEmptied(String),
|
||||
NewBlobFile(String),
|
||||
DeletedBlobFile(String),
|
||||
MsgDelivered
|
||||
MsgFailed
|
||||
MsgRead
|
||||
ChatModified(ChatId),
|
||||
ContactsChanged(Option<u32>),
|
||||
LocationChanged(Option<u32>),
|
||||
ImexProgress(usize),
|
||||
ImexFileWritten(PathBuf),
|
||||
SecurejoinInviterProgress
|
||||
SecurejoinJoinerProgress
|
||||
*/
|
||||
Event::ImapMessageMoved(_) | Event::ImapMessageDeleted(_) => {}
|
||||
_ => println!("Connection<{}>: unhandled event received: {:?}", id, e),
|
||||
};
|
||||
};
|
||||
|
||||
let ctx =
|
||||
Context::new(Box::new(f), "telepathy-padfoot".to_string(), dbfile).map_err(|e| {
|
||||
let ctx = Arc::new(
|
||||
block_on(Context::new("telepathy-padfoot".to_string(), dbfile)).map_err(|e| {
|
||||
println!(
|
||||
"Connection<{}>::new(): couldn't get delta context: {}",
|
||||
settings.id(),
|
||||
e
|
||||
);
|
||||
MethodErr::no_arg() // FIXME: better error handling
|
||||
})?;
|
||||
})?,
|
||||
);
|
||||
|
||||
ctx.set_config(Config::Addr, Some(&settings.account))
|
||||
let e_ctx = ctx.clone();
|
||||
let e_queue = q_s.clone();
|
||||
thread::spawn(move || {
|
||||
let emitter = e_ctx.get_event_emitter();
|
||||
while let Some(e) = emitter.recv_sync() {
|
||||
match e {
|
||||
Event::Info(msg) => println!("Connection<{}>: INFO: {}", id, msg),
|
||||
Event::Warning(msg) => println!("Connection<{}>: WARN : {}", id, msg),
|
||||
Event::Error(msg)
|
||||
| Event::ErrorNetwork(msg)
|
||||
| Event::ErrorSelfNotInGroup(msg) => {
|
||||
println!("Connection<{}>: ERR : {}", id, msg)
|
||||
}
|
||||
Event::ConfigureProgress(progress) => {
|
||||
println!("Connection<{}>: Configuration progress: {}", id, progress)
|
||||
}
|
||||
Event::ImapConnected(msg) | Event::SmtpConnected(msg) => {
|
||||
println!("Connection<{}>: Network: {}", id, msg);
|
||||
}
|
||||
Event::MsgsChanged { chat_id, msg_id } => {
|
||||
println!(
|
||||
"Connection<{}>: Messages changed for {}: {}",
|
||||
id, chat_id, msg_id
|
||||
);
|
||||
e_queue
|
||||
.send(DbusAction::IncomingMessage(chat_id, msg_id))
|
||||
.unwrap();
|
||||
}
|
||||
Event::IncomingMsg { chat_id, msg_id } => {
|
||||
println!(
|
||||
"Connection<{}>: Incoming message for {}: {}",
|
||||
id, chat_id, msg_id
|
||||
);
|
||||
e_queue
|
||||
.send(DbusAction::IncomingMessage(chat_id, msg_id))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/* Unhandled messages:
|
||||
SmtpMessageSent(String),
|
||||
ImapMessageDeleted(String),
|
||||
ImapFolderEmptied(String),
|
||||
NewBlobFile(String),
|
||||
DeletedBlobFile(String),
|
||||
MsgDelivered
|
||||
MsgFailed
|
||||
MsgRead
|
||||
ChatModified(ChatId),
|
||||
ContactsChanged(Option<u32>),
|
||||
LocationChanged(Option<u32>),
|
||||
ImexProgress(usize),
|
||||
ImexFileWritten(PathBuf),
|
||||
SecurejoinInviterProgress
|
||||
SecurejoinJoinerProgress
|
||||
*/
|
||||
Event::ImapMessageMoved(_) | Event::ImapMessageDeleted(_) => {}
|
||||
_ => println!("Connection<{}>: unhandled event received: {:?}", id, e),
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
block_on(ctx.set_config(Config::Addr, Some(&settings.account)))
|
||||
.map_err(|_e| MethodErr::no_arg())?;
|
||||
ctx.set_config(Config::MailPw, Some(&settings.password))
|
||||
block_on(ctx.set_config(Config::MailPw, Some(&settings.password)))
|
||||
.map_err(|_e| MethodErr::no_arg())?;
|
||||
|
||||
if settings.bcc_self {
|
||||
ctx.set_config(Config::BccSelf, Some(&"1"))
|
||||
block_on(ctx.set_config(Config::BccSelf, Some(&"1")))
|
||||
.map_err(|_e| MethodErr::no_arg())?;
|
||||
} else {
|
||||
ctx.set_config(Config::BccSelf, Some(&"0"))
|
||||
block_on(ctx.set_config(Config::BccSelf, Some(&"0")))
|
||||
.map_err(|_e| MethodErr::no_arg())?;
|
||||
}
|
||||
|
||||
if !ctx.is_configured() {
|
||||
ctx.configure();
|
||||
if !block_on(ctx.is_configured()) {
|
||||
block_on(ctx.configure()).unwrap();
|
||||
};
|
||||
|
||||
Ok((
|
||||
Connection {
|
||||
conns,
|
||||
settings,
|
||||
actq: q_s,
|
||||
channels: Arc::new(RwLock::new(
|
||||
HashMap::<dbus::Path<'static>, Arc<Channel>>::new(),
|
||||
)),
|
||||
ctx: Arc::new(RwLock::new(ctx)),
|
||||
conns,
|
||||
ctx,
|
||||
settings,
|
||||
state: Arc::new(RwLock::new(ConnState::Initial)),
|
||||
},
|
||||
q_r,
|
||||
@@ -399,8 +403,8 @@ impl Connection {
|
||||
let chan_path = Connection::build_channel_path(path.clone(), chat_id);
|
||||
let c2 = Arc::clone(&chans);
|
||||
let chans = c2.read().unwrap();
|
||||
let u_ctx = ctx.clone();
|
||||
let ctx = u_ctx.read().unwrap();
|
||||
//let u_ctx = ctx.clone();
|
||||
let ctx = ctx.clone();
|
||||
let blobdir = ctx.get_blobdir();
|
||||
|
||||
// Autocreate channel if it doesn't already exist
|
||||
@@ -408,7 +412,7 @@ impl Connection {
|
||||
if !chans.contains_key(&chan_path) {
|
||||
print!("Channel for {} doesn't exist yet, creating it...", chat_id);
|
||||
|
||||
let contacts = dc::chat::get_chat_contacts(&ctx, chat_id);
|
||||
let contacts = block_on(dc::chat::get_chat_contacts(&ctx, chat_id));
|
||||
if contacts.len() > 1 {
|
||||
println!("...{} contacts in chat, ignoring!", contacts.len());
|
||||
continue;
|
||||
@@ -421,7 +425,7 @@ impl Connection {
|
||||
let chan = Channel::new(
|
||||
actq.clone(),
|
||||
chat_id,
|
||||
u_ctx.clone(),
|
||||
ctx.clone(),
|
||||
*handle, // initiator is the remote contact
|
||||
chan_path,
|
||||
false, // FIXME: this needs to handle requested channels
|
||||
@@ -437,7 +441,7 @@ impl Connection {
|
||||
// Since the channel exists, emit new message signals
|
||||
print!("Message {} received for {}...", msg_id, chan_path);
|
||||
|
||||
let msg = match dc::message::Message::load_from_db(&ctx, msg_id) {
|
||||
let msg = match block_on(dc::message::Message::load_from_db(&ctx, msg_id)) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
println!("Can't load from database, skipping: {}", e);
|
||||
@@ -474,12 +478,11 @@ impl Connection {
|
||||
|
||||
DbusAction::FreshMessages => {
|
||||
println!("*** FRESH MESSAGES");
|
||||
let ctx_rc = ctx.clone();
|
||||
let ctx = ctx_rc.read().unwrap();
|
||||
let ctx = ctx.clone();
|
||||
|
||||
for msg_id in dc::context::Context::get_fresh_msgs(&ctx) {
|
||||
for msg_id in block_on(dc::context::Context::get_fresh_msgs(&ctx)) {
|
||||
println!(" FRESH MESSAGE: {}", msg_id);
|
||||
match dc::message::Message::load_from_db(&ctx, msg_id) {
|
||||
match block_on(dc::message::Message::load_from_db(&ctx, msg_id)) {
|
||||
Ok(msg) => {
|
||||
actq.send(DbusAction::IncomingMessage(
|
||||
msg.get_chat_id(),
|
||||
|
@@ -1,6 +1,7 @@
|
||||
use crate::telepathy;
|
||||
use crate::telepathy::{ConnectionInterfaceContacts, ConnectionInterfaceRequests}; // Non-deprecated channel methods
|
||||
|
||||
use async_std::task::block_on;
|
||||
use dbus::message::SignalArgs;
|
||||
use dbus::tree::MethodErr;
|
||||
use dc::contact::Contact;
|
||||
@@ -57,64 +58,12 @@ impl telepathy::Connection for Connection {
|
||||
fn connect(&self) -> Result<()> {
|
||||
println!("Connection<{}>::connect()", self.id());
|
||||
|
||||
let inbox_ctx = self.ctx.clone();
|
||||
let state = self.state.clone();
|
||||
let id = self.id();
|
||||
let _inbox_thread = thread::spawn(move || {
|
||||
while *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
|
||||
if *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
|
||||
let io_ctx = self.ctx.clone();
|
||||
let io_id = self.id();
|
||||
let _io_thread = thread::spawn(move || {
|
||||
block_on(io_ctx.start_io());
|
||||
|
||||
if *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("Connection<{}>::connect(): inbox thread exited", id);
|
||||
});
|
||||
|
||||
let smtp_ctx = self.ctx.clone();
|
||||
let state = self.state.clone();
|
||||
let id = self.id();
|
||||
let _smtp_thread = thread::spawn(move || {
|
||||
while *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
|
||||
if *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
println!("Connection<{}>::connect(): smtp thread exited", id);
|
||||
});
|
||||
|
||||
let mvbox_ctx = self.ctx.clone();
|
||||
let state = self.state.clone();
|
||||
let id = self.id();
|
||||
let _mvbox_thread = thread::spawn(move || {
|
||||
while *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
|
||||
if *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
println!("Connection<{}>::connect(): mvbox thread exited", id);
|
||||
});
|
||||
|
||||
let sentbox_ctx = self.ctx.clone();
|
||||
let state = self.state.clone();
|
||||
let id = self.id();
|
||||
let _sentbox_thread = thread::spawn(move || {
|
||||
while *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
|
||||
if *state.read().unwrap() != ConnState::Disconnected {
|
||||
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
println!("Connection<{}>::connect(): sentbox thread exited", id);
|
||||
println!("Connection<{}>::connect(): I/O thread exited", io_id);
|
||||
});
|
||||
|
||||
// Just pretend to be connected all the time for now. Tracking IMAP+SMTP
|
||||
@@ -122,7 +71,7 @@ impl telepathy::Connection for Connection {
|
||||
let state = self.state.clone();
|
||||
let mut w = state.write().unwrap();
|
||||
*w = ConnState::Connected;
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
let ctx = self.ctx.clone();
|
||||
|
||||
// Emit a StatusChanged signal for the benefit of others, but the caller
|
||||
// learns from our RPC response
|
||||
@@ -136,13 +85,13 @@ impl telepathy::Connection for Connection {
|
||||
self.actq.send(DbusAction::FreshMessages).unwrap();
|
||||
|
||||
// If we can, emit signals on connect about the contact list
|
||||
if let Ok(handles) = Contact::get_all(
|
||||
if let Ok(handles) = block_on(Contact::get_all(
|
||||
&ctx,
|
||||
(dc::constants::DC_GCL_ADD_SELF as usize)
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
None::<String>,
|
||||
) {
|
||||
)) {
|
||||
println!(" HANDLES: {:?}", handles);
|
||||
let mut changes = HashMap::<u32, ContactSubscription>::new();
|
||||
for handle in handles {
|
||||
@@ -170,17 +119,12 @@ impl telepathy::Connection for Connection {
|
||||
|
||||
fn disconnect(&self) -> Result<()> {
|
||||
println!("Connection<{}>::disconnect()", self.id());
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
block_on(self.ctx.stop_io());
|
||||
|
||||
let state = self.state.clone();
|
||||
let mut w = state.write().unwrap();
|
||||
*w = ConnState::Disconnected;
|
||||
|
||||
dc::job::interrupt_inbox_idle(&ctx);
|
||||
dc::job::interrupt_smtp_idle(&ctx);
|
||||
dc::job::interrupt_sentbox_idle(&ctx);
|
||||
dc::job::interrupt_mvbox_idle(&ctx);
|
||||
|
||||
// FIXME: we need to signal to the CM that they should remove the
|
||||
// connection from the active list
|
||||
|
||||
@@ -330,7 +274,7 @@ impl telepathy::Connection for Connection {
|
||||
|
||||
match handle_type {
|
||||
crate::padfoot::HANDLE_TYPE_CONTACT => {
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
let ctx = &self.ctx;
|
||||
let mut out = Vec::<u32>::new();
|
||||
|
||||
// Identifiers is a list of email addresses. These can be
|
||||
@@ -340,12 +284,16 @@ impl telepathy::Connection for Connection {
|
||||
// FIXME: will it be faster to get all and filter?
|
||||
|
||||
for addr in identifiers {
|
||||
let id = Contact::lookup_id_by_addr(&ctx, addr, dc::contact::Origin::Unknown);
|
||||
let id = block_on(Contact::lookup_id_by_addr(
|
||||
ctx,
|
||||
addr,
|
||||
dc::contact::Origin::Unknown,
|
||||
));
|
||||
match id {
|
||||
0 => {
|
||||
// No contact exists for this address yet. Try to
|
||||
// add one so we can have an ID.
|
||||
match Contact::create(&ctx, &addr, &addr) {
|
||||
match block_on(Contact::create(ctx, &addr, &addr)) {
|
||||
Ok(new_id) => out.push(new_id),
|
||||
Err(e) => {
|
||||
println!("Failed to add contact {}: {}", addr, e);
|
||||
@@ -384,10 +332,10 @@ impl telepathy::Connection for Connection {
|
||||
fn self_id(&self) -> Result<String> {
|
||||
println!("Connection<{}>::self_id()", self.id());
|
||||
|
||||
let contact = match dc::contact::Contact::get_by_id(
|
||||
&self.ctx.read().unwrap(),
|
||||
let contact = match block_on(dc::contact::Contact::get_by_id(
|
||||
&self.ctx,
|
||||
dc::constants::DC_CONTACT_ID_SELF,
|
||||
) {
|
||||
)) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
println!(" err: {}", e);
|
||||
|
@@ -1,6 +1,7 @@
|
||||
use crate::padfoot::VarArg;
|
||||
use crate::telepathy;
|
||||
|
||||
use async_std::task::block_on;
|
||||
use dbus::tree::MethodErr;
|
||||
use dc::constants::DC_GCL_ADD_SELF;
|
||||
use dc::contact::Contact;
|
||||
@@ -31,8 +32,11 @@ impl telepathy::ConnectionInterfaceContactList for Connection {
|
||||
hold
|
||||
);
|
||||
|
||||
let ctx = &self.ctx.read().unwrap();
|
||||
let ids = match Contact::get_all(ctx, DC_GCL_ADD_SELF.try_into().unwrap(), None::<String>) {
|
||||
let ids = match block_on(Contact::get_all(
|
||||
&self.ctx,
|
||||
DC_GCL_ADD_SELF.try_into().unwrap(),
|
||||
None::<String>,
|
||||
)) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
println!(" err: {}", e);
|
||||
@@ -65,11 +69,9 @@ impl telepathy::ConnectionInterfaceContactList for Connection {
|
||||
fn remove_contacts(&self, contacts: Vec<u32>) -> Result<(), MethodErr> {
|
||||
println!("Connection<{}>::remove_contacts({:?})", self.id(), contacts);
|
||||
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
|
||||
for contact_id in contacts {
|
||||
// FIXME: don't ignore errors
|
||||
if let Err(e) = Contact::delete(&ctx, contact_id) {
|
||||
if let Err(e) = block_on(Contact::delete(&self.ctx, contact_id)) {
|
||||
println!(" Deleting contact {} failed: {}", contact_id, e);
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
use crate::padfoot::{var_str, var_u32, VarArg};
|
||||
use crate::telepathy;
|
||||
|
||||
use async_std::task::block_on;
|
||||
use dbus::tree::MethodErr;
|
||||
use deltachat::contact::{Contact, Origin};
|
||||
use std::collections::HashMap;
|
||||
@@ -31,7 +32,7 @@ impl telepathy::ConnectionInterfaceContacts for Connection {
|
||||
let mut out = HashMap::<u32, HashMap<String, VarArg>>::new();
|
||||
for id in handles.iter() {
|
||||
// FIXME: work out how to use get_all
|
||||
let contact = match Contact::get_by_id(&self.ctx.read().unwrap(), *id) {
|
||||
let contact = match block_on(Contact::get_by_id(&self.ctx, *id)) {
|
||||
Ok(c) => c,
|
||||
Err(_e) => continue, // Invalid IDs are silently ignored
|
||||
};
|
||||
@@ -77,10 +78,11 @@ impl telepathy::ConnectionInterfaceContacts for Connection {
|
||||
interfaces
|
||||
);
|
||||
|
||||
let id = {
|
||||
let ctx = &self.ctx.read().unwrap();
|
||||
Contact::lookup_id_by_addr(ctx, identifier, Origin::Unknown)
|
||||
};
|
||||
let id = block_on(Contact::lookup_id_by_addr(
|
||||
&self.ctx,
|
||||
identifier,
|
||||
Origin::Unknown,
|
||||
));
|
||||
|
||||
if id == 0 {
|
||||
return Err(MethodErr::no_arg()); // FIXME: should be InvalidHandle
|
||||
|
@@ -1,6 +1,7 @@
|
||||
use crate::padfoot::{get_var_str, get_var_u32, requestables, Channel, DbusAction, VarArg};
|
||||
use crate::telepathy;
|
||||
|
||||
use async_std::task::block_on;
|
||||
use dbus::tree::MethodErr;
|
||||
use dc::contact::Contact;
|
||||
use deltachat as dc;
|
||||
@@ -77,10 +78,11 @@ impl telepathy::ConnectionInterfaceRequests for Connection {
|
||||
return Err(MethodErr::no_arg());
|
||||
};
|
||||
|
||||
let ctx = self.ctx.read().unwrap();
|
||||
|
||||
let target_handle =
|
||||
Contact::lookup_id_by_addr(&ctx, target_id.clone(), dc::contact::Origin::Unknown);
|
||||
let target_handle = block_on(Contact::lookup_id_by_addr(
|
||||
&self.ctx,
|
||||
target_id.clone(),
|
||||
dc::contact::Origin::Unknown,
|
||||
));
|
||||
if target_handle == 0 {
|
||||
println!("Couldn't find target handle for {}", target_id);
|
||||
return Err(MethodErr::no_arg());
|
||||
@@ -96,7 +98,7 @@ impl telepathy::ConnectionInterfaceRequests for Connection {
|
||||
}
|
||||
|
||||
// Now we need to discover or create a chat id for the contact
|
||||
let chat_id = dc::chat::create_by_contact_id(&ctx, target_handle).unwrap();
|
||||
let chat_id = block_on(dc::chat::create_by_contact_id(&self.ctx, target_handle)).unwrap();
|
||||
let channel = Channel::new(
|
||||
self.actq.clone(),
|
||||
chat_id,
|
||||
|
@@ -1,5 +1,6 @@
|
||||
use crate::padfoot::{var_bytearray, var_i64, var_str, var_u32, VarArg};
|
||||
|
||||
use async_std::path::{Path, PathBuf};
|
||||
use dc::constants::Viewtype as Vt;
|
||||
use dc::message::Message;
|
||||
use deltachat as dc;
|
||||
@@ -10,7 +11,7 @@ type Part = HashMap<String, VarArg>;
|
||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
|
||||
|
||||
// Turns a deltachat::message::Message into a Telepathy Message_Part_List
|
||||
pub fn convert_msg(blobdir: &std::path::Path, msg: &Message) -> Result<Vec<Part>> {
|
||||
pub fn convert_msg(blobdir: &Path, msg: &Message) -> Result<Vec<Part>> {
|
||||
if msg.is_setupmessage() {
|
||||
return Ok(convert_setupmessage(msg));
|
||||
}
|
||||
@@ -110,15 +111,13 @@ fn build_vid(_msg: &Message) -> Result<Vec<Part>> {
|
||||
|
||||
// The message contains a file. Detect the content-type and construct a part
|
||||
// containing the data in full.
|
||||
fn build_attachment(blobdir: &std::path::Path, msg: &Message) -> Result<Vec<Part>> {
|
||||
fn build_attachment(blobdir: &Path, msg: &Message) -> Result<Vec<Part>> {
|
||||
let mime = msg
|
||||
.get_filemime()
|
||||
.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||
let filename = msg.get_filename().ok_or("Failed to get filename")?;
|
||||
|
||||
let path: std::path::PathBuf = [blobdir, &std::path::Path::new(&filename)].iter().collect();
|
||||
// let mut path = std::path::PathBuf::from(blobdir);
|
||||
// path.push(filename);
|
||||
let path: PathBuf = [blobdir, &Path::new(&filename)].iter().collect();
|
||||
|
||||
let data =
|
||||
std::fs::read(&path).map_err(|e| format!("Failed to read file {:?}: {}", path, e))?;
|
||||
|
Reference in New Issue
Block a user