Start running delta connections. Lots of problems still

This commit is contained in:
2021-10-31 21:17:42 +00:00
parent eec8669b88
commit 29ea694b1e
6 changed files with 104 additions and 158 deletions

View File

@@ -2,8 +2,8 @@
// use super::protocol;
use crate::logging;
use crate::messages::{
AccountInfo, FdSender, GetChatInfoMessage, GetHistoryMessage, DeltaSystemHandle, JoinChatMessage,
PurpleMessage, SendMsgMessage, SystemMessage,
AccountInfo, DeltaSystemHandle, FdSender, GetChatInfoMessage, GetHistoryMessage,
JoinChatMessage, PurpleMessage, SendMsgMessage, SystemMessage,
};
// use crate::{Handle, ChatInfo};
use async_std::channel::{self, Receiver};
@@ -36,8 +36,8 @@ pub fn run(tx: FdSender<SystemMessage>, rx: Receiver<PurpleMessage>) {
let mut config_dir = DeltaSystem::user_dir();
config_dir.push("purple-plugin-delta");
let accounts =
async_std::task::block_on(Accounts::new("purple-plugin-delta".into(), config_dir)).unwrap();
let accounts =
async_std::task::block_on(Accounts::new("purple-plugin-delta".into(), config_dir)).unwrap();
let mut system = DeltaSystem::new(tx, rx, accounts);
async_std::task::block_on(system.run());
@@ -55,14 +55,14 @@ impl DeltaSystem {
}
fn user_dir() -> async_std::path::PathBuf {
use std::os::unix::ffi::OsStrExt;
use std::os::unix::ffi::OsStrExt;
// SAFETY: We trust libpurple here
let slice = unsafe { std::ffi::CStr::from_ptr(crate::purple_sys::purple_user_dir()) };
let osstr = std::ffi::OsStr::from_bytes(slice.to_bytes());
let path: &async_std::path::Path = osstr.as_ref();
// SAFETY: We trust libpurple here
let slice = unsafe { std::ffi::CStr::from_ptr(crate::purple_sys::purple_user_dir()) };
let osstr = std::ffi::OsStr::from_bytes(slice.to_bytes());
let path: &async_std::path::Path = osstr.as_ref();
path.into()
path.into()
}
async fn run(&mut self) {
@@ -97,113 +97,56 @@ impl DeltaSystem {
let password = { account_info.protocol_data.imap_pass.clone() };
let handle = &account_info.handle;
self.tx
.connection_proxy(&handle)
.set_state(purple::PurpleConnectionState::PURPLE_CONNECTING)
.await;
// TODO: make this properly async
let ctx = match self.accounts.get_all().await.into_iter()
.map( |id| async_std::task::block_on(self.accounts.get_account(id)).unwrap() )
.find( |ctx| async_std::task::block_on(ctx.is_self_addr(&email_address)).unwrap() ) {
let ctx = match self
.accounts
.get_all()
.await
.into_iter()
.map(|id| async_std::task::block_on(self.accounts.get_account(id)).unwrap())
.find(|ctx| {
async_std::task::block_on(ctx.is_self_addr(&email_address)).unwrap_or(false)
}) {
None => {
let id = self.accounts.add_account().await.unwrap();
self.accounts.get_account(id).await.unwrap()
},
Some(ctx) => ctx
};
// Now transpose config into ctx. TODO: rest of the fields
ctx.set_config(deltachat::config::Config::Addr, Some(&email_address)).await.unwrap();
ctx.set_config(deltachat::config::Config::MailPw, Some(&password)).await.unwrap();
ctx.configure().await.unwrap();
ctx.start_io(); // TODO: what to do with this future?
// TODO: set off event emitter
/*
let mut registered_account_info = {
self.tx
.account_proxy(&handle)
.exec(|purple_account| {
let token =
account.get_string(protocol::RegistrationData::TOKEN_SETTING_KEY, "");
if token.is_empty() {
None
} else {
Some(protocol::RegistrationData {
token,
session_id: account
.get_string(protocol::RegistrationData::SESSION_ID_SETTING_KEY, ""),
session_key: account.get_string(
protocol::RegistrationData::SESSION_KEY_SETTING_KEY,
"",
),
host_time: account
.get_int(protocol::RegistrationData::HOST_TIME_SETTING_KEY, 0)
as u32,
})
}
})
.await
.ok_or_else(|| "Failed to read settings".to_string())?
};
if registered_account_info.is_none() {
let info = protocol::register(&phone_number, || {
log::debug!("read_code");
self.read_code(&account_info.handle)
})
.await
.map_err(|e| format!("Failed to register account: {:?}", e))?;
self.tx
.account_proxy(&handle)
.set_settings(info.clone())
.await
.map_err(|e| format!("Failed to write settings: {:?}", e))?;
registered_account_info = Some(info);
}
log::debug!("Registered account info: {:?}", registered_account_info);
if registered_account_info.is_none() {
self.tx
.connection_proxy(&handle)
.error_reason(
purple::PurpleConnectionError::PURPLE_CONNECTION_ERROR_AUTHENTICATION_FAILED,
"Failed to register account".into(),
)
.await;
return Err("Failed to register account".into());
}
if let Some(registered_account_info) = registered_account_info {
self.tx
.connection_proxy(&handle)
.set_state(purple::PurpleConnectionState::PURPLE_CONNECTING)
.await;
let session_info = protocol::start_session(&registered_account_info).await;
log::debug!("Session info: {:?}", session_info);
match session_info {
Ok(session) => {
self.tx
.connection_proxy(&handle)
.set_state(purple::PurpleConnectionState::PURPLE_CONNECTED)
.await;
(*account_info.protocol_data.session.write().await) = Some(session);
async_std::task::spawn_local(poller::fetch_events_loop(
self.tx.clone(),
account_info.clone(),
));
}
Err(error) => {
let error_message = format!("Failed to start session: {:?}", error);
self.tx
.connection_proxy(&handle)
.error_reason(purple::PurpleConnectionError::PURPLE_CONNECTION_ERROR_AUTHENTICATION_FAILED,
error_message.clone()).await;
return Err(error_message);
}
}
Some(ctx) => ctx,
};
// Now transpose config into ctx. TODO: rest of the fields
ctx.set_config(deltachat::config::Config::Addr, Some(&email_address))
.await
.unwrap();
ctx.set_config(deltachat::config::Config::MailPw, Some(&password))
.await
.unwrap();
// FIXME: handle configuration failure nicely here. Right now we just panic.
ctx.configure().await.unwrap();
ctx.start_io().await;
async_std::task::spawn_local(DeltaSystem::deltachat_events(ctx));
// Hint from deleted code:
// self.tx.account_proxy(&handle).exec(|purple_account| ...);
Ok(())
}
async fn deltachat_events(ctx: deltachat::context::Context) {
// TODO: loop until we're out of events
let emitter = ctx.get_event_emitter();
while let Some(event) = emitter.recv().await {
println!("Received event {:?}", event);
}
*/
Err("TODO".into())
// FIXME: this is back to front. We need to stop_io to interrupt the loop
ctx.stop_io().await;
}
async fn get_chat_info(&mut self, message: GetChatInfoMessage) -> Result<(), String> {
@@ -260,7 +203,7 @@ impl DeltaSystem {
Ok(())
}
async fn get_history(&mut self, get_history_message: GetHistoryMessage) -> Result<(), String> {
async fn get_history(&mut self, _get_history_message: GetHistoryMessage) -> Result<(), String> {
/*
let session = {
get_history_message
@@ -307,4 +250,3 @@ impl DeltaSystem {
Ok(())
}
}

View File

@@ -2,10 +2,10 @@ extern crate async_std;
extern crate deltachat;
extern crate lazy_static;
extern crate log;
extern crate purple_rs as purple;
extern crate openssl;
extern crate purple_rs as purple;
use async_std::sync::Arc; // RwLock
use async_std::sync::Arc;
// use chat_info::ChatInfo; //PartialChatInfo, ChatInfoVersion
use lazy_static::lazy_static;
use messages::{AccountInfo, DeltaSystemHandle, PurpleMessage, SystemMessage};
@@ -90,7 +90,6 @@ pub struct AccountData {
// Not exposed: server_flags, selfstatus, e2ee_enabled
session_closed: AtomicBool,
// session: RwLock<Option<delta::protocol::SessionInfo>>,
}
impl Drop for AccountData {
@@ -197,7 +196,6 @@ impl purple::LoginHandler for PurpleDelta {
bcc_self,
session_closed: AtomicBool::new(false),
// session: RwLock::new(None),
});
// SAFETY:
@@ -225,6 +223,7 @@ impl purple::CloseHandler for PurpleDelta {
.data
.session_closed
.store(true, Ordering::Relaxed);
self.connections.remove(*connection);
}
None => {
@@ -526,45 +525,54 @@ impl PurpleDelta {
PurpleCmdRet::PURPLE_CMD_RET_OK
}
fn process_message(&mut self, message: SystemMessage) {
log::info!("received system message");
match message {
SystemMessage::ExecAccount { handle, function } => {
/*
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.account))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::ExecConnection { handle, function } => {
/*
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.connection))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::ExecHandle { handle, function } => {
/*
self.connections
.get(handle)
.map(|mut protocol_data| function(self, &mut protocol_data))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::FlushLogs => logging::flush(),
fn process_message(&mut self, message: SystemMessage) {
log::info!("received system message");
match message {
SystemMessage::ExecAccount {
handle: _,
function: _,
} => {
/*
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.account))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::ExecConnection {
handle: _,
function: _,
} => {
/*
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.connection))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::ExecHandle {
handle: _,
function: _,
} => {
/*
self.connections
.get(handle)
.map(|mut protocol_data| function(self, &mut protocol_data))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::FlushLogs => logging::flush(),
}
}
/*
pub fn serv_got_chat_in(&mut self, connection: &mut Connection, msg_info: MsgInfo) {
match purple::Chat::find(&mut connection.get_account(), &msg_info.chat_sn) {

View File

@@ -99,4 +99,3 @@ impl<'a> AccountProxy<'a> {
})
}
}

View File

@@ -52,4 +52,3 @@ impl<'a> ConnectionProxy<'a> {
.await
}
}

View File

@@ -41,4 +41,3 @@ impl<'a> HandleProxy<'a> {
.await;
}
}

View File

@@ -191,4 +191,3 @@ pub struct DeltaSystemHandle {
pub rx: Receiver<SystemMessage>,
pub tx: Sender<PurpleMessage>,
}