message

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const APIv1 string = "v1"

APIv1 is the string that represents API version 1

View Source
const CloudEventsSpec = "1.0"

CloudEventsSpec is the string that represents The Cloud Events Spec used for API version 2

Variables

SHA512 references the sha512 hash function.

Functions

func ConsumeMessages

func ConsumeMessages(ctx context.Context, topics []string, group sarama.ConsumerGroup, cons Consumer, wg *sync.WaitGroup)

ConsumeMessages given a ConsumerGroup and a Consumer, process each message in the consumer group using the consumer provided. You will need to close the consumer group when you are done processing.

func CreateConsumerGroup

func CreateConsumerGroup(kafkaVersion, groupID string, kafkaPeers []string) (sarama.ConsumerGroup, error)

CreateConsumerGroup returns a new ConsumerGroup, ready to consume things. Calls through to CreateSecureConsumerGroup without any security pararmeters enabled.

func CreateConsumerGroupEnv

func CreateConsumerGroupEnv(kafkaVersion, groupID string, tls bool, kafkaPeers []string) (sarama.ConsumerGroup, error)

CreateConsumerGroupEnv returns a new ConsumerGroup with Kafka security options enabled or disabled depending on configured envirionment variables. If SASL_USERNAME and SASL_PASSWORD are set, a consumer with SASL enabled will be returned, else a normal consumer group will be returned. If SASL is enabled SASL_MECHANISM must be set to SCRAM or PLAIN Supported SASL_MECHANISMS: SCRAM, PLAIN Future SASL_MECHANISMS: OAUTH2

func CreateSecureConsumerGroup

func CreateSecureConsumerGroup(kafkaVersion, groupID, saslUser, saslPass string, tls bool, kafkaPeers []string) (sarama.ConsumerGroup, error)

CreateSecureConsumerGroup creates a ConsumerGroup with Kafka security options enabled. Providing saslUser and saslPassword allows the consumer to authenticate with a SASL enabled kafka cluster. If left empty, a normal consumer will be created. The previous two options imply TLS. TLS can be set separately in cases where TLS is required but auth is not.

func NewConfig

func NewConfig(version string) (*sarama.Config, error)

NewConfig creates a new sarama.Config object with TLS disabled.

func NewConfigEnv

func NewConfigEnv(version string) (*sarama.Config, error)

NewConfigEnv returns a new Sarama config with Kafka security options enabled or disabled depending on configured envirionment variables. If SASL_USERNAME and SASL_PASSWORD are set, a config with SASL and TLS enabled will be returned, else a config with the former disabled will be returned. If SASL is enabled SASL_MECHANISM must be set to SCRAM or PLAIN Supported SASL_MECHANISMS: SCRAM, PLAIN Future SASL_MECHANISMS: OAUTH2

func NewPlainConfig

func NewPlainConfig(user, password, version string) (*sarama.Config, error)

NewPlainConfig creates a new SASL PLAINTEXT enabled sarama config for communicating over TLS

func NewSCRAMConfig

func NewSCRAMConfig(user, password, version string) (*sarama.Config, error)

NewSCRAMConfig creates a new SASL SCRAM enabled sarama config for communicating over TLS

Types

type Consumer

type Consumer struct {
	Ready  chan bool
	Worker ConsumerWorker
}

Consumer implements the sarama ConsumerGroupHandler interface.

func (*Consumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Each message received is handled by the Worker function on the consumer. Messages that process with an error are marked as read and will not be reprocessed.

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type ConsumerController

type ConsumerController struct {
	// contains filtered or unexported fields
}

ConsumerController is an abstraction around consumer groups to make them easier to use. The methods used to build this object are exported in the event that this object is insufficient for the task.

func NewConsumerController

func NewConsumerController(kafkaVersion, groupID string, kafkaPeers, topics []string, worker ConsumerWorker) (*ConsumerController, error)

NewConsumerController creates a new ConsumerController

func NewConsumerControllerEnv

func NewConsumerControllerEnv(kafkaVersion, groupID string, tls bool, kafkaPeers, topics []string, worker ConsumerWorker) (*ConsumerController, error)

NewConsumerControllerEnv creates a new ConsumerController with security flags invoked based on SASL_USERNAME and SASL_PASSWORD envirionment variables. If SASL_USERNAME and SASL_PASSWORD are set, a ConsumerController with SASL enabled will be returned.

func NewSecureConsumerController

func NewSecureConsumerController(kafkaVersion, groupID, saslUser, saslPass string, tls bool, kafkaPeers, topics []string, worker ConsumerWorker) (*ConsumerController, error)

NewSecureConsumerController creates a new ConsumerController with security flags invoked.

func (*ConsumerController) BeginConsuming

func (c *ConsumerController) BeginConsuming(ctx context.Context, wg *sync.WaitGroup)

BeginConsuming starts consuming messages off of the kafka bus inside of a goroutine.

func (*ConsumerController) Close

func (c *ConsumerController) Close() error

Close closes the ConsumerGroup

type ConsumerWorker

type ConsumerWorker func(*sarama.ConsumerMessage) error

ConsumerWorker represents a function that operates on sarama.ConsumerMessage structs. In practice, this means that this function should be executed on each messages received from the bus. Refer to ConsumeClaim as an example.

type Data

type Data struct {
	Events              []storage.Event              `json:"events"`
	EventReceivers      []storage.EventReceiver      `json:"event_receivers"`
	EventReceiverGroups []storage.EventReceiverGroup `json:"event_receiver_groups"`
}

Data contains the data that created the event

type Mechanism

type Mechanism int

Mechanism represents the SASL Authentication mechanism being used

const (
	NONE Mechanism = iota
	PLAIN
	SCRAM
	OAUTH2 // OAUTH2 is not currently supported, but will be
)

type Message

type Message struct {
	Success     bool   `json:"success"`     // Extension to Cloud Events Spec
	ID          string `json:"id"`          // Cloud Events Spec 1.0.1
	Specversion string `json:"specversion"` // Cloud Events Spec 1.0.1
	Type        string `json:"type"`        // Cloud Events Spec 1.0.1
	Source      string `json:"source"`      // Cloud Events Spec 1.0.1
	APIVersion  string `json:"api_version"` // Extension to Cloud Events Spec
	Name        string `json:"name"`        // Extension to Cloud Events Spec
	Version     string `json:"version"`     // Extension to Cloud Events Spec
	Release     string `json:"release"`     // Extension to Cloud Events Spec
	PlatformID  string `json:"platform_id"` // Extension to Cloud Events Spec
	Package     string `json:"package"`     // Extension to Cloud Events Spec
	Data        Data   `json:"data"`        // Cloud Events Spec 1.0.1
}

Message is the struct for kafka message events it contains information from the receipt that created the event Adheres to https://github.com/cloudevents/spec 1.0.1

func DecodeFromJSON

func DecodeFromJSON(reader io.Reader) (*Message, error)

DecodeFromJSON returns an Event from JSON

func New

func New() *Message

New returns a Message

func NewEvent

func NewEvent(e storage.Event) Message

NewEvent returns a Message

func NewEventReceiver

func NewEventReceiver(e storage.EventReceiver) Message

NewEventReceiver returns a Message

func NewEventReceiverGroupComplete

func NewEventReceiverGroupComplete(e storage.Event, erg storage.EventReceiverGroup) Message

NewEventReceiverGroupComplete returns a message

func NewEventReceiverGroupCreated

func NewEventReceiverGroupCreated(e storage.EventReceiverGroup) Message

NewEventReceiverGroupCreated returns a Message

func NewEventReceiverGroupModified

func NewEventReceiverGroupModified(e storage.EventReceiverGroup) Message

NewEventReceiverGroupModified returns a Message

func (*Message) ToJSON

func (m *Message) ToJSON() (string, error)

ToJSON converts a Events struct to JSON

func (*Message) ToYAML

func (m *Message) ToYAML() (string, error)

ToYAML converts a Events struct to YAML

type MsgInfo

type MsgInfo interface {
	Length() int
	Encode() ([]byte, error)
}

MsgInfo is a base type for encoding messages

type Producer

type Producer interface {
	Async(string, interface{})
	Send(string, interface{}) error
	ConsumeSuccesses()
	ConsumeErrors()
	Close() error
}

Producer defines an interface for producing events

func NewProducer

func NewProducer(brokers []string, config *sarama.Config) (Producer, error)

NewProducer creates a producer instance

type SASLAuthentication

type SASLAuthentication struct {
	Mechanism Mechanism
	Username  string
	Password  string
}

func GetSASLAuthentication

func GetSASLAuthentication() *SASLAuthentication

GetSASLCredentials returns SASLCredentials struct

func (*SASLAuthentication) SASLEnabled

func (s *SASLAuthentication) SASLEnabled() bool

SASLEnabled returns true if SASL_USERNAME, SASL_PASSWORD, and a SASL_MECHANISM are set

type SCRAMClient

type SCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

SCRAMClient client for doing SCRAM authentication through sarama. This implementation was taken from the sarama examples here: https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go

func (*SCRAMClient) Begin

func (s *SCRAMClient) Begin(user, pass, authzID string) error

Begin begins SCRAM authentication

func (*SCRAMClient) Done

func (s *SCRAMClient) Done() bool

Done closes conversation

func (*SCRAMClient) Step

func (s *SCRAMClient) Step(challenge string) (string, error)

Step handles scram auth

type TopicProducer

type TopicProducer interface {
	Async(data any)
	Send(data any) error
}

func NewTopicProducer

func NewTopicProducer(p Producer, topic string) TopicProducer

NewTopicProducer wraps the given producer into an interface for sending messages without knowledge of message-bus details, such as topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL