semiworking thing where connecting / sending text results in entities being created

This commit is contained in:
Vivian Lim 2020-09-14 01:06:24 -07:00
parent f19d0f852b
commit 41312d57f0
3 changed files with 358 additions and 56 deletions

250
Cargo.lock generated
View File

@ -1,5 +1,14 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "ahash"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f33b5018f120946c1dcf279194f238a9f146725593ead1c08fa47ff22b0b5d3"
dependencies = [
"const-random",
]
[[package]]
name = "ahash"
version = "0.3.8"
@ -27,6 +36,15 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "arrayvec"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd9fd44efafa8690358b7408d253adf110036b88f55672a933f01d616ad9b1b9"
dependencies = [
"nodrop",
]
[[package]]
name = "arrayvec"
version = "0.5.1"
@ -158,6 +176,26 @@ dependencies = [
"bitflags",
]
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom",
"proc-macro-hack",
]
[[package]]
name = "cpuid-bool"
version = "0.1.2"
@ -170,7 +208,7 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
dependencies = [
"crossbeam-utils",
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
@ -181,7 +219,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
@ -193,13 +231,22 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
dependencies = [
"autocfg 1.0.0",
"cfg-if",
"crossbeam-utils",
"crossbeam-utils 0.7.2",
"lazy_static",
"maybe-uninit",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
dependencies = [
"crossbeam-utils 0.6.6",
]
[[package]]
name = "crossbeam-queue"
version = "0.2.3"
@ -207,10 +254,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570"
dependencies = [
"cfg-if",
"crossbeam-utils",
"crossbeam-utils 0.7.2",
"maybe-uninit",
]
[[package]]
name = "crossbeam-utils"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
@ -222,6 +279,17 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "derivative"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c6d883546668a3e2011b6a716a7330b82eabb0151b138217f632c8243e17135"
dependencies = [
"proc-macro2 0.4.30",
"quote 0.6.13",
"syn 0.15.44",
]
[[package]]
name = "digest"
version = "0.8.1"
@ -252,13 +320,15 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"crossbeam-channel",
"dotenv",
"futures",
"log",
"pretty_env_logger",
"serde 1.0.114",
"serde_json",
"specs",
"specs 0.16.1",
"specs-hierarchy",
"tokio",
"uuid",
"warp",
@ -437,13 +507,23 @@ dependencies = [
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead"
dependencies = [
"ahash 0.2.18",
"autocfg 0.1.7",
]
[[package]]
name = "hashbrown"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96282e96bfcd3da0d3aa9938bedf1e50df3269b6db08b4876d2da0bb1a0841cf"
dependencies = [
"ahash",
"ahash 0.3.8",
"autocfg 1.0.0",
]
@ -768,6 +848,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "nodrop"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
[[package]]
name = "nom"
version = "5.1.2"
@ -846,9 +932,9 @@ version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c0e815c3ee9a031fdf5af21c10aa17c573c9c6a566328d99e3936c34e36461f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.18",
"quote 1.0.7",
"syn 1.0.34",
]
[[package]]
@ -880,13 +966,28 @@ dependencies = [
"log",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99c605b9a0adc77b7211c6b1f722dcb613d68d66859a44f3d485a6da332b0598"
[[package]]
name = "proc-macro2"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759"
dependencies = [
"unicode-xid 0.1.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa"
dependencies = [
"unicode-xid",
"unicode-xid 0.2.1",
]
[[package]]
@ -895,13 +996,22 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1"
dependencies = [
"proc-macro2 0.4.30",
]
[[package]]
name = "quote"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
dependencies = [
"proc-macro2",
"proc-macro2 1.0.18",
]
[[package]]
@ -1094,7 +1204,7 @@ checksum = "91739a34c4355b5434ce54c9086c5895604a9c278586d1f1aa95e04f66b525a0"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"crossbeam-utils 0.7.2",
"lazy_static",
"num_cpus",
]
@ -1186,9 +1296,9 @@ version = "1.0.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.18",
"quote 1.0.7",
"syn 1.0.34",
]
[[package]]
@ -1239,20 +1349,44 @@ dependencies = [
"opaque-debug 0.3.0",
]
[[package]]
name = "shred"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92472b9bafafbcba21935c6444d924e5332742f6778c49504a49a97eaeff6ccc"
dependencies = [
"arrayvec 0.4.12",
"hashbrown 0.6.3",
"mopa",
"rayon",
"smallvec 0.6.13",
]
[[package]]
name = "shred"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f08237e667ac94ad20f8878b5943d91a93ccb231428446c57c21c57779016d"
dependencies = [
"arrayvec",
"hashbrown",
"arrayvec 0.5.1",
"hashbrown 0.7.2",
"mopa",
"rayon",
"smallvec",
"smallvec 1.4.0",
"tynm",
]
[[package]]
name = "shred-derive"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1f37080f2751fbf091dbdebaa95bd6cf9dbf74ad1d50396b1908518a1747fdf"
dependencies = [
"proc-macro2 1.0.18",
"quote 1.0.7",
"syn 1.0.34",
]
[[package]]
name = "shrev"
version = "1.1.1"
@ -1275,6 +1409,15 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6"
dependencies = [
"maybe-uninit",
]
[[package]]
name = "smallvec"
version = "1.4.0"
@ -1293,18 +1436,35 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "specs"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4943fde8c5d3d14c3d19d2a4c7abbd7b626c270a19e6cd35252294a48feb698c"
dependencies = [
"crossbeam-queue 0.1.2",
"derivative",
"hashbrown 0.6.3",
"hibitset",
"log",
"rayon",
"shred 0.9.4",
"shrev",
"tuple_utils",
]
[[package]]
name = "specs"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff28a29366aff703d5da8a7e2c8875dc8453ac1118f842cbc0fa70c7db51240"
dependencies = [
"crossbeam-queue",
"hashbrown",
"crossbeam-queue 0.2.3",
"hashbrown 0.7.2",
"hibitset",
"log",
"rayon",
"shred",
"shred 0.10.2",
"shrev",
"specs-derive",
"tuple_utils",
@ -1316,9 +1476,33 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e23e09360f3d2190fec4222cd9e19d3158d5da948c0d1ea362df617dd103511"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.18",
"quote 1.0.7",
"syn 1.0.34",
]
[[package]]
name = "specs-hierarchy"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3199a5c3147995119adcaba09a4c70f390cd8002013ef35a7494a7fb9255e3"
dependencies = [
"hibitset",
"shred 0.9.4",
"shred-derive",
"shrev",
"specs 0.15.1",
]
[[package]]
name = "syn"
version = "0.15.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5"
dependencies = [
"proc-macro2 0.4.30",
"quote 0.6.13",
"unicode-xid 0.1.0",
]
[[package]]
@ -1327,9 +1511,9 @@ version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
"proc-macro2 1.0.18",
"quote 1.0.7",
"unicode-xid 0.2.1",
]
[[package]]
@ -1410,9 +1594,9 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [
"proc-macro2",
"quote",
"syn",
"proc-macro2 1.0.18",
"quote 1.0.7",
"syn 1.0.34",
]
[[package]]
@ -1560,6 +1744,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-xid"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
[[package]]
name = "unicode-xid"
version = "0.2.1"

View File

@ -15,7 +15,9 @@ dotenv = "0.15.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
specs = { version = "0.16.1", features = ["specs-derive"] }
specs-hierarchy = "0.5.1"
uuid = { version = "0.4", features = ["serde", "v4"] }
futures = { version = "0.3", default-features = false }
log = "0.4"
pretty_env_logger = "0.3"
pretty_env_logger = "0.3"
crossbeam-channel = "0.4.4"

View File

@ -3,11 +3,15 @@
extern crate pretty_env_logger;
#[macro_use] extern crate log;
use specs::ReadExpect;
use specs::{Read, Entities};
use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use warp::ws::{Message, WebSocket};
use warp::Filter;
extern crate crossbeam_channel;
#[macro_use]
extern crate anyhow;
@ -30,11 +34,11 @@ extern crate dotenv;
use dotenv::dotenv;
use specs::{World, WorldExt, Builder, Component, VecStorage, Entity};
use specs::{World, WorldExt, Builder, Component, VecStorage, Entity, System, ReadStorage, WriteStorage, NullStorage};
/* user state from chat sample, ecs-ify it */
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, warp::Error>>>>>;
/* user state from chat sample, ecs-ify it */
//type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, warp::Error>>>>>;
/* end chat sample state */
#[derive(Component, Debug)]
@ -55,13 +59,79 @@ struct Contains {
children: Vec<Entity>,
}
pub struct Client {
pub user_id: usize,
pub topics: Vec<String>,
pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>,
#[derive(Component, Debug)]
#[storage(VecStorage)]
struct User {
sender: mpsc::UnboundedSender<Result<Message, warp::Error>>,
socket_id: usize,
}
#[derive(Component, Default)]
#[storage(NullStorage)]
struct Unsent;
#[derive(Debug)]
enum NewEntityRequest {
User(User),
Textual(Textual),
}
struct SendMessages;
impl<'a> System<'a> for SendMessages {
type SystemData = (
ReadStorage<'a, Textual>,
ReadStorage<'a, Unsent>,
WriteStorage<'a, User>,
);
fn run(&mut self, (text, unsent, mut user): Self::SystemData) {
use specs::Join;
let mut users = (&mut user).join();
for (text, _) in (&text, &unsent).join() { // todo: why are the positions flipped???
for user in &mut users {
eprintln!("sending {} to user {}", &text.value, &user.socket_id);
user.sender.send(Ok(Message::text(text.value.clone()))).unwrap(); // todo: do something with unhandled result.
}
}
}
}
struct RequestQueue(crossbeam_channel::Receiver<NewEntityRequest>);
struct ProcessEntityRequests;
impl<'a> System<'a> for ProcessEntityRequests {
type SystemData = (
ReadExpect<'a, RequestQueue>,
Entities<'a>,
WriteStorage<'a, User>,
WriteStorage<'a, Textual>,
WriteStorage<'a, Unsent>,
);
fn run(&mut self, (requests, entities, mut users, mut textual, mut unsent): Self::SystemData) {
eprintln!("entered process request system");
match requests.0.try_recv() {
Ok(request) => {
eprintln!("the requested entity: {:?}", request);
match request {
NewEntityRequest::User(user_request) => {
let user = entities.create();
users.insert(user, user_request).unwrap();
}
NewEntityRequest::Textual(textual_request) => {
let text = entities.create();
textual.insert(text, textual_request).unwrap();
unsent.insert(text, Unsent).unwrap();
}
}
}
Err(_) => {}
}
}
}
/*
#[get("/")]
@ -129,25 +199,49 @@ fn main() {
*/
async fn world_loop(mut world: World){
use specs::DispatcherBuilder;
let mut dispatcher = DispatcherBuilder::new()
.with(SendMessages, "send_messages", &[])
.with(ProcessEntityRequests, "process_entity_requests", &[])
.build();
dispatcher.setup(&mut world);
loop {
dispatcher.dispatch(&mut world);
world.maintain();
std::thread::sleep(Duration::from_secs(1));
}
}
#[tokio::main]
async fn main() {
pretty_env_logger::init();
// non-tokio queue
let (new_entity_tx, new_entity_rx) = crossbeam_channel::unbounded::<NewEntityRequest>();
let mut world = World::new();
world.insert(RequestQueue(new_entity_rx));
tokio::spawn(world_loop(world));
// Keep track of all connected users, key is usize, value
// is a websocket sender.
let users = Users::default();
// Turn our "state" into a new Filter...
let users = warp::any().map(move || users.clone());
// filter for allowing new entities to be created
let new_entity_filter = warp::any().map(move || new_entity_tx.clone());
// GET /chat -> websocket upgrade
let chat = warp::path("chat")
// The `ws()` filter will prepare Websocket handshake...
.and(warp::ws())
.and(users)
.map(|ws: warp::ws::Ws, users| {
.and(new_entity_filter)
.map(|ws: warp::ws::Ws, new_entity_tx| {
// This will call our function if the handshake succeeds.
ws.on_upgrade(move |socket| user_connected(socket, users))
ws.on_upgrade(move |socket| user_connected(socket, new_entity_tx))
});
// GET / -> index html
@ -158,14 +252,13 @@ async fn main() {
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
async fn user_connected(ws: WebSocket, users: Users) {
async fn user_connected(ws: WebSocket, entity_request_tx: crossbeam_channel::Sender<NewEntityRequest>) {
let entity_request_tx = entity_request_tx.clone();
let (user_ws_tx, mut user_ws_rx) = ws.split();
// Use a counter to assign a new unique ID for this user.
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
eprintln!("new chat user: {}", my_id);
// Split the socket into a sender and receive of messages.
let (user_ws_tx, mut user_ws_rx) = ws.split();
// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
@ -176,14 +269,18 @@ async fn user_connected(ws: WebSocket, users: Users) {
}
}));
// Save the sender in our list of connected users.
users.write().await.insert(my_id, tx);
entity_request_tx.send(NewEntityRequest::User(User{
sender: tx,
socket_id: my_id,
}));
eprintln!("new chat user: {}", my_id);
// Return a `Future` that is basically a state machine managing
// this specific user's connection.
// Make an extra clone to give to our disconnection handler...
let users2 = users.clone();
let entity_request_tx2 = entity_request_tx.clone();
// Every time the user sends a message, broadcast it to
// all other users...
@ -195,15 +292,17 @@ async fn user_connected(ws: WebSocket, users: Users) {
break;
}
};
user_message(my_id, msg, &users).await;
user_message(my_id, msg, entity_request_tx.clone()).await;
}
// user_ws_rx stream will keep processing as long as the user stays
// connected. Once they disconnect, then...
user_disconnected(my_id, &users2).await;
user_disconnected(my_id, entity_request_tx2).await;
}
async fn user_message(my_id: usize, msg: Message, users: &Users) {
async fn user_message(my_id: usize, msg: Message, entity_request_tx: crossbeam_channel::Sender<NewEntityRequest>) {
use specs::Join;
// Skip any non-Text messages...
let msg = if let Ok(s) = msg.to_str() {
s
@ -213,6 +312,13 @@ async fn user_message(my_id: usize, msg: Message, users: &Users) {
let new_msg = format!("<User#{}>: {}", my_id, msg);
// Add new entity with the message to the world.alloc
entity_request_tx.send(NewEntityRequest::Textual(Textual{
value: msg.to_string(),
}));
/*
// New message from this user, send it to everyone else (except same uid)...
for (&uid, tx) in users.read().await.iter() {
if my_id != uid {
@ -223,13 +329,17 @@ async fn user_message(my_id: usize, msg: Message, users: &Users) {
}
}
}
*/
}
async fn user_disconnected(my_id: usize, users: &Users) {
async fn user_disconnected(my_id: usize, entity_request_tx: crossbeam_channel::Sender<NewEntityRequest>) {
eprintln!("good bye user: {}", my_id);
// todo: really remove them
//world.write().await.en
// Stream closed up, so remove from the user list
users.write().await.remove(&my_id);
//world.write().await.delete_entity(me);
//world.write().await.remove(&my_id);
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>