Documentation ¶
Index ¶
- Constants
- Variables
- func AddConfigSchema(c interface{ ... }) error
- type Config
- type Errors
- type Handler
- type Handlers
- type InMemoryConfig
- type KafkaConfig
- type PoisonQueueConfig
- type ProvidersConfig
- type PubSub
- type PubSubAdminClient
- type PubSubOption
- func WithMaxMessageByte(max int32) PubSubOption
- func WithMeterProvider(provider metric.MeterProvider) PubSubOption
- func WithPropagator(propagator propagation.TextMapPropagator) PubSubOption
- func WithRetentionMs(retentionMs int32) PubSubOption
- func WithTracerProvider(provider trace.TracerProvider) PubSubOption
- type PubSubOptions
- type Publisher
- type Subscriber
- type SubscriberError
- type SubscriberOption
- func WithAsyncExecution() SubscriberOption
- func WithDialTimeout(timeout time.Duration) SubscriberOption
- func WithMaxBatchSize(maxBatchSize int) SubscriberOption
- func WithMaxParalleAsyncExecution(max int16) SubscriberOption
- func WithMaxTopicRetryCount(maxTopicRetryCount int) SubscriberOption
- func WithRebalanceTimeout(timeout time.Duration) SubscriberOption
- type SubscriberOptions
Constants ¶
const ConfigSchemaID = "clinia://pubsub-config"
Variables ¶
var ConfigSchema string
Functions ¶
Types ¶
type Config ¶
type Config struct { PoisonQueue PoisonQueueConfig `json:"poisonQueue"` Scope string `json:"scope"` Provider string `json:"provider"` Providers ProvidersConfig `json:"providers"` TopicRetry bool `json:"topicRetry"` EnableAutoCommit bool `json:"enableAutoCommit"` }
type Handler ¶ added in v0.0.67
Handler represents a function that handles messages received by a subscriber. It takes a context.Context and a slice of *messagex.Message as input parameters. The function should return a slice of errors, representing per-message failures, and an error, representing the processing failure in general.
For the singular error, the caller may return a pubsubx.AbortSubscribeError() to abort the subscription right away. For any other errors happening, the handler will be retried up to a certain number of times. After that, the subscription will be aborted. As such, it is the caller's responsibility to return an error that is related to the batch processing itself and not to a specific message.
type InMemoryConfig ¶
type InMemoryConfig struct{}
type KafkaConfig ¶
type KafkaConfig struct {
Brokers []string `json:"brokers"`
}
type PoisonQueueConfig ¶ added in v0.0.84
func (PoisonQueueConfig) IsEnabled ¶ added in v0.0.84
func (pqc PoisonQueueConfig) IsEnabled() bool
type ProvidersConfig ¶
type ProvidersConfig struct { InMemory InMemoryConfig `json:"inmemory"` Kafka KafkaConfig `json:"kafka"` }
type PubSub ¶ added in v0.0.12
type PubSub interface { // Publisher returns a Publisher instance for publishing messages. Publisher() Publisher // Subscriber returns a Subscriber instance for subscribing to messages. // It takes a group name, a list of topics, and optional SubscriberOptions. // It returns a Subscriber and an error if any. // A subscriber should define ALL the topics it wants to subscribe to. Subscriber(group string, topics []messagex.Topic, opts ...SubscriberOption) (Subscriber, error) // PubSubAdminClient returns a PubSubAdminClient instance for managing topics and configurations. // A new client is always returned on each call. Caller is responsible for closing the client. AdminClient() (PubSubAdminClient, error) // Close closes all publishers and subscribers associated with the PubSub instance. // It returns an error if any. Close() error }
PubSub represents a generic interface for a publish-subscribe system.
type PubSubAdminClient ¶ added in v0.0.68
type PubSubAdminClient interface { // CreateTopic creates a topic with the given configuration. // The default configuration entries are set by default, but they can be overridden (see `pubsub.NewCreateTopicConfigEntries()`). CreateTopic(ctx context.Context, partitions int32, replicationFactor int16, topic string, configs ...map[string]*string) (kadm.CreateTopicResponse, error) // CreateTopics creates a topics with the given configuration. // The default configuration entries are set by default, but they can be overridden (see `pubsub.NewCreateTopicConfigEntries()`). CreateTopics(ctx context.Context, partitions int32, replicationFactor int16, topics []string, configs ...map[string]*string) (kadm.CreateTopicResponses, error) // DeleteTopic deletes a topic. DeleteTopic(ctx context.Context, topic string) (kadm.DeleteTopicResponse, error) // DeleteTopicsWithRetryTopics deletes the topics with their related retry topics. DeleteTopicsWithRetryTopics(ctx context.Context, topics ...string) (kadm.DeleteTopicResponses, error) // DeleteTopicWithRetryTopics deletes a topic with it's related retry topics. DeleteTopicWithRetryTopics(ctx context.Context, topic string) (kadm.DeleteTopicResponses, error) // DeleteGroup deletes a group and related resources. DeleteGroup(ctx context.Context, group messagex.ConsumerGroup) (kadm.DeleteGroupResponse, error) // DeleteGroups deletes groups and related resources. DeleteGroups(ctx context.Context, groups ...messagex.ConsumerGroup) (kadm.DeleteGroupResponses, error) // HealthCheck checks the health of the underlying pubsub. It returns an error if the pubsub is unhealthy or we cannot connect to it. HealthCheck(ctx context.Context) error // ListTopics returns the details of the given topics. // If no topics are provided, it returns the details of all topics. ListTopics(ctx context.Context, topics ...string) (kadm.TopicDetails, error) Close() }
type PubSubOption ¶ added in v0.0.31
type PubSubOption func(*PubSubOptions)
func WithMaxMessageByte ¶ added in v0.0.78
func WithMaxMessageByte(max int32) PubSubOption
WithMaxMessageByte specifies the max message size in bytes. If none is specified, the default value is 1 MB.
func WithMeterProvider ¶ added in v0.0.67
func WithMeterProvider(provider metric.MeterProvider) PubSubOption
func WithPropagator ¶ added in v0.0.33
func WithPropagator(propagator propagation.TextMapPropagator) PubSubOption
func WithRetentionMs ¶ added in v0.0.78
func WithRetentionMs(retentionMs int32) PubSubOption
WithRetentionMs specifies the retention time in milliseconds.
func WithTracerProvider ¶ added in v0.0.31
func WithTracerProvider(provider trace.TracerProvider) PubSubOption
WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, no tracer is configured
type PubSubOptions ¶ added in v0.0.67
type PubSubOptions struct { TracerProvider trace.TracerProvider Propagator propagation.TextMapPropagator MeterProvider metric.MeterProvider MaxMessageByte *int32 RetentionMs *int32 }
type Publisher ¶ added in v0.0.12
type Publisher interface { // PublishSync publishes messages synchronously to the specified topic. // It returns an error if the operation fails. PublishSync(ctx context.Context, topic messagex.Topic, messages ...*messagex.Message) (Errors, error) // PublishAsync publishes messages asynchronously to the specified topic. // It returns an error if the operation fails. // // WARNING: The context should stay alive until the messages are published. // When using a fire-n-forget approach in a request-response scenario, a new Context.Background() should be preferred since // the request context might be canceled before the messages are actually published. PublishAsync(ctx context.Context, topic messagex.Topic, messages ...*messagex.Message) error // Close closes the publisher. // Once a publisher is closed, it cannot be used to publish messages anymore. // // WARNING: Since the PubSub.Publisher() method always returns the same instance of the publisher, // closing the publisher will force the user to create a new PubSub instance to get a new publisher. // This should really be done only when the application is shutting down, or you no longer have any needs to publish messages. Close() error }
Publisher is an interface for publishing messages to a topic.
type Subscriber ¶ added in v0.0.12
type Subscriber interface { // Subscribe subscribes to all topics that are configured in the subscriber. // It takes a context and a map of topic handlers as input. // - If there are topics missing handlers, it will return an error immediately. Subscribe(ctx context.Context, topicHandlers Handlers) error // Health returns the health status of the subscriber. // It should return an error if the subscriber is unhealthy, nil otherwise (healthy). Health() error // Close closes the subscriber. Close() error }
type SubscriberError ¶ added in v0.0.69
func AbortSubscribeError ¶ added in v0.0.69
func AbortSubscribeError() *SubscriberError
func (*SubscriberError) Error ¶ added in v0.0.69
func (e *SubscriberError) Error() string
type SubscriberOption ¶ added in v0.0.39
type SubscriberOption func(*SubscriberOptions)
func WithAsyncExecution ¶ added in v0.0.97
func WithAsyncExecution() SubscriberOption
func WithDialTimeout ¶ added in v0.0.99
func WithDialTimeout(timeout time.Duration) SubscriberOption
func WithMaxBatchSize ¶ added in v0.0.67
func WithMaxBatchSize(maxBatchSize int) SubscriberOption
func WithMaxParalleAsyncExecution ¶ added in v0.0.97
func WithMaxParalleAsyncExecution(max int16) SubscriberOption
func WithMaxTopicRetryCount ¶ added in v0.0.80
func WithMaxTopicRetryCount(maxTopicRetryCount int) SubscriberOption
func WithRebalanceTimeout ¶ added in v0.0.99
func WithRebalanceTimeout(timeout time.Duration) SubscriberOption
type SubscriberOptions ¶ added in v0.0.67
type SubscriberOptions struct { // MaxBatchSize max amount of elements the batch will contain. // Default value is 100 if nothing is specified. MaxBatchSize uint16 // MaxTopicRetryCount indicate how many time we allow to push to // the retry topic before considering a retryable error non retryable MaxTopicRetryCount uint16 // Allows the handler to run each topic handling in parallel EnableAsyncExecution bool // Define the number of maximum topic handler that can run in parallel // on record processing MaxParallelAsyncExecution int16 // Timeouts DialTimeout time.Duration // Defaults to 60s. This should be big enough to allow enough time to process a batch. RebalanceTimeout time.Duration }
func NewDefaultSubscriberOptions ¶ added in v0.0.67
func NewDefaultSubscriberOptions() *SubscriberOptions