Files
ubw-network/ubw-sward/src/udp/integrated.rs

80 lines
2.9 KiB
Rust

use crate::udp::counter::AtomicUdpFloodCounter;
use crate::udp::generator::RandomUdpWorkloadGenerator;
use crate::udp::random_flood::UdpSward;
use crate::udp::{BoxedUdpRequest, SizedUdpRequest};
use rand::Rng;
use std::net::SocketAddr;
use std::sync::{Arc, atomic};
use tokio::net::UdpSocket;
use tower::{Service, ServiceExt};
use wyrand::WyRand;
pub struct IntegratedUdpSward {
udp_sward: UdpSward,
random_udp_workload_generator: RandomUdpWorkloadGenerator<WyRand>,
counter: Arc<AtomicUdpFloodCounter>,
}
impl IntegratedUdpSward {
pub async fn new(target: SocketAddr) -> Result<Self, std::io::Error> {
let random_core = WyRand::new(rand::rng().random());
let generator = RandomUdpWorkloadGenerator::new(random_core);
let udp_socket = UdpSocket::bind("0.0.0.0:0").await?;
udp_socket.connect(target).await?;
let udp_socket = Arc::new(udp_socket);
let sward = UdpSward::new(udp_socket);
Ok(Self {
random_udp_workload_generator: generator,
udp_sward: sward,
counter: Arc::new(AtomicUdpFloodCounter::default()),
})
}
pub async fn oneshot_array<const N: usize>(&mut self) -> Result<usize, std::io::Error> {
self.counter
.sent_bytes
.fetch_add(N as u64, atomic::Ordering::Relaxed);
self.counter
.sent_packets
.fetch_add(1, atomic::Ordering::Relaxed);
let content = self.random_udp_workload_generator.generate_sized::<N>();
<UdpSward as ServiceExt<SizedUdpRequest<N>>>::ready(&mut self.udp_sward)
.await?
.call(content)
.await
}
pub async fn oneshot_dynamic(&mut self, size: usize) -> Result<usize, std::io::Error> {
self.counter
.sent_bytes
.fetch_add(size as u64, atomic::Ordering::Relaxed);
self.counter
.sent_packets
.fetch_add(1, atomic::Ordering::Relaxed);
let content = self.random_udp_workload_generator.generate_boxed(size);
<UdpSward as ServiceExt<BoxedUdpRequest>>::ready(&mut self.udp_sward)
.await?
.call(content)
.await
}
pub async fn run_with_signal<const SIZE: usize>(
mut self,
mut signal: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), tokio::sync::oneshot::error::RecvError> {
loop {
tokio::select! {
receive_result = &mut signal => {
receive_result?;
return Ok(())
},
oneshot_result = self.oneshot_array::<SIZE>() => {
if oneshot_result.is_err() {
self.counter.error.fetch_add(1, atomic::Ordering::Relaxed);
}
}
}
}
}
pub fn get_counter(&self) -> Arc<AtomicUdpFloodCounter> {
self.counter.clone()
}
}