From f57cdf3cdc8c10799696cc7d551fbf5222c5c900 Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Sun, 31 Oct 2021 01:19:36 +0100 Subject: [PATCH] Initialize Delta accounts on plugin load We don't yet configure any of the accounts, but we're set up to manage several accounts simultaneously in the same plugin instance, using delta's built-in support for that. --- Cargo.lock | 12 ++ Cargo.toml | 5 +- src/delta/mod.rs | 2 +- src/delta/system.rs | 286 +++++++++++++++++++++++++++++++ src/lib.rs | 75 +++----- src/logging.rs | 5 +- src/messages/account_proxy.rs | 102 +++++++++++ src/messages/connection_proxy.rs | 55 ++++++ src/messages/handle_proxy.rs | 44 +++++ src/messages/mod.rs | 194 +++++++++++++++++++++ 10 files changed, 722 insertions(+), 58 deletions(-) create mode 100644 src/delta/system.rs create mode 100644 src/messages/account_proxy.rs create mode 100644 src/messages/connection_proxy.rs create mode 100644 src/messages/handle_proxy.rs create mode 100644 src/messages/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 67aba39..cf55c4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2309,6 +2309,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_pipe" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb233f06c2307e1f5ce2ecad9f8121cffbbee2c95428f44ea85222e460d0d213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "ouroboros" version = "0.9.5" @@ -2606,7 +2616,9 @@ dependencies = [ "lazy_static", "log", "openssl", + "os_pipe", "purple-rs", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b2af3de..cd8a385 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,14 +11,15 @@ path = "src/lib.rs" crate-type = ["dylib"] [dependencies] -openssl = "*" deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag = "1.61.0" } lazy_static = "*" log = "*" +openssl = "*" +os_pipe = "*" purple-rs = { git = "https://github.com/Flared/purple-rs", branch = "master" } +serde = "*" ## Keep in sync with deltachat-core-rust ## - [dependencies.async-std] version = "1" features = ["unstable"] diff --git a/src/delta/mod.rs b/src/delta/mod.rs index 8b13789..ac77f63 100644 --- a/src/delta/mod.rs +++ b/src/delta/mod.rs @@ -1 +1 @@ - +pub mod system; diff --git a/src/delta/system.rs b/src/delta/system.rs new file mode 100644 index 0000000..464f0e9 --- /dev/null +++ b/src/delta/system.rs @@ -0,0 +1,286 @@ +// use super::poller; +// use super::protocol; +use crate::logging; +use crate::messages::{ + AccountInfo, FdSender, GetChatInfoMessage, GetHistoryMessage, DeltaSystemHandle, JoinChatMessage, + PurpleMessage, SendMsgMessage, SystemMessage, +}; +// use crate::{Handle, ChatInfo}; +use async_std::channel::{self, Receiver}; + +use deltachat::accounts::Accounts; + +const CHANNEL_CAPACITY: usize = 1024; + +pub fn spawn() -> DeltaSystemHandle { + let (input_rx, input_tx) = os_pipe::pipe().unwrap(); + let (system_tx, system_rx) = channel::bounded::(CHANNEL_CAPACITY); + let (purple_tx, purple_rx) = channel::bounded(CHANNEL_CAPACITY); + + let fd_sender = FdSender::new(input_tx, system_tx); + + log::debug!("Starting async thread."); + std::thread::spawn(move || run(fd_sender, purple_rx)); + + DeltaSystemHandle { + input_rx, + rx: system_rx, + tx: purple_tx, + } +} + +pub fn run(tx: FdSender, rx: Receiver) { + logging::set_thread_logger(logging::RemoteLogger(tx.clone())); + log::info!("Starting Delta system"); + let mut system = DeltaSystem::new(tx, rx); + async_std::task::block_on(system.run()); +} + +pub struct DeltaSystem { + tx: FdSender, + rx: Receiver, +} + +impl DeltaSystem { + fn new(tx: FdSender, rx: Receiver) -> Self { + Self { tx, rx } + } + + fn user_dir() -> async_std::path::PathBuf { + use std::os::unix::ffi::OsStrExt; + + // SAFETY: We trust libpurple here + let slice = unsafe { std::ffi::CStr::from_ptr(crate::purple_sys::purple_user_dir()) }; + let osstr = std::ffi::OsStr::from_bytes(slice.to_bytes()); + let path: &async_std::path::Path = osstr.as_ref(); + + path.into() + } + + async fn run(&mut self) { + log::debug!("Performing delta accounts setup"); + let mut config_dir = DeltaSystem::user_dir(); + config_dir.push("purple-plugin-delta"); + + Accounts::new("purple-plugin-delta".into(), config_dir).await.unwrap(); + + log::info!("Looping on messages"); + loop { + let purple_message = match self.rx.recv().await { + Ok(r) => r, + Err(error) => { + log::error!("Failed to receive message: {:?}", error); + break; + } + }; + log::info!("Message: {:?}", purple_message); + let result = match purple_message { + PurpleMessage::Login(account_info) => self.login(account_info).await, + PurpleMessage::JoinChat(m) => self.join_chat(m).await, + PurpleMessage::SendMsg(m) => self.send_msg(m).await, + PurpleMessage::GetChatInfo(m) => self.get_chat_info(m).await, + PurpleMessage::GetHistory(m) => self.get_history(m).await, + }; + if let Err(error) = result { + log::error!("Error handling message: {}", error); + } + logging::flush(); + } + } + + async fn login(&mut self, account_info: AccountInfo) -> std::result::Result<(), String> { + log::debug!("login"); + /* + let phone_number = { account_info.protocol_data.phone_number.clone() }; + let handle = &account_info.handle; + let mut registered_account_info = { + self.tx + .account_proxy(&handle) + .exec(|account| { + let token = + account.get_string(protocol::RegistrationData::TOKEN_SETTING_KEY, ""); + if token.is_empty() { + None + } else { + Some(protocol::RegistrationData { + token, + session_id: account + .get_string(protocol::RegistrationData::SESSION_ID_SETTING_KEY, ""), + session_key: account.get_string( + protocol::RegistrationData::SESSION_KEY_SETTING_KEY, + "", + ), + host_time: account + .get_int(protocol::RegistrationData::HOST_TIME_SETTING_KEY, 0) + as u32, + }) + } + }) + .await + .ok_or_else(|| "Failed to read settings".to_string())? + }; + if registered_account_info.is_none() { + let info = protocol::register(&phone_number, || { + log::debug!("read_code"); + self.read_code(&account_info.handle) + }) + .await + .map_err(|e| format!("Failed to register account: {:?}", e))?; + + self.tx + .account_proxy(&handle) + .set_settings(info.clone()) + .await + .map_err(|e| format!("Failed to write settings: {:?}", e))?; + + registered_account_info = Some(info); + } + + log::debug!("Registered account info: {:?}", registered_account_info); + if registered_account_info.is_none() { + self.tx + .connection_proxy(&handle) + .error_reason( + purple::PurpleConnectionError::PURPLE_CONNECTION_ERROR_AUTHENTICATION_FAILED, + "Failed to register account".into(), + ) + .await; + return Err("Failed to register account".into()); + } + + if let Some(registered_account_info) = registered_account_info { + self.tx + .connection_proxy(&handle) + .set_state(purple::PurpleConnectionState::PURPLE_CONNECTING) + .await; + + let session_info = protocol::start_session(®istered_account_info).await; + log::debug!("Session info: {:?}", session_info); + match session_info { + Ok(session) => { + self.tx + .connection_proxy(&handle) + .set_state(purple::PurpleConnectionState::PURPLE_CONNECTED) + .await; + (*account_info.protocol_data.session.write().await) = Some(session); + async_std::task::spawn_local(poller::fetch_events_loop( + self.tx.clone(), + account_info.clone(), + )); + } + Err(error) => { + let error_message = format!("Failed to start session: {:?}", error); + self.tx + .connection_proxy(&handle) + .error_reason(purple::PurpleConnectionError::PURPLE_CONNECTION_ERROR_AUTHENTICATION_FAILED, + error_message.clone()).await; + return Err(error_message); + } + } + } + */ + Err("TODO".into()) + } + + async fn get_chat_info(&mut self, message: GetChatInfoMessage) -> Result<(), String> { + log::info!("Get chat info sn: {}", message.message_data.sn); + /* + let session = { message.protocol_data.session.read().await.clone().unwrap() }; + let chat_info_response = protocol::get_chat_info_by_sn(&session, &message.message_data.sn) + .await + .map_err(|e| format!("Failed to get chat info: {:?}", e))?; + + self.tx + .handle_proxy(&message.handle) + .exec_no_return(move |plugin, protocol_data| { + let chat_info = ChatInfo::from(chat_info_response); + let connection = &mut protocol_data.connection; + plugin.load_chat_info(connection, &chat_info); + }) + .await; + */ + Ok(()) + } + + async fn join_chat(&mut self, message: JoinChatMessage) -> Result<(), String> { + log::info!("Joining stamp: {}", message.message_data.stamp); + /* + let session = { message.protocol_data.session.read().await.clone().unwrap() }; + let stamp = message.message_data.stamp; + // Handle shareable URLs: https://icq.im/XXXXXXXXXXXXXX + let stamp = if stamp.contains("icq.im/") { + stamp.rsplit('/').next().unwrap().into() + } else { + stamp + }; + + protocol::join_chat(&session, &stamp) + .await + .map_err(|e| format!("Failed to join chat: {:?}", e))?; + let chat_info_response = protocol::get_chat_info(&session, &stamp) + .await + .map_err(|e| format!("Failed to get chat info: {:?}", e))?; + + self.tx + .handle_proxy(&message.handle) + .exec_no_return(move |plugin, protocol_data| { + let chat_info = ChatInfo::from(chat_info_response); + let partial_info = chat_info.as_partial(); + let connection = &mut protocol_data.connection; + plugin.chat_joined(connection, &partial_info); + plugin.conversation_joined(connection, &partial_info); + plugin.load_chat_info(connection, &chat_info); + }) + .await; + */ + Ok(()) + } + + async fn get_history(&mut self, get_history_message: GetHistoryMessage) -> Result<(), String> { + /* + let session = { + get_history_message + .protocol_data + .session + .read() + .await + .clone() + .unwrap() + }; + let sn = &get_history_message.message_data.sn; + let from_msg_id = &get_history_message.message_data.from_msg_id; + let count = get_history_message.message_data.count; + + let history = protocol::get_history(&session, sn, from_msg_id, count) + .await + .map_err(|e| format!("Failed to get history: {:?}", e))?; + + super::poller::process_hist_dlg_state_messages( + self.tx.clone(), + session, + get_history_message.handle, + sn, + &history.persons, + None, + &history.messages, + ) + .await; + */ + + Ok(()) + } + + async fn send_msg(&mut self, message: SendMsgMessage) -> Result<(), String> { + log::info!("send_msg({:?})", message); + /* + let to_sn = &message.message_data.to_sn; + let message_body = &message.message_data.message; + let session = { message.protocol_data.session.read().await.clone().unwrap() }; + let _msg_info = protocol::send_im(&session, to_sn, message_body) + .await + .map_err(|e| format!("Failed to send msg: {:?}", e))?; + */ + Ok(()) + } +} + diff --git a/src/lib.rs b/src/lib.rs index ff0a597..7dca262 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,9 +6,9 @@ extern crate purple_rs as purple; extern crate openssl; use async_std::sync::Arc; // RwLock -use chat_info::{ChatInfo, PartialChatInfo}; //ChatInfoVersion +// use chat_info::ChatInfo; //PartialChatInfo, ChatInfoVersion use lazy_static::lazy_static; -//use messages::{AccountInfo, ICQSystemHandle, PurpleMessage, SystemMessage}; +use messages::{AccountInfo, DeltaSystemHandle, PurpleMessage, SystemMessage}; use purple::*; //use std::cell::RefCell; use std::ffi::{CStr, CString}; @@ -16,12 +16,10 @@ use std::ffi::{CStr, CString}; //use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; -use deltachat::accounts::Accounts; - -mod chat_info; -mod delta; +pub mod chat_info; +pub mod delta; pub mod logging; -//mod messages; +pub mod messages; pub mod status { use lazy_static::lazy_static; @@ -91,7 +89,7 @@ pub struct AccountData { // Not exposed: server_flags, selfstatus, e2ee_enabled session_closed: AtomicBool, - // session: RwLock>, + // session: RwLock>, } impl Drop for AccountData { @@ -105,9 +103,8 @@ pub type Handle = purple::Handle; pub type ProtocolData = purple::ProtocolData; pub struct PurpleDelta { - // system: ICQSystemHandle, + system: DeltaSystemHandle, connections: purple::Connections, - input_handle: Option, imex_command_handle: Option, } @@ -116,10 +113,9 @@ impl purple::PrplPlugin for PurpleDelta { fn new() -> Self { logging::init(log::LevelFilter::Debug).expect("Failed to initialize logging"); - // let system = icq::system::spawn(); + let system = delta::system::spawn(); Self { - // system, - input_handle: None, + system, imex_command_handle: None, connections: purple::Connections::new(), } @@ -200,19 +196,20 @@ impl purple::LoginHandler for PurpleDelta { // session: RwLock::new(None), }); + // SAFETY: // Safe as long as we remove the account in "close". unsafe { self.connections .add(account.get_connection().unwrap(), protocol_data.clone()) }; - /* + self.system .tx .try_send(PurpleMessage::Login(AccountInfo { handle: Handle::from(&mut *account), protocol_data, })) - .unwrap();*/ + .unwrap(); } } impl purple::CloseHandler for PurpleDelta { @@ -253,13 +250,7 @@ impl purple::StatusTypeHandler for PurpleDelta { impl purple::LoadHandler for PurpleDelta { fn load(&mut self, _plugin: &purple::Plugin) -> bool { logging::set_thread_logger(logging::PurpleDebugLogger); - //use std::os::unix::io::AsRawFd; - /* - self.input_handle = Some(self.enable_input( - self.system.input_rx.as_raw_fd(), - purple::PurpleInputCondition::PURPLE_INPUT_READ, - )); - */ + self.imex_command_handle = Some(self.enable_command(commands::IMEX, "w", "imex <code>")); @@ -435,33 +426,6 @@ impl purple::ChatSendHandler for PurpleDelta { 1 } } - -impl purple::InputHandler for PurpleDelta { - fn input(&mut self, _fd: i32, _cond: purple::PurpleInputCondition) { - log::debug!("Input"); - /* - // Consume the byte from the input pipe. - let mut buf = [0; 1]; - - self.system - .input_rx - .read_exact(&mut buf) - .expect("Failed to read input pipe"); - - // Consume the actual message. - match self.system.rx.try_recv() { - Ok(message) => self.process_message(message), - Err(async_std::sync::TryRecvError::Empty) => log::error!("Expected message, but empty"), - Err(async_std::sync::TryRecvError::Disconnected) => { - log::error!("System disconnected"); - if let Some(input_handle) = self.input_handle { - self.disable_input(input_handle); - } - } - }; - */ - } -} */ impl purple::CommandHandler for PurpleDelta { fn command( @@ -487,7 +451,7 @@ impl purple::CommandHandler for PurpleDelta { } impl PurpleDelta { - fn command_imex(&mut self, conversation: &mut Conversation, args: &[&str]) -> PurpleCmdRet { + fn command_imex(&mut self, _conversation: &mut Conversation, args: &[&str]) -> PurpleCmdRet { log::debug!("command_imex"); if args.len() != 1 { @@ -557,10 +521,12 @@ impl PurpleDelta { */ PurpleCmdRet::PURPLE_CMD_RET_OK } - /* + fn process_message(&mut self, message: SystemMessage) { + log::info!("received system message"); match message { SystemMessage::ExecAccount { handle, function } => { +/* self.connections .get(handle) .map(|protocol_data| function(&mut protocol_data.account)) @@ -568,8 +534,10 @@ impl PurpleDelta { log::warn!("The account connection has been closed"); None }); + */ } SystemMessage::ExecConnection { handle, function } => { +/* self.connections .get(handle) .map(|protocol_data| function(&mut protocol_data.connection)) @@ -577,8 +545,10 @@ impl PurpleDelta { log::warn!("The account connection has been closed"); None }); +*/ } SystemMessage::ExecHandle { handle, function } => { +/* self.connections .get(handle) .map(|mut protocol_data| function(self, &mut protocol_data)) @@ -586,11 +556,12 @@ impl PurpleDelta { log::warn!("The account connection has been closed"); None }); +*/ } SystemMessage::FlushLogs => logging::flush(), } } - + /* pub fn serv_got_chat_in(&mut self, connection: &mut Connection, msg_info: MsgInfo) { match purple::Chat::find(&mut connection.get_account(), &msg_info.chat_sn) { Some(mut chat) => { diff --git a/src/logging.rs b/src/logging.rs index 76ce460..b6db08b 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,6 +1,6 @@ // This is a copy of https://github.com/Flared/purple-icq/blob/master/src/logging.rs -//use crate::messages::{FdSender, SystemMessage}; +use crate::messages::{FdSender, SystemMessage}; use crate::purple; use std::cell::RefCell; use std::sync::Mutex; @@ -76,7 +76,6 @@ impl log::Log for PurpleDebugLogger { } } -/* pub struct RemoteLogger(pub FdSender); impl log::Log for RemoteLogger { @@ -100,7 +99,7 @@ impl log::Log for RemoteLogger { self.0.clone().try_send(SystemMessage::FlushLogs); } } -*/ + pub fn init(level: log::LevelFilter) -> Result<(), log::SetLoggerError> { log::set_logger(&TLS_LOGGER).map(|()| log::set_max_level(level)) } diff --git a/src/messages/account_proxy.rs b/src/messages/account_proxy.rs new file mode 100644 index 0000000..27ce5f2 --- /dev/null +++ b/src/messages/account_proxy.rs @@ -0,0 +1,102 @@ +use super::{FdSender, SystemMessage}; +use crate::Handle; +use async_std::channel; +use purple::{account, Account}; + +pub struct AccountProxy<'a> { + pub handle: Handle, + pub sender: &'a mut FdSender, +} +impl<'a> AccountProxy<'a> { + pub async fn exec(&mut self, f: F) -> Option + where + F: FnOnce(&mut Account) -> T, + F: Send + 'static, + T: Send + 'static, + { + let (tx, rx) = channel::bounded(1); + self.exec_no_return(move |account| { + if let Err(error) = tx.try_send(f(account)) { + log::error!("Failed to send result: {:?}", error); + } + }) + .await; + rx.recv().await.ok().or_else(|| { + log::error!("Failed to receive result"); + None + }) + } + + pub async fn exec_no_return(&mut self, f: F) + where + F: FnOnce(&mut Account), + F: Send + 'static, + { + self.sender + .send(SystemMessage::ExecAccount { + handle: self.handle.clone(), + function: Box::new(f), + }) + .await; + } + + #[allow(clippy::too_many_arguments)] + pub async fn request_input( + &mut self, + title: Option, + primary: Option, + secondary: Option, + default_value: Option, + multiline: bool, + masked: bool, + hint: Option, + ok_text: String, + cancel_text: String, + who: Option, + ) -> Option { + let (tx, rx) = channel::bounded(1); + self.exec_no_return(move |account| { + account.request_input( + title.as_deref(), + primary.as_deref(), + secondary.as_deref(), + default_value.as_deref(), + multiline, + masked, + hint.as_deref(), + &ok_text, + &cancel_text, + move |input_value| { + if let Err(error) = tx.try_send(input_value.map(|v| v.into_owned())) { + log::error!("Failed to send result: {:?}", error); + } + }, + who.as_deref(), + ) + }) + .await; + + rx.recv().await.ok().flatten() + } + + pub async fn is_disconnected(&mut self) -> bool { + self.exec(move |account| account.is_disconnected()) + .await + .unwrap_or(false) + } + + pub async fn set_settings( + &mut self, + settings: T, + ) -> account::settings::Result<()> { + self.exec(move |account| account.set_settings(&settings)) + .await + .transpose() + .and_then(|option| { + option.ok_or_else(|| { + account::settings::Error::Message("Failed to receive result".into()) + }) + }) + } +} + diff --git a/src/messages/connection_proxy.rs b/src/messages/connection_proxy.rs new file mode 100644 index 0000000..8958695 --- /dev/null +++ b/src/messages/connection_proxy.rs @@ -0,0 +1,55 @@ +use super::{FdSender, SystemMessage}; +use crate::Handle; +use async_std::channel; +use purple::{Connection, PurpleConnectionError, PurpleConnectionState}; + +pub struct ConnectionProxy<'a> { + pub handle: Handle, + pub sender: &'a mut FdSender, +} + +impl<'a> ConnectionProxy<'a> { + #[allow(dead_code)] + pub async fn exec(&mut self, f: F) -> Option + where + F: FnOnce(&mut Connection) -> T, + F: Send + 'static, + T: Send + 'static, + { + let (tx, rx) = channel::bounded(1); + self.exec_no_return(move |connection| { + if let Err(error) = tx.try_send(f(connection)) { + log::error!("Failed to send result: {:?}", error); + } + }) + .await; + rx.recv().await.ok().or_else(|| { + log::error!("Failed to receive result"); + None + }) + } + + pub async fn exec_no_return(&mut self, f: F) + where + F: FnOnce(&mut Connection), + F: Send + 'static, + { + self.sender + .send(SystemMessage::ExecConnection { + handle: self.handle.clone(), + function: Box::new(f), + }) + .await; + } + + pub async fn set_state(&mut self, state: PurpleConnectionState) { + self.exec_no_return(move |connection| connection.set_state(state)) + .await + } + + pub async fn error_reason(&mut self, reason: PurpleConnectionError, description: String) { + self.exec_no_return(move |connection| connection.error_reason(reason, &description)) + .await + } +} + diff --git a/src/messages/handle_proxy.rs b/src/messages/handle_proxy.rs new file mode 100644 index 0000000..0cff0c7 --- /dev/null +++ b/src/messages/handle_proxy.rs @@ -0,0 +1,44 @@ +use super::{FdSender, SystemMessage}; +use crate::{Handle, ProtocolData}; +use async_std::channel; + +pub struct HandleProxy<'a> { + pub handle: Handle, + pub sender: &'a mut FdSender, +} + +impl<'a> HandleProxy<'a> { + #[allow(dead_code)] + pub async fn exec(&mut self, f: F) -> Option + where + F: FnOnce(&mut crate::PurpleDelta, &mut ProtocolData) -> T, + F: Send + 'static, + T: Send + 'static, + { + let (tx, rx) = channel::bounded(1); + self.exec_no_return(move |plugin, protocol_data| { + if let Err(error) = tx.try_send(f(plugin, protocol_data)) { + log::error!("Failed to send result: {:?}", error); + } + }) + .await; + rx.recv().await.ok().or_else(|| { + log::error!("Failed to receive result"); + None + }) + } + + pub async fn exec_no_return(&mut self, f: F) + where + F: FnOnce(&mut crate::PurpleDelta, &mut ProtocolData), + F: Send + 'static, + { + self.sender + .send(SystemMessage::ExecHandle { + handle: self.handle.clone(), + function: Box::new(f), + }) + .await; + } +} + diff --git a/src/messages/mod.rs b/src/messages/mod.rs new file mode 100644 index 0000000..1436e70 --- /dev/null +++ b/src/messages/mod.rs @@ -0,0 +1,194 @@ +use self::account_proxy::AccountProxy; +use self::connection_proxy::ConnectionProxy; +use self::handle_proxy::HandleProxy; +use crate::{AccountDataBox, Handle, ProtocolData, PurpleDelta}; +use async_std::channel::{Receiver, Sender}; +use purple::{Account, Connection}; + +mod account_proxy; +mod connection_proxy; +mod handle_proxy; + +pub struct FdSender { + os_sender: os_pipe::PipeWriter, + channel_sender: Sender, +} + +impl FdSender { + pub fn new(os_sender: os_pipe::PipeWriter, channel_sender: Sender) -> Self { + Self { + os_sender, + channel_sender, + } + } + + pub async fn send(&mut self, item: T) { + match self.channel_sender.send(item).await { + Ok(()) => { + use std::io::Write; + self.os_sender.write_all(&[0]).unwrap(); + } + Err(error) => log::error!("Failed to send message: {}", error), + } + } + + pub fn try_send(&mut self, item: T) { + self.channel_sender.try_send(item).unwrap(); + use std::io::Write; + self.os_sender.write_all(&[0]).unwrap(); + } +} + +impl FdSender { + pub fn connection_proxy<'a>(&'a mut self, handle: &Handle) -> ConnectionProxy<'a> { + ConnectionProxy { + handle: handle.clone(), + sender: self, + } + } + + pub fn account_proxy<'a>(&'a mut self, handle: &Handle) -> AccountProxy<'a> { + AccountProxy { + handle: handle.clone(), + sender: self, + } + } + + pub fn handle_proxy<'a>(&'a mut self, handle: &Handle) -> HandleProxy<'a> { + HandleProxy { + handle: handle.clone(), + sender: self, + } + } +} + +impl Clone for FdSender { + fn clone(&self) -> Self { + Self { + os_sender: self.os_sender.try_clone().unwrap(), + channel_sender: self.channel_sender.clone(), + } + } +} + +#[derive(Debug, Clone)] +pub struct AccountInfo { + pub handle: Handle, + pub protocol_data: AccountDataBox, +} + +#[derive(Debug, Clone)] +pub struct PurpleMessageWithHandle { + pub handle: Handle, + pub protocol_data: AccountDataBox, + pub message_data: T, +} + +#[derive(Debug, Clone)] +pub struct JoinChatMessageData { + pub stamp: String, +} + +#[derive(Debug, Clone)] +pub struct SendMsgMessageData { + pub to_sn: String, + pub message: String, +} + +#[derive(Debug, Clone)] +pub struct GetChatInfoMessageData { + pub sn: String, +} + +#[derive(Debug, Clone)] +pub struct GetHistoryMessageData { + pub sn: String, + pub from_msg_id: String, + pub count: i32, +} + +#[derive(Debug)] +pub enum PurpleMessage { + Login(AccountInfo), + JoinChat(JoinChatMessage), + SendMsg(SendMsgMessage), + GetChatInfo(GetChatInfoMessage), + GetHistory(GetHistoryMessage), +} + +pub type JoinChatMessage = PurpleMessageWithHandle; +pub type GetHistoryMessage = PurpleMessageWithHandle; +pub type SendMsgMessage = PurpleMessageWithHandle; +pub type GetChatInfoMessage = PurpleMessageWithHandle; + +impl PurpleMessage { + pub fn join_chat(handle: Handle, protocol_data: AccountDataBox, stamp: String) -> Self { + Self::JoinChat(JoinChatMessage { + handle, + protocol_data, + message_data: JoinChatMessageData { stamp }, + }) + } + + pub fn fetch_history( + handle: Handle, + protocol_data: AccountDataBox, + sn: String, + from_msg_id: String, + count: i32, + ) -> Self { + Self::GetHistory(GetHistoryMessage { + handle, + protocol_data, + message_data: GetHistoryMessageData { + sn, + from_msg_id, + count, + }, + }) + } + + pub fn send_msg( + handle: Handle, + protocol_data: AccountDataBox, + to_sn: String, + message: String, + ) -> Self { + Self::SendMsg(SendMsgMessage { + handle, + protocol_data, + message_data: SendMsgMessageData { to_sn, message }, + }) + } + + pub fn get_chat_info(handle: Handle, protocol_data: AccountDataBox, sn: String) -> Self { + Self::GetChatInfo(GetChatInfoMessage { + handle, + protocol_data, + message_data: GetChatInfoMessageData { sn }, + }) + } +} + +pub enum SystemMessage { + ExecAccount { + handle: Handle, + function: Box, + }, + ExecConnection { + handle: Handle, + function: Box, + }, + ExecHandle { + handle: Handle, + function: Box, + }, + FlushLogs, +} + +pub struct DeltaSystemHandle { + pub input_rx: os_pipe::PipeReader, + pub rx: Receiver, + pub tx: Sender, +} +