Documentation ¶
Index ¶
- Constants
- Variables
- func AddConfigSchema(c interface{ ... }) error
- func NewLogrusLogger(log *logrus.Logger) watermill.LoggerAdapter
- func SetupInMemoryPubSub(l *logrusx.Logger, c *Config) (*memoryPubSub, error)
- type BatchConsumerOptions
- type Config
- type ConsumerModel
- type InMemoryConfig
- type KafkaConfig
- type LogrusLoggerAdapter
- func (l *LogrusLoggerAdapter) Debug(msg string, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) Error(msg string, err error, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) Info(msg string, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) Trace(msg string, fields watermill.LogFields)
- func (l *LogrusLoggerAdapter) With(fields watermill.LogFields) watermill.LoggerAdapter
- type ProvidersConfig
- type PubSub
- type PubSubOption
- type Publisher
- type Subscriber
- type SubscriberOption
Constants ¶
const ConfigSchemaID = "clinia://pubsub-config"
Variables ¶
var ConfigSchema string
Functions ¶
func AddConfigSchema ¶
AddConfigSchema adds the tracing schema to the compiler. The interface is specified instead of `jsonschema.Compiler` to allow the use of any jsonschema library fork or version.
func NewLogrusLogger ¶ added in v0.0.12
func NewLogrusLogger(log *logrus.Logger) watermill.LoggerAdapter
NewLogrusLogger returns a LogrusLoggerAdapter that sends all logs to the passed logrus instance.
Types ¶
type BatchConsumerOptions ¶ added in v0.0.39
type BatchConsumerOptions struct { // MaxBatchSize max amount of elements the batch will contain. // Default value is 100 if nothing is specified. MaxBatchSize int16 // MaxWaitTime max time that it will be waited until MaxBatchSize elements are received. // Default value is 100ms if nothing is specified. MaxWaitTime time.Duration }
type Config ¶
type Config struct { Scope string `json:"scope"` Provider string `json:"provider"` Providers ProvidersConfig `json:"providers"` }
type ConsumerModel ¶ added in v0.0.39
type ConsumerModel string
var ( // Default is a model when only one message is sent to the customer and customer needs to ACK the message // to receive the next. ConsumerModelDefault ConsumerModel = "" // Batch works by sending multiple messages in a batch // You can ack all of them at once, or one by one. ConsumerModelBatch ConsumerModel = "batch" )
type InMemoryConfig ¶
type InMemoryConfig struct{}
type KafkaConfig ¶
type KafkaConfig struct {
Brokers []string `json:"brokers"`
}
type LogrusLoggerAdapter ¶ added in v0.0.12
type LogrusLoggerAdapter struct {
// contains filtered or unexported fields
}
LogrusLoggerAdapter is a logrus adapter for watermill.
func (*LogrusLoggerAdapter) Debug ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Debug(msg string, fields watermill.LogFields)
Debug logs on level debug with optional fields.
func (*LogrusLoggerAdapter) Error ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Error(msg string, err error, fields watermill.LogFields)
Error logs on level error with err as field and optional fields.
func (*LogrusLoggerAdapter) Info ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Info(msg string, fields watermill.LogFields)
Info logs on level info with optional fields.
func (*LogrusLoggerAdapter) Trace ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) Trace(msg string, fields watermill.LogFields)
Trace logs on level trace with optional fields.
func (*LogrusLoggerAdapter) With ¶ added in v0.0.12
func (l *LogrusLoggerAdapter) With(fields watermill.LogFields) watermill.LoggerAdapter
With returns a new LogrusLoggerAdapter that includes fields to be re-used between logging statements.
type ProvidersConfig ¶
type ProvidersConfig struct { InMemory InMemoryConfig `json:"inmemory"` Kafka KafkaConfig `json:"kafka"` }
type PubSub ¶ added in v0.0.12
type PubSub interface { Publisher() Publisher Subscriber(group string, opts ...SubscriberOption) (Subscriber, error) // CLoses all publishers and subscribers. Close() error }
type PubSubOption ¶ added in v0.0.31
type PubSubOption func(*pubSubOptions)
func WithPropagator ¶ added in v0.0.33
func WithPropagator(propagator propagation.TextMapPropagator) PubSubOption
func WithSaramaPublisherConfig ¶ added in v0.0.60
func WithSaramaPublisherConfig(config *sarama.Config) PubSubOption
WithSaramaPublisherConfig specifies the sarama config to use for the publisher. PublisherConfig should be started with default config using kafkax.DefaultSaramaSyncPublisherConfig
func WithSaramaSubscriberConfig ¶ added in v0.0.60
func WithSaramaSubscriberConfig(config *sarama.Config) PubSubOption
WithSaramaSubscriberConfig specifies the sarama config to use for the subscriber. SubscriberConfig should be started with default config using kafkax.DefaultSaramaSubscriberConfig
func WithTracerProvider ¶ added in v0.0.31
func WithTracerProvider(provider trace.TracerProvider) PubSubOption
WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, no tracer is configured
type Publisher ¶ added in v0.0.12
type Publisher interface { // Publish publishes a message to the topic. Publish(ctx context.Context, topic string, messages ...*message.Message) error BulkPublish(ctx context.Context, topic string, messages ...*message.Message) error // Close closes the publisher. Close() error }
publisher is the interface that wraps the Publish method.
type Subscriber ¶ added in v0.0.12
type SubscriberOption ¶ added in v0.0.39
type SubscriberOption func(*subscriberOptions)
func WithBatchConsumerModel ¶ added in v0.0.39
func WithBatchConsumerModel(batchOptions *BatchConsumerOptions) SubscriberOption
func WithDefaultConsumerModel ¶ added in v0.0.39
func WithDefaultConsumerModel() SubscriberOption