add tcp sward

This commit is contained in:
2025-10-03 21:06:46 +09:00
parent d2db279476
commit e5b1300bbd
2 changed files with 96 additions and 0 deletions

View File

@@ -3,6 +3,7 @@ use std::pin::Pin;
pub mod counter; pub mod counter;
pub mod generator; pub mod generator;
pub mod integrated; pub mod integrated;
pub mod tcp;
pub mod udp; pub mod udp;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]

View File

@@ -0,0 +1,95 @@
use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::{io::AsyncWriteExt, net::TcpStream, sync::Mutex};
use tower::Service;
use crate::socket_stream::{
BoxedStreamWorkload, SizedStreamWorkload, StreamSendFuture, StreamSward,
};
pub struct TcpSward {
stream: Arc<Mutex<TcpStream>>,
sent_count: usize,
}
impl TcpSward {
pub fn new(stream: Arc<Mutex<TcpStream>>) -> Self {
Self {
stream,
sent_count: 0,
}
}
}
impl StreamSward for TcpSward {
async fn connect(addr: std::net::SocketAddr) -> Result<Self, std::io::Error> {
let stream = TcpStream::connect(addr).await?;
let stream = Arc::new(Mutex::new(stream));
Ok(Self {
stream,
sent_count: 0,
})
}
fn add_request_count(&mut self) {
self.sent_count += 1;
}
fn send_sized<const N: usize>(
&self,
workload: SizedStreamWorkload<N>,
) -> impl Future<Output = Result<usize, std::io::Error>> + Send + 'static {
let stream = self.stream.clone();
async move {
let mut stream = stream.lock().await;
stream.write_all(&workload.bytes).await?;
Ok(N)
}
}
fn send_boxed(
&self,
workload: BoxedStreamWorkload,
) -> impl Future<Output = Result<usize, std::io::Error>> + Send + 'static {
let stream = self.stream.clone();
async move {
let len = workload.0.len();
let mut stream = stream.lock().await;
stream.write_all(&workload.0).await?;
Ok(len)
}
}
}
impl<const N: usize> Service<SizedStreamWorkload<N>> for TcpSward {
type Response = usize;
type Error = std::io::Error;
type Future = StreamSendFuture;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: SizedStreamWorkload<N>) -> Self::Future {
self.add_request_count();
Box::pin(self.send_sized(req))
}
}
impl Service<BoxedStreamWorkload> for TcpSward {
type Response = usize;
type Error = std::io::Error;
type Future = StreamSendFuture;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BoxedStreamWorkload) -> Self::Future {
self.add_request_count();
Box::pin(self.send_boxed(req))
}
}
impl tower::load::Load for TcpSward {
type Metric = usize;
fn load(&self) -> Self::Metric {
self.sent_count
}
}