Documentation
¶
Index ¶
- Constants
- Variables
- func AddConfigSchema(c interface{ ... }) error
- func NewLogrusLogger(log *logrus.Logger) watermill.LoggerAdapter
- func NewOTELSaramaTracer(option ...otelsarama.Option) kafka.SaramaTracer
- func SetupInMemoryPubSub(l *logrusx.Logger, c *Config) (*memoryPubSub, error)
- type Config
- type InMemoryConfig
- type KafkaConfig
- type LogrusLoggerAdapter
- func (l *LogrusLoggerAdapter) Debug(msg string, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) Error(msg string, err error, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) Info(msg string, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) Trace(msg string, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) With(fields watermill.LogFields) watermill.LoggerAdapter
- type OTELSaramaTracer
- func (t OTELSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumer
- func (t OTELSaramaTracer) WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler
- func (t OTELSaramaTracer) WrapPartitionConsumer(pc sarama.PartitionConsumer) sarama.PartitionConsumer
- func (t OTELSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducer
- type ProvidersConfig
- type PubSub
- type PubSubOption
- type Publisher
- type Subscriber
Constants ¶
const ConfigSchemaID = "clinia://pubsub-config"
Variables ¶
var ConfigSchema string
Functions ¶
func AddConfigSchema ¶
AddConfigSchema adds the tracing schema to the compiler. The interface is specified instead of `jsonschema.Compiler` to allow the use of any jsonschema library fork or version.
func NewLogrusLogger ¶ added in v0.0.12
func NewLogrusLogger(log *logrus.Logger) watermill.LoggerAdapter
NewLogrusLogger returns a LogrusLoggerAdapter that sends all logs to the passed logrus instance.
func NewOTELSaramaTracer ¶ added in v0.0.31
func NewOTELSaramaTracer(option ...otelsarama.Option) kafka.SaramaTracer
Types ¶
type Config ¶
type Config struct { Provider string `json:"provider"` Providers ProvidersConfig `json:"providers"` }
type InMemoryConfig ¶
type InMemoryConfig struct{}
type KafkaConfig ¶
type KafkaConfig struct {
Brokers []string `json:"brokers"`
}
type LogrusLoggerAdapter ¶ added in v0.0.12
type LogrusLoggerAdapter struct {
// contains filtered or unexported fields
}
LogrusLoggerAdapter is a logrus adapter for watermill.
func (*LogrusLoggerAdapter) Debug ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Debug(msg string, fields watermill.LogFields)
Debug logs on level debug with optional fields.
func (*LogrusLoggerAdapter) Error ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Error(msg string, err error, fields watermill.LogFields)
Error logs on level error with err as field and optional fields.
func (*LogrusLoggerAdapter) Info ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Info(msg string, fields watermill.LogFields)
Info logs on level info with optional fields.
func (*LogrusLoggerAdapter) Trace ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Trace(msg string, fields watermill.LogFields)
Trace logs on level trace with optional fields.
func (*LogrusLoggerAdapter) With ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) With(fields watermill.LogFields) watermill.LoggerAdapter
With returns a new LogrusLoggerAdapter that includes fields to be re-used between logging statements.
type OTELSaramaTracer ¶ added in v0.0.31
type OTELSaramaTracer struct {
// contains filtered or unexported fields
}
func (OTELSaramaTracer) WrapConsumer ¶ added in v0.0.31
func (t OTELSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumer
func (OTELSaramaTracer) WrapConsumerGroupHandler ¶ added in v0.0.31
func (t OTELSaramaTracer) WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler
func (OTELSaramaTracer) WrapPartitionConsumer ¶ added in v0.0.31
func (t OTELSaramaTracer) WrapPartitionConsumer(pc sarama.PartitionConsumer) sarama.PartitionConsumer
func (OTELSaramaTracer) WrapSyncProducer ¶ added in v0.0.31
func (t OTELSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducer
type ProvidersConfig ¶
type ProvidersConfig struct { InMemory InMemoryConfig `json:"inmemory"` Kafka KafkaConfig `json:"kafka"` }
type PubSub ¶ added in v0.0.12
type PubSub interface { Publisher() Publisher Subscriber(group string) (Subscriber, error) // CLoses all publishers and subscribers. Close() error }
type PubSubOption ¶ added in v0.0.31
type PubSubOption func(*pubSubOptions)
func WithTracerProvider ¶ added in v0.0.31
func WithTracerProvider(provider trace.TracerProvider) PubSubOption
WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, no tracer is configured
type Publisher ¶ added in v0.0.12
type Publisher interface { // Publish publishes a message to the topic. Publish(topic string, messages ...*message.Message) error // Close closes the publisher. Close() error }
Publisher is the interface that wraps the Publish method.
func SetupInMemoryPublisher ¶ added in v0.0.12
type Subscriber ¶ added in v0.0.12
type Subscriber interface { // Subscribe subscribes to the topic. Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) // Close closes the subscriber. Close() error }
func SetupInMemorySubscriber ¶ added in v0.0.12
func SetupInMemorySubscriber(l *logrusx.Logger, c *Config) (Subscriber, error)
func SetupKafkaSubscriber ¶ added in v0.0.12
func SetupKafkaSubscriber(l *logrusx.Logger, c *Config, opts *pubSubOptions, group string) (Subscriber, error)
TODO: add subscriber configs