bus

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultChannelSize = 100
)

Variables

This section is empty.

Functions

func DeserializePayload

func DeserializePayload[T proto.Message](buf []byte) (T, error)

func SerializePayload

func SerializePayload(m proto.Message) ([]byte, error)

Types

type EmptySubscription

type EmptySubscription[MessageType any] struct{}

func (EmptySubscription[MessageType]) Channel

func (s EmptySubscription[MessageType]) Channel() <-chan MessageType

func (EmptySubscription[MessageType]) Close

func (s EmptySubscription[MessageType]) Close() error

type MessageBus

type MessageBus interface {
	Publish(ctx context.Context, channel string, msg proto.Message) error
	Subscribe(ctx context.Context, channel string, channelSize int) (Reader, error)
	SubscribeQueue(ctx context.Context, channel string, channelSize int) (Reader, error)
}

func NewLocalMessageBus

func NewLocalMessageBus() MessageBus

func NewNatsMessageBus

func NewNatsMessageBus(nc *nats.Conn) MessageBus

func NewRedisMessageBus

func NewRedisMessageBus(rc redis.UniversalClient) MessageBus

func NewTestBus added in v0.3.4

func NewTestBus(bus MessageBus, opts ...TestBusOption) MessageBus

type PublishHandler added in v0.3.4

type PublishHandler func(ctx context.Context, channel string, msg proto.Message) error

type PublishInterceptor added in v0.3.4

type PublishInterceptor func(next PublishHandler) PublishHandler

type ReadHandler added in v0.3.4

type ReadHandler func() ([]byte, bool)

type Reader

type Reader interface {
	Close() error
	// contains filtered or unexported methods
}

type SubscribeInterceptor added in v0.3.4

type SubscribeInterceptor func(ctx context.Context, channel string, next ReadHandler) ReadHandler

type Subscription

type Subscription[MessageType proto.Message] interface {
	Channel() <-chan MessageType
	Close() error
}

func Subscribe

func Subscribe[MessageType proto.Message](
	ctx context.Context,
	bus MessageBus,
	channel string,
	channelSize int,
) (Subscription[MessageType], error)

func SubscribeQueue

func SubscribeQueue[MessageType proto.Message](
	ctx context.Context,
	bus MessageBus,
	channel string,
	channelSize int,
) (Subscription[MessageType], error)

type TestBusOption added in v0.3.4

type TestBusOption func(*TestBusOpts)

type TestBusOpts added in v0.3.4

type TestBusOpts struct {
	PublishInterceptors   []PublishInterceptor
	SubscribeInterceptors []SubscribeInterceptor
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL