Compare commits

...

2 Commits

Author SHA1 Message Date
ef43c81955 Don't start delta until the connect signal is received
We also introduce a sleep() to "fix" a race condition where empathy
tries to contact the connection bus name before the thread has gotten
around to starting it. That really needs fixing better, but for now
I'm focused on basic functionality.
2020-05-12 22:28:36 +01:00
ef1342f372 Move account name escaping code to the end 2020-05-12 22:12:08 +01:00
2 changed files with 118 additions and 109 deletions

View File

@@ -19,59 +19,10 @@ pub const CONN_OBJECT_PATH: &'static str = "/org/freedesktop/Telepathy/Connectio
pub struct Connection {
id: String,
ctx: Arc<RwLock<Context>>,
}
fn escape_one(b: u8) -> String {
format!("_{:0<2x}", b)
}
// Some non-empty sequence of ASCII letters, digits and underscores
fn escape(s: String) -> String {
// Special-case the empty string
if s.len() == 0 {
return "_".to_string();
}
let bytes = s.into_bytes();
let mut iter = bytes.iter();
let mut out = String::new();
// Only alphanumeric in the first byte
let x = *iter.next().expect("Already checked len > 0");
let first = match x {
b'a'..=b'z' | b'A'..=b'Z' => unsafe { String::from_utf8_unchecked(vec![x]) },
_ => escape_one(x),
};
out.push_str(&first);
for x in iter {
let next = match x {
b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' => unsafe {
String::from_utf8_unchecked(vec![*x])
},
_ => escape_one(*x),
};
out.push_str(&next);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_escape() {
assert_eq!(escape("".to_string()), "_");
assert_eq!(escape("foo".to_string()), "foo");
assert_eq!(escape("foo@bar".to_string()), "foo_40bar");
assert_eq!(escape("foo_bar".to_string()), "foo_5fbar");
assert_eq!(escape("foo__@__bar".to_string()), "foo_5f_5f_40_5f_5fbar");
assert_eq!(escape("1foo".to_string()), "_31foo");
}
// set to false when disconnect() is called. Note that it is true even
// before connect() is called
disconnected: Arc<RwLock<bool>>,
}
impl Connection {
@@ -136,6 +87,7 @@ impl Connection {
Ok(Connection {
id: id,
ctx: Arc::new(RwLock::new(ctx)),
disconnected: Arc::new(RwLock::new(false)),
})
}
@@ -146,7 +98,7 @@ impl Connection {
pub fn run(self) {
let bus = self.bus();
let path = self.path();
let ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let c_rc = std::rc::Rc::new(self);
let f = tree::Factory::new_fn::<()>();
@@ -182,58 +134,7 @@ impl Connection {
// Set up delta jobs last in case registering to DBUS fails
// "Borrowed" from https://github.com/deltachat/deltachat-core-rust/blob/master/examples/simple.rs
let running = Arc::new(RwLock::new(true));
let inbox_ctx = ctx.clone();
let r1 = running.clone();
let _t1 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
}
}
}
});
let smtp_ctx = ctx.clone();
let r1 = running.clone();
let _t2 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
}
}
});
let mvbox_ctx = ctx.clone();
let r1 = running.clone();
let _t3 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
}
}
});
let sentbox_ctx = ctx.clone();
let r1 = running.clone();
let _t4 = thread::spawn(move || {
while *r1.read().unwrap() {
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
if *r1.read().unwrap() {
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
}
}
});
let r1 = running.clone();
while *r1.read().unwrap() {
while !*disconnected.read().unwrap() {
match c.process(Duration::from_millis(100)) {
Err(e) => {
println!("Error processing: {}", e);
@@ -241,9 +142,10 @@ impl Connection {
}
_ => {}
}
// TODO: notice when the conn wants to exit
}
// TODO: clean up, emit disconnected signal. Join on threads started in
// connect() ?
}
pub fn bus(&self) -> String {
@@ -262,8 +164,59 @@ impl AsRef<dyn telepathy::Connection + 'static> for std::rc::Rc<Connection> {
}
impl telepathy::Connection for Connection {
// In connect(), we start the threads that drive the deltachat context
fn connect(&self) -> Result<(), tree::MethodErr> {
Err(tree::MethodErr::no_arg()) // FIXME: should be NotImplemented?
println!("Connection<{}>::connect()", self.id);
let inbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _inbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_inbox_jobs(&inbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_inbox_fetch(&inbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_inbox_idle(&inbox_ctx.read().unwrap());
}
}
}
});
let smtp_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _smtp_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_smtp_jobs(&smtp_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_smtp_idle(&smtp_ctx.read().unwrap());
}
}
});
let mvbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _mvbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_mvbox_fetch(&mvbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_mvbox_idle(&mvbox_ctx.read().unwrap());
}
}
});
let sentbox_ctx = self.ctx.clone();
let disconnected = self.disconnected.clone();
let _sentbox_thread = thread::spawn(move || {
while !*disconnected.read().unwrap() {
dc::job::perform_sentbox_fetch(&sentbox_ctx.read().unwrap());
if !*disconnected.read().unwrap() {
dc::job::perform_sentbox_idle(&sentbox_ctx.read().unwrap());
}
}
});
Ok(())
}
fn disconnect(&self) -> Result<(), tree::MethodErr> {
@@ -390,3 +343,56 @@ impl telepathy::Connection for Connection {
Ok(true)
}
}
fn escape_one(b: u8) -> String {
format!("_{:0<2x}", b)
}
// Some non-empty sequence of ASCII letters, digits and underscores
fn escape(s: String) -> String {
// Special-case the empty string
if s.len() == 0 {
return "_".to_string();
}
let bytes = s.into_bytes();
let mut iter = bytes.iter();
let mut out = String::new();
// Only alphanumeric in the first byte
let x = *iter.next().expect("Already checked len > 0");
let first = match x {
b'a'..=b'z' | b'A'..=b'Z' => unsafe { String::from_utf8_unchecked(vec![x]) },
_ => escape_one(x),
};
out.push_str(&first);
for x in iter {
let next = match x {
b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' => unsafe {
String::from_utf8_unchecked(vec![*x])
},
_ => escape_one(*x),
};
out.push_str(&next);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_escape() {
assert_eq!(escape("".to_string()), "_");
assert_eq!(escape("foo".to_string()), "foo");
assert_eq!(escape("foo@bar".to_string()), "foo_40bar");
assert_eq!(escape("foo_bar".to_string()), "foo_5fbar");
assert_eq!(escape("foo__@__bar".to_string()), "foo_5f_5f_40_5f_5fbar");
assert_eq!(escape("1foo".to_string()), "_31foo");
}
}

View File

@@ -59,13 +59,16 @@ impl ConnectionManager {
std::thread::spawn(move || conn.run());
// Emit a NewConnection signal for the benefit of others, but the caller
// learns immediately
// learns from our RPC response
let sig = telepathy::ConnectionManagerNewConnection {
bus_name: bus.to_owned(),
object_path: dbus_conn_path.clone(),
protocol: super::PROTO_NAME.to_string(),
};
// FIXME: ...but for now, just sleep for a bit
std::thread::sleep(std::time::Duration::from_secs(1));
let dbus_cm_path = dbus::strings::Path::new(CM_OBJECT_PATH.to_string())
.expect("Object path should meet DBUS requirements");