Compare commits

..

7 Commits

Author SHA1 Message Date
29ea694b1e Start running delta connections. Lots of problems still 2021-10-31 21:17:42 +00:00
eec8669b88 Get delta contexts chugging 2021-10-31 16:28:46 +00:00
e99d824227 Keep hold of the Accounts struct 2021-10-31 01:31:02 +01:00
f57cdf3cdc Initialize Delta accounts on plugin load
We don't yet configure any of the accounts, but we're set up to manage
several accounts simultaneously in the same plugin instance, using
delta's built-in support for that.
2021-10-31 01:19:36 +01:00
039807bb32 Drop openssl fork, move to rust 2021 edition
The openssl-sys crate now natively supports openssl 3 on the master
branch, although there doesn't seem to be a release containing it yet.
2021-10-30 16:13:38 +01:00
fd321e02d8 Use OpenSSL 3 2021-09-09 18:18:31 +01:00
3142134360 Initial framework based off pidgin-icq 2021-04-15 00:41:54 +01:00
12 changed files with 1321 additions and 621 deletions

1090
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,8 @@
name = "purple-plugin-delta"
version = "0.1.0"
authors = ["Nick Thomas <delta@ur.gs>"]
rust-version = "1.56"
edition = "2021"
[lib]
name = "purple_delta"
@@ -9,15 +11,21 @@ path = "src/lib.rs"
crate-type = ["dylib"]
[dependencies]
deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag="v1.51.0" }
lazy_static = "1.4.0"
log = "0.4.8"
purple-rs = { git = "https://github.com/lupine/purple-rs", branch="next" }
deltachat = { git = "https://github.com/deltachat/deltachat-core-rust", tag = "1.61.0" }
lazy_static = "*"
log = "*"
openssl = "*"
os_pipe = "*"
purple-rs = { git = "https://github.com/Flared/purple-rs", branch = "master" }
serde = "*"
# Keep in sync with deltachat-core-rust
## Keep in sync with deltachat-core-rust ##
[dependencies.async-std]
version = "~1.8"
version = "1"
features = ["unstable"]
[profile.release]
lto = true
[patch.crates-io]
openssl-sys = { git = "https://github.com/sfackler/rust-openssl", branch = "master" }

View File

@@ -33,22 +33,15 @@ Starting again from scratch in Rust. So currently, nothing works. TODO list:
- [ ] Send/receive video messages
- [ ] Send/receive arbitrary attachments
## Build
There are some licensing issues at present, so you shouldn't build this plugin.
To get a `target/debug/libpurple_delta.so`, just run `cargo build`.
`deltachat-core-rust` uses a vendored openssl 1, unconditionally links it, and
is MPL-licensed.
`purple-plugin-delta` is GPLv3 without the [OpenSSL exemption](https://people.gnome.org/~markmc/openssl-and-the-gpl.html)
`libpurple` itself is GPLv2 without the OpenSSL exemption.
There's no point to `purple-plugin-delta` adding the OpenSSL exemption because
`libpurple` lacks it, and in any event, it will be unnecessary with the next
major version of OpenSSL. So, time should resolve this for us one way or another.
Since purple-plugin-delta is made to link against libpurple, which is GPLv2
without the "OpenSSL exemption", distributing something that linked against
OpenSSL 1 would be a licensing violation. Instead, we configure the build system
so we statically link against a vendored OpenSSL 3 instead. This has only been
possible since 2021-09-07.
Significant code using the WTFPL includes the [libpurple-rust bindings](https://github.com/sbwtw/libpurple-rust)
and the [pidgin-wechat plugin](https://github.com/sbwtw/pidgin-wechat), which
@@ -57,7 +50,7 @@ against this mess.
## Use
The easiest way to use this is to copy the `libdelta.so` file into
The easiest way to use this is to copy the `libpurple_delta.so` file into
`~/.purple/plugins`. When running pidgin, you'll now have the option to add
a "Delta Chat" account.

View File

@@ -1 +1 @@
1.50.0
1.56.0

View File

@@ -1 +1 @@
pub mod system;

252
src/delta/system.rs Normal file
View File

@@ -0,0 +1,252 @@
// use super::poller;
// use super::protocol;
use crate::logging;
use crate::messages::{
AccountInfo, DeltaSystemHandle, FdSender, GetChatInfoMessage, GetHistoryMessage,
JoinChatMessage, PurpleMessage, SendMsgMessage, SystemMessage,
};
// use crate::{Handle, ChatInfo};
use async_std::channel::{self, Receiver};
use deltachat::accounts::Accounts;
const CHANNEL_CAPACITY: usize = 1024;
pub fn spawn() -> DeltaSystemHandle {
let (input_rx, input_tx) = os_pipe::pipe().unwrap();
let (system_tx, system_rx) = channel::bounded::<SystemMessage>(CHANNEL_CAPACITY);
let (purple_tx, purple_rx) = channel::bounded(CHANNEL_CAPACITY);
let fd_sender = FdSender::new(input_tx, system_tx);
log::debug!("Starting async thread.");
std::thread::spawn(move || run(fd_sender, purple_rx));
DeltaSystemHandle {
input_rx,
rx: system_rx,
tx: purple_tx,
}
}
pub fn run(tx: FdSender<SystemMessage>, rx: Receiver<PurpleMessage>) {
logging::set_thread_logger(logging::RemoteLogger(tx.clone()));
log::info!("Starting Delta system");
log::debug!("Performing delta accounts setup");
let mut config_dir = DeltaSystem::user_dir();
config_dir.push("purple-plugin-delta");
let accounts =
async_std::task::block_on(Accounts::new("purple-plugin-delta".into(), config_dir)).unwrap();
let mut system = DeltaSystem::new(tx, rx, accounts);
async_std::task::block_on(system.run());
}
pub struct DeltaSystem {
tx: FdSender<SystemMessage>,
rx: Receiver<PurpleMessage>,
accounts: Accounts,
}
impl DeltaSystem {
fn new(tx: FdSender<SystemMessage>, rx: Receiver<PurpleMessage>, accounts: Accounts) -> Self {
Self { tx, rx, accounts }
}
fn user_dir() -> async_std::path::PathBuf {
use std::os::unix::ffi::OsStrExt;
// SAFETY: We trust libpurple here
let slice = unsafe { std::ffi::CStr::from_ptr(crate::purple_sys::purple_user_dir()) };
let osstr = std::ffi::OsStr::from_bytes(slice.to_bytes());
let path: &async_std::path::Path = osstr.as_ref();
path.into()
}
async fn run(&mut self) {
log::info!("Looping on messages");
loop {
let purple_message = match self.rx.recv().await {
Ok(r) => r,
Err(error) => {
log::error!("Failed to receive message: {:?}", error);
break;
}
};
log::info!("Message: {:?}", purple_message);
let result = match purple_message {
PurpleMessage::Login(account_info) => self.login(account_info).await,
PurpleMessage::JoinChat(m) => self.join_chat(m).await,
PurpleMessage::SendMsg(m) => self.send_msg(m).await,
PurpleMessage::GetChatInfo(m) => self.get_chat_info(m).await,
PurpleMessage::GetHistory(m) => self.get_history(m).await,
};
if let Err(error) = result {
log::error!("Error handling message: {}", error);
}
logging::flush();
}
}
async fn login(&mut self, account_info: AccountInfo) -> std::result::Result<(), String> {
log::debug!("login");
let email_address = { account_info.protocol_data.email_address.clone() };
let password = { account_info.protocol_data.imap_pass.clone() };
let handle = &account_info.handle;
self.tx
.connection_proxy(&handle)
.set_state(purple::PurpleConnectionState::PURPLE_CONNECTING)
.await;
// TODO: make this properly async
let ctx = match self
.accounts
.get_all()
.await
.into_iter()
.map(|id| async_std::task::block_on(self.accounts.get_account(id)).unwrap())
.find(|ctx| {
async_std::task::block_on(ctx.is_self_addr(&email_address)).unwrap_or(false)
}) {
None => {
let id = self.accounts.add_account().await.unwrap();
self.accounts.get_account(id).await.unwrap()
}
Some(ctx) => ctx,
};
// Now transpose config into ctx. TODO: rest of the fields
ctx.set_config(deltachat::config::Config::Addr, Some(&email_address))
.await
.unwrap();
ctx.set_config(deltachat::config::Config::MailPw, Some(&password))
.await
.unwrap();
// FIXME: handle configuration failure nicely here. Right now we just panic.
ctx.configure().await.unwrap();
ctx.start_io().await;
async_std::task::spawn_local(DeltaSystem::deltachat_events(ctx));
// Hint from deleted code:
// self.tx.account_proxy(&handle).exec(|purple_account| ...);
Ok(())
}
async fn deltachat_events(ctx: deltachat::context::Context) {
// TODO: loop until we're out of events
let emitter = ctx.get_event_emitter();
while let Some(event) = emitter.recv().await {
println!("Received event {:?}", event);
}
// FIXME: this is back to front. We need to stop_io to interrupt the loop
ctx.stop_io().await;
}
async fn get_chat_info(&mut self, message: GetChatInfoMessage) -> Result<(), String> {
log::info!("Get chat info sn: {}", message.message_data.sn);
/*
let session = { message.protocol_data.session.read().await.clone().unwrap() };
let chat_info_response = protocol::get_chat_info_by_sn(&session, &message.message_data.sn)
.await
.map_err(|e| format!("Failed to get chat info: {:?}", e))?;
self.tx
.handle_proxy(&message.handle)
.exec_no_return(move |plugin, protocol_data| {
let chat_info = ChatInfo::from(chat_info_response);
let connection = &mut protocol_data.connection;
plugin.load_chat_info(connection, &chat_info);
})
.await;
*/
Ok(())
}
async fn join_chat(&mut self, message: JoinChatMessage) -> Result<(), String> {
log::info!("Joining stamp: {}", message.message_data.stamp);
/*
let session = { message.protocol_data.session.read().await.clone().unwrap() };
let stamp = message.message_data.stamp;
// Handle shareable URLs: https://icq.im/XXXXXXXXXXXXXX
let stamp = if stamp.contains("icq.im/") {
stamp.rsplit('/').next().unwrap().into()
} else {
stamp
};
protocol::join_chat(&session, &stamp)
.await
.map_err(|e| format!("Failed to join chat: {:?}", e))?;
let chat_info_response = protocol::get_chat_info(&session, &stamp)
.await
.map_err(|e| format!("Failed to get chat info: {:?}", e))?;
self.tx
.handle_proxy(&message.handle)
.exec_no_return(move |plugin, protocol_data| {
let chat_info = ChatInfo::from(chat_info_response);
let partial_info = chat_info.as_partial();
let connection = &mut protocol_data.connection;
plugin.chat_joined(connection, &partial_info);
plugin.conversation_joined(connection, &partial_info);
plugin.load_chat_info(connection, &chat_info);
})
.await;
*/
Ok(())
}
async fn get_history(&mut self, _get_history_message: GetHistoryMessage) -> Result<(), String> {
/*
let session = {
get_history_message
.protocol_data
.session
.read()
.await
.clone()
.unwrap()
};
let sn = &get_history_message.message_data.sn;
let from_msg_id = &get_history_message.message_data.from_msg_id;
let count = get_history_message.message_data.count;
let history = protocol::get_history(&session, sn, from_msg_id, count)
.await
.map_err(|e| format!("Failed to get history: {:?}", e))?;
super::poller::process_hist_dlg_state_messages(
self.tx.clone(),
session,
get_history_message.handle,
sn,
&history.persons,
None,
&history.messages,
)
.await;
*/
Ok(())
}
async fn send_msg(&mut self, message: SendMsgMessage) -> Result<(), String> {
log::info!("send_msg({:?})", message);
/*
let to_sn = &message.message_data.to_sn;
let message_body = &message.message_data.message;
let session = { message.protocol_data.session.read().await.clone().unwrap() };
let _msg_info = protocol::send_im(&session, to_sn, message_body)
.await
.map_err(|e| format!("Failed to send msg: {:?}", e))?;
*/
Ok(())
}
}

View File

@@ -1,12 +1,14 @@
extern crate async_std;
extern crate deltachat;
extern crate lazy_static;
extern crate log;
extern crate openssl;
extern crate purple_rs as purple;
use async_std::sync::Arc; // RwLock
use chat_info::{ChatInfo, PartialChatInfo}; //ChatInfoVersion
use async_std::sync::Arc;
// use chat_info::ChatInfo; //PartialChatInfo, ChatInfoVersion
use lazy_static::lazy_static;
//use messages::{AccountInfo, ICQSystemHandle, PurpleMessage, SystemMessage};
use messages::{AccountInfo, DeltaSystemHandle, PurpleMessage, SystemMessage};
use purple::*;
//use std::cell::RefCell;
use std::ffi::{CStr, CString};
@@ -14,10 +16,10 @@ use std::ffi::{CStr, CString};
//use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
mod chat_info;
mod delta;
pub mod chat_info;
pub mod delta;
pub mod logging;
//mod messages;
pub mod messages;
pub mod status {
use lazy_static::lazy_static;
@@ -74,6 +76,7 @@ pub struct MsgInfo {
#[derive(Debug, Default)]
pub struct AccountData {
email_address: String,
display_name: String,
imap_host: String,
imap_port: String,
@@ -87,7 +90,6 @@ pub struct AccountData {
// Not exposed: server_flags, selfstatus, e2ee_enabled
session_closed: AtomicBool,
// session: RwLock<Option<icq::protocol::SessionInfo>>,
}
impl Drop for AccountData {
@@ -101,9 +103,8 @@ pub type Handle = purple::Handle<AccountDataBox>;
pub type ProtocolData = purple::ProtocolData<AccountDataBox>;
pub struct PurpleDelta {
// system: ICQSystemHandle,
system: DeltaSystemHandle,
connections: purple::Connections<AccountDataBox>,
input_handle: Option<u32>,
imex_command_handle: Option<PurpleCmdId>,
}
@@ -112,15 +113,16 @@ impl purple::PrplPlugin for PurpleDelta {
fn new() -> Self {
logging::init(log::LevelFilter::Debug).expect("Failed to initialize logging");
// let system = icq::system::spawn();
let system = delta::system::spawn();
Self {
// system,
input_handle: None,
system,
imex_command_handle: None,
connections: purple::Connections::new(),
}
}
fn register(&self, context: RegisterContext<Self>) -> RegisterContext<Self> {
println!("OpenSSL version: {}", openssl::version::version());
let info = purple::PrplInfo {
id: "prpl-delta".into(),
name: "Delta Chat".into(),
@@ -137,7 +139,8 @@ impl purple::PrplPlugin for PurpleDelta {
.with_string_option("Display Name".into(), "displayname".into(), "".into())
.with_string_option("IMAP server host".into(), "mail_server".into(), "".into())
.with_string_option("IMAP server port".into(), "mail_port".into(), "".into())
// Username and password are mail_user and mail_pw
.with_string_option("IMAP server username".into(), "mail_user".into(), "".into())
// Password is account password
.with_string_option("SMTP server host".into(), "send_server".into(), "".into())
.with_string_option("SMTP server port".into(), "send_port".into(), "".into())
.with_string_option("SMTP server username".into(), "send_user".into(), "".into())
@@ -161,11 +164,12 @@ impl purple::PrplPlugin for PurpleDelta {
impl purple::LoginHandler for PurpleDelta {
fn login(&mut self, account: &mut Account) {
let email_address = account.get_username().unwrap().into();
let display_name = account.get_string("displayname", "");
let imap_host = account.get_string("mail_server", "");
let imap_port = account.get_string("mail_port", "");
let imap_user = account.get_username().unwrap().into();
let imap_user = account.get_string("mail_user", "");
let imap_pass = account.get_password().unwrap().into();
let smtp_host = account.get_string("send_server", "");
@@ -176,6 +180,7 @@ impl purple::LoginHandler for PurpleDelta {
let bcc_self = account.get_bool("bcc_self", false);
let protocol_data: AccountDataBox = Arc::new(AccountData {
email_address,
display_name,
imap_host,
@@ -191,22 +196,22 @@ impl purple::LoginHandler for PurpleDelta {
bcc_self,
session_closed: AtomicBool::new(false),
// session: RwLock::new(None),
});
// SAFETY:
// Safe as long as we remove the account in "close".
unsafe {
self.connections
.add(account.get_connection().unwrap(), protocol_data.clone())
};
/*
self.system
.tx
.try_send(PurpleMessage::Login(AccountInfo {
handle: Handle::from(&mut *account),
protocol_data,
}))
.unwrap();*/
.unwrap();
}
}
impl purple::CloseHandler for PurpleDelta {
@@ -218,6 +223,7 @@ impl purple::CloseHandler for PurpleDelta {
.data
.session_closed
.store(true, Ordering::Relaxed);
self.connections.remove(*connection);
}
None => {
@@ -247,13 +253,7 @@ impl purple::StatusTypeHandler for PurpleDelta {
impl purple::LoadHandler for PurpleDelta {
fn load(&mut self, _plugin: &purple::Plugin) -> bool {
logging::set_thread_logger(logging::PurpleDebugLogger);
//use std::os::unix::io::AsRawFd;
/*
self.input_handle = Some(self.enable_input(
self.system.input_rx.as_raw_fd(),
purple::PurpleInputCondition::PURPLE_INPUT_READ,
));
*/
self.imex_command_handle =
Some(self.enable_command(commands::IMEX, "w", "imex &lt;code&gt;"));
@@ -429,33 +429,6 @@ impl purple::ChatSendHandler for PurpleDelta {
1
}
}
impl purple::InputHandler for PurpleDelta {
fn input(&mut self, _fd: i32, _cond: purple::PurpleInputCondition) {
log::debug!("Input");
/*
// Consume the byte from the input pipe.
let mut buf = [0; 1];
self.system
.input_rx
.read_exact(&mut buf)
.expect("Failed to read input pipe");
// Consume the actual message.
match self.system.rx.try_recv() {
Ok(message) => self.process_message(message),
Err(async_std::sync::TryRecvError::Empty) => log::error!("Expected message, but empty"),
Err(async_std::sync::TryRecvError::Disconnected) => {
log::error!("System disconnected");
if let Some(input_handle) = self.input_handle {
self.disable_input(input_handle);
}
}
};
*/
}
}
*/
impl purple::CommandHandler for PurpleDelta {
fn command(
@@ -481,7 +454,7 @@ impl purple::CommandHandler for PurpleDelta {
}
impl PurpleDelta {
fn command_imex(&mut self, conversation: &mut Conversation, args: &[&str]) -> PurpleCmdRet {
fn command_imex(&mut self, _conversation: &mut Conversation, args: &[&str]) -> PurpleCmdRet {
log::debug!("command_imex");
if args.len() != 1 {
@@ -551,40 +524,56 @@ impl PurpleDelta {
*/
PurpleCmdRet::PURPLE_CMD_RET_OK
}
/*
fn process_message(&mut self, message: SystemMessage) {
match message {
SystemMessage::ExecAccount { handle, function } => {
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.account))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
}
SystemMessage::ExecConnection { handle, function } => {
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.connection))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
}
SystemMessage::ExecHandle { handle, function } => {
self.connections
.get(handle)
.map(|mut protocol_data| function(self, &mut protocol_data))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
}
SystemMessage::FlushLogs => logging::flush(),
}
}
fn process_message(&mut self, message: SystemMessage) {
log::info!("received system message");
match message {
SystemMessage::ExecAccount {
handle: _,
function: _,
} => {
/*
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.account))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::ExecConnection {
handle: _,
function: _,
} => {
/*
self.connections
.get(handle)
.map(|protocol_data| function(&mut protocol_data.connection))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::ExecHandle {
handle: _,
function: _,
} => {
/*
self.connections
.get(handle)
.map(|mut protocol_data| function(self, &mut protocol_data))
.or_else(|| {
log::warn!("The account connection has been closed");
None
});
*/
}
SystemMessage::FlushLogs => logging::flush(),
}
}
/*
pub fn serv_got_chat_in(&mut self, connection: &mut Connection, msg_info: MsgInfo) {
match purple::Chat::find(&mut connection.get_account(), &msg_info.chat_sn) {
Some(mut chat) => {

View File

@@ -1,6 +1,6 @@
// This is a copy of https://github.com/Flared/purple-icq/blob/master/src/logging.rs
//use crate::messages::{FdSender, SystemMessage};
use crate::messages::{FdSender, SystemMessage};
use crate::purple;
use std::cell::RefCell;
use std::sync::Mutex;
@@ -76,7 +76,6 @@ impl log::Log for PurpleDebugLogger {
}
}
/*
pub struct RemoteLogger(pub FdSender<SystemMessage>);
impl log::Log for RemoteLogger {
@@ -100,7 +99,7 @@ impl log::Log for RemoteLogger {
self.0.clone().try_send(SystemMessage::FlushLogs);
}
}
*/
pub fn init(level: log::LevelFilter) -> Result<(), log::SetLoggerError> {
log::set_logger(&TLS_LOGGER).map(|()| log::set_max_level(level))
}

View File

@@ -0,0 +1,101 @@
use super::{FdSender, SystemMessage};
use crate::Handle;
use async_std::channel;
use purple::{account, Account};
pub struct AccountProxy<'a> {
pub handle: Handle,
pub sender: &'a mut FdSender<SystemMessage>,
}
impl<'a> AccountProxy<'a> {
pub async fn exec<F, T>(&mut self, f: F) -> Option<T>
where
F: FnOnce(&mut Account) -> T,
F: Send + 'static,
T: Send + 'static,
{
let (tx, rx) = channel::bounded(1);
self.exec_no_return(move |account| {
if let Err(error) = tx.try_send(f(account)) {
log::error!("Failed to send result: {:?}", error);
}
})
.await;
rx.recv().await.ok().or_else(|| {
log::error!("Failed to receive result");
None
})
}
pub async fn exec_no_return<F>(&mut self, f: F)
where
F: FnOnce(&mut Account),
F: Send + 'static,
{
self.sender
.send(SystemMessage::ExecAccount {
handle: self.handle.clone(),
function: Box::new(f),
})
.await;
}
#[allow(clippy::too_many_arguments)]
pub async fn request_input(
&mut self,
title: Option<String>,
primary: Option<String>,
secondary: Option<String>,
default_value: Option<String>,
multiline: bool,
masked: bool,
hint: Option<String>,
ok_text: String,
cancel_text: String,
who: Option<String>,
) -> Option<String> {
let (tx, rx) = channel::bounded(1);
self.exec_no_return(move |account| {
account.request_input(
title.as_deref(),
primary.as_deref(),
secondary.as_deref(),
default_value.as_deref(),
multiline,
masked,
hint.as_deref(),
&ok_text,
&cancel_text,
move |input_value| {
if let Err(error) = tx.try_send(input_value.map(|v| v.into_owned())) {
log::error!("Failed to send result: {:?}", error);
}
},
who.as_deref(),
)
})
.await;
rx.recv().await.ok().flatten()
}
pub async fn is_disconnected(&mut self) -> bool {
self.exec(move |account| account.is_disconnected())
.await
.unwrap_or(false)
}
pub async fn set_settings<T: 'static + serde::Serialize + Send>(
&mut self,
settings: T,
) -> account::settings::Result<()> {
self.exec(move |account| account.set_settings(&settings))
.await
.transpose()
.and_then(|option| {
option.ok_or_else(|| {
account::settings::Error::Message("Failed to receive result".into())
})
})
}
}

View File

@@ -0,0 +1,54 @@
use super::{FdSender, SystemMessage};
use crate::Handle;
use async_std::channel;
use purple::{Connection, PurpleConnectionError, PurpleConnectionState};
pub struct ConnectionProxy<'a> {
pub handle: Handle,
pub sender: &'a mut FdSender<SystemMessage>,
}
impl<'a> ConnectionProxy<'a> {
#[allow(dead_code)]
pub async fn exec<F, T>(&mut self, f: F) -> Option<T>
where
F: FnOnce(&mut Connection) -> T,
F: Send + 'static,
T: Send + 'static,
{
let (tx, rx) = channel::bounded(1);
self.exec_no_return(move |connection| {
if let Err(error) = tx.try_send(f(connection)) {
log::error!("Failed to send result: {:?}", error);
}
})
.await;
rx.recv().await.ok().or_else(|| {
log::error!("Failed to receive result");
None
})
}
pub async fn exec_no_return<F>(&mut self, f: F)
where
F: FnOnce(&mut Connection),
F: Send + 'static,
{
self.sender
.send(SystemMessage::ExecConnection {
handle: self.handle.clone(),
function: Box::new(f),
})
.await;
}
pub async fn set_state(&mut self, state: PurpleConnectionState) {
self.exec_no_return(move |connection| connection.set_state(state))
.await
}
pub async fn error_reason(&mut self, reason: PurpleConnectionError, description: String) {
self.exec_no_return(move |connection| connection.error_reason(reason, &description))
.await
}
}

View File

@@ -0,0 +1,43 @@
use super::{FdSender, SystemMessage};
use crate::{Handle, ProtocolData};
use async_std::channel;
pub struct HandleProxy<'a> {
pub handle: Handle,
pub sender: &'a mut FdSender<SystemMessage>,
}
impl<'a> HandleProxy<'a> {
#[allow(dead_code)]
pub async fn exec<F, T>(&mut self, f: F) -> Option<T>
where
F: FnOnce(&mut crate::PurpleDelta, &mut ProtocolData) -> T,
F: Send + 'static,
T: Send + 'static,
{
let (tx, rx) = channel::bounded(1);
self.exec_no_return(move |plugin, protocol_data| {
if let Err(error) = tx.try_send(f(plugin, protocol_data)) {
log::error!("Failed to send result: {:?}", error);
}
})
.await;
rx.recv().await.ok().or_else(|| {
log::error!("Failed to receive result");
None
})
}
pub async fn exec_no_return<F>(&mut self, f: F)
where
F: FnOnce(&mut crate::PurpleDelta, &mut ProtocolData),
F: Send + 'static,
{
self.sender
.send(SystemMessage::ExecHandle {
handle: self.handle.clone(),
function: Box::new(f),
})
.await;
}
}

193
src/messages/mod.rs Normal file
View File

@@ -0,0 +1,193 @@
use self::account_proxy::AccountProxy;
use self::connection_proxy::ConnectionProxy;
use self::handle_proxy::HandleProxy;
use crate::{AccountDataBox, Handle, ProtocolData, PurpleDelta};
use async_std::channel::{Receiver, Sender};
use purple::{Account, Connection};
mod account_proxy;
mod connection_proxy;
mod handle_proxy;
pub struct FdSender<T> {
os_sender: os_pipe::PipeWriter,
channel_sender: Sender<T>,
}
impl<T> FdSender<T> {
pub fn new(os_sender: os_pipe::PipeWriter, channel_sender: Sender<T>) -> Self {
Self {
os_sender,
channel_sender,
}
}
pub async fn send(&mut self, item: T) {
match self.channel_sender.send(item).await {
Ok(()) => {
use std::io::Write;
self.os_sender.write_all(&[0]).unwrap();
}
Err(error) => log::error!("Failed to send message: {}", error),
}
}
pub fn try_send(&mut self, item: T) {
self.channel_sender.try_send(item).unwrap();
use std::io::Write;
self.os_sender.write_all(&[0]).unwrap();
}
}
impl FdSender<SystemMessage> {
pub fn connection_proxy<'a>(&'a mut self, handle: &Handle) -> ConnectionProxy<'a> {
ConnectionProxy {
handle: handle.clone(),
sender: self,
}
}
pub fn account_proxy<'a>(&'a mut self, handle: &Handle) -> AccountProxy<'a> {
AccountProxy {
handle: handle.clone(),
sender: self,
}
}
pub fn handle_proxy<'a>(&'a mut self, handle: &Handle) -> HandleProxy<'a> {
HandleProxy {
handle: handle.clone(),
sender: self,
}
}
}
impl<T> Clone for FdSender<T> {
fn clone(&self) -> Self {
Self {
os_sender: self.os_sender.try_clone().unwrap(),
channel_sender: self.channel_sender.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct AccountInfo {
pub handle: Handle,
pub protocol_data: AccountDataBox,
}
#[derive(Debug, Clone)]
pub struct PurpleMessageWithHandle<T> {
pub handle: Handle,
pub protocol_data: AccountDataBox,
pub message_data: T,
}
#[derive(Debug, Clone)]
pub struct JoinChatMessageData {
pub stamp: String,
}
#[derive(Debug, Clone)]
pub struct SendMsgMessageData {
pub to_sn: String,
pub message: String,
}
#[derive(Debug, Clone)]
pub struct GetChatInfoMessageData {
pub sn: String,
}
#[derive(Debug, Clone)]
pub struct GetHistoryMessageData {
pub sn: String,
pub from_msg_id: String,
pub count: i32,
}
#[derive(Debug)]
pub enum PurpleMessage {
Login(AccountInfo),
JoinChat(JoinChatMessage),
SendMsg(SendMsgMessage),
GetChatInfo(GetChatInfoMessage),
GetHistory(GetHistoryMessage),
}
pub type JoinChatMessage = PurpleMessageWithHandle<JoinChatMessageData>;
pub type GetHistoryMessage = PurpleMessageWithHandle<GetHistoryMessageData>;
pub type SendMsgMessage = PurpleMessageWithHandle<SendMsgMessageData>;
pub type GetChatInfoMessage = PurpleMessageWithHandle<GetChatInfoMessageData>;
impl PurpleMessage {
pub fn join_chat(handle: Handle, protocol_data: AccountDataBox, stamp: String) -> Self {
Self::JoinChat(JoinChatMessage {
handle,
protocol_data,
message_data: JoinChatMessageData { stamp },
})
}
pub fn fetch_history(
handle: Handle,
protocol_data: AccountDataBox,
sn: String,
from_msg_id: String,
count: i32,
) -> Self {
Self::GetHistory(GetHistoryMessage {
handle,
protocol_data,
message_data: GetHistoryMessageData {
sn,
from_msg_id,
count,
},
})
}
pub fn send_msg(
handle: Handle,
protocol_data: AccountDataBox,
to_sn: String,
message: String,
) -> Self {
Self::SendMsg(SendMsgMessage {
handle,
protocol_data,
message_data: SendMsgMessageData { to_sn, message },
})
}
pub fn get_chat_info(handle: Handle, protocol_data: AccountDataBox, sn: String) -> Self {
Self::GetChatInfo(GetChatInfoMessage {
handle,
protocol_data,
message_data: GetChatInfoMessageData { sn },
})
}
}
pub enum SystemMessage {
ExecAccount {
handle: Handle,
function: Box<dyn FnOnce(&mut Account) + Send + 'static>,
},
ExecConnection {
handle: Handle,
function: Box<dyn FnOnce(&mut Connection) + Send + 'static>,
},
ExecHandle {
handle: Handle,
function: Box<dyn FnOnce(&mut PurpleDelta, &mut ProtocolData) + Send + 'static>,
},
FlushLogs,
}
pub struct DeltaSystemHandle {
pub input_rx: os_pipe::PipeReader,
pub rx: Receiver<SystemMessage>,
pub tx: Sender<PurpleMessage>,
}