gomb

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

GOMB - Go Message Broker

A general purpose package for working with message brokers, such as Kafka, Pulsar and Redis Streams, loosely inspired by Golang's database/sql.

⚠ This repo is under construction.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrConnection = errors.New("connection error")

ErrConnection is returned when there is a connection error.

View Source
var ErrInvalidBroker = errors.New("invalid broker")

ErrInvalidBroker is returned when the broker is invalid.

View Source
var ErrInvalidBrokerOption = errors.New("invalid broker option")

ErrInvalidBrokerOption is returned when the broker option is invalid.

View Source
var ErrInvalidBrokerOptionValue = errors.New("invalid broker option value")

ErrInvalidBrokerOptionValue is returned when the broker option value is invalid.

View Source
var ErrInvalidBrokerOptions = errors.New("invalid broker options")

ErrInvalidBrokerOptions is returned when the broker options are invalid.

View Source
var ErrMissingBroker = errors.New("missing broker")

ErrMissingBroker is returned when the broker is missing.

View Source
var ErrTooManyAuths = errors.New("too many auths are set - none or one is required")

ErrTooManyAuths is returned when there are too many auths.

Functions

This section is empty.

Types

type AuthOptions

type AuthOptions struct {
	Basic *BasicAuth

	SASL *SASL

	TLS *TLS

	OAuth2 *OAuth2
}

AuthOptions represents the options for authentication. This supports SASL/PLAIN, SASL/SCRAM, and password authentication. Either None or One should be set.

func (*AuthOptions) Check

func (a *AuthOptions) Check() error

Check returns an error if the authentication options are invalid.

type BasicAuth

type BasicAuth struct {
	// The username to use for authentication.
	// Ignored for redis.
	Username string

	// The password to use for authentication.
	// Used for redis.
	Password string
}

BasicAuth represents the basic authentication options.

type Broker

type Broker interface {
	// Consume consumes a message from the given topics.
	// This sends messages to the returned channel.
	// Args:
	// - ctx: The context to use for the operation.
	// - cg: The consumer group to consume from.
	// - topics: The topics to consume from.
	NewConsumer(opts ConsumerOptions) (Consumer, error)

	// Publish publishes the given message to given topics.
	Publish(ctx context.Context, msg BrokerMessage, topics ...string) error

	// AckBrokerMessage acknowledges the given message.
	AckMessage(ctx context.Context, msg BrokerMessage) error

	// Close closes the message broker and all its consumers.
	Close() error

	// Connect connects the message broker.
	Connect(ctx context.Context) error

	// Options returns the broker options.
	Options() *BrokerOptions
}

Broker is the interface that all message brokers must implement.

func NewBroker

func NewBroker(opts BrokerOptions) (Broker, error)

NewBroker creates a new MessageBroker.

type BrokerMessage

type BrokerMessage interface {
	// Ack acknowledges the message. This should be called after the
	// message has been successfully processed.
	// Or your could use the Ack() the message and Republish() the message
	Ack(ctx context.Context) error

	// Republish republishes the message. This should be called if the
	// message has not been successfully processed.
	// This will put the message back on the queue to be processed again as a new message.
	Republish(ctx context.Context) error

	// Publish publishes the message to the given topics.
	Publish(ctx context.Context, topics ...string) error

	// Body returns the message body.
	Body() []byte

	// Headers returns the message headers.
	// These are:
	// - Kafka: Headers
	// - Pulsar: Properties
	// - Redis Streams: Fields
	Headers() map[string]string

	// ID returns the message ID.
	// This is:
	// - Kafka: Offset
	// - Pulsar: BrokerMessageID
	// - Redis Streams: ID
	ID() string

	// Key returns the message key.
	// This is:
	// - Kafka: Key
	// - Pulsar: Key
	// - Redis Streams: No equivalent (returns empty string)
	Key() string

	// Timestamp returns the message timestamp.
	// This is:
	// - Kafka: Timestamp
	// - Pulsar: PublishTime
	// - Redis Streams: No equivalent (returns 0)
	Timestamp() time.Time

	// Topic returns the message topic.
	// This is:
	// - Kafka: Topic
	// - Pulsar: Topic
	// - Redis Streams: Channel
	Topic() string

	// Group returns the message group.
	// This is:
	// - Kafka: No equivalent (returns empty string)
	// - Pulsar: No equivalent (returns empty string)
	// - Redis Streams: Group
	Group() string

	// ConsumerGroup returns the message consumer group.
	// This is:
	// - Kafka: ConsumerGroup
	// - Pulsar: ConsumerGroup
	// - Redis Streams: ConsumerGroup
	ConsumerGroup() string

	// Unmarshal unmarshals the message body into the given value.
	Unmarshal(v interface{}) error

	// Marshal marshals the given value into the message body.
	Marshal(v interface{}) error
}

BrokerMessage is the interface that all messages must implement. This represents a message that is consumed or published by a Broker.

type BrokerOptions

type BrokerOptions struct {
	// The type of message broker to use.
	// This is required.
	BrokerType BrokerType

	// The address of the message broker(s).
	// This is required.
	// If multiple it should be comma separated.
	Address string

	// Authentication
	// This is optional.
	// If not set, no authentication will be used.
	Auth *AuthOptions

	// OperationTimeout
	// This is optional, and defaults to 5 seconds.
	// Applies to Pulsar only.
	OperationTimeout time.Duration

	// ConnectionTimeout
	// This is optional, and defaults to 5 seconds.
	// Applies to Pulsar only.
	ConnectionTimeout time.Duration

	// Custom Marshaller
	// This is optional, and defaults to json.Marshal.
	// Override this if you want to use a custom marshaller for your messages.
	Marshal func(v interface{}) ([]byte, error)

	// Custom Unmarshaller
	// This is optional, and defaults to json.Unmarshal.
	// Override this if you want to use a custom unmarshaller for your messages.
	Unmarshal func(data []byte, v interface{}) error
}

BrokerOptions represents the options for a message broker.

func NewBrokerOptions

func NewBrokerOptions(brokerType BrokerType, address string) BrokerOptions

NewBrokerOptions returns a new BrokerOptions.

func (*BrokerOptions) SetMarshal

func (o *BrokerOptions) SetMarshal(f func(v interface{}) ([]byte, error))

SetMarshal sets the Marshal function.

func (*BrokerOptions) SetUnmarshal

func (o *BrokerOptions) SetUnmarshal(f func(data []byte, v interface{}) error)

SetUnmarshal sets the Unmarshal function.

type BrokerType

type BrokerType string
const (
	BrokerTypeKafka        BrokerType = "kafka"
	BrokerTypePulsar       BrokerType = "pulsar"
	BrokerTypeRedisStreams BrokerType = "redisstreams"
)

type Consumer

type Consumer interface {
	// Consume consumes a message from the given topics.
	// This sends messages to the returned channel.
	// Args:
	// - ctx: The context to use for the operation.
	// - cg: The consumer group to consume from.
	// - topics: The topics to consume from.
	Consume(ctx context.Context) (<-chan BrokerMessage, error)

	// Pause pauses the consumer.
	Pause()

	// Resume resumes the consumer.
	Resume()

	// Close closes the consumer.
	Close() error

	// Connect connects the consumer.
	Connect(ctx context.Context) error

	// Topics returns the topics that the consumer is consuming from.
	Topics() []string

	// ConsumerGroup returns the consumer group that the consumer is consuming from.
	ConsumerGroup() string
}

Consumer is the interface that all message consumers must implement.

type ConsumerOptions

type ConsumerOptions struct {
	// The consumer group to consume from.
	// This is required.
	ConsumerGroup string

	// The topics to consume from.
	// This is required.
	Topics []string
}

ConsumerOptions represents the options for a message consumer.

func NewConsumerOptions

func NewConsumerOptions(consumerGroup string, topics []string) ConsumerOptions

NewConsumerOptions returns a new ConsumerOptions.

type ErrAuth

type ErrAuth struct {
	Msg string
}

ErrAuth is an auth setting related error.

func (*ErrAuth) Error

func (e *ErrAuth) Error() string

Error returns the error message.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is the struct that all brokers handle. It implements the Message interface.

func NewMessage

func NewMessage(topic string, key string, body []byte, headers map[string]string, id string, timestamp time.Time, broker Broker) *Message

NewMessage creates a new message.

func (*Message) Ack

func (m *Message) Ack(ctx context.Context) error

Ack acknowledges the message, using the broker's Ack function.

func (*Message) Body

func (m *Message) Body() []byte

Body returns the message body.

func (*Message) ConsumerGroup

func (m *Message) ConsumerGroup() string

ConsumerGroup returns the message consumer group.

func (*Message) Group

func (m *Message) Group() string

Group returns the message group.

func (*Message) Headers

func (m *Message) Headers() map[string]string

Headers returns the message headers.

func (*Message) ID

func (m *Message) ID() string

ID returns the message ID.

func (*Message) Key

func (m *Message) Key() string

Key returns the message key.

func (*Message) Marshal

func (m *Message) Marshal(v interface{}) error

Marshal marshals the given value into the message body, using the broker's Marshal function.

func (*Message) Publish

func (m *Message) Publish(ctx context.Context, topics ...string) error

Publish this message to the given topics.

func (*Message) Republish

func (m *Message) Republish(ctx context.Context) error

Republish republishes the message, using the broker's Republish function.

func (*Message) SetBody

func (m *Message) SetBody(body []byte)

SetBody sets the message body.

func (*Message) SetBroker

func (m *Message) SetBroker(broker Broker)

SetBroker sets the message broker.

func (*Message) SetConsumerGroup

func (m *Message) SetConsumerGroup(consumerGroup string)

SetConsumerGroup sets the message consumer group.

func (*Message) SetGroup

func (m *Message) SetGroup(group string)

SetGroup sets the message group.

func (*Message) SetHeaders

func (m *Message) SetHeaders(headers map[string]string)

SetHeaders sets the message headers.

func (*Message) SetID

func (m *Message) SetID(id string)

SetID sets the message ID.

func (*Message) SetKey

func (m *Message) SetKey(key string)

SetKey sets the message key.

func (*Message) SetTimestamp

func (m *Message) SetTimestamp(timestamp time.Time)

SetTimestamp sets the message timestamp.

func (*Message) SetTopic

func (m *Message) SetTopic(topic string)

SetTopic sets the message topic.

func (*Message) Timestamp

func (m *Message) Timestamp() time.Time

Timestamp returns the message timestamp.

func (*Message) Topic

func (m *Message) Topic() string

Topic returns the message topic.

func (*Message) Unmarshal

func (m *Message) Unmarshal(v interface{}) error

Unmarshal unmarshals the message body into the given value, using the broker's Unmarshal function.

type OAuth2

type OAuth2 struct {
	// The type of OAuth2 authentication.
	Type string

	// The issuer URL.
	IssuerURL string

	// The audience.
	Audience string

	// The path to the private key file.
	PrivateKey string

	// The client ID.
	ClientID string
}

OAuth2 represents the OAuth2 options. For use in Pulsar. Read more here: https://pulsar.apache.org/docs/2.11.x/client-libraries-go/#tls-encryption-and-authentication

func (*OAuth2) Check

func (o *OAuth2) Check() error

Check returns an error if the OAuth2 options are invalid.

func (*OAuth2) String

func (o *OAuth2) String() string

func (*OAuth2) ToMap

func (o *OAuth2) ToMap() map[string]string

ToMap converts the OAuth2 options to a map

type SASL

type SASL struct {
	// The mechanism to use for authentication.
	Mechanism SASLMechanism

	// The service name to use for authentication.
	ServiceName string

	// The username to use for authentication.
	Username string

	// The password to use for authentication.
	Password string
}

SASL represents the SASL authentication options. For use in Kafka.

func (*SASL) Check

func (s *SASL) Check() error

Check returns an error if the SASL options are invalid.

type SASLMechanism

type SASLMechanism string
const (
	SASLSHA256 SASLMechanism = "SASL-SHA-256"
	SASLScram  SASLMechanism = "SASL-SCRAM"
)

type TLS

type TLS struct {
	// The path to the certificate file.
	CertFile string

	// The path to the key file.
	KeyFile string
}

TLS represents the TLS options. For use in Pulsar.

func (*TLS) Check

func (t *TLS) Check() error

Check returns an error if the TLS options are invalid.

func (*TLS) GetCertFileName

func (t *TLS) GetCertFileName() string

getCertFileName returns the certificate file name without the path.

func (*TLS) GetKeyFileName

func (t *TLS) GetKeyFileName() string

getKeyFileName returns the key file name without the path.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL