use std::{error, fmt, path::PathBuf}; use bytes::{Buf, BufMut}; use metrics_tracing_context::{MetricsLayer, TracingContextLayer}; use metrics_util::{debugging::DebuggingRecorder, layers::Layer}; use tracing::Span; use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; use vector_buffers::{ BufferType, EventCount, encoding::FixedEncodable, topology::{ builder::TopologyBuilder, channel::{BufferReceiver, BufferSender}, }, }; use vector_common::{ byte_size_of::ByteSizeOf, finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable}, }; #[derive(Clone, Debug)] pub struct Message { id: u64, // Purpose of `_heap_allocated` is to simulate memory pressure in the buffer benchmarks when the // max_size option is selected. _heap_allocated: Box<[u64; N]>, _padding: [u64; N], } impl Message { fn new(id: u64) -> Self { Message { id, _heap_allocated: Box::new([0; N]), _padding: [0; N], } } } impl AddBatchNotifier for Message { fn add_batch_notifier(&mut self, batch: BatchNotifier) { drop(batch); // Incorrect but fast } } impl ByteSizeOf for Message { fn allocated_bytes(&self) -> usize { N * std::mem::size_of::() } } impl EventCount for Message { fn event_count(&self) -> usize { 1 } } impl Finalizable for Message { fn take_finalizers(&mut self) -> EventFinalizers { Default::default() // This benchmark doesn't need finalization } } #[derive(Debug)] pub struct EncodeError; impl fmt::Display for EncodeError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{self:?}") } } impl error::Error for EncodeError {} #[derive(Debug)] pub struct DecodeError; impl fmt::Display for DecodeError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{self:?}") } } impl error::Error for DecodeError {} impl FixedEncodable for Message { type EncodeError = EncodeError; type DecodeError = DecodeError; fn encode(self, buffer: &mut B) -> Result<(), Self::EncodeError> where B: BufMut, Self: Sized, { buffer.put_u64(self.id); for _ in 0..(N * 2) { // this covers self._padding and self.heap_allocated buffer.put_u64(0); } Ok(()) } fn decode(mut buffer: B) -> Result where B: Buf, Self: Sized, { let id = buffer.get_u64(); for _ in 0..(N * 2) { // this covers self._padding and self.heap_allocated _ = buffer.get_u64(); } Ok(Message::new(id)) } } #[allow(dead_code)] #[allow(clippy::type_complexity)] pub async fn setup( variant: BufferType, total_events: usize, data_dir: Option, id: String, ) -> ( BufferSender>, BufferReceiver>, Vec>, ) { let mut messages: Vec> = Vec::with_capacity(total_events); for i in 0..total_events { messages.push(Message::new(i as u64)); } let mut builder = TopologyBuilder::default(); variant .add_to_builder(&mut builder, data_dir, id) .expect("should not fail to add variant to builder"); let (tx, rx) = builder .build(String::from("benches"), Span::none()) .await .expect("should not fail to build topology"); (tx, rx, messages) } pub fn init_instrumentation() { let subscriber = tracing_subscriber::Registry::default().with(MetricsLayer::new()); if tracing::subscriber::set_global_default(subscriber).is_ok() { let recorder = TracingContextLayer::all().layer(DebuggingRecorder::new()); metrics::set_global_recorder(recorder).unwrap(); } } // // Measurements // // The nature of our buffer is such that the underlying representation is hidden // behind an abstract interface. As a happy consequence of this our benchmark // measurements are common. "Write Then Read" writes all messages into the // buffer and then reads them out. "Write And Read" writes a message and then // reads it from the buffer. // pub async fn wtr_measurement( mut sender: BufferSender>, mut receiver: BufferReceiver>, messages: Vec>, ) { for msg in messages.into_iter() { sender.send(msg, None).await.unwrap(); } drop(sender); while receiver.next().await.is_some() {} } pub async fn war_measurement( mut sender: BufferSender>, mut receiver: BufferReceiver>, messages: Vec>, ) { for msg in messages.into_iter() { sender.send(msg, None).await.unwrap(); _ = receiver.next().await.unwrap(); } }