jetstream

package
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT = iota
	SYNC
	PULL
	QUEUE
)

Variables

View Source
var (
	// ErrInvalidMessageHandler returned when the message handler doesn't implement the underlying interface
	ErrInvalidMessageHandler = errors.New("invalid message handler provided")
	// ErrSubscriptionFailed returned when subscription fails
	ErrSubscriptionFailed = errors.New("nats subscription failed")
)
View Source
var (
	// ErrReqDurableName returned when durable name is a required field
	ErrReqDurableName = errors.New("durable name is required - use withDurableName(name) subopt")
	// ErrReqQueueName returned when queue name is a required field
	ErrReqQueueName = errors.New("queue name is required - use withQueueName(name) subopt")
)

Functions

func AddStream

func AddStream(name string, subjects ...string) broker.Option

AddStream defines a the stream in which to publish the message

func NewPublisher

func NewPublisher(j *JetStream, topic string, opts ...broker.PublishOption) *publisher

NewPublisher returns a new publisher instance for jetstream publisher

func NewSubscriber

func NewSubscriber(j *JetStream, topic string, opts ...broker.SubscribeOption) *subscriber

NewSubscriber returns a new subscriber instance for jetstream subscription

func WithDurableName

func WithDurableName(name string) broker.SubscribeOption

WithDurableName - used to provied a durable name for sync and pull subscription

func WithNatsPubOpts

func WithNatsPubOpts(opts ...nats.PubOpt) broker.PublishOption

WithNatsPubOpts defines a additional jetstream publish options

func WithNatsSubOpts

func WithNatsSubOpts(opts ...nats.SubOpt) broker.SubscribeOption

WithNatsSubOpts defines a additional jetstream subscribe options

func WithQueueName

func WithQueueName(name string) broker.SubscribeOption

WithQueueName - used to provied a queue name for queue subscriptions

func WithQueueSubscription

func WithQueueSubscription() broker.SubscribeOption

WithQueueSubscription - used to create a queue subscriber

Types

type Config

type Config struct {
	Host string `env:"NATS_HOST,default=127.0.0.1"`
	Port string `env:"NATS_PORT,default=4222"`
}

Config holds database configuration

func NewConfig

func NewConfig() *Config

NewConfig returns the parsed config for jetstream from env

func (*Config) Address

func (c *Config) Address() string

Address returns the formatted address for the producer

func (*Config) UnmarshalEnv

func (c *Config) UnmarshalEnv(es env.EnvSet) error

UnmarshalEnv env.EnvSet to Config

type Initializer

type Initializer struct {
	// contains filtered or unexported fields
}

func NewInitializer

func NewInitializer(j *JetStream) *Initializer

NewInitializer returns a new JetStream Initialiazer

func (*Initializer) AddDependency

func (i *Initializer) AddDependency(dep interface{}) error

AddDependency adds necessary service components as dependencies

func (*Initializer) CanRun

func (i *Initializer) CanRun() bool

CanRun returns true if the component has anything to Run

func (*Initializer) CanStop

func (i *Initializer) CanStop() bool

CanRun returns true if the component has anything to Run

func (*Initializer) Dependencies

func (i *Initializer) Dependencies() []string

Dependencies returns the string names of service components that are required as dependencies for this component

func (*Initializer) Run

func (i *Initializer) Run(ctx context.Context) error

Run start the service component

func (*Initializer) Stop

func (i *Initializer) Stop(ctx context.Context) error

Stop - stops the running

type JetStream

type JetStream struct {
	Subscriptions map[string]*nats.Subscription
	*Config
	// contains filtered or unexported fields
}

Nsq holds our broker instance

func NewJetStream

func NewJetStream(opts ...broker.Option) *JetStream

NewJetStream returns a new instance of nats jetstream

func (*JetStream) HasInitializer

func (j *JetStream) HasInitializer() bool

func (*JetStream) Initializer

func (j *JetStream) Initializer() component.Initializer

func (*JetStream) Logger

func (j *JetStream) Logger() logger.Logger

Logger returns the initialized logger instance

func (*JetStream) Publish

func (j *JetStream) Publish(ctx context.Context, topic string, message interface{}, opts ...broker.PublishOption) error

Publish publishes the topic message

func (*JetStream) String

func (j *JetStream) String() string

func (*JetStream) Subscribe

func (j *JetStream) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) error

Subscribe subcribes for the given topic.

func (*JetStream) Unsubscribe

func (j *JetStream) Unsubscribe(topic string) error

Ubsubscribe method is not applicable

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL