Our great sponsors
- InfluxDB - Collect and Analyze Billions of Data Points in Real Time
- Onboard AI - Learn any GitHub repo in 59 seconds
- SaaSHub - Software Alternatives and Reviews
-
To showcase how to produce Kafka events, we will leverage Tokio's tracing crate to generate log data.
-
templates
Repository for Dev Container Templates that are managed by Dev Container spec maintainers. See https://github.com/devcontainers/template-starter to create your own! (by devcontainers)
$ cd tracing_publisher $ mkdir .devcontainer $ cat > .devcontainer/devcontainer.json // For format details, see https://aka.ms/devcontainer.json. For config options, see the // README at: https://github.com/devcontainers/templates/tree/main/src/rust { "name": "Rust", "service": "rust-log-processing", "dockerComposeFile": "../docker-compose.yml", "features": { "ghcr.io/devcontainers/features/rust:1": {} }, "workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}", "shutdownAction": "stopCompose" }
-
InfluxDB
Collect and Analyze Billions of Data Points in Real Time. Manage all types of time series data in a single, purpose-built database. Run at any scale in any environment in the cloud, on-premises, or at the edge.
-
// Credit: https://github.com/bryanburgers/tracing-blog-post/blob/main/examples/figure_3/custom_layer.rs use std::collections::BTreeMap; use tracing_subscriber::Layer; pub struct CustomLayer; impl Layer for CustomLayer where S: tracing::Subscriber, { fn on_event( &self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>, ) { // Covert the values into a JSON object let mut fields = BTreeMap::new(); let mut visitor = JsonVisitor(&mut fields); event.record(&mut visitor); // Output the event in JSON let output = serde_json::json!({ "target": event.metadata().target(), "name": event.metadata().name(), "level": format!("{:?}", event.metadata().level()), "fields": fields, }); println!("{}", serde_json::to_string_pretty(&output).unwrap()); } } struct JsonVisitor<'a>(&'a mut BTreeMap); impl<'a> tracing::field::Visit for JsonVisitor<'a> { fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { self.0 .insert(field.name().to_string(), serde_json::json!(value)); } fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { self.0 .insert(field.name().to_string(), serde_json::json!(value)); } fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { self.0 .insert(field.name().to_string(), serde_json::json!(value)); } fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { self.0 .insert(field.name().to_string(), serde_json::json!(value)); } fn record_str(&mut self, field: &tracing::field::Field, value: &str) { self.0 .insert(field.name().to_string(), serde_json::json!(value)); } fn record_error( &mut self, field: &tracing::field::Field, value: &(dyn std::error::Error + 'static), ) { self.0.insert( field.name().to_string(), serde_json::json!(value.to_string()), ); } fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { self.0.insert( field.name().to_string(), serde_json::json!(format!("{:?}", value)), ); } }
-
Now we don't see any additional output. To verify it worked, let's use kafkacat to consume the topic's events. (We install kafkacat in the Dev Container. Please run the following command in VSCode's terminal)
-
rust-kafka-producer-partitioner-example
Example Code for "How to Build a Kafka Producer in Rust with Partitioning"
Find the project's source code on GitHub.
-
Onboard AI
Learn any GitHub repo in 59 seconds. Onboard AI learns any GitHub repo in minutes and lets you chat with it to locate functionality, understand different parts, and generate new code. Use it for free at www.getonboard.dev.