draw-game/src/main.rs

385 lines
10 KiB
Rust

#![feature(proc_macro_hygiene, decl_macro)]
extern crate pretty_env_logger;
#[macro_use] extern crate log;
use specs::ReadExpect;
use specs::{Read, Entities};
use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use warp::ws::{Message, WebSocket};
use warp::Filter;
extern crate crossbeam_channel;
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate serde;
extern crate chrono;
use chrono::prelude::*;
use std::{
collections::BTreeMap,
fmt::Display,
sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}},
time::{Duration, SystemTime},
collections::HashMap};
#[macro_use]
extern crate dotenv;
use dotenv::dotenv;
use specs::{World, WorldExt, Builder, Component, VecStorage, Entity, System, ReadStorage, WriteStorage, NullStorage};
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
/* user state from chat sample, ecs-ify it */
//type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, warp::Error>>>>>;
/* end chat sample state */
#[derive(Component, Debug)]
#[storage(VecStorage)]
struct Name {
name: String,
}
#[derive(Component, Debug)]
#[storage(VecStorage)]
struct Textual {
value: String,
}
#[derive(Component, Debug)]
#[storage(VecStorage)]
struct Contains {
children: Vec<Entity>,
}
#[derive(Component, Debug)]
#[storage(VecStorage)]
struct User {
sender: mpsc::UnboundedSender<Result<Message, warp::Error>>,
socket_id: usize,
}
#[derive(Component, Default)]
#[storage(NullStorage)]
struct Unsent;
#[derive(Debug)]
enum NewEntityRequest {
User(User),
Textual(Textual),
}
struct SendMessages;
impl<'a> System<'a> for SendMessages {
type SystemData = (
ReadStorage<'a, Textual>,
ReadStorage<'a, Unsent>,
WriteStorage<'a, User>,
);
fn run(&mut self, (text, unsent, mut user): Self::SystemData) {
use specs::Join;
let mut users = (&mut user).join();
for (text, _) in (&text, &unsent).join() { // todo: why are the positions flipped???
for user in &mut users {
eprintln!("sending {} to user {}", &text.value, &user.socket_id);
user.sender.send(Ok(Message::text(text.value.clone()))).unwrap(); // todo: do something with unhandled result.
}
}
}
}
struct RequestQueue(crossbeam_channel::Receiver<NewEntityRequest>);
struct ProcessEntityRequests;
impl<'a> System<'a> for ProcessEntityRequests {
type SystemData = (
ReadExpect<'a, RequestQueue>,
Entities<'a>,
WriteStorage<'a, User>,
WriteStorage<'a, Textual>,
WriteStorage<'a, Unsent>,
);
fn run(&mut self, (requests, entities, mut users, mut textual, mut unsent): Self::SystemData) {
eprintln!("entered process request system");
match requests.0.try_recv() {
Ok(request) => {
eprintln!("the requested entity: {:?}", request);
match request {
NewEntityRequest::User(user_request) => {
let user = entities.create();
users.insert(user, user_request).unwrap();
}
NewEntityRequest::Textual(textual_request) => {
let text = entities.create();
textual.insert(text, textual_request).unwrap();
unsent.insert(text, Unsent).unwrap();
}
}
}
Err(_) => {}
}
}
}
/*
#[get("/")]
fn index() -> Result<String, anyhow::Error> {
Ok(format!(
"Devices:\n{}\n\nwhile true; do curl https://pc.vvn.space/log/name; sleep 5; done",
"aa"
))
}
*/
/*
#[get("/log/<device_name>/<value>")]
fn log_new(device_name: &RawStr, value: &RawStr) -> Result<String, anyhow::Error> {
let new_event = NewEvent {
devicename: device_name,
value: value.parse()?,
timestamp: &Local::now().to_rfc3339(),
};
Ok(String::from("Logged"))
}
#[get("/log/<device_name>")]
fn log_get(device_name: &RawStr) -> Result<String, anyhow::Error> {
Ok("what")
}
#[get("/devices")]
fn devices_get() -> Result<String, anyhow::Error> {
Ok("get")
}
#[get("/device/<name>")]
fn device_get(name: &RawStr) -> Result<String, anyhow::Error> {
Ok("get")
}
*/
/*
#[catch(404)]
fn not_found() -> Option<NamedFile> {
NamedFile::open("frontend/build/index.html").ok()
}
*/
/*
fn main() {
let mut world = World::new();
world.register::<Name>();
world.register::<Textual>();
world.register::<Contains>();
*/
// todo: serve static files equiv to
/*
.mount(
"/",
rocket_contrib::serve::StaticFiles::from("frontend/build"),
)
//.mount("/api", routes![log_new, log_get, devices_get, device_get])
.register(catchers![not_found])
}
*/
async fn world_loop(mut world: World){
use specs::DispatcherBuilder;
let mut dispatcher = DispatcherBuilder::new()
.with(SendMessages, "send_messages", &[])
.with(ProcessEntityRequests, "process_entity_requests", &[])
.build();
dispatcher.setup(&mut world);
loop {
dispatcher.dispatch(&mut world);
world.maintain();
std::thread::sleep(Duration::from_secs(1));
}
}
#[tokio::main]
async fn main() {
pretty_env_logger::init();
// non-tokio queue
let (new_entity_tx, new_entity_rx) = crossbeam_channel::unbounded::<NewEntityRequest>();
let mut world = World::new();
world.insert(RequestQueue(new_entity_rx));
tokio::spawn(world_loop(world));
// Keep track of all connected users, key is usize, value
// is a websocket sender.
// Turn our "state" into a new Filter...
// filter for allowing new entities to be created
let new_entity_filter = warp::any().map(move || new_entity_tx.clone());
// GET /chat -> websocket upgrade
let chat = warp::path("chat")
// The `ws()` filter will prepare Websocket handshake...
.and(warp::ws())
.and(new_entity_filter)
.map(|ws: warp::ws::Ws, new_entity_tx| {
// This will call our function if the handshake succeeds.
ws.on_upgrade(move |socket| user_connected(socket, new_entity_tx))
});
// GET / -> index html
let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
let routes = index.or(chat);
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
async fn user_connected(ws: WebSocket, entity_request_tx: crossbeam_channel::Sender<NewEntityRequest>) {
let entity_request_tx = entity_request_tx.clone();
let (user_ws_tx, mut user_ws_rx) = ws.split();
// Use a counter to assign a new unique ID for this user.
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
// Split the socket into a sender and receive of messages.
// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
let (tx, rx) = mpsc::unbounded_channel();
tokio::task::spawn(rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
eprintln!("websocket send error: {}", e);
}
}));
entity_request_tx.send(NewEntityRequest::User(User{
sender: tx,
socket_id: my_id,
}));
eprintln!("new chat user: {}", my_id);
// Return a `Future` that is basically a state machine managing
// this specific user's connection.
// Make an extra clone to give to our disconnection handler...
let entity_request_tx2 = entity_request_tx.clone();
// Every time the user sends a message, broadcast it to
// all other users...
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
eprintln!("websocket error(uid={}): {}", my_id, e);
break;
}
};
user_message(my_id, msg, entity_request_tx.clone()).await;
}
// user_ws_rx stream will keep processing as long as the user stays
// connected. Once they disconnect, then...
user_disconnected(my_id, entity_request_tx2).await;
}
async fn user_message(my_id: usize, msg: Message, entity_request_tx: crossbeam_channel::Sender<NewEntityRequest>) {
use specs::Join;
// Skip any non-Text messages...
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", my_id, msg);
// Add new entity with the message to the world.alloc
entity_request_tx.send(NewEntityRequest::Textual(Textual{
value: msg.to_string(),
}));
/*
// New message from this user, send it to everyone else (except same uid)...
for (&uid, tx) in users.read().await.iter() {
if my_id != uid {
if let Err(_disconnected) = tx.send(Ok(Message::text(new_msg.clone()))) {
// The tx is disconnected, our `user_disconnected` code
// should be happening in another task, nothing more to
// do here.
}
}
}
*/
}
async fn user_disconnected(my_id: usize, entity_request_tx: crossbeam_channel::Sender<NewEntityRequest>) {
eprintln!("good bye user: {}", my_id);
// todo: really remove them
//world.write().await.en
// Stream closed up, so remove from the user list
//world.write().await.delete_entity(me);
//world.write().await.remove(&my_id);
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html lang="en">
<head>
<title>Warp Chat</title>
</head>
<body>
<h1>Warp chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="send">Send</button>
<script type="text/javascript">
const chat = document.getElementById('chat');
const text = document.getElementById('text');
const uri = 'ws://' + location.host + '/chat';
const ws = new WebSocket(uri);
function message(data) {
const line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
ws.onopen = function() {
chat.innerHTML = '<p><em>Connected!</em></p>';
};
ws.onmessage = function(msg) {
message(msg.data);
};
ws.onclose = function() {
chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
};
send.onclick = function() {
const msg = text.value;
ws.send(msg);
text.value = '';
message('<You>: ' + msg);
};
</script>
</body>
</html>
"#;