Documentation
¶
Index ¶
- Variables
- func MatchSubject(subject, pattern string) bool
- func ValidatePattern(pattern string) error
- type Bus
- type DeliveryPolicy
- type ErrorCallbackFunc
- type HandlerFunc
- type InboundMessage
- type OutboundMessage
- type SubscribeOption
- func WithDeliveryPolicy(policy DeliveryPolicy) SubscribeOption
- func WithDeserializer(deserializer serialization.Serializer) SubscribeOption
- func WithDurable() SubscribeOption
- func WithFilterSubject(subject string) SubscribeOption
- func WithGuaranteeOrder() SubscribeOption
- func WithMaxDeliveryTries(maxTries int) SubscribeOption
- type SubscriberInfo
- type Subscription
- func (s *Subscription) AddHandler(pattern string, handlerFunc HandlerFunc) error
- func (s *Subscription) IsRunning() bool
- func (s *Subscription) OnError(errorFunc ErrorCallbackFunc)
- func (s *Subscription) RemoveHandler(pattern string)
- func (s *Subscription) Start(ctx context.Context)
- func (s *Subscription) Stop()
- type SubscriptionOptions
- type UnsubscribeFn
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrHandlerRegistrationFailed = errors.New("failed to register handler") ErrMessageNotRoutable = errors.New("message is not routable") ErrMessageHandlerFailed = errors.New("message could not be handled") )
View Source
var DefaultSubscriptionOptions = SubscriptionOptions{ FilterSubjects: []string{}, GuaranteeOrder: false, MaxDeliveryTries: 10, }
View Source
var (
ErrInvalidSubjectPattern = errors.New("invalid subject pattern")
)
Functions ¶
func MatchSubject ¶
MatchSubject checks whether a given subject matches a given pattern. The pattern can be expressed in the same syntax as in NATS. For more information about valid patterns check: https://docs.nats.io/nats-concepts/subjects#wildcards
func ValidatePattern ¶
Types ¶
type Bus ¶
type Bus interface { // Publish sends a new message to the bus. It must be added in order to ensure consistency Publish(ctx context.Context, message *OutboundMessage) error // Subscribe retrieves messages from the bus in an ordered matter. Subscribe(ctx context.Context, subscriberName string, stream string, opts ...SubscribeOption) (*Subscription, error) // Migrate ensures that dependencies (streams, topic, consumers, etc.) are up to date and ready to be used Migrate(ctx context.Context) error // SubscriberInfo retrieves information about a subscriber in a stream SubscriberInfo(ctx context.Context, stream string, subscriberName string) (SubscriberInfo, error) }
type ErrorCallbackFunc ¶
type ErrorCallbackFunc func(err error)
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, message InboundMessage) error
type InboundMessage ¶
type InboundMessage struct { MessageCtx context.Context Id string Subject string Data []byte Ack func() error Nak func(retryAfter time.Duration) error // contains filtered or unexported fields }
func (*InboundMessage) Unmarshal ¶
func (im *InboundMessage) Unmarshal(dst interface{}) error
type OutboundMessage ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscriptionOptions)
func WithDeliveryPolicy ¶
func WithDeliveryPolicy(policy DeliveryPolicy) SubscribeOption
func WithDeserializer ¶
func WithDeserializer(deserializer serialization.Serializer) SubscribeOption
func WithDurable ¶
func WithDurable() SubscribeOption
func WithFilterSubject ¶
func WithFilterSubject(subject string) SubscribeOption
func WithGuaranteeOrder ¶
func WithGuaranteeOrder() SubscribeOption
func WithMaxDeliveryTries ¶
func WithMaxDeliveryTries(maxTries int) SubscribeOption
type SubscriberInfo ¶ added in v0.14.0
type SubscriberInfo interface {
HasPendingMessages() bool
}
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
func NewSubscription ¶
func NewSubscription(inboundMessages chan InboundMessage, deserializer serialization.Serializer, unsubscribe UnsubscribeFn) *Subscription
func (*Subscription) AddHandler ¶
func (s *Subscription) AddHandler(pattern string, handlerFunc HandlerFunc) error
func (*Subscription) IsRunning ¶
func (s *Subscription) IsRunning() bool
func (*Subscription) OnError ¶
func (s *Subscription) OnError(errorFunc ErrorCallbackFunc)
func (*Subscription) RemoveHandler ¶
func (s *Subscription) RemoveHandler(pattern string)
func (*Subscription) Start ¶
func (s *Subscription) Start(ctx context.Context)
func (*Subscription) Stop ¶
func (s *Subscription) Stop()
type SubscriptionOptions ¶
type SubscriptionOptions struct { FilterSubjects []string GuaranteeOrder bool MaxDeliveryTries int DeliveryPolicy DeliveryPolicy Durable bool Deserializer serialization.Serializer }
type UnsubscribeFn ¶
type UnsubscribeFn func()
Click to show internal directories.
Click to hide internal directories.