pubsubx

package
v0.0.37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const ConfigSchemaID = "clinia://pubsub-config"

Variables

View Source
var ConfigSchema string

Functions

func AddConfigSchema

func AddConfigSchema(c interface {
	AddResource(url string, r io.Reader) error
}) error

AddConfigSchema adds the tracing schema to the compiler. The interface is specified instead of `jsonschema.Compiler` to allow the use of any jsonschema library fork or version.

func NewLogrusLogger added in v0.0.12

func NewLogrusLogger(log *logrus.Logger) watermill.LoggerAdapter

NewLogrusLogger returns a LogrusLoggerAdapter that sends all logs to the passed logrus instance.

func NewOTELSaramaTracer added in v0.0.31

func NewOTELSaramaTracer(option ...otelsarama.Option) kafka.SaramaTracer

func SetupInMemoryPubSub added in v0.0.12

func SetupInMemoryPubSub(l *logrusx.Logger, c *Config) (*memoryPubSub, error)

Types

type Config

type Config struct {
	Provider  string          `json:"provider"`
	Providers ProvidersConfig `json:"providers"`
}

type InMemoryConfig

type InMemoryConfig struct{}

type KafkaConfig

type KafkaConfig struct {
	Brokers []string `json:"brokers"`
}

type LogrusLoggerAdapter added in v0.0.12

type LogrusLoggerAdapter struct {
	// contains filtered or unexported fields
}

LogrusLoggerAdapter is a logrus adapter for watermill.

func (*LogrusLoggerAdapter) Debug added in v0.0.12

func (l *LogrusLoggerAdapter) Debug(msg string, fields watermill.LogFields)

Debug logs on level debug with optional fields.

func (*LogrusLoggerAdapter) Error added in v0.0.12

func (l *LogrusLoggerAdapter) Error(msg string, err error, fields watermill.LogFields)

Error logs on level error with err as field and optional fields.

func (*LogrusLoggerAdapter) Info added in v0.0.12

func (l *LogrusLoggerAdapter) Info(msg string, fields watermill.LogFields)

Info logs on level info with optional fields.

func (*LogrusLoggerAdapter) Trace added in v0.0.12

func (l *LogrusLoggerAdapter) Trace(msg string, fields watermill.LogFields)

Trace logs on level trace with optional fields.

func (*LogrusLoggerAdapter) With added in v0.0.12

With returns a new LogrusLoggerAdapter that includes fields to be re-used between logging statements.

type OTELSaramaTracer added in v0.0.31

type OTELSaramaTracer struct {
	// contains filtered or unexported fields
}

func (OTELSaramaTracer) WrapConsumer added in v0.0.31

func (t OTELSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumer

func (OTELSaramaTracer) WrapConsumerGroupHandler added in v0.0.31

func (OTELSaramaTracer) WrapPartitionConsumer added in v0.0.31

func (OTELSaramaTracer) WrapSyncProducer added in v0.0.31

func (t OTELSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducer

type ProvidersConfig

type ProvidersConfig struct {
	InMemory InMemoryConfig `json:"inmemory"`
	Kafka    KafkaConfig    `json:"kafka"`
}

type PubSub added in v0.0.12

type PubSub interface {
	Publisher() Publisher
	Subscriber(group string) (Subscriber, error)
	// CLoses all publishers and subscribers.
	Close() error
}

func New added in v0.0.12

func New(l *logrusx.Logger, c *Config, opts ...PubSubOption) (PubSub, error)

type PubSubOption added in v0.0.31

type PubSubOption func(*pubSubOptions)

func WithPropagator added in v0.0.33

func WithPropagator(propagator propagation.TextMapPropagator) PubSubOption

func WithTracerProvider added in v0.0.31

func WithTracerProvider(provider trace.TracerProvider) PubSubOption

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, no tracer is configured

type Publisher added in v0.0.12

type Publisher interface {
	// Publish publishes a message to the topic.
	Publish(ctx context.Context, topic string, messages ...*message.Message) error
	// Close closes the publisher.
	Close() error
}

publisher is the interface that wraps the Publish method.

func SetupInMemoryPublisher added in v0.0.12

func SetupInMemoryPublisher(l *logrusx.Logger, c *Config) (Publisher, error)

func SetupKafkaPublisher added in v0.0.12

func SetupKafkaPublisher(l *logrusx.Logger, c *Config, opts *pubSubOptions) (Publisher, error)

TODO: add publisher configs

type Subscriber added in v0.0.12

type Subscriber interface {
	// Subscribe subscribes to the topic.
	Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
	// Close closes the subscriber.
	Close() error
}

func SetupInMemorySubscriber added in v0.0.12

func SetupInMemorySubscriber(l *logrusx.Logger, c *Config) (Subscriber, error)

func SetupKafkaSubscriber added in v0.0.12

func SetupKafkaSubscriber(l *logrusx.Logger, c *Config, opts *pubSubOptions, group string) (Subscriber, error)

TODO: add subscriber configs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL