initial commit

This commit is contained in:
Viv Lim 2022-07-12 01:16:24 -07:00
commit 88d941d212
5 changed files with 1564 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
.vscode/*

1220
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

17
Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "gifboy-loadtester"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
prometheus_exporter = "0.8"
crossterm = "0.23"
tui = "0.18.0"
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
reqwest = { version = "0.11", features = ["stream"] }
futures-util = {version = "0.3"}
num_cpus = "1.13.1"
bytesize = "1.1.0"

195
src/main.rs Normal file
View File

@ -0,0 +1,195 @@
use std::{io, sync::{Arc, atomic::{AtomicU32, Ordering, AtomicBool, AtomicUsize}}, time::Duration};
use futures_util::{StreamExt, future};
use tui::{backend::CrosstermBackend, Terminal};
use tokio::{sync::{mpsc, Mutex}, task::JoinHandle};
use tokio::join;
mod ui;
use tui::{
backend::Backend,
layout::{Constraint, Direction, Layout},
widgets::{Block, Borders},
Frame,
};
#[derive(Clone)]
pub struct State {
// should contain only arcs or things that we don't expect to change. the ui clones it
num_requests: Arc<AtomicUsize>,
num_tasks: Arc<AtomicUsize>,
num_connections_open: Arc<AtomicU32>,
num_tasks_errored: Arc<AtomicU32>,
num_bytes_downloaded: Arc<AtomicU32>,
shutting_down: Arc<AtomicBool>,
url: String,
log_send: mpsc::Sender<String>,
}
fn main() {
tokio::runtime::Builder::new_current_thread()
.worker_threads(num_cpus::get_physical() - 1) // leave a core free
.enable_all()
.build()
.unwrap()
.block_on(async_main())
}
async fn async_main() {
// Some simple CLI args requirements...
let url = match std::env::args().nth(1) {
Some(url) => url,
None => {
println!("No CLI URL provided, exiting.");
return;
}
};
let (log_send, log_recv) = mpsc::channel::<String>(32);
let state = State {
num_requests: Default::default(),
num_tasks: Default::default(),
num_connections_open: Default::default(),
num_tasks_errored: Default::default(),
num_bytes_downloaded: Default::default(),
shutting_down: Default::default(),
url,
log_send
};
let tui_handle = tokio::task::spawn(ui::tui(state.clone(), log_recv));
let download_task_manager = tokio::task::spawn(download_task_manager(state.clone()));
download_task_manager.await.unwrap();
// if something happened here then we are shutting down.
state.shutting_down.store(true, Ordering::Relaxed);
tui_handle.await.unwrap();
}
async fn download_task_manager(state: State) -> anyhow::Result<()> {
let download_tasks = Arc::new(tokio::sync::RwLock::new(vec![]));
// Two tasks - one adds new download tasks if there aren't enough, one clears out finished ones.
let create_download_tasks = download_tasks.clone();
let create_state = state.clone();
let create = tokio::spawn(async {
let download_tasks = create_download_tasks;
let state = create_state;
while !state.shutting_down.load(Ordering::Relaxed) {
let num_download_tasks = {
download_tasks.read().await.len()
};
if num_download_tasks < state.num_requests.load(Ordering::Relaxed) {
// take the write lock then iterate until we have enough
let mut download_tasks = download_tasks.write().await;
while download_tasks.len() < state.num_requests.load(Ordering::Relaxed) {
let new_id = download_tasks.len();
download_tasks.push(tokio::task::spawn(download_task(state.clone(), new_id)));
state.num_tasks.store(download_tasks.len(), Ordering::Release);
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let destroy_download_tasks = download_tasks.clone();
let destroy_state = state.clone();
let destroy = tokio::spawn(async {
let download_tasks = destroy_download_tasks;
let state = destroy_state;
let log_send = state.log_send.clone();
while !state.shutting_down.load(Ordering::Relaxed) {
{
// take a write lock to await them all
let mut download_tasks = download_tasks.write().await;
// create a timeout task for how long we want to spend clearing out threads, total.
let mut timeout_task_vec = vec![tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok::<TaskKind, anyhow::Error>(TaskKind::TimeoutTask)
})];
loop {
let download_tasks_and_timeout: Vec<&mut JoinHandle<anyhow::Result<TaskKind>>> = download_tasks.iter_mut().chain(timeout_task_vec.iter_mut()).collect();
match future::select_all(download_tasks_and_timeout).await {
(Ok(Ok(TaskKind::DownloadTask)), index, remaining) => {
// no problem just clean up the task
download_tasks.remove(index);
},
(Ok(Err(e_inner)), index, remaining) => {
// clean up the task and log its error
download_tasks.remove(index);
log_send.send(format!("Unhandled error in a download task (index {}): {}", index, e_inner)).await.unwrap();
state.num_tasks_errored.fetch_add(1, Ordering::SeqCst);
},
(Err(e), index, remaining) => {
log_send.send(format!("Error waiting for any download tasks to finish: {} (index {}?)", e, index)).await.unwrap();
},
(Ok(Ok(TaskKind::TimeoutTask)), _, _) => {
// time's up, give up the write lock.
break;
}
}
state.num_tasks.store(download_tasks.len(), Ordering::Release);
}
}
// Do this outside of the section we hold the write lock
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let (create_res, destroy_res) = join!(create, destroy);
create_res?;
destroy_res?;
//future::select_all(iter)
//state.num_tasks.store(download_tasks.len(), Ordering::Relaxed);
Ok::<(), anyhow::Error>(())
}
async fn download_task(state: State, load_id: usize) -> anyhow::Result<TaskKind> {
let log_send = state.log_send.clone();
match reqwest::get(&state.url).await {
Ok(response) => {
state.num_connections_open.fetch_add(1, Ordering::SeqCst);
let mut stream = response.bytes_stream();
while load_id < state.num_requests.load(Ordering::Relaxed) && !state.shutting_down.load(Ordering::Relaxed) {
match stream.next().await {
Some(Ok(bytes)) => {
state.num_bytes_downloaded.fetch_add(bytes.len().try_into()?, Ordering::Relaxed);
},
Some(Err(e)) => {
log_send.send(format!("Task id {} errored while reading: {}", load_id, e)).await?;
// do this second just in case the await fails. if the await fails or we otherwise exit, we'll add this *outside*
state.num_tasks_errored.fetch_add(1, Ordering::SeqCst);
break;
},
None => {
state.num_tasks_errored.fetch_add(1, Ordering::SeqCst);
log_send.send(format!("Task id {} ran out of data.", load_id)).await?;
break;
}
}
}
state.num_connections_open.fetch_sub(1, Ordering::SeqCst);
log_send.send(format!("Task id {} exiting normally.", load_id)).await?;
},
Err(e) => {
state.num_tasks_errored.fetch_add(1, Ordering::SeqCst);
log_send.send(format!("Task id {} couldn't connect: {}", load_id, e)).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
},
}
Ok::<TaskKind, anyhow::Error>(TaskKind::DownloadTask)
}
enum TaskKind {
DownloadTask,
TimeoutTask
}

130
src/ui.rs Normal file
View File

@ -0,0 +1,130 @@
use bytesize::ByteSize;
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, poll},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use tokio::sync::mpsc;
use std::{error::Error, io::{self, Write}, sync::atomic::Ordering, time::Duration, collections::VecDeque};
use tui::{
backend::{Backend, CrosstermBackend},
layout::{Constraint, Direction, Layout, Corner},
style::{Color, Modifier, Style},
text::{Span, Spans, Text},
widgets::{Block, Borders, List, ListItem, Paragraph},
Frame, Terminal, terminal,
};
//use unicode_width::UnicodeWidthStr;
use crate::State;
struct UiState {
state: State,
log_messages: VecDeque<String>,
}
pub async fn tui(state: State, log_recv: mpsc::Receiver<String>) -> anyhow::Result<()> {
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let ui_state = UiState {
state: state,
log_messages: Default::default()
};
let res = tokio::task::spawn_blocking(move || run_app_outer(terminal, ui_state, log_recv)).await?;
if let Err(err) = res {
println!("{:?}", err)
}
Ok::<(), anyhow::Error>(())
}
fn run_app_outer<B: Backend + Write>(mut terminal: Terminal<B>, mut ui_state: UiState, mut log_recv: mpsc::Receiver<String>) -> anyhow::Result<()> {
let result = run_app(&mut terminal, ui_state, log_recv);
// restore terminal
disable_raw_mode().unwrap();
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
).unwrap();
terminal.show_cursor().unwrap();
result
}
fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut ui_state: UiState, mut log_recv: mpsc::Receiver<String>) -> anyhow::Result<()> {
while !ui_state.state.shutting_down.load(Ordering::Relaxed) {
while let Ok(msg) = log_recv.try_recv() {
ui_state.log_messages.push_front(msg);
}
terminal.draw(|f| ui(f, &ui_state))?;
if poll(Duration::from_millis(50))? {
if let Event::Key(key) = event::read()? {
match key.code {
KeyCode::Char('q') => {
ui_state.state.shutting_down.store(true, Ordering::Relaxed);
},
KeyCode::Up => {
ui_state.state.num_requests.fetch_add(1, Ordering::Relaxed);
},
KeyCode::Down => {
let current = ui_state.state.num_requests.load(Ordering::Relaxed);
if current > 0 {
ui_state.state.num_requests.store(current - 1, Ordering::Relaxed);
}
},
KeyCode::PageUp => {
ui_state.state.num_requests.fetch_add(10, Ordering::Relaxed);
},
KeyCode::PageDown => {
let current = ui_state.state.num_requests.load(Ordering::Relaxed);
if current > 10 {
ui_state.state.num_requests.store(current - 10, Ordering::Relaxed);
}
},
_ => ()
}
}
}
}
Ok(())
}
fn ui<B: Backend>(f: &mut Frame<B>, ui_state: &UiState) {
let layout = Layout::default()
.direction(Direction::Vertical)
.margin(2)
.constraints(
[
Constraint::Length(2),
Constraint::Min(10),
]
.as_ref(),
)
.split(f.size());
let mut text = Text::from(Spans::from(
format!("target # requests: {} | {} tasks have {} open conns | errors: {} | received: {} (q: quit, up/down: add/remove requests, pageup/down: more)",
ui_state.state.num_requests.load(Ordering::Relaxed),
ui_state.state.num_tasks.load(Ordering::Relaxed),
ui_state.state.num_connections_open.load(Ordering::Relaxed),
ui_state.state.num_tasks_errored.load(Ordering::Relaxed),
ByteSize::b(ui_state.state.num_bytes_downloaded.load(Ordering::Relaxed).into()),
)));
f.render_widget(Paragraph::new(text), layout[0]);
let log_list_items: Vec<ListItem> = ui_state.log_messages.iter().map(|msg| ListItem::new(Spans::from(msg.as_ref()))).collect();
let log_list = List::new(log_list_items)
.block(Block::default().borders(Borders::ALL).title("Log"))
.start_corner(Corner::BottomLeft);
f.render_widget(log_list, layout[1]);
}