From 29ea694b1e1b498c4ee388cda022f961cd010dd8 Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Sun, 31 Oct 2021 21:17:42 +0000 Subject: [PATCH] Start running delta connections. Lots of problems still --- src/delta/system.rs | 168 ++++++++++--------------------- src/lib.rs | 90 +++++++++-------- src/messages/account_proxy.rs | 1 - src/messages/connection_proxy.rs | 1 - src/messages/handle_proxy.rs | 1 - src/messages/mod.rs | 1 - 6 files changed, 104 insertions(+), 158 deletions(-) diff --git a/src/delta/system.rs b/src/delta/system.rs index f2a1df7..ac9047d 100644 --- a/src/delta/system.rs +++ b/src/delta/system.rs @@ -2,8 +2,8 @@ // use super::protocol; use crate::logging; use crate::messages::{ - AccountInfo, FdSender, GetChatInfoMessage, GetHistoryMessage, DeltaSystemHandle, JoinChatMessage, - PurpleMessage, SendMsgMessage, SystemMessage, + AccountInfo, DeltaSystemHandle, FdSender, GetChatInfoMessage, GetHistoryMessage, + JoinChatMessage, PurpleMessage, SendMsgMessage, SystemMessage, }; // use crate::{Handle, ChatInfo}; use async_std::channel::{self, Receiver}; @@ -36,8 +36,8 @@ pub fn run(tx: FdSender, rx: Receiver) { let mut config_dir = DeltaSystem::user_dir(); config_dir.push("purple-plugin-delta"); - let accounts = - async_std::task::block_on(Accounts::new("purple-plugin-delta".into(), config_dir)).unwrap(); + let accounts = + async_std::task::block_on(Accounts::new("purple-plugin-delta".into(), config_dir)).unwrap(); let mut system = DeltaSystem::new(tx, rx, accounts); async_std::task::block_on(system.run()); @@ -55,14 +55,14 @@ impl DeltaSystem { } fn user_dir() -> async_std::path::PathBuf { - use std::os::unix::ffi::OsStrExt; + use std::os::unix::ffi::OsStrExt; - // SAFETY: We trust libpurple here - let slice = unsafe { std::ffi::CStr::from_ptr(crate::purple_sys::purple_user_dir()) }; - let osstr = std::ffi::OsStr::from_bytes(slice.to_bytes()); - let path: &async_std::path::Path = osstr.as_ref(); + // SAFETY: We trust libpurple here + let slice = unsafe { std::ffi::CStr::from_ptr(crate::purple_sys::purple_user_dir()) }; + let osstr = std::ffi::OsStr::from_bytes(slice.to_bytes()); + let path: &async_std::path::Path = osstr.as_ref(); - path.into() + path.into() } async fn run(&mut self) { @@ -97,113 +97,56 @@ impl DeltaSystem { let password = { account_info.protocol_data.imap_pass.clone() }; let handle = &account_info.handle; + self.tx + .connection_proxy(&handle) + .set_state(purple::PurpleConnectionState::PURPLE_CONNECTING) + .await; + // TODO: make this properly async - let ctx = match self.accounts.get_all().await.into_iter() - .map( |id| async_std::task::block_on(self.accounts.get_account(id)).unwrap() ) - .find( |ctx| async_std::task::block_on(ctx.is_self_addr(&email_address)).unwrap() ) { + let ctx = match self + .accounts + .get_all() + .await + .into_iter() + .map(|id| async_std::task::block_on(self.accounts.get_account(id)).unwrap()) + .find(|ctx| { + async_std::task::block_on(ctx.is_self_addr(&email_address)).unwrap_or(false) + }) { None => { let id = self.accounts.add_account().await.unwrap(); self.accounts.get_account(id).await.unwrap() - }, - Some(ctx) => ctx - }; - - // Now transpose config into ctx. TODO: rest of the fields - ctx.set_config(deltachat::config::Config::Addr, Some(&email_address)).await.unwrap(); - ctx.set_config(deltachat::config::Config::MailPw, Some(&password)).await.unwrap(); - - ctx.configure().await.unwrap(); - ctx.start_io(); // TODO: what to do with this future? - // TODO: set off event emitter - - /* - let mut registered_account_info = { - self.tx - .account_proxy(&handle) - .exec(|purple_account| { - let token = - account.get_string(protocol::RegistrationData::TOKEN_SETTING_KEY, ""); - if token.is_empty() { - None - } else { - Some(protocol::RegistrationData { - token, - session_id: account - .get_string(protocol::RegistrationData::SESSION_ID_SETTING_KEY, ""), - session_key: account.get_string( - protocol::RegistrationData::SESSION_KEY_SETTING_KEY, - "", - ), - host_time: account - .get_int(protocol::RegistrationData::HOST_TIME_SETTING_KEY, 0) - as u32, - }) - } - }) - .await - .ok_or_else(|| "Failed to read settings".to_string())? - }; - if registered_account_info.is_none() { - let info = protocol::register(&phone_number, || { - log::debug!("read_code"); - self.read_code(&account_info.handle) - }) - .await - .map_err(|e| format!("Failed to register account: {:?}", e))?; - - self.tx - .account_proxy(&handle) - .set_settings(info.clone()) - .await - .map_err(|e| format!("Failed to write settings: {:?}", e))?; - - registered_account_info = Some(info); - } - - log::debug!("Registered account info: {:?}", registered_account_info); - if registered_account_info.is_none() { - self.tx - .connection_proxy(&handle) - .error_reason( - purple::PurpleConnectionError::PURPLE_CONNECTION_ERROR_AUTHENTICATION_FAILED, - "Failed to register account".into(), - ) - .await; - return Err("Failed to register account".into()); - } - - if let Some(registered_account_info) = registered_account_info { - self.tx - .connection_proxy(&handle) - .set_state(purple::PurpleConnectionState::PURPLE_CONNECTING) - .await; - - let session_info = protocol::start_session(®istered_account_info).await; - log::debug!("Session info: {:?}", session_info); - match session_info { - Ok(session) => { - self.tx - .connection_proxy(&handle) - .set_state(purple::PurpleConnectionState::PURPLE_CONNECTED) - .await; - (*account_info.protocol_data.session.write().await) = Some(session); - async_std::task::spawn_local(poller::fetch_events_loop( - self.tx.clone(), - account_info.clone(), - )); - } - Err(error) => { - let error_message = format!("Failed to start session: {:?}", error); - self.tx - .connection_proxy(&handle) - .error_reason(purple::PurpleConnectionError::PURPLE_CONNECTION_ERROR_AUTHENTICATION_FAILED, - error_message.clone()).await; - return Err(error_message); - } } + Some(ctx) => ctx, + }; + + // Now transpose config into ctx. TODO: rest of the fields + ctx.set_config(deltachat::config::Config::Addr, Some(&email_address)) + .await + .unwrap(); + ctx.set_config(deltachat::config::Config::MailPw, Some(&password)) + .await + .unwrap(); + + // FIXME: handle configuration failure nicely here. Right now we just panic. + ctx.configure().await.unwrap(); + ctx.start_io().await; + + async_std::task::spawn_local(DeltaSystem::deltachat_events(ctx)); + // Hint from deleted code: + // self.tx.account_proxy(&handle).exec(|purple_account| ...); + + Ok(()) + } + + async fn deltachat_events(ctx: deltachat::context::Context) { + // TODO: loop until we're out of events + let emitter = ctx.get_event_emitter(); + while let Some(event) = emitter.recv().await { + println!("Received event {:?}", event); } - */ - Err("TODO".into()) + + // FIXME: this is back to front. We need to stop_io to interrupt the loop + ctx.stop_io().await; } async fn get_chat_info(&mut self, message: GetChatInfoMessage) -> Result<(), String> { @@ -260,7 +203,7 @@ impl DeltaSystem { Ok(()) } - async fn get_history(&mut self, get_history_message: GetHistoryMessage) -> Result<(), String> { + async fn get_history(&mut self, _get_history_message: GetHistoryMessage) -> Result<(), String> { /* let session = { get_history_message @@ -307,4 +250,3 @@ impl DeltaSystem { Ok(()) } } - diff --git a/src/lib.rs b/src/lib.rs index 506048a..4e9d3f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,10 +2,10 @@ extern crate async_std; extern crate deltachat; extern crate lazy_static; extern crate log; -extern crate purple_rs as purple; extern crate openssl; +extern crate purple_rs as purple; -use async_std::sync::Arc; // RwLock +use async_std::sync::Arc; // use chat_info::ChatInfo; //PartialChatInfo, ChatInfoVersion use lazy_static::lazy_static; use messages::{AccountInfo, DeltaSystemHandle, PurpleMessage, SystemMessage}; @@ -90,7 +90,6 @@ pub struct AccountData { // Not exposed: server_flags, selfstatus, e2ee_enabled session_closed: AtomicBool, - // session: RwLock>, } impl Drop for AccountData { @@ -197,7 +196,6 @@ impl purple::LoginHandler for PurpleDelta { bcc_self, session_closed: AtomicBool::new(false), - // session: RwLock::new(None), }); // SAFETY: @@ -225,6 +223,7 @@ impl purple::CloseHandler for PurpleDelta { .data .session_closed .store(true, Ordering::Relaxed); + self.connections.remove(*connection); } None => { @@ -526,45 +525,54 @@ impl PurpleDelta { PurpleCmdRet::PURPLE_CMD_RET_OK } - fn process_message(&mut self, message: SystemMessage) { - log::info!("received system message"); - match message { - SystemMessage::ExecAccount { handle, function } => { -/* - self.connections - .get(handle) - .map(|protocol_data| function(&mut protocol_data.account)) - .or_else(|| { - log::warn!("The account connection has been closed"); - None - }); - */ - } - SystemMessage::ExecConnection { handle, function } => { -/* - self.connections - .get(handle) - .map(|protocol_data| function(&mut protocol_data.connection)) - .or_else(|| { - log::warn!("The account connection has been closed"); - None - }); -*/ - } - SystemMessage::ExecHandle { handle, function } => { -/* - self.connections - .get(handle) - .map(|mut protocol_data| function(self, &mut protocol_data)) - .or_else(|| { - log::warn!("The account connection has been closed"); - None - }); -*/ - } - SystemMessage::FlushLogs => logging::flush(), + fn process_message(&mut self, message: SystemMessage) { + log::info!("received system message"); + match message { + SystemMessage::ExecAccount { + handle: _, + function: _, + } => { + /* + self.connections + .get(handle) + .map(|protocol_data| function(&mut protocol_data.account)) + .or_else(|| { + log::warn!("The account connection has been closed"); + None + }); + */ } + SystemMessage::ExecConnection { + handle: _, + function: _, + } => { + /* + self.connections + .get(handle) + .map(|protocol_data| function(&mut protocol_data.connection)) + .or_else(|| { + log::warn!("The account connection has been closed"); + None + }); + */ + } + SystemMessage::ExecHandle { + handle: _, + function: _, + } => { + /* + self.connections + .get(handle) + .map(|mut protocol_data| function(self, &mut protocol_data)) + .or_else(|| { + log::warn!("The account connection has been closed"); + None + }); + */ + } + SystemMessage::FlushLogs => logging::flush(), } + } /* pub fn serv_got_chat_in(&mut self, connection: &mut Connection, msg_info: MsgInfo) { match purple::Chat::find(&mut connection.get_account(), &msg_info.chat_sn) { diff --git a/src/messages/account_proxy.rs b/src/messages/account_proxy.rs index 27ce5f2..cc6f8e9 100644 --- a/src/messages/account_proxy.rs +++ b/src/messages/account_proxy.rs @@ -99,4 +99,3 @@ impl<'a> AccountProxy<'a> { }) } } - diff --git a/src/messages/connection_proxy.rs b/src/messages/connection_proxy.rs index 8958695..ce24221 100644 --- a/src/messages/connection_proxy.rs +++ b/src/messages/connection_proxy.rs @@ -52,4 +52,3 @@ impl<'a> ConnectionProxy<'a> { .await } } - diff --git a/src/messages/handle_proxy.rs b/src/messages/handle_proxy.rs index 0cff0c7..a6045e6 100644 --- a/src/messages/handle_proxy.rs +++ b/src/messages/handle_proxy.rs @@ -41,4 +41,3 @@ impl<'a> HandleProxy<'a> { .await; } } - diff --git a/src/messages/mod.rs b/src/messages/mod.rs index 1436e70..6d029ff 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -191,4 +191,3 @@ pub struct DeltaSystemHandle { pub rx: Receiver, pub tx: Sender, } -