diff --git a/Cargo.lock b/Cargo.lock index 54f76ec..bbfbed2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,14 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "ahash" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f33b5018f120946c1dcf279194f238a9f146725593ead1c08fa47ff22b0b5d3" +dependencies = [ + "const-random", +] + [[package]] name = "ahash" version = "0.3.8" @@ -27,6 +36,15 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" +[[package]] +name = "arrayvec" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd9fd44efafa8690358b7408d253adf110036b88f55672a933f01d616ad9b1b9" +dependencies = [ + "nodrop", +] + [[package]] name = "arrayvec" version = "0.5.1" @@ -158,6 +176,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "const-random" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a" +dependencies = [ + "const-random-macro", + "proc-macro-hack", +] + +[[package]] +name = "const-random-macro" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a" +dependencies = [ + "getrandom", + "proc-macro-hack", +] + [[package]] name = "cpuid-bool" version = "0.1.2" @@ -170,7 +208,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -181,7 +219,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" dependencies = [ "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -193,13 +231,22 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg 1.0.0", "cfg-if", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "maybe-uninit", "memoffset", "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b" +dependencies = [ + "crossbeam-utils 0.6.6", +] + [[package]] name = "crossbeam-queue" version = "0.2.3" @@ -207,10 +254,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ "cfg-if", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] +[[package]] +name = "crossbeam-utils" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -222,6 +279,17 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "derivative" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c6d883546668a3e2011b6a716a7330b82eabb0151b138217f632c8243e17135" +dependencies = [ + "proc-macro2 0.4.30", + "quote 0.6.13", + "syn 0.15.44", +] + [[package]] name = "digest" version = "0.8.1" @@ -252,13 +320,15 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "crossbeam-channel", "dotenv", "futures", "log", "pretty_env_logger", "serde 1.0.114", "serde_json", - "specs", + "specs 0.16.1", + "specs-hierarchy", "tokio", "uuid", "warp", @@ -437,13 +507,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead" +dependencies = [ + "ahash 0.2.18", + "autocfg 0.1.7", +] + [[package]] name = "hashbrown" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96282e96bfcd3da0d3aa9938bedf1e50df3269b6db08b4876d2da0bb1a0841cf" dependencies = [ - "ahash", + "ahash 0.3.8", "autocfg 1.0.0", ] @@ -768,6 +848,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nodrop" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" + [[package]] name = "nom" version = "5.1.2" @@ -846,9 +932,9 @@ version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.34", ] [[package]] @@ -880,13 +966,28 @@ dependencies = [ "log", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598" + +[[package]] +name = "proc-macro2" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" +dependencies = [ + "unicode-xid 0.1.0", +] + [[package]] name = "proc-macro2" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa" dependencies = [ - "unicode-xid", + "unicode-xid 0.2.1", ] [[package]] @@ -895,13 +996,22 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quote" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1" +dependencies = [ + "proc-macro2 0.4.30", +] + [[package]] name = "quote" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" dependencies = [ - "proc-macro2", + "proc-macro2 1.0.18", ] [[package]] @@ -1094,7 +1204,7 @@ checksum = "91739a34c4355b5434ce54c9086c5895604a9c278586d1f1aa95e04f66b525a0" dependencies = [ "crossbeam-channel", "crossbeam-deque", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "num_cpus", ] @@ -1186,9 +1296,9 @@ version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.34", ] [[package]] @@ -1239,20 +1349,44 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "shred" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92472b9bafafbcba21935c6444d924e5332742f6778c49504a49a97eaeff6ccc" +dependencies = [ + "arrayvec 0.4.12", + "hashbrown 0.6.3", + "mopa", + "rayon", + "smallvec 0.6.13", +] + [[package]] name = "shred" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5f08237e667ac94ad20f8878b5943d91a93ccb231428446c57c21c57779016d" dependencies = [ - "arrayvec", - "hashbrown", + "arrayvec 0.5.1", + "hashbrown 0.7.2", "mopa", "rayon", - "smallvec", + "smallvec 1.4.0", "tynm", ] +[[package]] +name = "shred-derive" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f37080f2751fbf091dbdebaa95bd6cf9dbf74ad1d50396b1908518a1747fdf" +dependencies = [ + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.34", +] + [[package]] name = "shrev" version = "1.1.1" @@ -1275,6 +1409,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +[[package]] +name = "smallvec" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" +dependencies = [ + "maybe-uninit", +] + [[package]] name = "smallvec" version = "1.4.0" @@ -1293,18 +1436,35 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "specs" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4943fde8c5d3d14c3d19d2a4c7abbd7b626c270a19e6cd35252294a48feb698c" +dependencies = [ + "crossbeam-queue 0.1.2", + "derivative", + "hashbrown 0.6.3", + "hibitset", + "log", + "rayon", + "shred 0.9.4", + "shrev", + "tuple_utils", +] + [[package]] name = "specs" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fff28a29366aff703d5da8a7e2c8875dc8453ac1118f842cbc0fa70c7db51240" dependencies = [ - "crossbeam-queue", - "hashbrown", + "crossbeam-queue 0.2.3", + "hashbrown 0.7.2", "hibitset", "log", "rayon", - "shred", + "shred 0.10.2", "shrev", "specs-derive", "tuple_utils", @@ -1316,9 +1476,33 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e23e09360f3d2190fec4222cd9e19d3158d5da948c0d1ea362df617dd103511" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.34", +] + +[[package]] +name = "specs-hierarchy" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3199a5c3147995119adcaba09a4c70f390cd8002013ef35a7494a7fb9255e3" +dependencies = [ + "hibitset", + "shred 0.9.4", + "shred-derive", + "shrev", + "specs 0.15.1", +] + +[[package]] +name = "syn" +version = "0.15.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5" +dependencies = [ + "proc-macro2 0.4.30", + "quote 0.6.13", + "unicode-xid 0.1.0", ] [[package]] @@ -1327,9 +1511,9 @@ version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b" dependencies = [ - "proc-macro2", - "quote", - "unicode-xid", + "proc-macro2 1.0.18", + "quote 1.0.7", + "unicode-xid 0.2.1", ] [[package]] @@ -1410,9 +1594,9 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ - "proc-macro2", - "quote", - "syn", + "proc-macro2 1.0.18", + "quote 1.0.7", + "syn 1.0.34", ] [[package]] @@ -1560,6 +1744,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-xid" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" + [[package]] name = "unicode-xid" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 11bf0ce..f8b35c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,9 @@ dotenv = "0.15.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" specs = { version = "0.16.1", features = ["specs-derive"] } +specs-hierarchy = "0.5.1" uuid = { version = "0.4", features = ["serde", "v4"] } futures = { version = "0.3", default-features = false } log = "0.4" -pretty_env_logger = "0.3" \ No newline at end of file +pretty_env_logger = "0.3" +crossbeam-channel = "0.4.4" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 7f01acb..7ffba7c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,15 @@ extern crate pretty_env_logger; #[macro_use] extern crate log; +use specs::ReadExpect; +use specs::{Read, Entities}; use futures::{FutureExt, StreamExt}; use tokio::sync::{mpsc, RwLock}; use warp::ws::{Message, WebSocket}; use warp::Filter; +extern crate crossbeam_channel; + #[macro_use] extern crate anyhow; @@ -30,11 +34,11 @@ extern crate dotenv; use dotenv::dotenv; -use specs::{World, WorldExt, Builder, Component, VecStorage, Entity}; +use specs::{World, WorldExt, Builder, Component, VecStorage, Entity, System, ReadStorage, WriteStorage, NullStorage}; -/* user state from chat sample, ecs-ify it */ static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); -type Users = Arc>>>>; +/* user state from chat sample, ecs-ify it */ +//type Users = Arc>>>>; /* end chat sample state */ #[derive(Component, Debug)] @@ -55,13 +59,79 @@ struct Contains { children: Vec, } - -pub struct Client { - pub user_id: usize, - pub topics: Vec, - pub sender: Option>>, +#[derive(Component, Debug)] +#[storage(VecStorage)] +struct User { + sender: mpsc::UnboundedSender>, + socket_id: usize, } +#[derive(Component, Default)] +#[storage(NullStorage)] +struct Unsent; + +#[derive(Debug)] +enum NewEntityRequest { + User(User), + Textual(Textual), +} + +struct SendMessages; +impl<'a> System<'a> for SendMessages { + type SystemData = ( + ReadStorage<'a, Textual>, + ReadStorage<'a, Unsent>, + WriteStorage<'a, User>, + ); + + fn run(&mut self, (text, unsent, mut user): Self::SystemData) { + use specs::Join; + + let mut users = (&mut user).join(); + + for (text, _) in (&text, &unsent).join() { // todo: why are the positions flipped??? + + for user in &mut users { + eprintln!("sending {} to user {}", &text.value, &user.socket_id); + user.sender.send(Ok(Message::text(text.value.clone()))).unwrap(); // todo: do something with unhandled result. + } + } + } +} + +struct RequestQueue(crossbeam_channel::Receiver); + +struct ProcessEntityRequests; +impl<'a> System<'a> for ProcessEntityRequests { + type SystemData = ( + ReadExpect<'a, RequestQueue>, + Entities<'a>, + WriteStorage<'a, User>, + WriteStorage<'a, Textual>, + WriteStorage<'a, Unsent>, + ); + + fn run(&mut self, (requests, entities, mut users, mut textual, mut unsent): Self::SystemData) { + eprintln!("entered process request system"); + match requests.0.try_recv() { + Ok(request) => { + eprintln!("the requested entity: {:?}", request); + match request { + NewEntityRequest::User(user_request) => { + let user = entities.create(); + users.insert(user, user_request).unwrap(); + } + NewEntityRequest::Textual(textual_request) => { + let text = entities.create(); + textual.insert(text, textual_request).unwrap(); + unsent.insert(text, Unsent).unwrap(); + } + } + } + Err(_) => {} + } + } +} /* #[get("/")] @@ -129,25 +199,49 @@ fn main() { */ +async fn world_loop(mut world: World){ + use specs::DispatcherBuilder; + let mut dispatcher = DispatcherBuilder::new() + .with(SendMessages, "send_messages", &[]) + .with(ProcessEntityRequests, "process_entity_requests", &[]) + .build(); + + dispatcher.setup(&mut world); + + loop { + dispatcher.dispatch(&mut world); + world.maintain(); + std::thread::sleep(Duration::from_secs(1)); + } +} + #[tokio::main] async fn main() { pretty_env_logger::init(); + // non-tokio queue + let (new_entity_tx, new_entity_rx) = crossbeam_channel::unbounded::(); + + let mut world = World::new(); + world.insert(RequestQueue(new_entity_rx)); + + tokio::spawn(world_loop(world)); + // Keep track of all connected users, key is usize, value // is a websocket sender. - let users = Users::default(); // Turn our "state" into a new Filter... - let users = warp::any().map(move || users.clone()); + // filter for allowing new entities to be created + let new_entity_filter = warp::any().map(move || new_entity_tx.clone()); // GET /chat -> websocket upgrade let chat = warp::path("chat") // The `ws()` filter will prepare Websocket handshake... .and(warp::ws()) - .and(users) - .map(|ws: warp::ws::Ws, users| { + .and(new_entity_filter) + .map(|ws: warp::ws::Ws, new_entity_tx| { // This will call our function if the handshake succeeds. - ws.on_upgrade(move |socket| user_connected(socket, users)) + ws.on_upgrade(move |socket| user_connected(socket, new_entity_tx)) }); // GET / -> index html @@ -158,14 +252,13 @@ async fn main() { warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; } -async fn user_connected(ws: WebSocket, users: Users) { +async fn user_connected(ws: WebSocket, entity_request_tx: crossbeam_channel::Sender) { + let entity_request_tx = entity_request_tx.clone(); + let (user_ws_tx, mut user_ws_rx) = ws.split(); // Use a counter to assign a new unique ID for this user. let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed); - eprintln!("new chat user: {}", my_id); - // Split the socket into a sender and receive of messages. - let (user_ws_tx, mut user_ws_rx) = ws.split(); // Use an unbounded channel to handle buffering and flushing of messages // to the websocket... @@ -176,14 +269,18 @@ async fn user_connected(ws: WebSocket, users: Users) { } })); - // Save the sender in our list of connected users. - users.write().await.insert(my_id, tx); + entity_request_tx.send(NewEntityRequest::User(User{ + sender: tx, + socket_id: my_id, + })); + + eprintln!("new chat user: {}", my_id); // Return a `Future` that is basically a state machine managing // this specific user's connection. // Make an extra clone to give to our disconnection handler... - let users2 = users.clone(); + let entity_request_tx2 = entity_request_tx.clone(); // Every time the user sends a message, broadcast it to // all other users... @@ -195,15 +292,17 @@ async fn user_connected(ws: WebSocket, users: Users) { break; } }; - user_message(my_id, msg, &users).await; + user_message(my_id, msg, entity_request_tx.clone()).await; } // user_ws_rx stream will keep processing as long as the user stays // connected. Once they disconnect, then... - user_disconnected(my_id, &users2).await; + user_disconnected(my_id, entity_request_tx2).await; } -async fn user_message(my_id: usize, msg: Message, users: &Users) { +async fn user_message(my_id: usize, msg: Message, entity_request_tx: crossbeam_channel::Sender) { + use specs::Join; + // Skip any non-Text messages... let msg = if let Ok(s) = msg.to_str() { s @@ -213,6 +312,13 @@ async fn user_message(my_id: usize, msg: Message, users: &Users) { let new_msg = format!(": {}", my_id, msg); + // Add new entity with the message to the world.alloc + + entity_request_tx.send(NewEntityRequest::Textual(Textual{ + value: msg.to_string(), + })); + + /* // New message from this user, send it to everyone else (except same uid)... for (&uid, tx) in users.read().await.iter() { if my_id != uid { @@ -223,13 +329,17 @@ async fn user_message(my_id: usize, msg: Message, users: &Users) { } } } + */ } -async fn user_disconnected(my_id: usize, users: &Users) { +async fn user_disconnected(my_id: usize, entity_request_tx: crossbeam_channel::Sender) { eprintln!("good bye user: {}", my_id); + // todo: really remove them + //world.write().await.en // Stream closed up, so remove from the user list - users.write().await.remove(&my_id); + //world.write().await.delete_entity(me); + //world.write().await.remove(&my_id); } static INDEX_HTML: &str = r#"