Skip to content

Latest commit

 

History

History
475 lines (341 loc) · 17 KB

File metadata and controls

475 lines (341 loc) · 17 KB

OTAP Dataflow Engine

build build codecov License Slack


Overview

The OTAP Dataflow Engine is a set of core Rust crates which combine to produce an OpenTelemetry pipeline support, for use as an embedded software component, providing a framework for collecting OpenTelemetry data.

Note

These Rust libraries are the main deliverable of Phase 2 of the OTel-Arrow project, as defined in the project phases. The df_engine main program built through cargo in src/main.rs is provided as a means to test and validate OTAP pipelines built using the dataflow engine.

Architecture

OTAP Dataflow Engine architecture

The controller is the local control plane for pipeline groups. It allocates CPU cores, spawns one worker thread per core, and owns lifecycle, coordination, and runtime observability. Each pipeline runs a single-threaded engine instance per assigned core, hot data paths stay within that thread, while cross-thread coordination is handled through control messages and internal telemetry.

The admin HTTP server and observed-state store are driven by the controller for runtime visibility and control. For details, see the controller and engine crate READMEs.

Features

The OTAP Dataflow engine consists of a number of major pieces. Here are the major aspects of its design:

  • OTel-Arrow first. The Apache Arrow record batch is the underlying data type used to represent "OTAP records", at the lowest level in our pipeline. The OTAP records format consists of a varying number of Arrow record batches per unit of data, representing hierarchical OpenTelemetry data in a column-oriented form, under a "star schema".
  • Zero-copy to and from OTLP bytes. OTLP bytes are the standard for representing OpenTelemetry data on the wire. Through a custom implementation of Google Protocol buffers, we convert OTAP records directly to and from OTLP bytes without constructing intermediate data objects.
  • Thread-per-core approach. Our design aims to support single-threaded nothing-shared pipelines as our first priority. We make use of a Local async runtime freeing the pipeline from synchronizing instructions. Multi-threaded components are possible using shared adapters, but we choose single-threaded local components when possible.

The basic unit of data in an OTAP Dataflow pipeline is the OTAP pipeline data object, otap_df_otap::pdata::OtapPdata. In the hierarchy of types a pipeline component interacts with, this crate otap_df_otap::pdata crate is a focal point. The OtapPdata data type is a struct consisting of "context" and "payload", where context is used for routing "Ack" and "Nack" responses, and payload has two equivalent, signal-specific representations:

  • OTLP bytes (Logs, Traces, Metrics): A signal-specific enum of Vec<u8> corresponding with one of the export requests (e.g., ExportLogsServiceRequest).
  • OTAP records (Logs, Traces, Metrics): A signal-specific array of Arrow RecordBatch objects defining aspects of the OpenTelemetry data model, where unused columns are omitted. For example, The Logs form of OTAP records consists of four record batches, corresponding with Logs, Log Attributes, Scope Attributes, and Resource Attributes.

Refer to the OTAP basics documentation.

The OTAP data model contains diagrams of the many N-to-1 relationships expressed within an OTAP request.

Major components

Controller and Engine

See the controller and engine crate READMEs:

The otap_df_engine crate is located in crates/engine, here we find the engine's overall architecture expressed:

  • Local (unsynchronized) and shared (Sync + Send) code paths
  • Queue-oriented message passing
  • Separate control and data plane
  • Effect handler for interacting with pipeline.

The engine's main entry point, otap_df_engine::PipelineFactory<PData>, supports building a single-thread pipeline for generic type PData. Generally, users do not construct these, as they are managed by a controller instance. Here are the key files to know about:

crates/engine/lib.rs:    Effect handler extensions, pipeline factory
|-- attributes.rs:       Host, process/container IDs
|-- context.rs:          CPU, NUMA, thread context
|-- control.rs:          NodeControlMsg, AckMsg, NackMsg
|-- effect_handler.rs:   Component interfaces (network, clock, ack/nack)
|-- error.rs:            Structured errors
|-- exporter.rs:         Pipeline component (output only)
|-- message.rs:          The data and control plane messages
|-- node.rs:             The basic NodeId type
|-- pipeline_ctrl.rs:    Timer state, channel to all nodes
|-- processor.rs:        Pipeline component (input/input)
|-- receiver.rs:         Pipeline component (input only)
|-- runtime_pipeline.rs: Builds the graph of component channels

OTAP: OTel-Arrow Protocol pipeline data

See crate README. The OTAP pipeline data type is defined here, therefore all of our built-in components are provided here as well. The main entry point into this crate is the otap_df_otap::pdata::OtapPdata type with its two alternate representations, OTAP records and OTLP bytes, specific by signal type.

The PData type also facilitates datatype-aware aspects of interacting with the pipeline engine, including ProducerEffectHandlerExtension, for receivers and processors to subscribe to the NodeControlMsg::Ack and NodeControlMsg::Nack messages, and ConsumerEffectHandlerExtension for processors and exporters to notify the next recipient in the chain of subscribers.

Here are the key files to know that support the components in this crate:

crates/otap/lib.rs:      OTAP Dataflow pipeline factory
|-- compression.rs:      OTLP and OTAP compression settings
|-- encoder.rs:          Computes OTAP from OTLP view representations
|-- metrics.rs           Metrics logic shared by several components
|-- pdata.rs             The OtapPdata type, effect handler extensions
|-- otap_grpc/           OTLP and OTAP shared gRPC support
|-- fixtures.rs          Test support
|-- otap_mock.rs         Test support
|-- testing/             Test support

All gRPC services are implemented using Tonic.

The major OTAP Dataflow components related to OTAP/OTLP pipeline transport are listed next. Their concrete core-node implementations now live in otap-df-core-nodes.

Attributes processor

This component supports efficient low-level manipulation of OTAP records. For example, this component supports O(1) column renaming for OpenTelemetry data.

Debug processor

A simple component that prints information about the data passing through, with configurable level of detail.

Error exporter

A simple component that returns a constant error message. All requests fail.

Noop exporter

A simple component that returns success. All requests succeed.

Fake Data Generator

A simple component to produce synthetic data from semantic convention registries.

Batch processor

A batching processor that works directly with OTAP records. This is based on lower-level support in the otal_arrow_rust crate.

OTAP exporter

The OTAP streaming gRPC exporter. This corresponds with the otelarrowexporter Collector-Contrib exporter component (i.e., this project's Phase 1 deliverable), based on Arrow IPC streams over gRPC streams.

OTAP receiver

The OTAP streaming gRPC receiver. This corresponds with the otelarrowreceiver Collector-Contrib receiver component, this project's Phase 1 deliverable based on Arrow IPC streams over gRPC streams.

OTLP exporter

The OTLP unary gRPC exporter. This corresponds with the otlp Collector exporter component, exports standard OTLP bytes.

OTLP receiver

The OTLP unary gRPC exporter. This corresponds with the otlp Collector receiver component, receives standard OTLP bytes using a Tonic server.

Parquet exporter

The parquet exporter records the OTel-Arrow representation using Parquet. While there is a direct translation between the OTel-Arrow representation and Parquet, it requires changing several data types to be compatible with Parquet. This component uses 32-bit identifiers, as opposed to 16-bit identifiers used in OTel-Arrow batches, making large batches of telemetry available for external engines to process.

Performance exporter

A simple component that collects and prints statistics about the number of requests and items it sees, used for monitoring our benchmarks.

Retry processor

The retry processor supports a configurable number of retries and exponential back-off.

Signal type router

Supports routing OTAP data by signal type, enabling signal-specific route destinations.

Syslog/CEF receiver

The Syslog/CEF receiver is considered a core component used to establish the performance of the OTAP Dataflow system.

Core Nodes

See crate README.

  • Exporters: console_exporter, error_exporter, noop_exporter, otap_exporter, otlp_grpc_exporter, otlp_http_exporter, parquet_exporter, perf_exporter, topic_exporter
  • Processors: attributes_processor, batch_processor, content_router, debug_processor, delay_processor, durable_buffer_processor, fanout_processor, filter_processor, retry_processor, signal_type_router, transform_processor
  • Receivers: fake_data_generator, internal_telemetry_receiver, otap_receiver, otlp_receiver, syslog_cef_receiver, topic_receiver

Contrib Nodes

See crate README.

The otap-df-contrib-nodes crate contains optional, feature-gated exporters and processors that are registered into the OTAP pipeline factory maps when enabled.

Contrib feature model:

  • Aggregate exporter feature:
    • contrib-exporters enables all contrib exporters
  • Aggregate processor feature:
    • contrib-processors enables all contrib processors
  • Individual feature flags can still be enabled independently for smaller builds.

Controller

See crate README.

The otap_df_controller crate is located in crates/controller and is the main entry point to construct an OTAP Dataflow pipeline instance. The controller type, otap_df_controller::Controller<PData>, manages building, running, and supervising one or more pipelines.

This component is responsible for making the assignment between OTAP dataflow pipeline and individually-numbered CPU instances. The Controller::run_forever() method is called to execute the pipeline. Like the engine, the pipeline datatype PData is opaque to this crate.

Config

See crate README.

Here, the configuration model for the OTAP Dataflow engine defines the structs and conventions used to configure as well as observe the pipeline, the engine, and the pipeline components.

A number of example configurations are listed in ./configs. These are deserialized into the otap_df_config::engine::OtelDataflowSpec structs, defined in this crate.

Channel

See crate README.

Defines the low-level queues used in the OTAP dataflow pipeline, otap_df_channel::mpsc and otap_df_channel::mpmc.

Defines a standard SendError<T> used to return failures throughout the codebase to enable recovering from try_send().

Admin

See crate README.

Defines an administrative portal for operators, an HTTP service capable of displaying the current pipeline state, pipeline configuration, debugging logs, and Prometheus metrics. This supports primitive controls such as the ability to shut down the pipeline.

State

See crate README.

Low-level library supporting the state transition diagram, enabling observability for the state of the Controller.

Telemetry

See crate README.

The OTAP Dataflow system is built using a bespoke telemetry system, as we needed to ensure NUMA-awareness from the start. Moreover, this project is taking up a charter to investigate an OTel-Arrow first approach to telemetry, hence we are working with the experimental telemetry SDK here.

PData

See crate README.

The views sub-module contains zero-copy machinery for:

  • interpreting OTLP bytes using views to build OTAP records
  • interpreting OTAP records using views to encode OTLP bytes

Embedding in Custom Distributions

The engine crates are designed as reusable libraries. A custom binary can link the same controller, factory, and node crates and register its own components via linkme distributed slices -- exactly how src/main.rs works.

The otap_df_controller::startup module provides three helpers that every embedding binary typically needs:

  • validate_engine_components -- Checks that every node URN in a config maps to a registered component and runs per-component config validation.
  • apply_cli_overrides -- Merges core-allocation and HTTP-admin bind overrides into an OtelDataflowSpec.
  • system_info -- Returns a formatted string with CPU/memory info and all registered component URNs, useful for --help banners or diagnostics.

A minimal custom binary looks like this:

use otap_df_config::engine::OtelDataflowSpec;
use otap_df_controller::{Controller, startup};

// Side-effect imports to register components via linkme.
use otap_df_core_nodes as _;
// Bring your own contrib/custom nodes as needed.

// Reference the pipeline factory (or define your own).
use otap_df_otap::OTAP_PIPELINE_FACTORY;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    otap_df_otap::crypto::install_crypto_provider()?;

    let mut cfg = OtelDataflowSpec::from_file("pipeline.yaml")?;

    // Apply any CLI overrides (core count, admin bind address, etc.)
    startup::apply_cli_overrides(&mut cfg, Some(4), None, None);

    // Validate that every node URN in the config is registered.
    startup::validate_engine_components(&cfg, &OTAP_PIPELINE_FACTORY)?;

    // Print diagnostics.
    // Pass "system" here for the minimal example; in practice, align this
    // string with your binary's allocator feature (e.g. "jemalloc", "mimalloc").
    println!("{}", startup::system_info(&OTAP_PIPELINE_FACTORY, "system"));

    // Run the engine.
    let controller = Controller::new(&OTAP_PIPELINE_FACTORY);
    controller.run_forever(cfg)?;
    Ok(())
}

This pattern is analogous to the builder approach used by projects like bindplane-otel-collector in the Go ecosystem. The default src/main.rs is itself a thin wrapper over these library calls.

For a complete runnable example, see examples/custom_collector.rs.

Development Setup

Requirements:

  • Rust >= 1.86.0
  • Cargo

Clone & Build:

git clone https://github.com/open-telemetry/otel-arrow
cd otel-arrow/rust/otap-dataflow
cargo build --workspace

Run Tests:

cargo test --workspace

Run Examples:

cargo run --example <example_name>

With Docker:

docker build --build-context otel-arrow=../../ -f Dockerfile -t df_engine .

Contributing

Before submitting a PR, please run the following commands:

# Prepare and check the entire project before submitting a PR or a commit
cargo xtask check

📝 License

Licensed under the Apache License, Version 2.0.

📞 Support & Community

CNCF Slack channel: #otel-arrow

🌟 Roadmap

See our detailed Roadmap for upcoming features and improvements.

✅ Changelog