ferretro-synced/src/sync/comms.rs

131 lines
4.4 KiB
Rust

use std::path::{Path, PathBuf};
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use std::thread::JoinHandle;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json;
use websocket::client::ClientBuilder;
use websocket::receiver::Reader;
use websocket::sender::Writer;
use websocket::{Message, OwnedMessage};
const CONNECTION: &'static str = "ws://127.0.0.1:8765";
pub struct Communication<T> {
rx: Receiver<T>,
tx: Sender<OwnedMessage>,
inbound_thread: JoinHandle<Result<(), failure::Error>>,
outbound_thread: JoinHandle<Result<(), failure::Error>>,
}
pub struct CommunicationSettings {
pub connection: String
}
impl<T> Communication<T> where T: std::marker::Send, T: Serialize, T: DeserializeOwned, T: 'static {
pub fn new(settings: CommunicationSettings) -> Self {
let (tx, from_main): (Sender<OwnedMessage>, Receiver<OwnedMessage>) = mpsc::channel();
let (to_main, rx): (Sender<T>, Receiver<T>) = mpsc::channel();
// transmitter for ping and close messages
let tx_1 = tx.clone();
let ws_client = ClientBuilder::new(settings.connection.as_str())
.unwrap()
.add_protocol("rust-websocket")
.connect_insecure()
.unwrap();
let (mut ws_receiver, mut ws_sender) = ws_client.split().unwrap();
let outbound = thread::spawn(move || -> Result<(), failure::Error> {
let mut from_main = from_main;
let mut ws_sender = ws_sender;
let mut flush = || -> Result<(), failure::Error> {
let msg = from_main.recv()?;
println!("sending a message {:?}", &msg);
ws_sender.send_message(&msg)?;
Ok(())
};
loop {
match flush() {
Ok(_) => (),
Err(e) => {
println!("Error in outbound thread: {}", e);
return Err(e)
}
}
}
});
let inbound = thread::spawn(move || -> Result<(), failure::Error> {
let mut to_main = to_main;
let mut ws_receiver = ws_receiver;
let mut tx_1 = tx_1;
let mut flush = || -> Result<(), failure::Error> {
let msg = ws_receiver.recv_message()?;
match msg {
OwnedMessage::Text(text) => match serde_json::from_str(&text) {
Ok(m) => {
println!("receiving a message {}", &text);
to_main.send(m); // todo print an error
Ok(())
}
Err(e) => Err(failure::err_msg(format!("deserialization error: {:?}", e)))
},
OwnedMessage::Binary(_) => Err(failure::err_msg("binary message not implemented")),
OwnedMessage::Close(_) => {
tx_1.send(OwnedMessage::Close(None));
Err(failure::err_msg("Connection closed by request."))
}
OwnedMessage::Ping(data) => {
tx_1.send(OwnedMessage::Pong(data));
// todo print an error
Ok(())
},
OwnedMessage::Pong(_) => {
println!("received pong");
Ok(())
}
}
};
loop {
match flush() {
Ok(_) => (),
Err(e) => {
println!("Error in inbound thread: {}", e);
return Err(e)
}
}
/*
match Communication::<T>::handle_inbound(to_main) {
Ok(_) => (),
Err(e) => return Err(e)
}
*/
}
});
Communication {
tx: tx,
rx: rx,
inbound_thread: inbound,
outbound_thread: outbound,
}
}
pub fn send(&mut self, msg: T) -> Result<(), failure::Error> {
let msg_serialized = serde_json::to_string(&msg)?;
self.tx.send(OwnedMessage::Text(msg_serialized))?;
Ok(())
}
pub fn try_recv(&mut self) -> Result<T, std::sync::mpsc::TryRecvError> {
self.rx.try_recv()
}
}