Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type IJetstream ¶
type IJetstream interface { ChanQueueSubscribe(subj, queue string, ch chan *nats.Msg, opts ...nats.SubOpt) (*nats.Subscription, error) ConsumerInfo(stream, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) }
type NatsProcessor ¶
type NatsProcessor struct {
// contains filtered or unexported fields
}
NatsProcessor gets news from NATS and processes them using the provided function
func NewNatsProcessor ¶
func NewNatsProcessor(js IJetstream, bucket string, queueName string, retrier retrier.Retrier, processFunc func(*nwelastic.News) error) (NatsProcessor, error)
NewNatsProcessor creates a new listener
func (*NatsProcessor) Listen ¶
func (l *NatsProcessor) Listen(ctx context.Context, onMsgProcessed ...func(*nats.Msg)) error
func (*NatsProcessor) Unsubscribe ¶
func (l *NatsProcessor) Unsubscribe() error
Click to show internal directories.
Click to hide internal directories.