Documentation ¶
Index ¶
- func BuildAuthHeaders(privKey ed25519.PrivateKey) map[string]string
- func DefaultWriterClientConfig() writerClientConfig
- func GetLogger() otellog.Logger
- func GetMeter() otelmetric.Meter
- func GetTracer() oteltrace.Tracer
- func NewMetadataValidator() (*validator.Validate, error)
- func OtelAttr(key string, value any) otellog.KeyValue
- func SetClient(client *Client)
- func SetGlobalOtelProviders()
- type Attributes
- type Client
- type Config
- type Emitter
- type Message
- type Metadata
- type RetryConfig
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildAuthHeaders ¶ added in v0.4.0
func BuildAuthHeaders(privKey ed25519.PrivateKey) map[string]string
BuildAuthHeaders creates the auth header value to be included on requests. The current format for the header is:
<version>:<public_key_hex>:<signature_hex>
where the byte value of <public_key_hex> is what's being signed
func DefaultWriterClientConfig ¶
func DefaultWriterClientConfig() writerClientConfig
func GetMeter ¶
func GetMeter() otelmetric.Meter
func NewMetadataValidator ¶ added in v0.4.0
func NewMetadataValidator() (*validator.Validate, error)
func SetGlobalOtelProviders ¶
func SetGlobalOtelProviders()
Sets global OTel logger, tracer, meter providers from Client. Makes them accessible from anywhere in the code via global otel getters. Any package that relies on go.opentelemetry.io will be able to pick up configured global providers e.g [otelgrpc](https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc#example-NewServerHandler)
Types ¶
type Attributes ¶
type Client ¶
type Client struct { Config Config // Logger Logger otellog.Logger // Tracer Tracer oteltrace.Tracer // Meter Meter otelmetric.Meter // Message Emitter Emitter Emitter // Providers LoggerProvider otellog.LoggerProvider TracerProvider oteltrace.TracerProvider MeterProvider otelmetric.MeterProvider MessageLoggerProvider otellog.LoggerProvider // OnClose OnClose func() error }
func GetClient ¶
func GetClient() *Client
Returns the global Beholder Client Its thread-safe and can be used concurrently
func NewClient ¶
NewClient creates a new Client with initialized OpenTelemetry components To handle OpenTelemetry errors use otel.SetErrorHandler(https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler)
Example ¶
package main import ( "context" "fmt" "log" "go.opentelemetry.io/otel" "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb" ) func main() { config := beholder.DefaultConfig() // Initialize beholder otel client which sets up OTel components client, err := beholder.NewClient(config) if err != nil { log.Fatalf("Error creating Beholder client: %v", err) } // Handle OTel errors otel.SetErrorHandler(otelErrPrinter) // Set global client so it will be accessible from anywhere through beholder functions beholder.SetClient(client) // Define a custom protobuf payload to emit payload := &pb.TestCustomMessage{ BoolVal: true, IntVal: 42, FloatVal: 3.14, StringVal: "Hello, World!", } payloadBytes, err := proto.Marshal(payload) if err != nil { log.Fatalf("Failed to marshal protobuf") } // Emit the custom message anywhere from application logic fmt.Println("Emit custom messages") for range 10 { err := beholder.GetEmitter().Emit(context.Background(), payloadBytes, "beholder_data_schema", "/custom-message/versions/1", // required "beholder_domain", "ExampleDomain", // required "beholder_entity", "ExampleEntity", // required "beholder_data_type", "custom_message", "foo", "bar", ) if err != nil { log.Printf("Error emitting message: %v", err) } } } var otelErrPrinter = otel.ErrorHandlerFunc(func(err error) { log.Printf("otel error: %v", err) })
Output: Emit custom messages
func NewNoopClient ¶
func NewNoopClient() *Client
Default client to fallback when is is not initialized properly
Example ¶
package main import ( "context" "fmt" "log" "github.com/smartcontractkit/chainlink-common/pkg/beholder" ) func main() { fmt.Println("Beholder is not initialized. Fall back to Noop OTel Client") fmt.Println("Emitting custom message via noop otel client") err := beholder.GetEmitter().Emit(context.Background(), []byte("test message"), "beholder_data_schema", "/custom-message/versions/1", // required "beholder_domain", "ExampleDomain", // required "beholder_entity", "ExampleEntity", // required ) if err != nil { log.Printf("Error emitting message: %v", err) } }
Output: Beholder is not initialized. Fall back to Noop OTel Client Emitting custom message via noop otel client
func NewStdoutClient ¶
NewStdoutClient creates a new Client with exporters which send telemetry data to standard output Used for testing and debugging
func NewWriterClient ¶
NewWriterClient creates a new Client with otel exporters which send telemetry data to custom io.Writer
func (Client) ForName ¶ added in v0.4.0
ForName returns a new Client with the same configuration but with a different name. For global package-scoped telemetry, use the package name. For injected component-scoped telemetry, use a fully qualified name that uniquely identifies this instance.
func (Client) ForPackage ¶
Returns a new Client with the same configuration but with a different package name Deprecated: Use ForName
type Config ¶
type Config struct { InsecureConnection bool CACertFile string OtelExporterGRPCEndpoint string OtelExporterHTTPEndpoint string // OTel Resource ResourceAttributes []otelattr.KeyValue // Message Emitter EmitterExportTimeout time.Duration EmitterExportInterval time.Duration EmitterExportMaxBatchSize int EmitterMaxQueueSize int EmitterBatchProcessor bool // Enabled by default. Disable only for testing. // OTel Trace TraceSampleRatio float64 TraceBatchTimeout time.Duration TraceSpanExporter sdktrace.SpanExporter // optional additional exporter TraceRetryConfig *RetryConfig // OTel Metric MetricReaderInterval time.Duration MetricRetryConfig *RetryConfig MetricViews []sdkmetric.View // OTel Log LogExportTimeout time.Duration LogExportInterval time.Duration LogExportMaxBatchSize int LogMaxQueueSize int LogBatchProcessor bool // Enabled by default. Disable only for testing. // Retry config for shared log exporter, used by Emitter and Logger LogRetryConfig *RetryConfig // Auth AuthPublicKeyHex string AuthHeaders map[string]string }
Example ¶
package main import ( "fmt" "time" otelattr "go.opentelemetry.io/otel/attribute" "github.com/smartcontractkit/chainlink-common/pkg/beholder" ) const ( packageName = "beholder" ) func main() { config := beholder.Config{ InsecureConnection: true, CACertFile: "", OtelExporterGRPCEndpoint: "localhost:4317", OtelExporterHTTPEndpoint: "localhost:4318", // Resource ResourceAttributes: []otelattr.KeyValue{ otelattr.String("package_name", packageName), otelattr.String("sender", "beholderclient"), }, // Message Emitter EmitterExportTimeout: 1 * time.Second, EmitterExportMaxBatchSize: 512, EmitterExportInterval: 1 * time.Second, EmitterMaxQueueSize: 2048, EmitterBatchProcessor: true, // OTel message log exporter retry config LogRetryConfig: nil, // Trace TraceSampleRatio: 1, TraceBatchTimeout: 1 * time.Second, // OTel trace exporter retry config TraceRetryConfig: nil, // Metric MetricReaderInterval: 1 * time.Second, // OTel metric exporter retry config MetricRetryConfig: nil, // Log LogExportTimeout: 1 * time.Second, LogExportMaxBatchSize: 512, LogExportInterval: 1 * time.Second, LogMaxQueueSize: 2048, LogBatchProcessor: true, } fmt.Printf("%+v\n", config) config.LogRetryConfig = &beholder.RetryConfig{ InitialInterval: 5 * time.Second, MaxInterval: 30 * time.Second, MaxElapsedTime: 1 * time.Minute, // Set to zero to disable retry } fmt.Printf("%+v\n", *config.LogRetryConfig) }
Output: {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]} {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
func DefaultConfig ¶
func DefaultConfig() Config
func TestDefaultConfig ¶
func TestDefaultConfig() Config
func TestDefaultConfigHTTPClient ¶ added in v0.4.0
func TestDefaultConfigHTTPClient() Config
type Emitter ¶
type Emitter interface { // Sends message with bytes and attributes to OTel Collector Emit(ctx context.Context, body []byte, attrKVs ...any) error }
func GetEmitter ¶
func GetEmitter() Emitter
type Message ¶
type Message struct { Attrs Attributes Body []byte }
Example ¶
package main import ( "fmt" "github.com/smartcontractkit/chainlink-common/pkg/beholder" ) func main() { // Create message with body and attributes m1 := beholder.NewMessage([]byte{1}, beholder.Attributes{"key_string": "value"}) fmt.Println("#1", m1) // Create attributes additionalAttributes := beholder.Attributes{ "key_string": "new value", "key_int32": int32(1), } // Add attributes to message m1.AddAttributes(additionalAttributes) fmt.Println("#2", m1) // Create mmpty message struct m2 := beholder.Message{} fmt.Println("#3", m2) // Add attributes to message m2.AddAttributes(beholder.Attributes{"key_int": 1}) fmt.Println("#4", m2) // Update attribute key_int m2.AddAttributes(beholder.Attributes{"key_int": 2}) fmt.Println("#5", m2) // Set message body m2.Body = []byte("0123") fmt.Println("#6", m2) // Reset attributes m2.Attrs = beholder.Attributes{} fmt.Println("#7", m2) // Reset body m2.Body = nil fmt.Println("#8", m2) // Shalow copy of message m3 := beholder.NewMessage(m1.Body, m1.Attrs) fmt.Println("#9", m3) m1.Body[0] = byte(2) // Wil mutate m3 fmt.Println("#10", m3) // Deep copy m4 := m1.Copy() fmt.Println("#11", m4) m1.Body[0] = byte(3) // Should not mutate m4 fmt.Println("#12", m4) // Create message with mixed attributes: kv pairs and maps m5 := beholder.NewMessage([]byte{1}, // Add attributes from the map map[string]any{ "key1": "value1", }, // Add attributes from KV pair "key2", "value2", // Add attributes from Attributes map beholder.Attributes{"key3": "value3"}, // Add attributes from KV pair "key4", "value4", // Modify key1 "key1", "value5", // Modify key2 map[string]any{ "key2": "value6", }, ) fmt.Println("#13", m5) // Create message with no attributes m6 := beholder.NewMessage([]byte{1}, beholder.Attributes{}) // Add attributes using AddAttributes m6.AddAttributes( "key1", "value1", "key2", "value2", ) fmt.Println("#14", m6) }
Output: #1 Message{Attrs: map[key_string:value], Body: [1]} #2 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} #3 Message{Attrs: map[], Body: []} #4 Message{Attrs: map[key_int:1], Body: []} #5 Message{Attrs: map[key_int:2], Body: []} #6 Message{Attrs: map[key_int:2], Body: [48 49 50 51]} #7 Message{Attrs: map[], Body: [48 49 50 51]} #8 Message{Attrs: map[], Body: []} #9 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]} #10 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #11 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #12 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]} #13 Message{Attrs: map[key1:value5 key2:value6 key3:value3 key4:value4], Body: [1]} #14 Message{Attrs: map[key1:value1 key2:value2], Body: [1]}
func NewMessage ¶
func (*Message) AddAttributes ¶
func (*Message) OtelRecord ¶
type Metadata ¶
type Metadata struct { // REQUIRED FIELDS // Schema Registry URI to fetch schema BeholderDomain string `validate:"required,domain_entity"` BeholderEntity string `validate:"required,domain_entity"` BeholderDataSchema string `validate:"required,uri"` // OPTIONAL FIELDS // The version of the CL node. NodeVersion string // mTLS public key for the node operator. This is used as an identity key but with the added benefit of being able to provide signatures. NodeCsaKey string // Signature from CSA private key. NodeCsaSignature string DonID string // The RDD network name the CL node is operating with. NetworkName []string WorkflowID string WorkflowName string WorkflowOwnerAddress string // Hash of the workflow spec. WorkflowSpecID string // The unique execution of a workflow. WorkflowExecutionID string // The address for the contract. CapabilityContractAddress string CapabilityID string CapabilityVersion string CapabilityName string NetworkChainID string }
Example ¶
package main import ( "fmt" "github.com/smartcontractkit/chainlink-common/pkg/beholder" ) func testMetadata() beholder.Metadata { return beholder.Metadata{ NodeVersion: "v1.0.0", NodeCsaKey: "test_key", NodeCsaSignature: "test_signature", DonID: "test_don_id", NetworkName: []string{"test_network"}, WorkflowID: "test_workflow_id", WorkflowName: "test_workflow_name", WorkflowOwnerAddress: "test_owner_address", WorkflowSpecID: "test_spec_id", WorkflowExecutionID: "test_execution_id", BeholderDomain: "TestDomain", BeholderEntity: "TestEntity", BeholderDataSchema: "/schemas/ids/test_schema", CapabilityContractAddress: "test_contract_address", CapabilityID: "test_capability_id", CapabilityVersion: "test_capability_version", CapabilityName: "test_capability_name", NetworkChainID: "test_chain_id", } } func main() { m := testMetadata() fmt.Printf("%#v\n", m) fmt.Println(m.Attributes()) }
Output: beholder.Metadata{BeholderDomain:"TestDomain", BeholderEntity:"TestEntity", BeholderDataSchema:"/schemas/ids/test_schema", NodeVersion:"v1.0.0", NodeCsaKey:"test_key", NodeCsaSignature:"test_signature", DonID:"test_don_id", NetworkName:[]string{"test_network"}, WorkflowID:"test_workflow_id", WorkflowName:"test_workflow_name", WorkflowOwnerAddress:"test_owner_address", WorkflowSpecID:"test_spec_id", WorkflowExecutionID:"test_execution_id", CapabilityContractAddress:"test_contract_address", CapabilityID:"test_capability_id", CapabilityVersion:"test_capability_version", CapabilityName:"test_capability_name", NetworkChainID:"test_chain_id"} map[beholder_data_schema:/schemas/ids/test_schema beholder_domain:TestDomain beholder_entity:TestEntity capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id]
func NewMetadata ¶
func NewMetadata(attrs Attributes) *Metadata
func (Metadata) Attributes ¶
func (m Metadata) Attributes() Attributes
func (*Metadata) FromAttributes ¶
func (m *Metadata) FromAttributes(attrs Attributes) *Metadata
Sets metadata fields from attributes
func (*Metadata) Validate ¶
Example ¶
package main import ( "fmt" "github.com/smartcontractkit/chainlink-common/pkg/beholder" ) func main() { validate, err := beholder.NewMetadataValidator() if err != nil { fmt.Println(err) } metadata := beholder.Metadata{ BeholderDomain: "TestDomain", BeholderEntity: "TestEntity", } if err := validate.Struct(metadata); err != nil { fmt.Println(err) } metadata.BeholderDataSchema = "example.proto" if err := validate.Struct(metadata); err != nil { fmt.Println(err) } metadata.BeholderDataSchema = "/schemas/ids/test_schema" if err := validate.Struct(metadata); err != nil { fmt.Println(err) } else { fmt.Println("Metadata is valid") } }
Output: Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag Metadata is valid
type RetryConfig ¶ added in v0.4.0
type RetryConfig struct { // InitialInterval the time to wait after the first failure before // retrying. InitialInterval time.Duration // MaxInterval is the upper bound on backoff interval. Once this value is // reached the delay between consecutive retries will always be // `MaxInterval`. MaxInterval time.Duration // MaxElapsedTime is the maximum amount of time (including retries) spent // trying to send a request/batch. Once this value is reached, the data // is discarded. // Set to zero to disable retry MaxElapsedTime time.Duration }
func (*RetryConfig) Copy ¶ added in v0.4.0
func (c *RetryConfig) Copy() *RetryConfig
func (*RetryConfig) Enabled ¶ added in v0.4.0
func (c *RetryConfig) Enabled() bool
Calculate if retry is enabled
func (*RetryConfig) GetInitialInterval ¶ added in v0.4.0
func (c *RetryConfig) GetInitialInterval() time.Duration
Implement getters for fields to avoid nil pointer dereference in case the config is not set
func (*RetryConfig) GetMaxElapsedTime ¶ added in v0.4.0
func (c *RetryConfig) GetMaxElapsedTime() time.Duration
func (*RetryConfig) GetMaxInterval ¶ added in v0.4.0
func (c *RetryConfig) GetMaxInterval() time.Duration