The OTAP Dataflow Engine is a set of core Rust crates which combine to produce an OpenTelemetry pipeline support, for use as an embedded software component, providing a framework for collecting OpenTelemetry data.
Note
These Rust libraries are the main deliverable of Phase 2 of the OTel-Arrow
project, as defined in the project phases.
The df_engine main program built through cargo in
src/main.rs is provided as a means to test and validate
OTAP pipelines built using the dataflow engine.
The controller is the local control plane for pipeline groups. It allocates CPU cores, spawns one worker thread per core, and owns lifecycle, coordination, and runtime observability. Each pipeline runs a single-threaded engine instance per assigned core, hot data paths stay within that thread, while cross-thread coordination is handled through control messages and internal telemetry.
The admin HTTP server and observed-state store are driven by the controller for runtime visibility and control. For details, see the controller and engine crate READMEs.
The OTAP Dataflow engine consists of a number of major pieces. Here are the major aspects of its design:
- OTel-Arrow first. The Apache Arrow record batch is the underlying data type used to represent "OTAP records", at the lowest level in our pipeline. The OTAP records format consists of a varying number of Arrow record batches per unit of data, representing hierarchical OpenTelemetry data in a column-oriented form, under a "star schema".
- Zero-copy to and from OTLP bytes. OTLP bytes are the standard for representing OpenTelemetry data on the wire. Through a custom implementation of Google Protocol buffers, we convert OTAP records directly to and from OTLP bytes without constructing intermediate data objects.
- Thread-per-core approach. Our design aims to support single-threaded
nothing-shared pipelines as our first priority. We make use of a
Local async
runtime
freeing the pipeline from synchronizing instructions.
Multi-threaded components are possible using
sharedadapters, but we choose single-threadedlocalcomponents when possible.
The basic unit of data in an OTAP Dataflow pipeline is the OTAP
pipeline data object, otap_df_otap::pdata::OtapPdata. In the
hierarchy of types a pipeline component interacts with, this crate
otap_df_otap::pdata crate is a focal point. The OtapPdata data
type is a struct consisting of "context" and "payload", where context
is used for routing "Ack" and "Nack" responses, and payload has two
equivalent, signal-specific representations:
- OTLP bytes (Logs, Traces, Metrics): A signal-specific enum of
Vec<u8>corresponding with one of the export requests (e.g.,ExportLogsServiceRequest). - OTAP records (Logs, Traces, Metrics): A signal-specific array of
Arrow
RecordBatchobjects defining aspects of the OpenTelemetry data model, where unused columns are omitted. For example, The Logs form of OTAP records consists of four record batches, corresponding with Logs, Log Attributes, Scope Attributes, and Resource Attributes.
Refer to the OTAP basics documentation.
The OTAP data model contains diagrams of the many N-to-1 relationships expressed within an OTAP request.
See the controller and engine crate READMEs:
The otap_df_engine crate is located in crates/engine, here we
find the engine's overall architecture expressed:
- Local (unsynchronized) and shared (
Sync + Send) code paths - Queue-oriented message passing
- Separate control and data plane
- Effect handler for interacting with pipeline.
The engine's main entry point,
otap_df_engine::PipelineFactory<PData>, supports building a
single-thread pipeline for generic type PData. Generally, users do
not construct these, as they are managed by a controller instance.
Here are the key files to know about:
crates/engine/lib.rs: Effect handler extensions, pipeline factory
|-- attributes.rs: Host, process/container IDs
|-- context.rs: CPU, NUMA, thread context
|-- control.rs: NodeControlMsg, AckMsg, NackMsg
|-- effect_handler.rs: Component interfaces (network, clock, ack/nack)
|-- error.rs: Structured errors
|-- exporter.rs: Pipeline component (output only)
|-- message.rs: The data and control plane messages
|-- node.rs: The basic NodeId type
|-- pipeline_ctrl.rs: Timer state, channel to all nodes
|-- processor.rs: Pipeline component (input/input)
|-- receiver.rs: Pipeline component (input only)
|-- runtime_pipeline.rs: Builds the graph of component channels
See crate README.
The OTAP pipeline data type is defined here, therefore all of our
built-in components are provided here as well. The main entry point
into this crate is the otap_df_otap::pdata::OtapPdata type with its
two alternate representations, OTAP records and OTLP bytes, specific
by signal type.
The PData type also facilitates datatype-aware aspects of interacting
with the pipeline engine, including ProducerEffectHandlerExtension,
for receivers and processors to subscribe to the NodeControlMsg::Ack
and NodeControlMsg::Nack messages, and
ConsumerEffectHandlerExtension for processors and exporters to
notify the next recipient in the chain of subscribers.
Here are the key files to know that support the components in this crate:
crates/otap/lib.rs: OTAP Dataflow pipeline factory
|-- compression.rs: OTLP and OTAP compression settings
|-- encoder.rs: Computes OTAP from OTLP view representations
|-- metrics.rs Metrics logic shared by several components
|-- pdata.rs The OtapPdata type, effect handler extensions
|-- otap_grpc/ OTLP and OTAP shared gRPC support
|-- fixtures.rs Test support
|-- otap_mock.rs Test support
|-- testing/ Test support
All gRPC services are implemented using Tonic.
The major OTAP Dataflow components related to OTAP/OTLP pipeline transport are
listed next. Their concrete core-node implementations now live in
otap-df-core-nodes.
This component supports efficient low-level manipulation of OTAP records. For example, this component supports O(1) column renaming for OpenTelemetry data.
A simple component that prints information about the data passing through, with configurable level of detail.
A simple component that returns a constant error message. All requests fail.
A simple component that returns success. All requests succeed.
A simple component to produce synthetic data from semantic convention registries.
A batching processor that works directly with OTAP records. This is
based on lower-level support in the otal_arrow_rust
crate.
The OTAP streaming gRPC exporter. This corresponds with the
otelarrowexporter Collector-Contrib exporter component
(i.e., this project's Phase 1 deliverable), based on Arrow IPC
streams over gRPC streams.
The OTAP streaming gRPC receiver. This corresponds with the
otelarrowreceiver Collector-Contrib receiver component,
this project's Phase 1 deliverable based on Arrow IPC streams over
gRPC streams.
The OTLP unary gRPC exporter. This corresponds with the otlp
Collector exporter component, exports standard OTLP bytes.
The OTLP unary gRPC exporter. This corresponds with the otlp
Collector receiver component, receives standard OTLP bytes using
a Tonic server.
The parquet exporter records the OTel-Arrow representation using Parquet. While there is a direct translation between the OTel-Arrow representation and Parquet, it requires changing several data types to be compatible with Parquet. This component uses 32-bit identifiers, as opposed to 16-bit identifiers used in OTel-Arrow batches, making large batches of telemetry available for external engines to process.
A simple component that collects and prints statistics about the number of requests and items it sees, used for monitoring our benchmarks.
The retry processor supports a configurable number of retries and exponential back-off.
Supports routing OTAP data by signal type, enabling signal-specific route destinations.
The Syslog/CEF receiver is considered a core component used to establish the performance of the OTAP Dataflow system.
- Exporters:
console_exporter,error_exporter,noop_exporter,otap_exporter,otlp_grpc_exporter,otlp_http_exporter,parquet_exporter,perf_exporter,topic_exporter - Processors:
attributes_processor,batch_processor,content_router,debug_processor,delay_processor,durable_buffer_processor,fanout_processor,filter_processor,retry_processor,signal_type_router,transform_processor - Receivers:
fake_data_generator,internal_telemetry_receiver,otap_receiver,otlp_receiver,syslog_cef_receiver,topic_receiver
The otap-df-contrib-nodes crate contains optional, feature-gated
exporters and processors that are registered into the OTAP pipeline
factory maps when enabled.
Contrib feature model:
- Aggregate exporter feature:
contrib-exportersenables all contrib exporters
- Aggregate processor feature:
contrib-processorsenables all contrib processors
- Individual feature flags can still be enabled independently for smaller builds.
The otap_df_controller crate is located in crates/controller and is
the main entry point to construct an OTAP Dataflow pipeline instance. The
controller type, otap_df_controller::Controller<PData>, manages building,
running, and supervising one or more pipelines.
This component is responsible for making the assignment between OTAP
dataflow pipeline and individually-numbered CPU instances. The
Controller::run_forever() method is called to execute the pipeline.
Like the engine, the pipeline datatype PData is opaque to this crate.
Here, the configuration model for the OTAP Dataflow engine defines the structs and conventions used to configure as well as observe the pipeline, the engine, and the pipeline components.
A number of example configurations are listed in
./configs. These are deserialized into the
otap_df_config::engine::OtelDataflowSpec structs, defined in this crate.
Defines the low-level queues used in the OTAP dataflow pipeline,
otap_df_channel::mpsc and otap_df_channel::mpmc.
Defines a standard SendError<T> used to return failures throughout
the codebase to enable recovering from try_send().
Defines an administrative portal for operators, an HTTP service capable of displaying the current pipeline state, pipeline configuration, debugging logs, and Prometheus metrics. This supports primitive controls such as the ability to shut down the pipeline.
Low-level library supporting the state transition diagram, enabling
observability for the state of the Controller.
The OTAP Dataflow system is built using a bespoke telemetry system, as we needed to ensure NUMA-awareness from the start. Moreover, this project is taking up a charter to investigate an OTel-Arrow first approach to telemetry, hence we are working with the experimental telemetry SDK here.
The views sub-module contains zero-copy machinery for:
- interpreting OTLP bytes using views to build OTAP records
- interpreting OTAP records using views to encode OTLP bytes
The engine crates are designed as reusable libraries. A custom binary can
link the same controller, factory, and node crates and register its own
components via linkme distributed slices -- exactly how src/main.rs works.
The otap_df_controller::startup module provides three helpers that every
embedding binary typically needs:
validate_engine_components-- Checks that every node URN in a config maps to a registered component and runs per-component config validation.apply_cli_overrides-- Merges core-allocation and HTTP-admin bind overrides into anOtelDataflowSpec.system_info-- Returns a formatted string with CPU/memory info and all registered component URNs, useful for--helpbanners or diagnostics.
A minimal custom binary looks like this:
use otap_df_config::engine::OtelDataflowSpec;
use otap_df_controller::{Controller, startup};
// Side-effect imports to register components via linkme.
use otap_df_core_nodes as _;
// Bring your own contrib/custom nodes as needed.
// Reference the pipeline factory (or define your own).
use otap_df_otap::OTAP_PIPELINE_FACTORY;
fn main() -> Result<(), Box<dyn std::error::Error>> {
otap_df_otap::crypto::install_crypto_provider()?;
let mut cfg = OtelDataflowSpec::from_file("pipeline.yaml")?;
// Apply any CLI overrides (core count, admin bind address, etc.)
startup::apply_cli_overrides(&mut cfg, Some(4), None, None);
// Validate that every node URN in the config is registered.
startup::validate_engine_components(&cfg, &OTAP_PIPELINE_FACTORY)?;
// Print diagnostics.
// Pass "system" here for the minimal example; in practice, align this
// string with your binary's allocator feature (e.g. "jemalloc", "mimalloc").
println!("{}", startup::system_info(&OTAP_PIPELINE_FACTORY, "system"));
// Run the engine.
let controller = Controller::new(&OTAP_PIPELINE_FACTORY);
controller.run_forever(cfg)?;
Ok(())
}This pattern is analogous to the builder approach used by projects like
bindplane-otel-collector in the Go ecosystem. The default src/main.rs
is itself a thin wrapper over these library calls.
For a complete runnable example, see
examples/custom_collector.rs.
Requirements:
- Rust >= 1.86.0
- Cargo
Clone & Build:
git clone https://github.com/open-telemetry/otel-arrow
cd otel-arrow/rust/otap-dataflow
cargo build --workspaceRun Tests:
cargo test --workspaceRun Examples:
cargo run --example <example_name>With Docker:
docker build --build-context otel-arrow=../../ -f Dockerfile -t df_engine .- Contribution Guidelines
- Internal Telemetry Guidelines
- Code of Conduct (TBD)
Before submitting a PR, please run the following commands:
# Prepare and check the entire project before submitting a PR or a commit
cargo xtask checkLicensed under the Apache License, Version 2.0.
CNCF Slack channel: #otel-arrow
See our detailed Roadmap for upcoming features and improvements.