rip out rocket and badly fuse warp websocket chat sample
This commit is contained in:
parent
7c089fe532
commit
f19d0f852b
File diff suppressed because it is too large
Load Diff
11
Cargo.toml
11
Cargo.toml
|
@ -7,10 +7,15 @@ edition = "2018"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
rocket = "0.4.5"
|
warp = "0.2"
|
||||||
rocket_contrib = { version = "0.4.5", default-features = false, features = ["json", "diesel_sqlite_pool", "serve"] }
|
tokio = { version = "0.2", features = ["full"] }
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
specs = { version = "0.16.1", features = ["specs-derive"] }
|
||||||
|
uuid = { version = "0.4", features = ["serde", "v4"] }
|
||||||
|
futures = { version = "0.3", default-features = false }
|
||||||
|
log = "0.4"
|
||||||
|
pretty_env_logger = "0.3"
|
|
@ -0,0 +1,3 @@
|
||||||
|
|
||||||
|
https://blog.logrocket.com/how-to-build-a-websocket-server-with-rust/
|
||||||
|
https://github.com/seanmonstar/warp/blob/master/examples/websockets_chat.rs
|
214
src/main.rs
214
src/main.rs
|
@ -1,10 +1,13 @@
|
||||||
#![feature(proc_macro_hygiene, decl_macro)]
|
#![feature(proc_macro_hygiene, decl_macro)]
|
||||||
|
|
||||||
#[macro_use]
|
extern crate pretty_env_logger;
|
||||||
extern crate rocket;
|
#[macro_use] extern crate log;
|
||||||
|
|
||||||
|
use futures::{FutureExt, StreamExt};
|
||||||
|
use tokio::sync::{mpsc, RwLock};
|
||||||
|
use warp::ws::{Message, WebSocket};
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
#[macro_use]
|
|
||||||
extern crate rocket_contrib;
|
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate anyhow;
|
extern crate anyhow;
|
||||||
|
@ -15,23 +18,51 @@ extern crate serde;
|
||||||
extern crate chrono;
|
extern crate chrono;
|
||||||
|
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use rocket::{
|
|
||||||
http::RawStr,
|
|
||||||
response::{NamedFile, Redirect},
|
|
||||||
State,
|
|
||||||
};
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
fmt::Display,
|
fmt::Display,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}},
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, SystemTime},
|
||||||
};
|
collections::HashMap};
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate dotenv;
|
extern crate dotenv;
|
||||||
|
|
||||||
use dotenv::dotenv;
|
use dotenv::dotenv;
|
||||||
|
|
||||||
|
use specs::{World, WorldExt, Builder, Component, VecStorage, Entity};
|
||||||
|
|
||||||
|
/* user state from chat sample, ecs-ify it */
|
||||||
|
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
|
||||||
|
type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, warp::Error>>>>>;
|
||||||
|
/* end chat sample state */
|
||||||
|
|
||||||
|
#[derive(Component, Debug)]
|
||||||
|
#[storage(VecStorage)]
|
||||||
|
struct Name {
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Component, Debug)]
|
||||||
|
#[storage(VecStorage)]
|
||||||
|
struct Textual {
|
||||||
|
value: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Component, Debug)]
|
||||||
|
#[storage(VecStorage)]
|
||||||
|
struct Contains {
|
||||||
|
children: Vec<Entity>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub struct Client {
|
||||||
|
pub user_id: usize,
|
||||||
|
pub topics: Vec<String>,
|
||||||
|
pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
#[get("/")]
|
#[get("/")]
|
||||||
fn index() -> Result<String, anyhow::Error> {
|
fn index() -> Result<String, anyhow::Error> {
|
||||||
|
@ -72,20 +103,173 @@ fn device_get(name: &RawStr) -> Result<String, anyhow::Error> {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
#[catch(404)]
|
#[catch(404)]
|
||||||
fn not_found() -> Option<NamedFile> {
|
fn not_found() -> Option<NamedFile> {
|
||||||
NamedFile::open("frontend/build/index.html").ok()
|
NamedFile::open("frontend/build/index.html").ok()
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
fn main() {
|
fn main() {
|
||||||
rocket::ignite()
|
let mut world = World::new();
|
||||||
|
world.register::<Name>();
|
||||||
|
world.register::<Textual>();
|
||||||
|
world.register::<Contains>();
|
||||||
|
*/
|
||||||
|
// todo: serve static files equiv to
|
||||||
|
/*
|
||||||
.mount(
|
.mount(
|
||||||
"/",
|
"/",
|
||||||
rocket_contrib::serve::StaticFiles::from("frontend/build"),
|
rocket_contrib::serve::StaticFiles::from("frontend/build"),
|
||||||
)
|
)
|
||||||
//.mount("/api", routes![log_new, log_get, devices_get, device_get])
|
//.mount("/api", routes![log_new, log_get, devices_get, device_get])
|
||||||
.register(catchers![not_found])
|
.register(catchers![not_found])
|
||||||
.launch();
|
|
||||||
|
|
||||||
println!("after");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
pretty_env_logger::init();
|
||||||
|
|
||||||
|
// Keep track of all connected users, key is usize, value
|
||||||
|
// is a websocket sender.
|
||||||
|
let users = Users::default();
|
||||||
|
// Turn our "state" into a new Filter...
|
||||||
|
let users = warp::any().map(move || users.clone());
|
||||||
|
|
||||||
|
// GET /chat -> websocket upgrade
|
||||||
|
let chat = warp::path("chat")
|
||||||
|
// The `ws()` filter will prepare Websocket handshake...
|
||||||
|
.and(warp::ws())
|
||||||
|
.and(users)
|
||||||
|
.map(|ws: warp::ws::Ws, users| {
|
||||||
|
// This will call our function if the handshake succeeds.
|
||||||
|
ws.on_upgrade(move |socket| user_connected(socket, users))
|
||||||
|
});
|
||||||
|
|
||||||
|
// GET / -> index html
|
||||||
|
let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
|
||||||
|
|
||||||
|
let routes = index.or(chat);
|
||||||
|
|
||||||
|
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn user_connected(ws: WebSocket, users: Users) {
|
||||||
|
// Use a counter to assign a new unique ID for this user.
|
||||||
|
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
eprintln!("new chat user: {}", my_id);
|
||||||
|
|
||||||
|
// Split the socket into a sender and receive of messages.
|
||||||
|
let (user_ws_tx, mut user_ws_rx) = ws.split();
|
||||||
|
|
||||||
|
// Use an unbounded channel to handle buffering and flushing of messages
|
||||||
|
// to the websocket...
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
tokio::task::spawn(rx.forward(user_ws_tx).map(|result| {
|
||||||
|
if let Err(e) = result {
|
||||||
|
eprintln!("websocket send error: {}", e);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Save the sender in our list of connected users.
|
||||||
|
users.write().await.insert(my_id, tx);
|
||||||
|
|
||||||
|
// Return a `Future` that is basically a state machine managing
|
||||||
|
// this specific user's connection.
|
||||||
|
|
||||||
|
// Make an extra clone to give to our disconnection handler...
|
||||||
|
let users2 = users.clone();
|
||||||
|
|
||||||
|
// Every time the user sends a message, broadcast it to
|
||||||
|
// all other users...
|
||||||
|
while let Some(result) = user_ws_rx.next().await {
|
||||||
|
let msg = match result {
|
||||||
|
Ok(msg) => msg,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("websocket error(uid={}): {}", my_id, e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
user_message(my_id, msg, &users).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// user_ws_rx stream will keep processing as long as the user stays
|
||||||
|
// connected. Once they disconnect, then...
|
||||||
|
user_disconnected(my_id, &users2).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn user_message(my_id: usize, msg: Message, users: &Users) {
|
||||||
|
// Skip any non-Text messages...
|
||||||
|
let msg = if let Ok(s) = msg.to_str() {
|
||||||
|
s
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_msg = format!("<User#{}>: {}", my_id, msg);
|
||||||
|
|
||||||
|
// New message from this user, send it to everyone else (except same uid)...
|
||||||
|
for (&uid, tx) in users.read().await.iter() {
|
||||||
|
if my_id != uid {
|
||||||
|
if let Err(_disconnected) = tx.send(Ok(Message::text(new_msg.clone()))) {
|
||||||
|
// The tx is disconnected, our `user_disconnected` code
|
||||||
|
// should be happening in another task, nothing more to
|
||||||
|
// do here.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn user_disconnected(my_id: usize, users: &Users) {
|
||||||
|
eprintln!("good bye user: {}", my_id);
|
||||||
|
|
||||||
|
// Stream closed up, so remove from the user list
|
||||||
|
users.write().await.remove(&my_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
static INDEX_HTML: &str = r#"<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<title>Warp Chat</title>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1>Warp chat</h1>
|
||||||
|
<div id="chat">
|
||||||
|
<p><em>Connecting...</em></p>
|
||||||
|
</div>
|
||||||
|
<input type="text" id="text" />
|
||||||
|
<button type="button" id="send">Send</button>
|
||||||
|
<script type="text/javascript">
|
||||||
|
const chat = document.getElementById('chat');
|
||||||
|
const text = document.getElementById('text');
|
||||||
|
const uri = 'ws://' + location.host + '/chat';
|
||||||
|
const ws = new WebSocket(uri);
|
||||||
|
function message(data) {
|
||||||
|
const line = document.createElement('p');
|
||||||
|
line.innerText = data;
|
||||||
|
chat.appendChild(line);
|
||||||
|
}
|
||||||
|
ws.onopen = function() {
|
||||||
|
chat.innerHTML = '<p><em>Connected!</em></p>';
|
||||||
|
};
|
||||||
|
ws.onmessage = function(msg) {
|
||||||
|
message(msg.data);
|
||||||
|
};
|
||||||
|
ws.onclose = function() {
|
||||||
|
chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
|
||||||
|
};
|
||||||
|
send.onclick = function() {
|
||||||
|
const msg = text.value;
|
||||||
|
ws.send(msg);
|
||||||
|
text.value = '';
|
||||||
|
message('<You>: ' + msg);
|
||||||
|
};
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"#;
|
Loading…
Reference in New Issue