streams

package
v0.0.0-...-de4cc66 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

streams package contains a RedisClient which leverages go-redis to interact with a Redis server.

Index

Constants

View Source
const (
	// GoModMessagingNamespace go-mod-messaging(gmm) namespace used for grouping and isolating topics(streams). This is
	// prepended to topics when either subscribing or publishing to a stream.
	GoModMessagingNamespace = "gmm"

	// GoModMessagingNamespaceFormat formatted string which allows for consistent construction of entities within Redis
	// that require a namespace.
	GoModMessagingNamespaceFormat = GoModMessagingNamespace + ":%s"
)
View Source
const (
	// Special identifier used within Redis to signal that a subscriber(consumer) is only interested in the most recent
	// messages after the client has connected. Redis provides other functionality to read all the data from a stream
	// even if has been read previously, which is what we want to avoid for functional consistency with the other
	// implementations of MessageClient.
	LatestStreamMessage = "$"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client MessageClient implementation which provides functionality for sending and receiving messages using RedisStreams.

func NewClient

func NewClient(messageBusConfig types.MessageBusConfig) (Client, error)

NewClient creates a new Client based on the provided configuration.

func NewClientWithCreator

func NewClientWithCreator(
	messageBusConfig types.MessageBusConfig,
	creator RedisClientCreator,
	pairCreator pkg.X509KeyPairCreator,
	keyLoader pkg.X509KeyLoader) (Client, error)

NewClientWithCreator creates a new Client based on the provided configuration while allowing more control on the creation of the underlying entities such as certs, keys, and Redis clients

func (Client) Connect

func (c Client) Connect() error

Connect noop as preemptive connections are not needed.

func (Client) Disconnect

func (c Client) Disconnect() error

Disconnect closes connections to the Redis server.

func (Client) Publish

func (c Client) Publish(message types.MessageEnvelope, topic string) error

Publish sends the provided message to appropriate Redis stream.

func (Client) Subscribe

func (c Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) error

Subscribe creates background processes which reads messages from the appropriate Redis stream and sends to the provided channels

type DisconnectErr

type DisconnectErr struct {
	// contains filtered or unexported fields
}

DisconnectErr represents errors which occur when attempting to disconnect from a Redis server.

func NewDisconnectErr

func NewDisconnectErr(disconnectErrors []string) DisconnectErr

NewDisconnectErr created a new DisconnectErr

func (DisconnectErr) Error

func (d DisconnectErr) Error() string

Error constructs an appropriate error message based on the error descriptions provided.

type OptionalClientConfiguration

type OptionalClientConfiguration struct {
	Password string
}

OptionalClientConfiguration contains additional configuration properties which can be provided via the MessageBus.Optional's field.

func NewClientConfiguration

func NewClientConfiguration(config types.MessageBusConfig) (OptionalClientConfiguration, error)

NewClientConfiguration creates a OptionalClientConfiguration based on the configuration properties provided.

type RedisClient

type RedisClient interface {
	// AddToStream sends a message to a stream which acts as a topic.
	AddToStream(stream string, values map[string]interface{}) error
	// ReadFromStream blocking operation which retrieves the next message(s) from a stream which acts as a topic.
	ReadFromStream(stream string) ([]types.MessageEnvelope, error)
	// Close cleans up any entities which need to be deconstructed.
	Close() error
}

RedisClient provides functionality needed to read and send messages to/from Redis' RedisStreams functionality.

The main reason for this interface is to abstract out the underlying client from Client so that it can be mocked and allow for easy unit testing. Since 'go-redis' does not leverage interfaces and has complicated entities it can become complex to test the operations without requiring a running Redis server.

func NewGoRedisClientWrapper

func NewGoRedisClientWrapper(redisServerURL string, password string, tlsConfig *tls.Config) (RedisClient, error)

NewGoRedisClientWrapper creates a RedisClient implementation which uses a 'go-redis' Client to achieve the necessary functionality.

type RedisClientCreator

type RedisClientCreator func(redisServerURL string, password string, tlsConfig *tls.Config) (RedisClient, error)

RedisClientCreator type alias for functions which create RedisClient implementation.

This is mostly used for testing purposes so that we can easily inject mocks.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL