Documentation ¶
Index ¶
- Constants
- Variables
- func AddStream(name string, subjects ...string) broker.Option
- func NewPublisher(j *JetStream, topic string, opts ...broker.PublishOption) *publisher
- func NewSubscriber(j *JetStream, topic string, opts ...broker.SubscribeOption) *subscriber
- func WithDurableName(name string) broker.SubscribeOption
- func WithNatsPubOpts(opts ...nats.PubOpt) broker.PublishOption
- func WithNatsSubOpts(opts ...nats.SubOpt) broker.SubscribeOption
- func WithQueueName(name string) broker.SubscribeOption
- func WithQueueSubscription() broker.SubscribeOption
- type Config
- type Initializer
- type JetStream
- func (j *JetStream) HasInitializer() bool
- func (j *JetStream) Initializer() component.Initializer
- func (j *JetStream) Logger() logger.Logger
- func (j *JetStream) Publish(ctx context.Context, topic string, message interface{}, ...) error
- func (j *JetStream) String() string
- func (j *JetStream) Subscribe(ctx context.Context, topic string, handler broker.Handler, ...) error
- func (j *JetStream) Unsubscribe(topic string) error
Constants ¶
const ( DEFAULT = iota SYNC PULL QUEUE )
Variables ¶
var ( // ErrInvalidMessageHandler returned when the message handler doesn't implement the underlying interface ErrInvalidMessageHandler = errors.New("invalid message handler provided") // ErrSubscriptionFailed returned when subscription fails ErrSubscriptionFailed = errors.New("nats subscription failed") )
var ( // ErrReqDurableName returned when durable name is a required field ErrReqDurableName = errors.New("durable name is required - use withDurableName(name) subopt") // ErrReqQueueName returned when queue name is a required field ErrReqQueueName = errors.New("queue name is required - use withQueueName(name) subopt") )
Functions ¶
func NewPublisher ¶
func NewPublisher(j *JetStream, topic string, opts ...broker.PublishOption) *publisher
NewPublisher returns a new publisher instance for jetstream publisher
func NewSubscriber ¶
func NewSubscriber(j *JetStream, topic string, opts ...broker.SubscribeOption) *subscriber
NewSubscriber returns a new subscriber instance for jetstream subscription
func WithDurableName ¶
func WithDurableName(name string) broker.SubscribeOption
WithDurableName - used to provied a durable name for sync and pull subscription
func WithNatsPubOpts ¶
func WithNatsPubOpts(opts ...nats.PubOpt) broker.PublishOption
WithNatsPubOpts defines a additional jetstream publish options
func WithNatsSubOpts ¶
func WithNatsSubOpts(opts ...nats.SubOpt) broker.SubscribeOption
WithNatsSubOpts defines a additional jetstream subscribe options
func WithQueueName ¶
func WithQueueName(name string) broker.SubscribeOption
WithQueueName - used to provied a queue name for queue subscriptions
func WithQueueSubscription ¶
func WithQueueSubscription() broker.SubscribeOption
WithQueueSubscription - used to create a queue subscriber
Types ¶
type Config ¶
type Config struct { Host string `env:"NATS_HOST,default=127.0.0.1"` Port string `env:"NATS_PORT,default=4222"` }
Config holds database configuration
func NewConfig ¶
func NewConfig() *Config
NewConfig returns the parsed config for jetstream from env
func (*Config) UnmarshalEnv ¶
UnmarshalEnv env.EnvSet to Config
type Initializer ¶
type Initializer struct {
// contains filtered or unexported fields
}
func NewInitializer ¶
func NewInitializer(j *JetStream) *Initializer
NewInitializer returns a new JetStream Initialiazer
func (*Initializer) AddDependency ¶
func (i *Initializer) AddDependency(dep interface{}) error
AddDependency adds necessary service components as dependencies
func (*Initializer) CanRun ¶
func (i *Initializer) CanRun() bool
CanRun returns true if the component has anything to Run
func (*Initializer) CanStop ¶
func (i *Initializer) CanStop() bool
CanRun returns true if the component has anything to Run
func (*Initializer) Dependencies ¶
func (i *Initializer) Dependencies() []string
Dependencies returns the string names of service components that are required as dependencies for this component
type JetStream ¶
type JetStream struct { Subscriptions map[string]*nats.Subscription *Config // contains filtered or unexported fields }
Nsq holds our broker instance
func NewJetStream ¶
NewJetStream returns a new instance of nats jetstream
func (*JetStream) HasInitializer ¶
func (*JetStream) Initializer ¶
func (j *JetStream) Initializer() component.Initializer
func (*JetStream) Publish ¶
func (j *JetStream) Publish(ctx context.Context, topic string, message interface{}, opts ...broker.PublishOption) error
Publish publishes the topic message
func (*JetStream) Subscribe ¶
func (j *JetStream) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) error
Subscribe subcribes for the given topic.
func (*JetStream) Unsubscribe ¶
Ubsubscribe method is not applicable