wip websocket comms

This commit is contained in:
Vivian Lim 2020-01-02 03:17:39 -08:00
parent 32c4aa99cf
commit 96e65048c8
9 changed files with 952 additions and 48 deletions

14
.vscode/launch.json vendored
View File

@ -18,6 +18,20 @@
}],
"externalConsole": false,
},
{
"name": "sync: pokeblue 2",
"type": "cppvsdbg",
"request": "launch",
"program": "target/debug/ferretro-synced.exe",
"args": ["--core", "./data/gambatte_libretro.dll", "--rom", "./data/pokeblue.gb", "--state", "./data/pkblue-route1.sav"],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [{
"name": "RUST_BACKTRACE",
"value": "1"
}],
"externalConsole": false,
},
{
"name": "sync: sonic2",
"type": "cppvsdbg",

782
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -20,5 +20,6 @@ structopt = "^0.3"
serde = { version = "1.0.104", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0.44"
websocket = "0.24.0"
[dev-dependencies]

View File

@ -1,37 +0,0 @@
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use serde::{Serialize, Deserialize};
pub struct Communication<T> {
tx: Sender<T>,
rx: Receiver<T>,
}
impl<T> Communication<T> {
pub fn new() -> Box<Self> {
let (to_thread, from_main): (Sender<T>, Receiver<T>) = mpsc::channel();
let (to_main, from_thread): (Sender<T>, Receiver<T>) = mpsc::channel();
// let child = thread::spawn(move || {
// let client = WebsocketClient::<T> {
// thread_tx: to_main,
// thread_rx: from_main,
// };
// // todo: loop
// println!("listener thread finished");
// });
Box::new(Communication {
tx: to_thread,
rx: from_thread,
})
}
}
struct WebsocketClient<T> {
thread_tx: Sender<T>,
thread_rx: Receiver<T>,
}

View File

@ -146,6 +146,7 @@ impl MyEmulator {
pub fn run(&mut self) {
self.audio_device.resume();
self.synced_pokemonrb.start_comms();
let mut event_pump = self.sdl_context.event_pump().unwrap();
'running: loop {
let frame_begin = Instant::now();
@ -173,8 +174,10 @@ impl MyEmulator {
spf = 1.0 / 60.0;
}
self.synced_pokemonrb.handle_inbound_msgs();
self.synced_pokemonrb.update_from_mem();
println!("{}", self.synced_pokemonrb);
//println!("{}", self.synced_pokemonrb);
Duration::from_secs_f64(spf)
.checked_sub(frame_begin.elapsed())

View File

@ -9,8 +9,6 @@ use std::path::{Path, PathBuf};
mod emulator;
use emulator::MyEmulator;
mod comms;
use comms::Communication;
pub fn main() -> failure::Fallible<()> {
let opt: Opt = Opt::from_args();

109
src/sync/comms.rs Normal file
View File

@ -0,0 +1,109 @@
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use std::thread::JoinHandle;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json;
use websocket::client::ClientBuilder;
use websocket::receiver::Reader;
use websocket::sender::Writer;
use websocket::{Message, OwnedMessage};
const CONNECTION: &'static str = "ws://127.0.0.1:8765";
pub struct Communication<T> {
pub tx: Sender<T>,
pub rx: Receiver<T>,
inbound_thread: JoinHandle<Result<(), failure::Error>>,
outbound_thread: JoinHandle<Result<(), failure::Error>>,
}
impl<T> Communication<T> where T: std::marker::Send, T: Serialize, T: DeserializeOwned, T: 'static {
pub fn new() -> Self {
let (to_thread, from_main): (Sender<T>, Receiver<T>) = mpsc::channel();
let (to_main, from_thread): (Sender<T>, Receiver<T>) = mpsc::channel();
let ws_client = ClientBuilder::new(CONNECTION)
.unwrap()
.add_protocol("rust-websocket")
.connect_insecure()
.unwrap();
let (mut ws_receiver, mut ws_sender) = ws_client.split().unwrap();
let outbound = thread::spawn(move || -> Result<(), failure::Error> {
let mut from_main = from_main;
let mut ws_sender = ws_sender;
let mut flush = || -> Result<(), failure::Error> {
let msg = from_main.recv()?;
let msg_serialized = serde_json::to_string(&msg)?;
println!("sending a message {}", &msg_serialized);
ws_sender.send_message(&Message::text(msg_serialized))?;
Ok(())
};
loop {
match flush() {
Ok(_) => (),
Err(e) => {
println!("Error in outbound thread: {}", e);
return Err(e)
}
}
}
});
let inbound = thread::spawn(move || -> Result<(), failure::Error> {
let mut to_main = to_main;
let mut ws_receiver = ws_receiver;
let mut flush = || -> Result<(), failure::Error> {
let msg = ws_receiver.recv_message()?;
match msg {
OwnedMessage::Text(text) => match serde_json::from_str(&text) {
Ok(m) => {
println!("receiving a message {}", &text);
to_main.send(m);
Ok(())
}
Err(e) => Err(failure::err_msg(format!("deserialization error: {:?}", e)))
},
OwnedMessage::Binary(_) => Err(failure::err_msg("binary message not implemented")),
OwnedMessage::Close(_) => Err(failure::err_msg("close requested")),
OwnedMessage::Ping(_) => Err(failure::err_msg("ping message not implemented")),
OwnedMessage::Pong(_) => Err(failure::err_msg("pong message not implemented")),
}
};
loop {
match flush() {
Ok(_) => (),
Err(e) => {
println!("Error in inbound thread: {}", e);
return Err(e)
}
}
/*
match Communication::<T>::handle_inbound(to_main) {
Ok(_) => (),
Err(e) => return Err(e)
}
*/
}
});
Communication {
tx: to_thread,
rx: from_thread,
inbound_thread: inbound,
outbound_thread: outbound,
}
}
}
struct WebsocketClient<T> {
thread_tx: Sender<T>,
thread_rx: Receiver<T>,
}

View File

@ -1,2 +1,3 @@
pub mod pokemon_rb;
pub mod memory_rw;
pub mod memory_rw;
pub mod comms;

View File

@ -4,10 +4,12 @@ use crate::sync::memory_rw::ReadWriteMemoryMap;
use serde::{Serialize, Deserialize};
use ferretro::retro::wrapper::LibretroWrapper; // want LibretroWrapper.api : LibretroApi.get_memory()
use crate::num::ToPrimitive;
use crate::sync::comms::Communication;
pub struct SyncedPokemonRedBlue {
raw: Raw<'static>,
battle_context: BattleContext,
comms: Option<Communication<Message>>
}
@ -70,19 +72,45 @@ impl SyncedPokemonRedBlue {
// _ => ()
// }
self.battle_context = battle_context;
let message: Message = (&self.raw).into();
match serde_json::to_string(&message) {
Ok(serialized) => {
println!("serialized data: {}", serialized)
}
Err(_) => ()
}
match &mut self.comms {
Some(comms) => {
comms.tx.send(message);
},
None => ()
}
}
pub fn handle_inbound_msgs(&mut self) -> Result<(), failure::Error>{
let in_msg = self.comms.as_ref().ok_or(failure::err_msg("comms doesn't exist, can't handle inbound msgs"))?.rx.try_recv()?;
self.handle_message(in_msg)
}
fn handle_message(&mut self, msg: Message) -> Result<(), failure::Error> {
match msg.active_pkmn {
Some(src) => {
let target_slice = self.raw.active_pokemon_raw.slice.as_mut().ok_or(failure::err_msg("can't write message back to None memory slice"))?;
if src.len() != target_slice.len() {
return Err(failure::err_msg("message size and slice size differ."));
}
let mut index: usize = 0;
for value in src.into_iter(){
target_slice[index] = value;
index += 1;
}
Ok(())
}
None => Ok(())
}
}
// pub fn get_message<'a>(&self) -> Message {
// Message {
// active_pkmn: Box::from(&self.raw.active_pokemon_raw.clone())
@ -136,12 +164,17 @@ impl SyncedPokemonRedBlue {
SyncedPokemonRedBlue {
raw: Raw::create(map),
battle_context: BattleContext::default(),
comms: None,
}
}
pub fn libretro_set_memory_maps(&mut self, libretro_memory_map: ferretro::retro::ffi::MemoryMap) {
self.raw.libretro_set_memory_maps(libretro_memory_map)
}
pub fn start_comms(&mut self) {
self.comms = Some(Communication::new());
}
}
impl Raw<'_> {