pulsarexporter

package module
v0.110.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2024 License: Apache-2.0 Imports: 21 Imported by: 3

README

Pulsar Exporter

Status
Stability alpha: traces, metrics, logs
Distributions contrib
Issues Open issues Closed issues
Code Owners @dmitryax, @dao-jun

Pulsar exporter exports logs, metrics, and traces to Pulsar. This exporter uses a synchronous producer that blocks and able to batch messages.

Get Started

The following settings can be optionally configured:

  • endpoint (default = pulsar://localhost:6650): The url of pulsar cluster.
  • topic (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the pulsar topic to export to.
  • encoding (default = otlp_proto): The encoding of the traces sent to pulsar. All available encodings:
    • otlp_proto: payload is Protobuf serialized from ExportTraceServiceRequest if set as a traces exporter or ExportMetricsServiceRequest for metrics or ExportLogsServiceRequest for logs.
    • otlp_json: ** EXPERIMENTAL ** payload is JSON serialized from ExportTraceServiceRequest if set as a traces exporter or ExportMetricsServiceRequest for metrics or ExportLogsServiceRequest for logs.
    • The following encodings are valid only for traces.
      • jaeger_proto: the payload is serialized to a single Jaeger proto Span, and keyed by TraceID.
      • jaeger_json: the payload is serialized to a single Jaeger JSON Span using jsonpb, and keyed by TraceID.
  • auth
    • tls
      • cert_file:
      • key_file:
    • token
      • token
    • oauth2
      • issuer_url:
      • client_id:
      • audience:
    • athenz
      • provider_domain:
      • tenant_domain:
      • tenant_service:
      • private_key:
      • key_id:
      • principal_header:
      • zts_url:
  • producer
    • max_reconnect_broker: specifies the maximum retry number of reconnectToBroker. (default: ultimate)
    • hashing_scheme: used to define the partition on where to publish a particular message. Can be set to java_string_hash (default) or murmur3_32hash.
    • compression_level: one of 'default' (default), 'faster', or 'better'.
    • compression_type: one of 'none' (default), 'lz4', 'zlib', or 'zstd'.
    • max_pending_messages: specifies the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
    • batch_builder_type: one of 'default' (default) or 'key_based'.
    • partitions_auto_discovery_interval: the time interval for the background process to discover new partitions
    • batching_max_publish_delay: specifies the time period within which the messages sent will be batched (default: 10ms)
    • batching_max_messages: specifies the maximum number of messages permitted in a batch. (default: 1000)
    • batching_max_size: specifies the maximum number of bytes permitted in a batch. (default 128 KB)
    • disable_block_if_queue_full: controls whether Send and SendAsync block if producer's message queue is full. Defaults to false.
    • disable_batching: controls whether automatic batching of messages is enabled for the producer. Defaults to false.
  • tls_trust_certs_file_path: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to true.
  • tls_allow_insecure_connection: configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
  • timeout: send pulsar message timeout (default: 5s)
  • operation_timeout: sets producer-create, subscribe and unsubscribe operations timeout (default: 30 seconds)
  • connection_timeout: timeout for the establishment of a TCP connection (default: 5 seconds)
  • map_connections_per_broker: max number of connections to a single broker that will kept in the pool. (default: 1 connection)
  • retry_on_failure
    • enabled (default = true)
    • initial_interval (default = 5s): Time to wait after the first failure before retrying; ignored if enabled is false
    • max_interval (default = 30s): Is the upper bound on backoff; ignored if enabled is false
    • max_elapsed_time (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if enabled is false
  • sending_queue
    • enabled (default = true)
    • num_consumers (default = 10): Number of consumers that dequeue batches; ignored if enabled is false
    • queue_size (default = 1000): Maximum number of batches kept in memory before dropping data; ignored if enabled is false; User should calculate this as num_seconds * requests_per_second where:
      • num_seconds is the number of seconds to buffer in case of a backend outage
      • requests_per_second is the average number of requests per seconds.

Example configuration:

exporters:
  pulsar:
    endpoint: pulsar://localhost:6650
    topic: otlp-spans
    encoding: otlp_proto
    auth:
      tls:
        cert_file: cert.pem
        key_file: key.pem
    timeout: 10s
    tls_allow_insecure_connection: false
    tls_trust_certs_file_path: ca.pem

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory

func NewFactory(options ...FactoryOption) exporter.Factory

NewFactory creates Pulsar exporter factory.

Types

type Athenz

type Athenz struct {
	ProviderDomain  string              `mapstructure:"provider_domain"`
	TenantDomain    string              `mapstructure:"tenant_domain"`
	TenantService   string              `mapstructure:"tenant_service"`
	PrivateKey      configopaque.String `mapstructure:"private_key"`
	KeyID           string              `mapstructure:"key_id"`
	PrincipalHeader string              `mapstructure:"principal_header"`
	ZtsURL          string              `mapstructure:"zts_url"`
}

type Authentication

type Authentication struct {
	TLS    *TLS    `mapstructure:"tls"`
	Token  *Token  `mapstructure:"token"`
	Athenz *Athenz `mapstructure:"athenz"`
	OAuth2 *OAuth2 `mapstructure:"oauth2"`
}

type BatchBuilderType added in v0.73.0

type BatchBuilderType string
const (
	DefaultBatchBuilder  BatchBuilderType = "default"
	KeyBasedBatchBuilder BatchBuilderType = "key_based"
)

func (*BatchBuilderType) ToPulsar added in v0.73.0

func (*BatchBuilderType) UnmarshalText added in v0.73.0

func (c *BatchBuilderType) UnmarshalText(text []byte) error

type CompressionLevel added in v0.73.0

type CompressionLevel string
const (
	Default CompressionLevel = "default"
	Faster  CompressionLevel = "faster"
	Better  CompressionLevel = "better"
)

func (*CompressionLevel) ToPulsar added in v0.73.0

func (*CompressionLevel) UnmarshalText added in v0.73.0

func (c *CompressionLevel) UnmarshalText(text []byte) error

type CompressionType added in v0.73.0

type CompressionType string
const (
	None CompressionType = "none"
	LZ4  CompressionType = "lz4"
	ZLib CompressionType = "zlib"
	ZStd CompressionType = "zstd"
)

func (*CompressionType) ToPulsar added in v0.73.0

func (c *CompressionType) ToPulsar() pulsar.CompressionType

func (*CompressionType) UnmarshalText added in v0.73.0

func (c *CompressionType) UnmarshalText(text []byte) error

type Config

type Config struct {
	TimeoutSettings           exporterhelper.TimeoutConfig `mapstructure:",squash"`
	QueueSettings             exporterhelper.QueueConfig   `mapstructure:"sending_queue"`
	configretry.BackOffConfig `mapstructure:"retry_on_failure"`

	// Endpoint of pulsar broker (default "pulsar://localhost:6650")
	Endpoint string `mapstructure:"endpoint"`
	// The name of the pulsar topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
	Topic string `mapstructure:"topic"`
	// Encoding of messages (default "otlp_proto")
	Encoding string `mapstructure:"encoding"`
	// Producer configuration of the Pulsar producer
	Producer Producer `mapstructure:"producer"`
	// Set the path to the trusted TLS certificate file
	TLSTrustCertsFilePath string `mapstructure:"tls_trust_certs_file_path"`
	// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
	TLSAllowInsecureConnection bool           `mapstructure:"tls_allow_insecure_connection"`
	Authentication             Authentication `mapstructure:"auth"`
	OperationTimeout           time.Duration  `mapstructure:"operation_timeout"`
	ConnectionTimeout          time.Duration  `mapstructure:"connection_timeout"`
	MaxConnectionsPerBroker    int            `mapstructure:"map_connections_per_broker"`
}

Config defines configuration for Pulsar exporter.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks if the exporter configuration is valid

type FactoryOption

type FactoryOption func(factory *pulsarExporterFactory)

FactoryOption applies changes to pulsarExporterFactory.

type HashingScheme added in v0.73.0

type HashingScheme string
const (
	JavaStringHash HashingScheme = "java_string_hash"
	Murmur3_32Hash HashingScheme = "murmur3_32hash"
)

func (*HashingScheme) ToPulsar added in v0.73.0

func (c *HashingScheme) ToPulsar() pulsar.HashingScheme

func (*HashingScheme) UnmarshalText added in v0.73.0

func (c *HashingScheme) UnmarshalText(text []byte) error

type LogsMarshaler

type LogsMarshaler interface {
	// Marshal serializes logs into sarama's ProducerMessages
	Marshal(logs plog.Logs, topic string) ([]*pulsar.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

LogsMarshaler marshals logs into Message array

type MetricsMarshaler

type MetricsMarshaler interface {
	// Marshal serializes metrics into sarama's ProducerMessages
	Marshal(metrics pmetric.Metrics, topic string) ([]*pulsar.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

MetricsMarshaler marshals metrics into Message array

type OAuth2

type OAuth2 struct {
	IssuerURL string `mapstructure:"issuer_url"`
	ClientID  string `mapstructure:"client_id"`
	Audience  string `mapstructure:"audience"`
}

type Producer added in v0.73.0

type Producer struct {
	MaxReconnectToBroker            *uint            `mapstructure:"max_reconnect_broker"`
	HashingScheme                   HashingScheme    `mapstructure:"hashing_scheme"`
	CompressionLevel                CompressionLevel `mapstructure:"compression_level"`
	CompressionType                 CompressionType  `mapstructure:"compression_type"`
	MaxPendingMessages              int              `mapstructure:"max_pending_messages"`
	BatcherBuilderType              BatchBuilderType `mapstructure:"batch_builder_type"`
	PartitionsAutoDiscoveryInterval time.Duration    `mapstructure:"partitions_auto_discovery_interval"`
	BatchingMaxPublishDelay         time.Duration    `mapstructure:"batching_max_publish_delay"`
	BatchingMaxMessages             uint             `mapstructure:"batching_max_messages"`
	BatchingMaxSize                 uint             `mapstructure:"batching_max_size"`
	DisableBlockIfQueueFull         bool             `mapstructure:"disable_block_if_queue_full"`
	DisableBatching                 bool             `mapstructure:"disable_batching"`
}

Producer defines configuration for producer

type PulsarLogsProducer

type PulsarLogsProducer struct {
	// contains filtered or unexported fields
}

func (*PulsarLogsProducer) Close

type PulsarMetricsProducer

type PulsarMetricsProducer struct {
	// contains filtered or unexported fields
}

func (*PulsarMetricsProducer) Close

type PulsarTracesProducer

type PulsarTracesProducer struct {
	// contains filtered or unexported fields
}

func (*PulsarTracesProducer) Close

type TLS

type TLS struct {
	CertFile string `mapstructure:"cert_file"`
	KeyFile  string `mapstructure:"key_file"`
}

type Token

type Token struct {
	Token configopaque.String `mapstructure:"token"`
}

type TracesMarshaler

type TracesMarshaler interface {
	// Marshal serializes spans into sarama's ProducerMessages
	Marshal(traces ptrace.Traces, topic string) ([]*pulsar.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

TracesMarshaler marshals traces into Message array.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL