pubsubx

package
v0.0.43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const ConfigSchemaID = "clinia://pubsub-config"

Variables

View Source
var ConfigSchema string

Functions

func AddConfigSchema

func AddConfigSchema(c interface {
	AddResource(url string, r io.Reader) error
}) error

AddConfigSchema adds the tracing schema to the compiler. The interface is specified instead of `jsonschema.Compiler` to allow the use of any jsonschema library fork or version.

func NewLogrusLogger added in v0.0.12

func NewLogrusLogger(log *logrus.Logger) watermill.LoggerAdapter

NewLogrusLogger returns a LogrusLoggerAdapter that sends all logs to the passed logrus instance.

func NewOTELSaramaTracer added in v0.0.31

func NewOTELSaramaTracer(option ...otelsarama.Option) kafka.SaramaTracer

func SetupInMemoryPubSub added in v0.0.12

func SetupInMemoryPubSub(l *logrusx.Logger, c *Config) (*memoryPubSub, error)

Types

type BatchConsumerOptions added in v0.0.39

type BatchConsumerOptions struct {
	// MaxBatchSize max amount of elements the batch will contain.
	// Default value is 100 if nothing is specified.
	MaxBatchSize int16
	// MaxWaitTime max time that it will be waited until MaxBatchSize elements are received.
	// Default value is 100ms if nothing is specified.
	MaxWaitTime time.Duration
}

type Config

type Config struct {
	Scope     string          `json:"scope"`
	Provider  string          `json:"provider"`
	Providers ProvidersConfig `json:"providers"`
}

type ConsumerModel added in v0.0.39

type ConsumerModel string
var (

	// Default is a model when only one message is sent to the customer and customer needs to ACK the message
	// to receive the next.
	ConsumerModelDefault ConsumerModel = ""
	// Batch works by sending multiple messages in a batch
	// You can ack all of them at once, or one by one.
	ConsumerModelBatch ConsumerModel = "batch"
)

type InMemoryConfig

type InMemoryConfig struct{}

type KafkaConfig

type KafkaConfig struct {
	Brokers []string `json:"brokers"`
}

type LogrusLoggerAdapter added in v0.0.12

type LogrusLoggerAdapter struct {
	// contains filtered or unexported fields
}

LogrusLoggerAdapter is a logrus adapter for watermill.

func (*LogrusLoggerAdapter) Debug added in v0.0.12

func (l *LogrusLoggerAdapter) Debug(msg string, fields watermill.LogFields)

Debug logs on level debug with optional fields.

func (*LogrusLoggerAdapter) Error added in v0.0.12

func (l *LogrusLoggerAdapter) Error(msg string, err error, fields watermill.LogFields)

Error logs on level error with err as field and optional fields.

func (*LogrusLoggerAdapter) Info added in v0.0.12

func (l *LogrusLoggerAdapter) Info(msg string, fields watermill.LogFields)

Info logs on level info with optional fields.

func (*LogrusLoggerAdapter) Trace added in v0.0.12

func (l *LogrusLoggerAdapter) Trace(msg string, fields watermill.LogFields)

Trace logs on level trace with optional fields.

func (*LogrusLoggerAdapter) With added in v0.0.12

With returns a new LogrusLoggerAdapter that includes fields to be re-used between logging statements.

type OTELSaramaTracer added in v0.0.31

type OTELSaramaTracer struct {
	// contains filtered or unexported fields
}

func (OTELSaramaTracer) WrapConsumer added in v0.0.31

func (t OTELSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumer

func (OTELSaramaTracer) WrapConsumerGroupHandler added in v0.0.31

func (OTELSaramaTracer) WrapPartitionConsumer added in v0.0.31

func (OTELSaramaTracer) WrapSyncProducer added in v0.0.31

func (t OTELSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducer

type ProvidersConfig

type ProvidersConfig struct {
	InMemory InMemoryConfig `json:"inmemory"`
	Kafka    KafkaConfig    `json:"kafka"`
}

type PubSub added in v0.0.12

type PubSub interface {
	Publisher() Publisher
	Subscriber(group string, opts ...SubscriberOption) (Subscriber, error)
	// CLoses all publishers and subscribers.
	Close() error
}

func New added in v0.0.12

func New(l *logrusx.Logger, c *Config, opts ...PubSubOption) (PubSub, error)

type PubSubOption added in v0.0.31

type PubSubOption func(*pubSubOptions)

func WithPropagator added in v0.0.33

func WithPropagator(propagator propagation.TextMapPropagator) PubSubOption

func WithTracerProvider added in v0.0.31

func WithTracerProvider(provider trace.TracerProvider) PubSubOption

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, no tracer is configured

type Publisher added in v0.0.12

type Publisher interface {
	// Publish publishes a message to the topic.
	Publish(ctx context.Context, topic string, messages ...*message.Message) error
	// Close closes the publisher.
	Close() error
}

publisher is the interface that wraps the Publish method.

type Subscriber added in v0.0.12

type Subscriber interface {
	// Subscribe subscribes to the topic.
	Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
	// Close closes the subscriber.
	Close() error
}

type SubscriberOption added in v0.0.39

type SubscriberOption func(*subscriberOptions)

func WithBatchConsumerModel added in v0.0.39

func WithBatchConsumerModel(batchOptions *BatchConsumerOptions) SubscriberOption

func WithDefaultConsumerModel added in v0.0.39

func WithDefaultConsumerModel() SubscriberOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL