bolt

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2024 License: MIT Imports: 9 Imported by: 1

Documentation

Overview

Package bolt implements a Pub/Sub for the Watermill project which uses the Bolt database.

Apart from the subscriber there are two publishers available, one which uses a provided transaction and one that creates its own transaction.

import(
        "github.com/ThreeDotsLabs/watermill-bolt/pkg/bolt"
        "go.etcd.io/bbolt"
)

commonConfig := bolt.CommonConfig{
        Bucket: []bolt.BucketName{
                bolt.BucketName("watermill"),
        },
}

publisher, err := bolt.NewPublisher(db, bolt.PublisherConfig{
        Common: commonConfig,
})

subscriber, err := bolt.NewSubscriber(db, bolt.SubscriberConfig{
        Common: commonConfig,
})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BucketName

type BucketName []byte

BucketName must have a positive length.

type CommonConfig

type CommonConfig struct {
	// Bucket specifies the parent bucket in which topics and subscriptions
	// will be stored. The first element is the name of the parent bucket,
	// second is the name of its first child etc. It has to have at least
	// one element.
	Bucket []BucketName

	// Defaults to JSONMarshaler.
	Marshaler Marshaler

	// Defaults to watermill.NopLogger.
	Logger watermill.LoggerAdapter
}

CommonConfig defines configuration needed by both the subscriber and the publisher.

type GenerateSubscriptionNameFn

type GenerateSubscriptionNameFn func(topic string) string

type JSONMarshaler

type JSONMarshaler struct {
}

JSONMarshaler marshals the messages as JSON. This is the default marshaler.

func (JSONMarshaler) Marshal

func (m JSONMarshaler) Marshal(msg PersistedMessage) ([]byte, error)

func (JSONMarshaler) Unmarshal

func (m JSONMarshaler) Unmarshal(b []byte) (PersistedMessage, error)

type Marshaler

type Marshaler interface {
	Marshal(msg PersistedMessage) ([]byte, error)
	Unmarshal(b []byte) (PersistedMessage, error)
}

Marshaler is responsible for marshalling and unmarshaling messages for storage. Implementations need to marshal and unmarshal all fields of the persisted message.

type PersistedMessage

type PersistedMessage struct {
	UUID     string
	Metadata message.Metadata
	Payload  message.Payload
	Created  time.Time
}

PersistedMessage is marshalled and unmarshalled for storage in the database.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher publishes messages creating a new transaction every time messages are being published. If you already have a running transaction use TxPublisher. Publisher has to be initialized by using NewPublisher.

func NewPublisher

func NewPublisher(db *bbolt.DB, config PublisherConfig) (Publisher, error)

NewPublisher creates an initialized publisher.

func (Publisher) Close

func (p Publisher) Close() error

Close does not have to be called and is here just to satisfy the publisher interface.

func (Publisher) Publish

func (p Publisher) Publish(topic string, messages ...*message.Message) error

Publish publishes a message. Calling this function with no messages or a zero value of topic returns an error.

type PublisherConfig

type PublisherConfig struct {
	Common CommonConfig
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber receives messages sent by the publishers.

The current implementation always tries to retrieve all messages at the same time which can lead to problems with a very high number of unprocessed messages in a single subscription.

Subscriber has to be initialized by using NewSubscriber.

func NewSubscriber

func NewSubscriber(db *bbolt.DB, config SubscriberConfig) (*Subscriber, error)

NewSubscriber creates an initialized subscriber.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes all channels returned by subscribe and shuts down the subscriber. Close blocks until all returned channels are successfully closed. Close can be called multiple times but subsequent calls have no effect.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe returns a channel on which you can receive messages send on the specified topic. Calling this function with a zero value of topic returns an error. Returned channel is closed after closing the subscriber. Subsequent calls to subscribe after the subscriber has been closed should not be performed and will return a closed channel but will not return an error. Further messages will be available on the channel only after the last message retrieved from the channel has been acked or nacked. If the context is closed then the channel will be closed.

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

SubscribeInitialize satisfies one of Watermill's interfaces. It is not necessary to manually call it. The same initialization performed by this function is performed by subscribe.

type SubscriberConfig

type SubscriberConfig struct {
	Common CommonConfig

	// GenerateSubscriptionName is used to create a unique identifier for
	// subscriptions created by the subscriber. The names created by
	// multiple subscribers have to be unique within a single topic. Once
	// you set this function for a particular subscriber you should not
	// change it in the future to avoid accidentally abandoning your old
	// subscriptions.
	//
	// If only one subscriber is used to listen to various topics using
	// your database then it is perfectly fine to use the default value or
	// write a function that returns a contant string, for example the name
	// of your application.
	//
	// Defaults to topic + "_sub".
	GenerateSubscriptionName GenerateSubscriptionNameFn
}

type TxPublisher

type TxPublisher struct {
	// contains filtered or unexported fields
}

TxPublisher uses the provided transaction to publish messages. It can only be used during the lifetime of that transaction. If you don't have a running transaction but want to publish messages use Publisher. TxPublisher has to be initialized by using NewTxPublisher.

func NewTxPublisher

func NewTxPublisher(tx *bbolt.Tx, config PublisherConfig) (TxPublisher, error)

NewTxPublisher returns an initialized publisher.

func (TxPublisher) Close

func (p TxPublisher) Close() error

Close does not have to be called and is here just to satisfy the publisher interface.

func (TxPublisher) Publish

func (p TxPublisher) Publish(topic string, messages ...*message.Message) error

Publish publishes a message. Calling this function with no messages or a zero value of topic returns an error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL