pulsar

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2023 License: MIT, MIT Imports: 10 Imported by: 0

Documentation

Overview

Package pulsar fork from github.com/AlexCuse/watermill-pulsar@f5e8591

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher provides the pulsar implementation for watermill publish operations

func NewPublisher

func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

NewPublisher creates a new Publisher.

func NewPublisherWithPulsarClient

func NewPublisherWithPulsarClient(config PublisherConfig, logger watermill.LoggerAdapter,
	conn pulsar.Client) (*Publisher, error)

NewPublisherWithPulsarClient creates a new Publisher with the provided nats connection.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher and the underlying connection

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic string, messages ...*message.Message) (err error)

Publish publishes message to Pulsar.

Publish will not return until an ack has been received from JetStream. When one of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	// URL is the Pulsar URL.
	URL string

	Authentication pulsar.Authentication

	AppID string
}

PublisherConfig is the configuration to create a publisher

type Subscriber

type Subscriber struct {
	SubscribersCount int
	// contains filtered or unexported fields
}

Subscriber provides the pulsar implementation for watermill subscribe operations

func NewSubscriber

func NewSubscriber(config *SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

NewSubscriber creates a new Subscriber.

func NewSubscriberWithPulsarClient

func NewSubscriberWithPulsarClient(conn pulsar.Client, config *SubscriberConfig, logger watermill.LoggerAdapter) (
	*Subscriber, error)

NewSubscriberWithPulsarClient creates a new Subscriber with the provided pulsar client.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes the publisher and the underlying connection. It will attempt to wait for in-flight messages to complete.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe subscribes messages from JetStream.

type SubscriberConfig

type SubscriberConfig struct {
	// URL is the URL to the broker
	URL string

	// QueueGroup is the JetStream queue group.
	//
	// All subscriptions with the same queue name (regardless of the connection they originate from)
	// will form a queue group. Each message will be delivered to only one subscriber per queue group,
	// using queuing semantics.
	//
	// It is recommended to set it with DurableName.
	// For non durable queue subscribers, when the last member leaves the group,
	// that group is removed. A durable queue group (DurableName) allows you to have all members leave
	// but still maintain state. When a member re-joins, it starts at the last position in that group.
	//
	// When QueueGroup is empty, subscribe without QueueGroup will be used.
	QueueGroup string

	Persistent bool

	Authentication pulsar.Authentication
}

SubscriberConfig is the configuration to create a subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL