Documentation ¶
Overview ¶
Package redis fork from github.com/ThreeDotsLabs/watermill-redisstream@v1.2.2
Index ¶
Constants ¶
View Source
const ( // NoSleep can be set to SubscriberConfig.NackResendSleep NoSleep time.Duration = -1 DefaultBlockTime = time.Millisecond * 100 DefaultClaimInterval = time.Second * 5 DefaultClaimBatchSize = int64(100) DefaultMaxIdleTime = time.Second * 60 DefaultCheckConsumersInterval = time.Second * 300 DefaultConsumerTimeout = time.Second * 600 )
View Source
const UUIDHeaderKey = "_watermill_message_uuid"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultMarshallerUnmarshaller ¶
type DefaultMarshallerUnmarshaller struct {
AppID string
}
type Marshaller ¶
type MarshallerUnmarshaller ¶
type MarshallerUnmarshaller interface { Marshaller Unmarshaller }
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisher creates a new redis stream Publisher.
type PublisherConfig ¶
type PublisherConfig struct { Client redis.UniversalClient Marshaller Marshaller Maxlens map[string]int64 DisableRedisConnClose bool }
func (*PublisherConfig) Validate ¶
func (c *PublisherConfig) Validate() error
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriber creates a new redis stream Subscriber.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
type SubscriberConfig ¶
type SubscriberConfig struct { Client redis.UniversalClient Unmarshaller Unmarshaller // Redis stream consumer id, paired with ConsumerGroup. Consumer string // When empty, fan-out mode will be used. ConsumerGroup string // How long after Nack message should be redelivered. NackResendSleep time.Duration // Block to wait next redis stream message. BlockTime time.Duration // Claim idle pending message interval. ClaimInterval time.Duration // How many pending messages are claimed at most each claim interval. ClaimBatchSize int64 // How long should we treat a pending message as claimable. MaxIdleTime time.Duration // Check consumer status interval. CheckConsumersInterval time.Duration // After this timeout an idle consumer with no pending messages will be removed from the consumer group. ConsumerTimeout time.Duration // Start consumption from the specified message ID. // When using "0", the consumer group will consume from the very first message. // When using "$", the consumer group will consume from the latest message. OldestId string // If this is set, it will be called to decide whether a pending message that // has been idle for more than MaxIdleTime should actually be claimed. // If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed. // This can be useful e.g. for tasks where the processing time can be very variable - // so we can't just use a short MaxIdleTime; but at the same time dead // consumers should be spotted quickly - so we can't just use a long MaxIdleTime either. // In such cases, if we have another way for checking consumers' health, then we can // leverage that in this callback. ShouldClaimPendingMessage func(redis.XPendingExt) bool DisableRedisConnClose bool }
func (*SubscriberConfig) Validate ¶
func (sc *SubscriberConfig) Validate() error
Click to show internal directories.
Click to hide internal directories.