beholder

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: MIT Imports: 38 Imported by: 7

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildAuthHeaders added in v0.4.0

func BuildAuthHeaders(privKey ed25519.PrivateKey) map[string]string

BuildAuthHeaders creates the auth header value to be included on requests. The current format for the header is:

<version>:<public_key_hex>:<signature_hex>

where the byte value of <public_key_hex> is what's being signed

func DefaultWriterClientConfig

func DefaultWriterClientConfig() writerClientConfig

func GetLogger

func GetLogger() otellog.Logger

func GetMeter

func GetMeter() otelmetric.Meter

func GetTracer

func GetTracer() oteltrace.Tracer

func NewMetadataValidator added in v0.4.0

func NewMetadataValidator() (*validator.Validate, error)

func OtelAttr

func OtelAttr(key string, value any) otellog.KeyValue

func SetClient

func SetClient(client *Client)

SetClient sets the global Beholder Client

func SetGlobalOtelProviders

func SetGlobalOtelProviders()

Sets global OTel logger, tracer, meter providers from Client. Makes them accessible from anywhere in the code via global otel getters. Any package that relies on go.opentelemetry.io will be able to pick up configured global providers e.g [otelgrpc](https://pkg.go.dev/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc#example-NewServerHandler)

Types

type Attributes

type Attributes = map[string]any

type Client

type Client struct {
	Config Config
	// Logger
	Logger otellog.Logger
	// Tracer
	Tracer oteltrace.Tracer
	// Meter
	Meter otelmetric.Meter
	// Message Emitter
	Emitter Emitter

	// Providers
	LoggerProvider        otellog.LoggerProvider
	TracerProvider        oteltrace.TracerProvider
	MeterProvider         otelmetric.MeterProvider
	MessageLoggerProvider otellog.LoggerProvider

	// OnClose
	OnClose func() error
}

func GetClient

func GetClient() *Client

Returns the global Beholder Client Its thread-safe and can be used concurrently

func NewClient

func NewClient(cfg Config) (*Client, error)

NewClient creates a new Client with initialized OpenTelemetry components To handle OpenTelemetry errors use otel.SetErrorHandler(https://pkg.go.dev/go.opentelemetry.io/otel#SetErrorHandler)

Example
package main

import (
	"context"
	"fmt"
	"log"

	"go.opentelemetry.io/otel"
	"google.golang.org/protobuf/proto"

	"github.com/smartcontractkit/chainlink-common/pkg/beholder"
	"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
)

func main() {
	config := beholder.DefaultConfig()

	// Initialize beholder otel client which sets up OTel components
	client, err := beholder.NewClient(config)
	if err != nil {
		log.Fatalf("Error creating Beholder client: %v", err)
	}
	// Handle OTel errors
	otel.SetErrorHandler(otelErrPrinter)
	// Set global client so it will be accessible from anywhere through beholder functions
	beholder.SetClient(client)

	// Define a custom protobuf payload to emit
	payload := &pb.TestCustomMessage{
		BoolVal:   true,
		IntVal:    42,
		FloatVal:  3.14,
		StringVal: "Hello, World!",
	}
	payloadBytes, err := proto.Marshal(payload)
	if err != nil {
		log.Fatalf("Failed to marshal protobuf")
	}

	// Emit the custom message anywhere from application logic
	fmt.Println("Emit custom messages")
	for range 10 {
		err := beholder.GetEmitter().Emit(context.Background(), payloadBytes,
			"beholder_data_schema", "/custom-message/versions/1", // required
			"beholder_domain", "ExampleDomain", // required
			"beholder_entity", "ExampleEntity", // required
			"beholder_data_type", "custom_message",
			"foo", "bar",
		)
		if err != nil {
			log.Printf("Error emitting message: %v", err)
		}
	}
}

var otelErrPrinter = otel.ErrorHandlerFunc(func(err error) {
	log.Printf("otel error: %v", err)
})
Output:

Emit custom messages

func NewNoopClient

func NewNoopClient() *Client

Default client to fallback when is is not initialized properly

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

func main() {
	fmt.Println("Beholder is not initialized. Fall back to Noop OTel Client")

	fmt.Println("Emitting custom message via noop otel client")

	err := beholder.GetEmitter().Emit(context.Background(), []byte("test message"),
		"beholder_data_schema", "/custom-message/versions/1", // required
		"beholder_domain", "ExampleDomain", // required
		"beholder_entity", "ExampleEntity", // required
	)
	if err != nil {
		log.Printf("Error emitting message: %v", err)
	}
}
Output:

Beholder is not initialized. Fall back to Noop OTel Client
Emitting custom message via noop otel client

func NewStdoutClient

func NewStdoutClient() (*Client, error)

NewStdoutClient creates a new Client with exporters which send telemetry data to standard output Used for testing and debugging

func NewWriterClient

func NewWriterClient(w io.Writer) (*Client, error)

NewWriterClient creates a new Client with otel exporters which send telemetry data to custom io.Writer

func (Client) Close

func (c Client) Close() (err error)

Closes all providers, flushes all data and stops all background processes

func (Client) ForName added in v0.4.0

func (c Client) ForName(name string) Client

ForName returns a new Client with the same configuration but with a different name. For global package-scoped telemetry, use the package name. For injected component-scoped telemetry, use a fully qualified name that uniquely identifies this instance.

func (Client) ForPackage

func (c Client) ForPackage(name string) Client

Returns a new Client with the same configuration but with a different package name Deprecated: Use ForName

type Config

type Config struct {
	InsecureConnection       bool
	CACertFile               string
	OtelExporterGRPCEndpoint string
	OtelExporterHTTPEndpoint string

	// OTel Resource
	ResourceAttributes []otelattr.KeyValue
	// Message Emitter
	EmitterExportTimeout      time.Duration
	EmitterExportInterval     time.Duration
	EmitterExportMaxBatchSize int
	EmitterMaxQueueSize       int
	EmitterBatchProcessor     bool // Enabled by default. Disable only for testing.

	// OTel Trace
	TraceSampleRatio  float64
	TraceBatchTimeout time.Duration
	TraceSpanExporter sdktrace.SpanExporter // optional additional exporter
	TraceRetryConfig  *RetryConfig

	// OTel Metric
	MetricReaderInterval time.Duration
	MetricRetryConfig    *RetryConfig
	MetricViews          []sdkmetric.View

	// OTel Log
	LogExportTimeout      time.Duration
	LogExportInterval     time.Duration
	LogExportMaxBatchSize int
	LogMaxQueueSize       int
	LogBatchProcessor     bool // Enabled by default. Disable only for testing.
	// Retry config for shared log exporter, used by Emitter and Logger
	LogRetryConfig *RetryConfig

	// Auth
	AuthPublicKeyHex string
	AuthHeaders      map[string]string
}
Example
package main

import (
	"fmt"
	"time"

	otelattr "go.opentelemetry.io/otel/attribute"

	"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

const (
	packageName = "beholder"
)

func main() {
	config := beholder.Config{
		InsecureConnection:       true,
		CACertFile:               "",
		OtelExporterGRPCEndpoint: "localhost:4317",
		OtelExporterHTTPEndpoint: "localhost:4318",
		// Resource
		ResourceAttributes: []otelattr.KeyValue{
			otelattr.String("package_name", packageName),
			otelattr.String("sender", "beholderclient"),
		},
		// Message Emitter
		EmitterExportTimeout:      1 * time.Second,
		EmitterExportMaxBatchSize: 512,
		EmitterExportInterval:     1 * time.Second,
		EmitterMaxQueueSize:       2048,
		EmitterBatchProcessor:     true,
		// OTel message log exporter retry config
		LogRetryConfig: nil,
		// Trace
		TraceSampleRatio:  1,
		TraceBatchTimeout: 1 * time.Second,
		// OTel trace exporter retry config
		TraceRetryConfig: nil,
		// Metric
		MetricReaderInterval: 1 * time.Second,
		// OTel metric exporter retry config
		MetricRetryConfig: nil,
		// Log
		LogExportTimeout:      1 * time.Second,
		LogExportMaxBatchSize: 512,
		LogExportInterval:     1 * time.Second,
		LogMaxQueueSize:       2048,
		LogBatchProcessor:     true,
	}
	fmt.Printf("%+v\n", config)
	config.LogRetryConfig = &beholder.RetryConfig{
		InitialInterval: 5 * time.Second,
		MaxInterval:     30 * time.Second,
		MaxElapsedTime:  1 * time.Minute, // Set to zero to disable retry
	}
	fmt.Printf("%+v\n", *config.LogRetryConfig)
}
Output:

{InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]}
{InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}

func DefaultConfig

func DefaultConfig() Config

func TestDefaultConfig

func TestDefaultConfig() Config

func TestDefaultConfigHTTPClient added in v0.4.0

func TestDefaultConfigHTTPClient() Config

type Emitter

type Emitter interface {
	// Sends message with bytes and attributes to OTel Collector
	Emit(ctx context.Context, body []byte, attrKVs ...any) error
}

func GetEmitter

func GetEmitter() Emitter

type Message

type Message struct {
	Attrs Attributes
	Body  []byte
}
Example
package main

import (
	"fmt"

	"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

func main() {
	// Create message with body and attributes
	m1 := beholder.NewMessage([]byte{1}, beholder.Attributes{"key_string": "value"})
	fmt.Println("#1", m1)
	// Create attributes
	additionalAttributes := beholder.Attributes{
		"key_string": "new value",
		"key_int32":  int32(1),
	}
	// Add attributes to message
	m1.AddAttributes(additionalAttributes)
	fmt.Println("#2", m1)
	// Create mmpty message struct
	m2 := beholder.Message{}
	fmt.Println("#3", m2)
	// Add attributes to message
	m2.AddAttributes(beholder.Attributes{"key_int": 1})
	fmt.Println("#4", m2)
	// Update attribute key_int
	m2.AddAttributes(beholder.Attributes{"key_int": 2})
	fmt.Println("#5", m2)
	// Set message body
	m2.Body = []byte("0123")
	fmt.Println("#6", m2)
	// Reset attributes
	m2.Attrs = beholder.Attributes{}
	fmt.Println("#7", m2)
	// Reset body
	m2.Body = nil
	fmt.Println("#8", m2)
	// Shalow copy of message
	m3 := beholder.NewMessage(m1.Body, m1.Attrs)
	fmt.Println("#9", m3)
	m1.Body[0] = byte(2) // Wil mutate m3
	fmt.Println("#10", m3)
	// Deep copy
	m4 := m1.Copy()
	fmt.Println("#11", m4)
	m1.Body[0] = byte(3) // Should not mutate m4
	fmt.Println("#12", m4)
	// Create message with mixed attributes: kv pairs and maps
	m5 := beholder.NewMessage([]byte{1},
		// Add attributes from the map
		map[string]any{
			"key1": "value1",
		},
		// Add attributes from KV pair
		"key2", "value2",
		// Add attributes from Attributes map
		beholder.Attributes{"key3": "value3"},
		// Add attributes from KV pair
		"key4", "value4",
		// Modify key1
		"key1", "value5",
		// Modify key2
		map[string]any{
			"key2": "value6",
		},
	)
	fmt.Println("#13", m5)
	// Create message with no attributes
	m6 := beholder.NewMessage([]byte{1}, beholder.Attributes{})
	// Add attributes using AddAttributes
	m6.AddAttributes(
		"key1", "value1",
		"key2", "value2",
	)
	fmt.Println("#14", m6)
}
Output:

#1 Message{Attrs: map[key_string:value], Body: [1]}
#2 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]}
#3 Message{Attrs: map[], Body: []}
#4 Message{Attrs: map[key_int:1], Body: []}
#5 Message{Attrs: map[key_int:2], Body: []}
#6 Message{Attrs: map[key_int:2], Body: [48 49 50 51]}
#7 Message{Attrs: map[], Body: [48 49 50 51]}
#8 Message{Attrs: map[], Body: []}
#9 Message{Attrs: map[key_int32:1 key_string:new value], Body: [1]}
#10 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]}
#11 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]}
#12 Message{Attrs: map[key_int32:1 key_string:new value], Body: [2]}
#13 Message{Attrs: map[key1:value5 key2:value6 key3:value3 key4:value4], Body: [1]}
#14 Message{Attrs: map[key1:value1 key2:value2], Body: [1]}

func NewMessage

func NewMessage(body []byte, attrKVs ...any) Message

func (*Message) AddAttributes

func (e *Message) AddAttributes(attrKVs ...any)

func (*Message) Copy

func (e *Message) Copy() Message

func (*Message) OtelRecord

func (e *Message) OtelRecord() otellog.Record

func (Message) String

func (e Message) String() string

func (Message) Validate

func (e Message) Validate() error

type Metadata

type Metadata struct {
	//	REQUIRED FIELDS
	// Schema Registry URI to fetch schema
	BeholderDomain     string `validate:"required,domain_entity"`
	BeholderEntity     string `validate:"required,domain_entity"`
	BeholderDataSchema string `validate:"required,uri"`

	// OPTIONAL FIELDS
	// The version of the CL node.
	NodeVersion string
	// mTLS public key for the node operator. This is used as an identity key but with the added benefit of being able to provide signatures.
	NodeCsaKey string
	// Signature from CSA private key.
	NodeCsaSignature string
	DonID            string
	// The RDD network name the CL node is operating with.
	NetworkName          []string
	WorkflowID           string
	WorkflowName         string
	WorkflowOwnerAddress string
	// Hash of the workflow spec.
	WorkflowSpecID string
	// The unique execution of a workflow.
	WorkflowExecutionID string
	// The address for the contract.
	CapabilityContractAddress string
	CapabilityID              string
	CapabilityVersion         string
	CapabilityName            string
	NetworkChainID            string
}
Example
package main

import (
	"fmt"

	"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

func testMetadata() beholder.Metadata {
	return beholder.Metadata{
		NodeVersion:               "v1.0.0",
		NodeCsaKey:                "test_key",
		NodeCsaSignature:          "test_signature",
		DonID:                     "test_don_id",
		NetworkName:               []string{"test_network"},
		WorkflowID:                "test_workflow_id",
		WorkflowName:              "test_workflow_name",
		WorkflowOwnerAddress:      "test_owner_address",
		WorkflowSpecID:            "test_spec_id",
		WorkflowExecutionID:       "test_execution_id",
		BeholderDomain:            "TestDomain",
		BeholderEntity:            "TestEntity",
		BeholderDataSchema:        "/schemas/ids/test_schema",
		CapabilityContractAddress: "test_contract_address",
		CapabilityID:              "test_capability_id",
		CapabilityVersion:         "test_capability_version",
		CapabilityName:            "test_capability_name",
		NetworkChainID:            "test_chain_id",
	}
}
func main() {
	m := testMetadata()
	fmt.Printf("%#v\n", m)
	fmt.Println(m.Attributes())
}
Output:

beholder.Metadata{BeholderDomain:"TestDomain", BeholderEntity:"TestEntity", BeholderDataSchema:"/schemas/ids/test_schema", NodeVersion:"v1.0.0", NodeCsaKey:"test_key", NodeCsaSignature:"test_signature", DonID:"test_don_id", NetworkName:[]string{"test_network"}, WorkflowID:"test_workflow_id", WorkflowName:"test_workflow_name", WorkflowOwnerAddress:"test_owner_address", WorkflowSpecID:"test_spec_id", WorkflowExecutionID:"test_execution_id", CapabilityContractAddress:"test_contract_address", CapabilityID:"test_capability_id", CapabilityVersion:"test_capability_version", CapabilityName:"test_capability_name", NetworkChainID:"test_chain_id"}
map[beholder_data_schema:/schemas/ids/test_schema beholder_domain:TestDomain beholder_entity:TestEntity capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id]

func NewMetadata

func NewMetadata(attrs Attributes) *Metadata

func (Metadata) Attributes

func (m Metadata) Attributes() Attributes

func (*Metadata) FromAttributes

func (m *Metadata) FromAttributes(attrs Attributes) *Metadata

Sets metadata fields from attributes

func (*Metadata) Validate

func (m *Metadata) Validate() error
Example
package main

import (
	"fmt"

	"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)

func main() {
	validate, err := beholder.NewMetadataValidator()
	if err != nil {
		fmt.Println(err)
	}

	metadata := beholder.Metadata{
		BeholderDomain: "TestDomain",
		BeholderEntity: "TestEntity",
	}
	if err := validate.Struct(metadata); err != nil {
		fmt.Println(err)
	}
	metadata.BeholderDataSchema = "example.proto"
	if err := validate.Struct(metadata); err != nil {
		fmt.Println(err)
	}
	metadata.BeholderDataSchema = "/schemas/ids/test_schema"
	if err := validate.Struct(metadata); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println("Metadata is valid")
	}
}
Output:

Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag
Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag
Metadata is valid

type RetryConfig added in v0.4.0

type RetryConfig struct {
	// InitialInterval the time to wait after the first failure before
	// retrying.
	InitialInterval time.Duration
	// MaxInterval is the upper bound on backoff interval. Once this value is
	// reached the delay between consecutive retries will always be
	// `MaxInterval`.
	MaxInterval time.Duration
	// MaxElapsedTime is the maximum amount of time (including retries) spent
	// trying to send a request/batch.  Once this value is reached, the data
	// is discarded.
	// Set to zero to disable retry
	MaxElapsedTime time.Duration
}

func (*RetryConfig) Copy added in v0.4.0

func (c *RetryConfig) Copy() *RetryConfig

func (*RetryConfig) Enabled added in v0.4.0

func (c *RetryConfig) Enabled() bool

Calculate if retry is enabled

func (*RetryConfig) GetInitialInterval added in v0.4.0

func (c *RetryConfig) GetInitialInterval() time.Duration

Implement getters for fields to avoid nil pointer dereference in case the config is not set

func (*RetryConfig) GetMaxElapsedTime added in v0.4.0

func (c *RetryConfig) GetMaxElapsedTime() time.Duration

func (*RetryConfig) GetMaxInterval added in v0.4.0

func (c *RetryConfig) GetMaxInterval() time.Duration

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL