gochannel

package
v1.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0, MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Output channel buffer size.
	OutputChannelBuffer int64

	// If persistent is set to true, when subscriber subscribes to the topic,
	// it will receive all previously produced messages.
	//
	// All messages are persisted to the memory (simple slice),
	// so be aware that with large amount of messages you can go out of the memory.
	Persistent bool

	// When true, Publish will block until subscriber Ack's the message.
	// If there are no subscribers, Publish will not block (also when Persistent is true).
	BlockPublishUntilSubscriberAck bool

	ConsumerGroup string

	AppID string
}

Config holds the GoChannel Pub/Sub's configuration options.

type FanOut

type FanOut struct {
	// contains filtered or unexported fields
}

FanOut is a component that receives messages from the subscriber and passes them to all publishers. In effect, messages are "multiplied".

A typical use case for using FanOut is having one external subscription and multiple workers inside the process.

You need to call AddSubscription method for all topics that you want to listen to. This needs to be done *before* starting the FanOut.

FanOut exposes the standard Subscriber interface.

func NewFanOut

func NewFanOut(
	subscriber message.Subscriber,
	logger watermill.LoggerAdapter,
) (*FanOut, error)

NewFanOut creates a new FanOut.

func (*FanOut) AddSubscription

func (f *FanOut) AddSubscription(topic string)

AddSubscription add an internal subscription for the given topic. You need to call this method with all topics that you want to listen to, before the FanOut is started. AddSubscription is idempotent.

func (*FanOut) Close

func (f *FanOut) Close() error

Close closes the FanOut's internal Pub/Sub.

func (*FanOut) IsClosed

func (f *FanOut) IsClosed() bool

func (*FanOut) Run

func (f *FanOut) Run(ctx context.Context) error

Run runs the FanOut.

func (*FanOut) Running

func (f *FanOut) Running() chan struct{}

Running is closed when FanOut is running.

func (*FanOut) Subscribe

func (f *FanOut) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe starts subscription to the FanOut's internal Pub/Sub.

type GoChannel

type GoChannel struct {
	// contains filtered or unexported fields
}

GoChannel is the simplest Pub/Sub implementation. It is based on Golang's channels which are sent within the process.

GoChannel has no global state, that means that you need to use the same instance for Publishing and Subscribing!

When GoChannel is persistent, messages order is not guaranteed.

func NewGoChannel

func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel

NewGoChannel creates new GoChannel Pub/Sub.

This GoChannel is not persistent. That means if you send a message to a topic to which no subscriber is subscribed, that message will be discarded.

func (*GoChannel) Close

func (g *GoChannel) Close() error

Close closes the GoChannel Pub/Sub.

func (*GoChannel) Publish

func (g *GoChannel) Publish(ctx context.Context, topic string, messages ...*message.Message) error

Publish in GoChannel is NOT blocking until all consumers consume. Messages will be sent in background.

Messages may be persisted or not, depending on persistent attribute.

func (*GoChannel) Subscribe

func (g *GoChannel) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe returns channel to which all published messages are sent. Messages are not persisted. If there are no subscribers and message is produced it will be gone.

There are no consumer groups support etc. Every consumer will receive every produced message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL