pubsubx

package
v0.0.103 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const ConfigSchemaID = "clinia://pubsub-config"

Variables

View Source
var ConfigSchema string

Functions

func AddConfigSchema

func AddConfigSchema(c interface {
	AddResource(url string, r io.Reader) error
},
) error

AddConfigSchema adds the tracing schema to the compiler. The interface is specified instead of `jsonschema.Compiler` to allow the use of any jsonschema library fork or version.

Types

type Config

type Config struct {
	PoisonQueue      PoisonQueueConfig `json:"poisonQueue"`
	Scope            string            `json:"scope"`
	Provider         string            `json:"provider"`
	Providers        ProvidersConfig   `json:"providers"`
	TopicRetry       bool              `json:"topicRetry"`
	EnableAutoCommit bool              `json:"enableAutoCommit"`
}

type Errors added in v0.0.67

type Errors []error

func (Errors) FirstNonNil added in v0.0.67

func (e Errors) FirstNonNil() error

func (Errors) Join added in v0.0.67

func (e Errors) Join(errs ...Errors) error

type Handler added in v0.0.67

type Handler func(ctx context.Context, msgs []*messagex.Message) ([]error, error)

Handler represents a function that handles messages received by a subscriber. It takes a context.Context and a slice of *messagex.Message as input parameters. The function should return a slice of errors, representing per-message failures, and an error, representing the processing failure in general.

For the singular error, the caller may return a pubsubx.AbortSubscribeError() to abort the subscription right away. For any other errors happening, the handler will be retried up to a certain number of times. After that, the subscription will be aborted. As such, it is the caller's responsibility to return an error that is related to the batch processing itself and not to a specific message.

type Handlers added in v0.0.67

type Handlers map[messagex.Topic]Handler

type InMemoryConfig

type InMemoryConfig struct{}

type KafkaConfig

type KafkaConfig struct {
	Brokers []string `json:"brokers"`
}

type PoisonQueueConfig added in v0.0.84

type PoisonQueueConfig struct {
	Enabled   bool   `json:"enabled"`
	TopicName string `json:"topicName"`
}

func (PoisonQueueConfig) IsEnabled added in v0.0.84

func (pqc PoisonQueueConfig) IsEnabled() bool

type ProvidersConfig

type ProvidersConfig struct {
	InMemory InMemoryConfig `json:"inmemory"`
	Kafka    KafkaConfig    `json:"kafka"`
}

type PubSub added in v0.0.12

type PubSub interface {
	// Publisher returns a Publisher instance for publishing messages.
	Publisher() Publisher

	// Subscriber returns a Subscriber instance for subscribing to messages.
	// It takes a group name, a list of topics, and optional SubscriberOptions.
	// It returns a Subscriber and an error if any.
	// A subscriber should define ALL the topics it wants to subscribe to.
	Subscriber(group string, topics []messagex.Topic, opts ...SubscriberOption) (Subscriber, error)

	// PubSubAdminClient returns a PubSubAdminClient instance for managing topics and configurations.
	// A new client is always returned on each call. Caller is responsible for closing the client.
	AdminClient() (PubSubAdminClient, error)

	// Close closes all publishers and subscribers associated with the PubSub instance.
	// It returns an error if any.
	Close() error
}

PubSub represents a generic interface for a publish-subscribe system.

type PubSubAdminClient added in v0.0.68

type PubSubAdminClient interface {
	// CreateTopic creates a topic with the given configuration.
	// The default configuration entries are set by default, but they can be overridden (see `pubsub.NewCreateTopicConfigEntries()`).
	CreateTopic(ctx context.Context, partitions int32, replicationFactor int16, topic string, configs ...map[string]*string) (kadm.CreateTopicResponse, error)
	// CreateTopics creates a topics with the given configuration.
	// The default configuration entries are set by default, but they can be overridden (see `pubsub.NewCreateTopicConfigEntries()`).
	CreateTopics(ctx context.Context, partitions int32, replicationFactor int16, topics []string, configs ...map[string]*string) (kadm.CreateTopicResponses, error)
	// DeleteTopic deletes a topic.
	DeleteTopic(ctx context.Context, topic string) (kadm.DeleteTopicResponse, error)
	// DeleteTopicsWithRetryTopics deletes the topics with their related retry topics.
	DeleteTopicsWithRetryTopics(ctx context.Context, topics ...string) (kadm.DeleteTopicResponses, error)
	// DeleteTopicWithRetryTopics deletes a topic with it's related retry topics.
	DeleteTopicWithRetryTopics(ctx context.Context, topic string) (kadm.DeleteTopicResponses, error)
	// DeleteGroup deletes a group and related resources.
	DeleteGroup(ctx context.Context, group messagex.ConsumerGroup) (kadm.DeleteGroupResponse, error)
	// DeleteGroups deletes groups and related resources.
	DeleteGroups(ctx context.Context, groups ...messagex.ConsumerGroup) (kadm.DeleteGroupResponses, error)
	// HealthCheck checks the health of the underlying pubsub. It returns an error if the pubsub is unhealthy or we cannot connect to it.
	HealthCheck(ctx context.Context) error

	// ListTopics returns the details of the given topics.
	// If no topics are provided, it returns the details of all topics.
	ListTopics(ctx context.Context, topics ...string) (kadm.TopicDetails, error)

	Close()
}

type PubSubOption added in v0.0.31

type PubSubOption func(*PubSubOptions)

func WithMaxMessageByte added in v0.0.78

func WithMaxMessageByte(max int32) PubSubOption

WithMaxMessageByte specifies the max message size in bytes. If none is specified, the default value is 1 MB.

func WithMeterProvider added in v0.0.67

func WithMeterProvider(provider metric.MeterProvider) PubSubOption

func WithPropagator added in v0.0.33

func WithPropagator(propagator propagation.TextMapPropagator) PubSubOption

func WithRetentionMs added in v0.0.78

func WithRetentionMs(retentionMs int32) PubSubOption

WithRetentionMs specifies the retention time in milliseconds.

func WithTracerProvider added in v0.0.31

func WithTracerProvider(provider trace.TracerProvider) PubSubOption

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, no tracer is configured

type PubSubOptions added in v0.0.67

type PubSubOptions struct {
	TracerProvider trace.TracerProvider
	Propagator     propagation.TextMapPropagator
	MeterProvider  metric.MeterProvider
	MaxMessageByte *int32
	RetentionMs    *int32
}

type Publisher added in v0.0.12

type Publisher interface {
	// PublishSync publishes messages synchronously to the specified topic.
	// It returns an error if the operation fails.
	PublishSync(ctx context.Context, topic messagex.Topic, messages ...*messagex.Message) (Errors, error)

	// PublishAsync publishes messages asynchronously to the specified topic.
	// It returns an error if the operation fails.
	//
	// WARNING: The context should stay alive until the messages are published.
	// When using a fire-n-forget approach in a request-response scenario, a new Context.Background() should be preferred since
	// the request context might be canceled before the messages are actually published.
	PublishAsync(ctx context.Context, topic messagex.Topic, messages ...*messagex.Message) error

	// Close closes the publisher.
	// Once a publisher is closed, it cannot be used to publish messages anymore.
	//
	// WARNING: Since the PubSub.Publisher() method always returns the same instance of the publisher,
	// closing the publisher will force the user to create a new PubSub instance to get a new publisher.
	// This should really be done only when the application is shutting down, or you no longer have any needs to publish messages.
	Close() error
}

Publisher is an interface for publishing messages to a topic.

type Subscriber added in v0.0.12

type Subscriber interface {
	// Subscribe subscribes to all topics that are configured in the subscriber.
	// It takes a context and a map of topic handlers as input.
	// - If there are topics missing handlers, it will return an error immediately.
	Subscribe(ctx context.Context, topicHandlers Handlers) error
	// Health returns the health status of the subscriber.
	// It should return an error if the subscriber is unhealthy, nil otherwise (healthy).
	Health() error
	// Close closes the subscriber.
	Close() error
}

type SubscriberError added in v0.0.69

type SubscriberError struct {
	Message   string
	Retryable bool
}

func AbortSubscribeError added in v0.0.69

func AbortSubscribeError() *SubscriberError

func (*SubscriberError) Error added in v0.0.69

func (e *SubscriberError) Error() string

type SubscriberOption added in v0.0.39

type SubscriberOption func(*SubscriberOptions)

func WithAsyncExecution added in v0.0.97

func WithAsyncExecution() SubscriberOption

func WithDialTimeout added in v0.0.99

func WithDialTimeout(timeout time.Duration) SubscriberOption

func WithMaxBatchSize added in v0.0.67

func WithMaxBatchSize(maxBatchSize int) SubscriberOption

func WithMaxParalleAsyncExecution added in v0.0.97

func WithMaxParalleAsyncExecution(max int16) SubscriberOption

func WithMaxTopicRetryCount added in v0.0.80

func WithMaxTopicRetryCount(maxTopicRetryCount int) SubscriberOption

func WithRebalanceTimeout added in v0.0.99

func WithRebalanceTimeout(timeout time.Duration) SubscriberOption

type SubscriberOptions added in v0.0.67

type SubscriberOptions struct {
	// MaxBatchSize max amount of elements the batch will contain.
	// Default value is 100 if nothing is specified.
	MaxBatchSize uint16
	// MaxTopicRetryCount indicate how many time we allow to push to
	// the retry topic before considering a retryable error non retryable
	MaxTopicRetryCount uint16
	// Allows the handler to run each topic handling in parallel
	EnableAsyncExecution bool
	// Define the number of maximum topic handler that can run in parallel
	// on record processing
	MaxParallelAsyncExecution int16
	// Timeouts
	DialTimeout time.Duration
	// Defaults to 60s. This should be big enough to allow enough time to process a batch.
	RebalanceTimeout time.Duration
}

func NewDefaultSubscriberOptions added in v0.0.67

func NewDefaultSubscriberOptions() *SubscriberOptions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL