observability
Last updated
Last updated
Last updated: 2025-04-24 01:05:42.971258 File source:
The Observability package provides a comprehensive solution for monitoring and troubleshooting DMS. It includes:
Configurable, structured logging with outputs to the console, files, and optionally Elasticsearch.
Custom event emission using a LibP2P event bus.
Distributed tracing with Elastic APM integration.
Label-based routing for log messages to control log destinations.
Dynamic runtime configuration and a no-op mode for testing.
Logging
Console logging with colorized output for ease of development.
File-based logging using a rolling file writer (via lumberjack).
Optional Elasticsearch logging for centralized log management.
Dynamic log level configuration (DEBUG, INFO, WARN, ERROR, etc.).
Custom Event Emission
Leverages LibP2P's EventBus
to emit application events.
Events adhere to the CustomEvent
schema for downstream processing.
Distributed Tracing
Integrated with Elastic APM for distributed tracing.
Automatic system metrics collection (CPU, memory, disk, network).
Custom instrumentation via the StartTrace
helper.
Label-Based Routing
Attach labels to log messages to determine routing decisions.
Override default Elasticsearch index or skip ES logging based on labels.
Predefined labels include: default
, accounting
, metric
, deployment
, allocation
, node
, and user
.
Internal Architecture and Event Bus Explanation:
We Do Actually Have an “Internal Event Bus.”
The “internal event bus” is created via Libp2p’s host.EventBus()
inside observability.initEventBus(...)
. We attach a custom emitter to that bus (customEventEmitter
), and the code in newEventEmitterCore(...)
in observability.go
will emit every log entry as a CustomEvent
to that bus.
Why You Don’t See the Word “Collector.”
In an older design we talked about “collectors” as separate pipelines for sending logs to console, file, Elasticsearch, etc. In the current code, that same concept is implemented using multiple Zap cores in a “tee.” Each core (console, file, Elasticsearch, event bus) is effectively a “collector,” but we just don’t label them that way in the code anymore. You can see this in observability.initLogger(...)
, which calls createConsoleCore(...)
, createFileCore(...)
, createElasticsearchCore(...)
, and newEventEmitterCore(...)
. Then, all those cores are combined via zapcore.NewTee(cores...)
.
Where Is the “Internal Event Bus” Actually Implemented?
It is initialized in initEventBus
in observability.go
. We then wrap it with a custom Zap Core in the newEventEmitterCore
function, which creates the eventEmitterCore
. Inside that core’s Write(...)
method, the log data is packaged into a CustomEvent
struct and emitted via customEventEmitter.Emit(customEvent)
. This is exactly what ships logs onto the Libp2p event bus.
Summary:
Zap + Multiple Cores = The old notion of multiple “collectors” (console, file, Elasticsearch).
Event Emitter Core = The “internal event bus” hook, which you can subscribe to for advanced use cases (like a future reputation system).
Default Label = The fallback if a caller doesn’t specify any label at all.
Everything happens in the observability package, but we rely on the Libp2p host’s event bus. No extra “collector” objects are defined by name—the pattern is just a combination of Zap “tee’d cores” plus the event bus emitter.
Runtime Configuration
Dynamically enable/disable Elasticsearch logging.
Update Elasticsearch endpoint, APM server URL, API key, and flush interval at runtime.
No-op mode to disable all observability features during testing.
Before using any observability features, we initialize the package. This is done in the network package where the LibP2P host is set up.
initialization snippet from network\libp2p\libp2p.go
:
This setup:
Configures logging (console, file, and optionally Elasticsearch).
Sets up the event emitter on the LibP2P EventBus
.
Initializes Elastic APM tracing if cfg.APM.ServerURL
is provided.
Dynamic Log Level Adjustment
Log levels can be updated at runtime:
Elastic APM is integrated for distributed tracing. Use the helper function StartTrace
to begin new transactions or spans.
Usage examples:
If a transaction is already present (e.g., via APM-enabled Gin middleware), StartTrace
creates a child span; otherwise, it starts a new transaction.
Logs can be annotated with labels to control routing:
The label injection core processes the labels
field:
Logs tagged with specific labels may be routed to an override Elasticsearch index (e.g., "accounting-index").
Alternatively, logs can be skipped for Elasticsearch if configured.
This observability package sets up multiple zap cores (console, file, Elasticsearch, etc.) and combines them into a single logger in the initLogger
function. To add a new core you can follow the same pattern:
Create Your createXCore
Function
In your new function, mirror the style of our existing core-creation functions (e.g., createConsoleCore
, createFileCore
, createElasticsearchCore
). Typically, you will:
Configure an encoder (zapcore.EncoderConfig
).
Set up a writer (e.g., to a file, stdout, a network sink, or anything else).
Return a new core using zapcore.NewCore(...)
, along with the atomicLevel
for controlling log levels.
Add the New Core in initLogger
Inside initLogger
(where we build the logger), follow the approach used for the other cores:
Call your new createXCore(...)
function.
Handle any errors (e.g., if creation fails, log a warning or decide how you want to proceed).
Append the newly created core to the cores
slice, which is eventually passed to zapcore.NewTee(cores...)
.
Leverage Shared Fields and Wrappers
Just like our other cores, you can attach shared fields (e.g., the DID field) by calling .With([]zapcore.Field{ ... })
on the core. Also note that the package wraps the combined Tee core with newLabelInjectionCore
(for label-based routing) and includes an event emitter core. Your new core will automatically benefit from these features so long as you add it in initLogger
just like the others.
Verify No-Op Mode and ES Disabled States (If Relevant)
If your new core relies on external resources, consider how noOpMode
or similar package-level flags (like esDisabled
) might affect it. For instance, you may want to skip initialization if the environment or config dictates that your service shouldn’t produce logs to that destination at certain times.
Document Configuration
If your new core requires additional config fields (such as a new URL or API key), add them to config.Observability
or wherever your configuration is stored, and update your function accordingly so that everything is initialized properly.
By following these steps and modeling your function after our existing create*Core
methods, you can seamlessly add a new core to the combined logger in a way that remains consistent with the rest of the observability package.
Observability supports dynamic reconfiguration at runtime:
Enable/Disable Elasticsearch Logging:
Set Elasticsearch Endpoint:
Set APM Server URL:
Update API Key (for Elasticsearch and APM):
Set Flush Interval for Elasticsearch Logs:
Toggle No-Op Mode:
For unit tests, disable observability side-effects by enabling no-op mode:
This ensures that logging, Elasticsearch, and APM tracing are effectively disabled during tests.
To properly release resources and flush logs/traces, call:
This function:
Flushes and closes Elasticsearch log buffers.
Closes the custom event emitter.
Shuts down the Elastic APM tracer if enabled.
Example usage in the main function:
but it also happens automatically
LibP2P ()
Provides the host.Host
for event bus integration.
Zap () The primary structured logging library.
Lumberjack () Manages rolling log files for file-based logging.
Elasticsearch Client () Used to index logs into Elasticsearch.
Elastic APM () Enables distributed tracing and custom metric collection.
The package uses for structured logging. After initialization, you can produce logs as follows: