Documentation ¶
Overview ¶
Package bolt implements a Pub/Sub for the Watermill project which uses the Bolt database.
Apart from the subscriber there are two publishers available, one which uses a provided transaction and one that creates its own transaction.
import( "github.com/ThreeDotsLabs/watermill-bolt/pkg/bolt" "go.etcd.io/bbolt" ) commonConfig := bolt.CommonConfig{ Bucket: []bolt.BucketName{ bolt.BucketName("watermill"), }, } publisher, err := bolt.NewPublisher(db, bolt.PublisherConfig{ Common: commonConfig, }) subscriber, err := bolt.NewSubscriber(db, bolt.SubscriberConfig{ Common: commonConfig, })
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CommonConfig ¶
type CommonConfig struct { // Bucket specifies the parent bucket in which topics and subscriptions // will be stored. The first element is the name of the parent bucket, // second is the name of its first child etc. It has to have at least // one element. Bucket []BucketName // Defaults to JSONMarshaler. Marshaler Marshaler // Defaults to watermill.NopLogger. Logger watermill.LoggerAdapter }
CommonConfig defines configuration needed by both the subscriber and the publisher.
type JSONMarshaler ¶
type JSONMarshaler struct { }
JSONMarshaler marshals the messages as JSON. This is the default marshaler.
func (JSONMarshaler) Marshal ¶
func (m JSONMarshaler) Marshal(msg PersistedMessage) ([]byte, error)
func (JSONMarshaler) Unmarshal ¶
func (m JSONMarshaler) Unmarshal(b []byte) (PersistedMessage, error)
type Marshaler ¶
type Marshaler interface { Marshal(msg PersistedMessage) ([]byte, error) Unmarshal(b []byte) (PersistedMessage, error) }
Marshaler is responsible for marshalling and unmarshaling messages for storage. Implementations need to marshal and unmarshal all fields of the persisted message.
type PersistedMessage ¶
type PersistedMessage struct { UUID string Metadata message.Metadata Payload message.Payload Created time.Time }
PersistedMessage is marshalled and unmarshalled for storage in the database.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher publishes messages creating a new transaction every time messages are being published. If you already have a running transaction use TxPublisher. Publisher has to be initialized by using NewPublisher.
func NewPublisher ¶
func NewPublisher(db *bbolt.DB, config PublisherConfig) (Publisher, error)
NewPublisher creates an initialized publisher.
type PublisherConfig ¶
type PublisherConfig struct {
Common CommonConfig
}
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber receives messages sent by the publishers.
The current implementation always tries to retrieve all messages at the same time which can lead to problems with a very high number of unprocessed messages in a single subscription.
Subscriber has to be initialized by using NewSubscriber.
func NewSubscriber ¶
func NewSubscriber(db *bbolt.DB, config SubscriberConfig) (*Subscriber, error)
NewSubscriber creates an initialized subscriber.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes all channels returned by subscribe and shuts down the subscriber. Close blocks until all returned channels are successfully closed. Close can be called multiple times but subsequent calls have no effect.
func (*Subscriber) Subscribe ¶
Subscribe returns a channel on which you can receive messages send on the specified topic. Calling this function with a zero value of topic returns an error. Returned channel is closed after closing the subscriber. Subsequent calls to subscribe after the subscriber has been closed should not be performed and will return a closed channel but will not return an error. Further messages will be available on the channel only after the last message retrieved from the channel has been acked or nacked. If the context is closed then the channel will be closed.
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) error
SubscribeInitialize satisfies one of Watermill's interfaces. It is not necessary to manually call it. The same initialization performed by this function is performed by subscribe.
type SubscriberConfig ¶
type SubscriberConfig struct { Common CommonConfig // GenerateSubscriptionName is used to create a unique identifier for // subscriptions created by the subscriber. The names created by // multiple subscribers have to be unique within a single topic. Once // you set this function for a particular subscriber you should not // change it in the future to avoid accidentally abandoning your old // subscriptions. // // If only one subscriber is used to listen to various topics using // your database then it is perfectly fine to use the default value or // write a function that returns a contant string, for example the name // of your application. // // Defaults to topic + "_sub". GenerateSubscriptionName GenerateSubscriptionNameFn }
type TxPublisher ¶
type TxPublisher struct {
// contains filtered or unexported fields
}
TxPublisher uses the provided transaction to publish messages. It can only be used during the lifetime of that transaction. If you don't have a running transaction but want to publish messages use Publisher. TxPublisher has to be initialized by using NewTxPublisher.
func NewTxPublisher ¶
func NewTxPublisher(tx *bbolt.Tx, config PublisherConfig) (TxPublisher, error)
NewTxPublisher returns an initialized publisher.
func (TxPublisher) Close ¶
func (p TxPublisher) Close() error
Close does not have to be called and is here just to satisfy the publisher interface.