Documentation ¶
Index ¶
- Variables
- type AuthOptions
- type BasicAuth
- type Broker
- type BrokerMessage
- type BrokerOptions
- type BrokerType
- type Consumer
- type ConsumerOptions
- type ErrAuth
- type Message
- func (m *Message) Ack(ctx context.Context) error
- func (m *Message) Body() []byte
- func (m *Message) ConsumerGroup() string
- func (m *Message) Group() string
- func (m *Message) Headers() map[string]string
- func (m *Message) ID() string
- func (m *Message) Key() string
- func (m *Message) Marshal(v interface{}) error
- func (m *Message) Publish(ctx context.Context, topics ...string) error
- func (m *Message) Republish(ctx context.Context) error
- func (m *Message) SetBody(body []byte)
- func (m *Message) SetBroker(broker Broker)
- func (m *Message) SetConsumerGroup(consumerGroup string)
- func (m *Message) SetGroup(group string)
- func (m *Message) SetHeaders(headers map[string]string)
- func (m *Message) SetID(id string)
- func (m *Message) SetKey(key string)
- func (m *Message) SetTimestamp(timestamp time.Time)
- func (m *Message) SetTopic(topic string)
- func (m *Message) Timestamp() time.Time
- func (m *Message) Topic() string
- func (m *Message) Unmarshal(v interface{}) error
- type OAuth2
- type SASL
- type SASLMechanism
- type TLS
Constants ¶
This section is empty.
Variables ¶
var ErrConnection = errors.New("connection error")
ErrConnection is returned when there is a connection error.
var ErrInvalidBroker = errors.New("invalid broker")
ErrInvalidBroker is returned when the broker is invalid.
var ErrInvalidBrokerOption = errors.New("invalid broker option")
ErrInvalidBrokerOption is returned when the broker option is invalid.
var ErrInvalidBrokerOptionValue = errors.New("invalid broker option value")
ErrInvalidBrokerOptionValue is returned when the broker option value is invalid.
var ErrInvalidBrokerOptions = errors.New("invalid broker options")
ErrInvalidBrokerOptions is returned when the broker options are invalid.
var ErrMissingBroker = errors.New("missing broker")
ErrMissingBroker is returned when the broker is missing.
var ErrTooManyAuths = errors.New("too many auths are set - none or one is required")
ErrTooManyAuths is returned when there are too many auths.
Functions ¶
This section is empty.
Types ¶
type AuthOptions ¶
AuthOptions represents the options for authentication. This supports SASL/PLAIN, SASL/SCRAM, and password authentication. Either None or One should be set.
func (*AuthOptions) Check ¶
func (a *AuthOptions) Check() error
Check returns an error if the authentication options are invalid.
type BasicAuth ¶
type BasicAuth struct { // The username to use for authentication. // Ignored for redis. Username string // The password to use for authentication. // Used for redis. Password string }
BasicAuth represents the basic authentication options.
type Broker ¶
type Broker interface { // Consume consumes a message from the given topics. // This sends messages to the returned channel. // Args: // - ctx: The context to use for the operation. // - cg: The consumer group to consume from. // - topics: The topics to consume from. NewConsumer(opts ConsumerOptions) (Consumer, error) // Publish publishes the given message to given topics. Publish(ctx context.Context, msg BrokerMessage, topics ...string) error // AckBrokerMessage acknowledges the given message. AckMessage(ctx context.Context, msg BrokerMessage) error // Close closes the message broker and all its consumers. Close() error // Connect connects the message broker. Connect(ctx context.Context) error // Options returns the broker options. Options() *BrokerOptions }
Broker is the interface that all message brokers must implement.
func NewBroker ¶
func NewBroker(opts BrokerOptions) (Broker, error)
NewBroker creates a new MessageBroker.
type BrokerMessage ¶
type BrokerMessage interface { // Ack acknowledges the message. This should be called after the // message has been successfully processed. // Or your could use the Ack() the message and Republish() the message Ack(ctx context.Context) error // Republish republishes the message. This should be called if the // message has not been successfully processed. // This will put the message back on the queue to be processed again as a new message. Republish(ctx context.Context) error // Publish publishes the message to the given topics. Publish(ctx context.Context, topics ...string) error // Body returns the message body. Body() []byte // Headers returns the message headers. // These are: // - Kafka: Headers // - Pulsar: Properties // - Redis Streams: Fields Headers() map[string]string // ID returns the message ID. // This is: // - Kafka: Offset // - Pulsar: BrokerMessageID // - Redis Streams: ID ID() string // Key returns the message key. // This is: // - Kafka: Key // - Pulsar: Key // - Redis Streams: No equivalent (returns empty string) Key() string // Timestamp returns the message timestamp. // This is: // - Kafka: Timestamp // - Pulsar: PublishTime // - Redis Streams: No equivalent (returns 0) Timestamp() time.Time // Topic returns the message topic. // This is: // - Kafka: Topic // - Pulsar: Topic // - Redis Streams: Channel Topic() string // Group returns the message group. // This is: // - Kafka: No equivalent (returns empty string) // - Pulsar: No equivalent (returns empty string) // - Redis Streams: Group Group() string // ConsumerGroup returns the message consumer group. // This is: // - Kafka: ConsumerGroup // - Pulsar: ConsumerGroup // - Redis Streams: ConsumerGroup ConsumerGroup() string // Unmarshal unmarshals the message body into the given value. Unmarshal(v interface{}) error // Marshal marshals the given value into the message body. Marshal(v interface{}) error }
BrokerMessage is the interface that all messages must implement. This represents a message that is consumed or published by a Broker.
type BrokerOptions ¶
type BrokerOptions struct { // The type of message broker to use. // This is required. BrokerType BrokerType // The address of the message broker(s). // This is required. // If multiple it should be comma separated. Address string // Authentication // This is optional. // If not set, no authentication will be used. Auth *AuthOptions // OperationTimeout // This is optional, and defaults to 5 seconds. // Applies to Pulsar only. OperationTimeout time.Duration // ConnectionTimeout // This is optional, and defaults to 5 seconds. // Applies to Pulsar only. ConnectionTimeout time.Duration // Custom Marshaller // This is optional, and defaults to json.Marshal. // Override this if you want to use a custom marshaller for your messages. Marshal func(v interface{}) ([]byte, error) // Custom Unmarshaller // This is optional, and defaults to json.Unmarshal. // Override this if you want to use a custom unmarshaller for your messages. Unmarshal func(data []byte, v interface{}) error }
BrokerOptions represents the options for a message broker.
func NewBrokerOptions ¶
func NewBrokerOptions(brokerType BrokerType, address string) BrokerOptions
NewBrokerOptions returns a new BrokerOptions.
func (*BrokerOptions) SetMarshal ¶
func (o *BrokerOptions) SetMarshal(f func(v interface{}) ([]byte, error))
SetMarshal sets the Marshal function.
func (*BrokerOptions) SetUnmarshal ¶
func (o *BrokerOptions) SetUnmarshal(f func(data []byte, v interface{}) error)
SetUnmarshal sets the Unmarshal function.
type BrokerType ¶
type BrokerType string
const ( BrokerTypeKafka BrokerType = "kafka" BrokerTypePulsar BrokerType = "pulsar" BrokerTypeRedisStreams BrokerType = "redisstreams" )
type Consumer ¶
type Consumer interface { // Consume consumes a message from the given topics. // This sends messages to the returned channel. // Args: // - ctx: The context to use for the operation. // - cg: The consumer group to consume from. // - topics: The topics to consume from. Consume(ctx context.Context) (<-chan BrokerMessage, error) // Pause pauses the consumer. Pause() // Resume resumes the consumer. Resume() // Close closes the consumer. Close() error // Connect connects the consumer. Connect(ctx context.Context) error // Topics returns the topics that the consumer is consuming from. Topics() []string // ConsumerGroup returns the consumer group that the consumer is consuming from. ConsumerGroup() string }
Consumer is the interface that all message consumers must implement.
type ConsumerOptions ¶
type ConsumerOptions struct { // The consumer group to consume from. // This is required. ConsumerGroup string // The topics to consume from. // This is required. Topics []string }
ConsumerOptions represents the options for a message consumer.
func NewConsumerOptions ¶
func NewConsumerOptions(consumerGroup string, topics []string) ConsumerOptions
NewConsumerOptions returns a new ConsumerOptions.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is the struct that all brokers handle. It implements the Message interface.
func NewMessage ¶
func NewMessage(topic string, key string, body []byte, headers map[string]string, id string, timestamp time.Time, broker Broker) *Message
NewMessage creates a new message.
func (*Message) ConsumerGroup ¶
ConsumerGroup returns the message consumer group.
func (*Message) Marshal ¶
Marshal marshals the given value into the message body, using the broker's Marshal function.
func (*Message) Republish ¶
Republish republishes the message, using the broker's Republish function.
func (*Message) SetConsumerGroup ¶
SetConsumerGroup sets the message consumer group.
func (*Message) SetHeaders ¶
SetHeaders sets the message headers.
func (*Message) SetTimestamp ¶
SetTimestamp sets the message timestamp.
type OAuth2 ¶
type OAuth2 struct { // The type of OAuth2 authentication. Type string // The issuer URL. IssuerURL string // The audience. Audience string // The path to the private key file. PrivateKey string // The client ID. ClientID string }
OAuth2 represents the OAuth2 options. For use in Pulsar. Read more here: https://pulsar.apache.org/docs/2.11.x/client-libraries-go/#tls-encryption-and-authentication
type SASL ¶
type SASL struct { // The mechanism to use for authentication. Mechanism SASLMechanism // The service name to use for authentication. ServiceName string // The username to use for authentication. Username string // The password to use for authentication. Password string }
SASL represents the SASL authentication options. For use in Kafka.
type SASLMechanism ¶
type SASLMechanism string
const ( SASLSHA256 SASLMechanism = "SASL-SHA-256" SASLScram SASLMechanism = "SASL-SCRAM" )
type TLS ¶
type TLS struct { // The path to the certificate file. CertFile string // The path to the key file. KeyFile string }
TLS represents the TLS options. For use in Pulsar.
func (*TLS) GetCertFileName ¶
getCertFileName returns the certificate file name without the path.
func (*TLS) GetKeyFileName ¶
getKeyFileName returns the key file name without the path.