diff --git a/ubw-sward/src/lib.rs b/ubw-sward/src/lib.rs index 4aef5f5..81b6060 100644 --- a/ubw-sward/src/lib.rs +++ b/ubw-sward/src/lib.rs @@ -3,6 +3,5 @@ #![deny(clippy::expect_used)] pub mod http; -pub mod tcp; -pub mod udp; +pub mod socket_stream; pub mod utils; diff --git a/ubw-sward/src/udp/counter.rs b/ubw-sward/src/socket_stream/counter.rs similarity index 73% rename from ubw-sward/src/udp/counter.rs rename to ubw-sward/src/socket_stream/counter.rs index 1678fed..dbf734a 100644 --- a/ubw-sward/src/udp/counter.rs +++ b/ubw-sward/src/socket_stream/counter.rs @@ -1,25 +1,25 @@ use std::sync::atomic::AtomicU64; #[derive(Default, Clone, Copy)] -pub struct UdpFloodCounter { +pub struct SocketStreamCounter { pub sent_bytes: u64, pub sent_packets: u64, pub error: u64, } #[derive(Default)] -pub struct AtomicUdpFloodCounter { +pub struct AtomicSocketStreamCounter { pub sent_bytes: AtomicU64, pub sent_packets: AtomicU64, pub error: AtomicU64, } -impl AtomicUdpFloodCounter { - pub fn read(&self) -> UdpFloodCounter { - UdpFloodCounter { +impl AtomicSocketStreamCounter { + pub fn read(&self) -> SocketStreamCounter { + SocketStreamCounter { sent_bytes: self.sent_bytes.load(std::sync::atomic::Ordering::Relaxed), sent_packets: self.sent_packets.load(std::sync::atomic::Ordering::Relaxed), error: self.error.load(std::sync::atomic::Ordering::Relaxed), } } -} +} \ No newline at end of file diff --git a/ubw-sward/src/udp/generator.rs b/ubw-sward/src/socket_stream/generator.rs similarity index 50% rename from ubw-sward/src/udp/generator.rs rename to ubw-sward/src/socket_stream/generator.rs index ef77089..192b11f 100644 --- a/ubw-sward/src/udp/generator.rs +++ b/ubw-sward/src/socket_stream/generator.rs @@ -1,22 +1,23 @@ -use crate::udp::{BoxedUdpRequest, SizedUdpRequest}; use rand::RngCore; -pub struct RandomUdpWorkloadGenerator { +use crate::socket_stream::{BoxedStreamWorkload, SizedStreamWorkload}; + +pub struct RandomWorkloadGenerator { rng: Rng, } -impl RandomUdpWorkloadGenerator { +impl RandomWorkloadGenerator { pub fn new(rng: Rng) -> Self { Self { rng } } - pub fn generate_sized(&mut self) -> SizedUdpRequest { + pub fn generate_sized(&mut self) -> SizedStreamWorkload { let mut buf = [0u8; SIZE]; self.rng.fill_bytes(&mut buf); - SizedUdpRequest { bytes: buf } + SizedStreamWorkload { bytes: buf } } - pub fn generate_boxed(&mut self, size: usize) -> BoxedUdpRequest { + pub fn generate_boxed(&mut self, size: usize) -> BoxedStreamWorkload { let mut buf = vec![0u8; size].into_boxed_slice(); self.rng.fill_bytes(&mut buf); - BoxedUdpRequest(buf) + BoxedStreamWorkload(buf) } } diff --git a/ubw-sward/src/udp/integrated.rs b/ubw-sward/src/socket_stream/integrated.rs similarity index 51% rename from ubw-sward/src/udp/integrated.rs rename to ubw-sward/src/socket_stream/integrated.rs index 2033b1c..9c7cae3 100644 --- a/ubw-sward/src/udp/integrated.rs +++ b/ubw-sward/src/socket_stream/integrated.rs @@ -1,32 +1,28 @@ -use crate::udp::counter::AtomicUdpFloodCounter; -use crate::udp::generator::RandomUdpWorkloadGenerator; -use crate::udp::random_flood::UdpSward; -use crate::udp::{BoxedUdpRequest, SizedUdpRequest}; use rand::Rng; use std::net::SocketAddr; use std::sync::{Arc, atomic}; -use tokio::net::UdpSocket; -use tower::{Service, ServiceExt}; use wyrand::WyRand; -pub struct IntegratedUdpSward { - udp_sward: UdpSward, - random_udp_workload_generator: RandomUdpWorkloadGenerator, - counter: Arc, +use crate::socket_stream::StreamSward; +use crate::socket_stream::counter::AtomicSocketStreamCounter; +use crate::socket_stream::generator::RandomWorkloadGenerator; + +pub struct IntegratedSocketStreamSward { + sward: Swd, + random_workload_generator: RandomWorkloadGenerator, + counter: Arc, } -impl IntegratedUdpSward { +impl IntegratedSocketStreamSward { pub async fn new(target: SocketAddr) -> Result { - let random_core = WyRand::new(rand::rng().random()); - let generator = RandomUdpWorkloadGenerator::new(random_core); - let udp_socket = UdpSocket::bind("0.0.0.0:0").await?; - udp_socket.connect(target).await?; - let udp_socket = Arc::new(udp_socket); - let sward = UdpSward::new(udp_socket); + let sward = Swd::connect(target).await?; + let random_workload_generator = + RandomWorkloadGenerator::new(WyRand::new(rand::rng().random())); + let counter = Arc::new(AtomicSocketStreamCounter::default()); Ok(Self { - random_udp_workload_generator: generator, - udp_sward: sward, - counter: Arc::new(AtomicUdpFloodCounter::default()), + sward, + random_workload_generator, + counter, }) } pub async fn oneshot_array(&mut self) -> Result { @@ -36,11 +32,8 @@ impl IntegratedUdpSward { self.counter .sent_packets .fetch_add(1, atomic::Ordering::Relaxed); - let content = self.random_udp_workload_generator.generate_sized::(); - >>::ready(&mut self.udp_sward) - .await? - .call(content) - .await + let content = self.random_workload_generator.generate_sized::(); + self.sward.send_sized(content).await } pub async fn oneshot_dynamic(&mut self, size: usize) -> Result { self.counter @@ -49,11 +42,8 @@ impl IntegratedUdpSward { self.counter .sent_packets .fetch_add(1, atomic::Ordering::Relaxed); - let content = self.random_udp_workload_generator.generate_boxed(size); - >::ready(&mut self.udp_sward) - .await? - .call(content) - .await + let content = self.random_workload_generator.generate_boxed(size); + self.sward.send_boxed(content).await } pub async fn run_with_signal( mut self, @@ -73,7 +63,7 @@ impl IntegratedUdpSward { } } } - pub fn get_counter(&self) -> Arc { + pub fn get_counter(&self) -> Arc { self.counter.clone() } } diff --git a/ubw-sward/src/socket_stream/mod.rs b/ubw-sward/src/socket_stream/mod.rs new file mode 100644 index 0000000..962e4af --- /dev/null +++ b/ubw-sward/src/socket_stream/mod.rs @@ -0,0 +1,33 @@ +use std::pin::Pin; + +pub mod counter; +pub mod generator; +pub mod integrated; +pub mod udp; + +#[derive(Clone, Copy)] +pub struct SizedStreamWorkload { + pub bytes: [u8; N], +} + +#[derive(Clone)] +pub struct BoxedStreamWorkload(pub Box<[u8]>); + +pub type StreamSendFuture = Pin> + Send>>; + +pub trait StreamSward { + fn connect( + addr: std::net::SocketAddr, + ) -> impl Future> + Send + where + Self: Sized; + fn add_request_count(&mut self); + fn send_sized( + &self, + workload: SizedStreamWorkload, + ) -> impl Future> + Send + 'static; + fn send_boxed( + &self, + workload: BoxedStreamWorkload, + ) -> impl Future> + Send + 'static; +} diff --git a/ubw-sward/src/socket_stream/udp.rs b/ubw-sward/src/socket_stream/udp.rs new file mode 100644 index 0000000..3bc9cb4 --- /dev/null +++ b/ubw-sward/src/socket_stream/udp.rs @@ -0,0 +1,86 @@ +use std::{ + sync::Arc, + task::{Context, Poll}, +}; +use tokio::net::UdpSocket; +use tower::Service; + +use crate::socket_stream::{ + BoxedStreamWorkload, SizedStreamWorkload, StreamSendFuture, StreamSward, +}; + +pub struct UdpSward { + socket: Arc, + sent_count: usize, +} + +impl UdpSward { + pub fn new(socket: Arc) -> Self { + Self { + socket, + sent_count: 0, + } + } +} + +impl StreamSward for UdpSward { + async fn connect(addr: std::net::SocketAddr) -> Result { + let socket = UdpSocket::bind("0.0.0.0:0").await?; + socket.connect(addr).await?; + let socket = Arc::new(socket); + Ok(Self { + socket, + sent_count: 0, + }) + } + fn add_request_count(&mut self) { + self.sent_count += 1; + } + fn send_sized( + &self, + workload: SizedStreamWorkload, + ) -> impl Future> + Send + 'static { + let socket = self.socket.clone(); + async move { socket.send(&workload.bytes).await } + } + fn send_boxed( + &self, + workload: BoxedStreamWorkload, + ) -> impl Future> + Send + 'static { + let socket = self.socket.clone(); + async move { socket.send(&workload.0).await } + } +} + +impl Service> for UdpSward { + type Response = usize; + type Error = std::io::Error; + type Future = StreamSendFuture; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: SizedStreamWorkload) -> Self::Future { + self.add_request_count(); + Box::pin(self.send_sized(req)) + } +} + +impl Service for UdpSward { + type Response = usize; + type Error = std::io::Error; + type Future = StreamSendFuture; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: BoxedStreamWorkload) -> Self::Future { + self.add_request_count(); + Box::pin(self.send_boxed(req)) + } +} + +impl tower::load::Load for UdpSward { + type Metric = usize; + fn load(&self) -> Self::Metric { + self.sent_count + } +} diff --git a/ubw-sward/src/tcp/ack.rs b/ubw-sward/src/tcp/ack.rs deleted file mode 100644 index 8b13789..0000000 --- a/ubw-sward/src/tcp/ack.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/ubw-sward/src/tcp/mod.rs b/ubw-sward/src/tcp/mod.rs deleted file mode 100644 index 5903612..0000000 --- a/ubw-sward/src/tcp/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod ack; -pub mod syn; diff --git a/ubw-sward/src/tcp/syn.rs b/ubw-sward/src/tcp/syn.rs deleted file mode 100644 index 8b13789..0000000 --- a/ubw-sward/src/tcp/syn.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/ubw-sward/src/udp/mod.rs b/ubw-sward/src/udp/mod.rs deleted file mode 100644 index b278e26..0000000 --- a/ubw-sward/src/udp/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -mod counter; -pub mod generator; -pub mod integrated; -pub mod random_flood; - -pub use generator::RandomUdpWorkloadGenerator; -pub use integrated::IntegratedUdpSward; -pub use random_flood::{BoxedUdpRequest, SizedUdpRequest, UdpSward}; diff --git a/ubw-sward/src/udp/random_flood.rs b/ubw-sward/src/udp/random_flood.rs deleted file mode 100644 index 370dfbd..0000000 --- a/ubw-sward/src/udp/random_flood.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use tokio::net::UdpSocket; -use tower::Service; - -#[derive(Clone)] -pub struct UdpSward { - socket: Arc, - sent_count: usize, -} - -impl UdpSward { - pub fn new(socket: Arc) -> Self { - Self { - socket, - sent_count: 0, - } - } -} - -#[derive(Clone, Copy)] -pub struct SizedUdpRequest { - pub bytes: [u8; N], -} - -impl Service> for UdpSward { - type Response = usize; - type Error = std::io::Error; - type Future = Pin> + Send>>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: SizedUdpRequest) -> Self::Future { - self.sent_count += 1; - let socket = self.socket.clone(); - Box::pin(async move { socket.send(&req.bytes).await }) - } -} - -#[derive(Clone)] -pub struct BoxedUdpRequest(pub Box<[u8]>); - -impl Service for UdpSward { - type Response = usize; - type Error = std::io::Error; - type Future = Pin> + Send>>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: BoxedUdpRequest) -> Self::Future { - self.sent_count += 1; - let socket = self.socket.clone(); - Box::pin(async move { socket.send(&req.0).await }) - } -} - -impl tower::load::Load for UdpSward { - type Metric = usize; - fn load(&self) -> Self::Metric { - self.sent_count - } -}