refactor project to support tcp

This commit is contained in:
2025-10-03 21:03:18 +09:00
parent 602411dbdb
commit d2db279476
11 changed files with 155 additions and 123 deletions

View File

@@ -3,6 +3,5 @@
#![deny(clippy::expect_used)] #![deny(clippy::expect_used)]
pub mod http; pub mod http;
pub mod tcp; pub mod socket_stream;
pub mod udp;
pub mod utils; pub mod utils;

View File

@@ -1,25 +1,25 @@
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
#[derive(Default, Clone, Copy)] #[derive(Default, Clone, Copy)]
pub struct UdpFloodCounter { pub struct SocketStreamCounter {
pub sent_bytes: u64, pub sent_bytes: u64,
pub sent_packets: u64, pub sent_packets: u64,
pub error: u64, pub error: u64,
} }
#[derive(Default)] #[derive(Default)]
pub struct AtomicUdpFloodCounter { pub struct AtomicSocketStreamCounter {
pub sent_bytes: AtomicU64, pub sent_bytes: AtomicU64,
pub sent_packets: AtomicU64, pub sent_packets: AtomicU64,
pub error: AtomicU64, pub error: AtomicU64,
} }
impl AtomicUdpFloodCounter { impl AtomicSocketStreamCounter {
pub fn read(&self) -> UdpFloodCounter { pub fn read(&self) -> SocketStreamCounter {
UdpFloodCounter { SocketStreamCounter {
sent_bytes: self.sent_bytes.load(std::sync::atomic::Ordering::Relaxed), sent_bytes: self.sent_bytes.load(std::sync::atomic::Ordering::Relaxed),
sent_packets: self.sent_packets.load(std::sync::atomic::Ordering::Relaxed), sent_packets: self.sent_packets.load(std::sync::atomic::Ordering::Relaxed),
error: self.error.load(std::sync::atomic::Ordering::Relaxed), error: self.error.load(std::sync::atomic::Ordering::Relaxed),
} }
} }
} }

View File

@@ -1,22 +1,23 @@
use crate::udp::{BoxedUdpRequest, SizedUdpRequest};
use rand::RngCore; use rand::RngCore;
pub struct RandomUdpWorkloadGenerator<Rng: RngCore> { use crate::socket_stream::{BoxedStreamWorkload, SizedStreamWorkload};
pub struct RandomWorkloadGenerator<Rng: RngCore> {
rng: Rng, rng: Rng,
} }
impl<Rng: RngCore> RandomUdpWorkloadGenerator<Rng> { impl<Rng: RngCore> RandomWorkloadGenerator<Rng> {
pub fn new(rng: Rng) -> Self { pub fn new(rng: Rng) -> Self {
Self { rng } Self { rng }
} }
pub fn generate_sized<const SIZE: usize>(&mut self) -> SizedUdpRequest<SIZE> { pub fn generate_sized<const SIZE: usize>(&mut self) -> SizedStreamWorkload<SIZE> {
let mut buf = [0u8; SIZE]; let mut buf = [0u8; SIZE];
self.rng.fill_bytes(&mut buf); self.rng.fill_bytes(&mut buf);
SizedUdpRequest { bytes: buf } SizedStreamWorkload { bytes: buf }
} }
pub fn generate_boxed(&mut self, size: usize) -> BoxedUdpRequest { pub fn generate_boxed(&mut self, size: usize) -> BoxedStreamWorkload {
let mut buf = vec![0u8; size].into_boxed_slice(); let mut buf = vec![0u8; size].into_boxed_slice();
self.rng.fill_bytes(&mut buf); self.rng.fill_bytes(&mut buf);
BoxedUdpRequest(buf) BoxedStreamWorkload(buf)
} }
} }

View File

@@ -1,32 +1,28 @@
use crate::udp::counter::AtomicUdpFloodCounter;
use crate::udp::generator::RandomUdpWorkloadGenerator;
use crate::udp::random_flood::UdpSward;
use crate::udp::{BoxedUdpRequest, SizedUdpRequest};
use rand::Rng; use rand::Rng;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, atomic}; use std::sync::{Arc, atomic};
use tokio::net::UdpSocket;
use tower::{Service, ServiceExt};
use wyrand::WyRand; use wyrand::WyRand;
pub struct IntegratedUdpSward { use crate::socket_stream::StreamSward;
udp_sward: UdpSward, use crate::socket_stream::counter::AtomicSocketStreamCounter;
random_udp_workload_generator: RandomUdpWorkloadGenerator<WyRand>, use crate::socket_stream::generator::RandomWorkloadGenerator;
counter: Arc<AtomicUdpFloodCounter>,
pub struct IntegratedSocketStreamSward<Swd: StreamSward> {
sward: Swd,
random_workload_generator: RandomWorkloadGenerator<WyRand>,
counter: Arc<AtomicSocketStreamCounter>,
} }
impl IntegratedUdpSward { impl<Swd: StreamSward> IntegratedSocketStreamSward<Swd> {
pub async fn new(target: SocketAddr) -> Result<Self, std::io::Error> { pub async fn new(target: SocketAddr) -> Result<Self, std::io::Error> {
let random_core = WyRand::new(rand::rng().random()); let sward = Swd::connect(target).await?;
let generator = RandomUdpWorkloadGenerator::new(random_core); let random_workload_generator =
let udp_socket = UdpSocket::bind("0.0.0.0:0").await?; RandomWorkloadGenerator::new(WyRand::new(rand::rng().random()));
udp_socket.connect(target).await?; let counter = Arc::new(AtomicSocketStreamCounter::default());
let udp_socket = Arc::new(udp_socket);
let sward = UdpSward::new(udp_socket);
Ok(Self { Ok(Self {
random_udp_workload_generator: generator, sward,
udp_sward: sward, random_workload_generator,
counter: Arc::new(AtomicUdpFloodCounter::default()), counter,
}) })
} }
pub async fn oneshot_array<const N: usize>(&mut self) -> Result<usize, std::io::Error> { pub async fn oneshot_array<const N: usize>(&mut self) -> Result<usize, std::io::Error> {
@@ -36,11 +32,8 @@ impl IntegratedUdpSward {
self.counter self.counter
.sent_packets .sent_packets
.fetch_add(1, atomic::Ordering::Relaxed); .fetch_add(1, atomic::Ordering::Relaxed);
let content = self.random_udp_workload_generator.generate_sized::<N>(); let content = self.random_workload_generator.generate_sized::<N>();
<UdpSward as ServiceExt<SizedUdpRequest<N>>>::ready(&mut self.udp_sward) self.sward.send_sized(content).await
.await?
.call(content)
.await
} }
pub async fn oneshot_dynamic(&mut self, size: usize) -> Result<usize, std::io::Error> { pub async fn oneshot_dynamic(&mut self, size: usize) -> Result<usize, std::io::Error> {
self.counter self.counter
@@ -49,11 +42,8 @@ impl IntegratedUdpSward {
self.counter self.counter
.sent_packets .sent_packets
.fetch_add(1, atomic::Ordering::Relaxed); .fetch_add(1, atomic::Ordering::Relaxed);
let content = self.random_udp_workload_generator.generate_boxed(size); let content = self.random_workload_generator.generate_boxed(size);
<UdpSward as ServiceExt<BoxedUdpRequest>>::ready(&mut self.udp_sward) self.sward.send_boxed(content).await
.await?
.call(content)
.await
} }
pub async fn run_with_signal<const SIZE: usize>( pub async fn run_with_signal<const SIZE: usize>(
mut self, mut self,
@@ -73,7 +63,7 @@ impl IntegratedUdpSward {
} }
} }
} }
pub fn get_counter(&self) -> Arc<AtomicUdpFloodCounter> { pub fn get_counter(&self) -> Arc<AtomicSocketStreamCounter> {
self.counter.clone() self.counter.clone()
} }
} }

View File

@@ -0,0 +1,33 @@
use std::pin::Pin;
pub mod counter;
pub mod generator;
pub mod integrated;
pub mod udp;
#[derive(Clone, Copy)]
pub struct SizedStreamWorkload<const N: usize> {
pub bytes: [u8; N],
}
#[derive(Clone)]
pub struct BoxedStreamWorkload(pub Box<[u8]>);
pub type StreamSendFuture = Pin<Box<dyn Future<Output = Result<usize, std::io::Error>> + Send>>;
pub trait StreamSward {
fn connect(
addr: std::net::SocketAddr,
) -> impl Future<Output = Result<Self, std::io::Error>> + Send
where
Self: Sized;
fn add_request_count(&mut self);
fn send_sized<const N: usize>(
&self,
workload: SizedStreamWorkload<N>,
) -> impl Future<Output = Result<usize, std::io::Error>> + Send + 'static;
fn send_boxed(
&self,
workload: BoxedStreamWorkload,
) -> impl Future<Output = Result<usize, std::io::Error>> + Send + 'static;
}

View File

@@ -0,0 +1,86 @@
use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::net::UdpSocket;
use tower::Service;
use crate::socket_stream::{
BoxedStreamWorkload, SizedStreamWorkload, StreamSendFuture, StreamSward,
};
pub struct UdpSward {
socket: Arc<UdpSocket>,
sent_count: usize,
}
impl UdpSward {
pub fn new(socket: Arc<UdpSocket>) -> Self {
Self {
socket,
sent_count: 0,
}
}
}
impl StreamSward for UdpSward {
async fn connect(addr: std::net::SocketAddr) -> Result<Self, std::io::Error> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket.connect(addr).await?;
let socket = Arc::new(socket);
Ok(Self {
socket,
sent_count: 0,
})
}
fn add_request_count(&mut self) {
self.sent_count += 1;
}
fn send_sized<const N: usize>(
&self,
workload: SizedStreamWorkload<N>,
) -> impl Future<Output = Result<usize, std::io::Error>> + Send + 'static {
let socket = self.socket.clone();
async move { socket.send(&workload.bytes).await }
}
fn send_boxed(
&self,
workload: BoxedStreamWorkload,
) -> impl Future<Output = Result<usize, std::io::Error>> + Send + 'static {
let socket = self.socket.clone();
async move { socket.send(&workload.0).await }
}
}
impl<const N: usize> Service<SizedStreamWorkload<N>> for UdpSward {
type Response = usize;
type Error = std::io::Error;
type Future = StreamSendFuture;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: SizedStreamWorkload<N>) -> Self::Future {
self.add_request_count();
Box::pin(self.send_sized(req))
}
}
impl Service<BoxedStreamWorkload> for UdpSward {
type Response = usize;
type Error = std::io::Error;
type Future = StreamSendFuture;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BoxedStreamWorkload) -> Self::Future {
self.add_request_count();
Box::pin(self.send_boxed(req))
}
}
impl tower::load::Load for UdpSward {
type Metric = usize;
fn load(&self) -> Self::Metric {
self.sent_count
}
}

View File

@@ -1 +0,0 @@

View File

@@ -1,2 +0,0 @@
pub mod ack;
pub mod syn;

View File

@@ -1 +0,0 @@

View File

@@ -1,8 +0,0 @@
mod counter;
pub mod generator;
pub mod integrated;
pub mod random_flood;
pub use generator::RandomUdpWorkloadGenerator;
pub use integrated::IntegratedUdpSward;
pub use random_flood::{BoxedUdpRequest, SizedUdpRequest, UdpSward};

View File

@@ -1,65 +0,0 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::net::UdpSocket;
use tower::Service;
#[derive(Clone)]
pub struct UdpSward {
socket: Arc<UdpSocket>,
sent_count: usize,
}
impl UdpSward {
pub fn new(socket: Arc<UdpSocket>) -> Self {
Self {
socket,
sent_count: 0,
}
}
}
#[derive(Clone, Copy)]
pub struct SizedUdpRequest<const N: usize> {
pub bytes: [u8; N],
}
impl<const N: usize> Service<SizedUdpRequest<N>> for UdpSward {
type Response = usize;
type Error = std::io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: SizedUdpRequest<N>) -> Self::Future {
self.sent_count += 1;
let socket = self.socket.clone();
Box::pin(async move { socket.send(&req.bytes).await })
}
}
#[derive(Clone)]
pub struct BoxedUdpRequest(pub Box<[u8]>);
impl Service<BoxedUdpRequest> for UdpSward {
type Response = usize;
type Error = std::io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BoxedUdpRequest) -> Self::Future {
self.sent_count += 1;
let socket = self.socket.clone();
Box::pin(async move { socket.send(&req.0).await })
}
}
impl tower::load::Load for UdpSward {
type Metric = usize;
fn load(&self) -> Self::Metric {
self.sent_count
}
}