Documentation ¶
Index ¶
- Constants
- Variables
- func ConsumeMessages(ctx context.Context, topics []string, group sarama.ConsumerGroup, ...)
- func CreateConsumerGroup(kafkaVersion, groupID string, kafkaPeers []string) (sarama.ConsumerGroup, error)
- func CreateConsumerGroupEnv(kafkaVersion, groupID string, tls bool, kafkaPeers []string) (sarama.ConsumerGroup, error)
- func CreateSecureConsumerGroup(kafkaVersion, groupID, saslUser, saslPass string, tls bool, ...) (sarama.ConsumerGroup, error)
- func NewConfig(version string) (*sarama.Config, error)
- func NewConfigEnv(version string) (*sarama.Config, error)
- func NewPlainConfig(user, password, version string) (*sarama.Config, error)
- func NewSCRAMConfig(user, password, version string) (*sarama.Config, error)
- type Consumer
- type ConsumerController
- func NewConsumerController(kafkaVersion, groupID string, kafkaPeers, topics []string, ...) (*ConsumerController, error)
- func NewConsumerControllerEnv(kafkaVersion, groupID string, tls bool, kafkaPeers, topics []string, ...) (*ConsumerController, error)
- func NewSecureConsumerController(kafkaVersion, groupID, saslUser, saslPass string, tls bool, ...) (*ConsumerController, error)
- type ConsumerWorker
- type Data
- type Mechanism
- type Message
- func DecodeFromJSON(reader io.Reader) (*Message, error)
- func New() *Message
- func NewEvent(e storage.Event) Message
- func NewEventReceiver(e storage.EventReceiver) Message
- func NewEventReceiverGroupComplete(e storage.Event, erg storage.EventReceiverGroup) Message
- func NewEventReceiverGroupCreated(e storage.EventReceiverGroup) Message
- func NewEventReceiverGroupModified(e storage.EventReceiverGroup) Message
- type MsgInfo
- type Producer
- type SASLAuthentication
- type SCRAMClient
- type TopicProducer
Constants ¶
const APIv1 string = "v1"
APIv1 is the string that represents API version 1
const CloudEventsSpec = "1.0"
CloudEventsSpec is the string that represents The Cloud Events Spec used for API version 2
Variables ¶
var SHA512 scram.HashGeneratorFcn = sha512.New
SHA512 references the sha512 hash function.
Functions ¶
func ConsumeMessages ¶
func ConsumeMessages(ctx context.Context, topics []string, group sarama.ConsumerGroup, cons Consumer, wg *sync.WaitGroup)
ConsumeMessages given a ConsumerGroup and a Consumer, process each message in the consumer group using the consumer provided. You will need to close the consumer group when you are done processing.
func CreateConsumerGroup ¶
func CreateConsumerGroup(kafkaVersion, groupID string, kafkaPeers []string) (sarama.ConsumerGroup, error)
CreateConsumerGroup returns a new ConsumerGroup, ready to consume things. Calls through to CreateSecureConsumerGroup without any security pararmeters enabled.
func CreateConsumerGroupEnv ¶
func CreateConsumerGroupEnv(kafkaVersion, groupID string, tls bool, kafkaPeers []string) (sarama.ConsumerGroup, error)
CreateConsumerGroupEnv returns a new ConsumerGroup with Kafka security options enabled or disabled depending on configured envirionment variables. If SASL_USERNAME and SASL_PASSWORD are set, a consumer with SASL enabled will be returned, else a normal consumer group will be returned. If SASL is enabled SASL_MECHANISM must be set to SCRAM or PLAIN Supported SASL_MECHANISMS: SCRAM, PLAIN Future SASL_MECHANISMS: OAUTH2
func CreateSecureConsumerGroup ¶
func CreateSecureConsumerGroup(kafkaVersion, groupID, saslUser, saslPass string, tls bool, kafkaPeers []string) (sarama.ConsumerGroup, error)
CreateSecureConsumerGroup creates a ConsumerGroup with Kafka security options enabled. Providing saslUser and saslPassword allows the consumer to authenticate with a SASL enabled kafka cluster. If left empty, a normal consumer will be created. The previous two options imply TLS. TLS can be set separately in cases where TLS is required but auth is not.
func NewConfigEnv ¶
NewConfigEnv returns a new Sarama config with Kafka security options enabled or disabled depending on configured envirionment variables. If SASL_USERNAME and SASL_PASSWORD are set, a config with SASL and TLS enabled will be returned, else a config with the former disabled will be returned. If SASL is enabled SASL_MECHANISM must be set to SCRAM or PLAIN Supported SASL_MECHANISMS: SCRAM, PLAIN Future SASL_MECHANISMS: OAUTH2
func NewPlainConfig ¶
NewPlainConfig creates a new SASL PLAINTEXT enabled sarama config for communicating over TLS
Types ¶
type Consumer ¶
type Consumer struct { Ready chan bool Worker ConsumerWorker }
Consumer implements the sarama ConsumerGroupHandler interface.
func (*Consumer) Cleanup ¶
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). Each message received is handled by the Worker function on the consumer. Messages that process with an error are marked as read and will not be reprocessed.
type ConsumerController ¶
type ConsumerController struct {
// contains filtered or unexported fields
}
ConsumerController is an abstraction around consumer groups to make them easier to use. The methods used to build this object are exported in the event that this object is insufficient for the task.
func NewConsumerController ¶
func NewConsumerController(kafkaVersion, groupID string, kafkaPeers, topics []string, worker ConsumerWorker) (*ConsumerController, error)
NewConsumerController creates a new ConsumerController
func NewConsumerControllerEnv ¶
func NewConsumerControllerEnv(kafkaVersion, groupID string, tls bool, kafkaPeers, topics []string, worker ConsumerWorker) (*ConsumerController, error)
NewConsumerControllerEnv creates a new ConsumerController with security flags invoked based on SASL_USERNAME and SASL_PASSWORD envirionment variables. If SASL_USERNAME and SASL_PASSWORD are set, a ConsumerController with SASL enabled will be returned.
func NewSecureConsumerController ¶
func NewSecureConsumerController(kafkaVersion, groupID, saslUser, saslPass string, tls bool, kafkaPeers, topics []string, worker ConsumerWorker) (*ConsumerController, error)
NewSecureConsumerController creates a new ConsumerController with security flags invoked.
func (*ConsumerController) BeginConsuming ¶
func (c *ConsumerController) BeginConsuming(ctx context.Context, wg *sync.WaitGroup)
BeginConsuming starts consuming messages off of the kafka bus inside of a goroutine.
func (*ConsumerController) Close ¶
func (c *ConsumerController) Close() error
Close closes the ConsumerGroup
type ConsumerWorker ¶
type ConsumerWorker func(*sarama.ConsumerMessage) error
ConsumerWorker represents a function that operates on sarama.ConsumerMessage structs. In practice, this means that this function should be executed on each messages received from the bus. Refer to ConsumeClaim as an example.
type Data ¶
type Data struct { Events []storage.Event `json:"events"` EventReceivers []storage.EventReceiver `json:"event_receivers"` EventReceiverGroups []storage.EventReceiverGroup `json:"event_receiver_groups"` }
Data contains the data that created the event
type Mechanism ¶
type Mechanism int
Mechanism represents the SASL Authentication mechanism being used
type Message ¶
type Message struct { Success bool `json:"success"` // Extension to Cloud Events Spec ID string `json:"id"` // Cloud Events Spec 1.0.1 Specversion string `json:"specversion"` // Cloud Events Spec 1.0.1 Type string `json:"type"` // Cloud Events Spec 1.0.1 Source string `json:"source"` // Cloud Events Spec 1.0.1 APIVersion string `json:"api_version"` // Extension to Cloud Events Spec Name string `json:"name"` // Extension to Cloud Events Spec Version string `json:"version"` // Extension to Cloud Events Spec Release string `json:"release"` // Extension to Cloud Events Spec PlatformID string `json:"platform_id"` // Extension to Cloud Events Spec Package string `json:"package"` // Extension to Cloud Events Spec Data Data `json:"data"` // Cloud Events Spec 1.0.1 }
Message is the struct for kafka message events it contains information from the receipt that created the event Adheres to https://github.com/cloudevents/spec 1.0.1
func DecodeFromJSON ¶
DecodeFromJSON returns an Event from JSON
func NewEventReceiver ¶
func NewEventReceiver(e storage.EventReceiver) Message
NewEventReceiver returns a Message
func NewEventReceiverGroupComplete ¶
func NewEventReceiverGroupComplete(e storage.Event, erg storage.EventReceiverGroup) Message
NewEventReceiverGroupComplete returns a message
func NewEventReceiverGroupCreated ¶
func NewEventReceiverGroupCreated(e storage.EventReceiverGroup) Message
NewEventReceiverGroupCreated returns a Message
func NewEventReceiverGroupModified ¶
func NewEventReceiverGroupModified(e storage.EventReceiverGroup) Message
NewEventReceiverGroupModified returns a Message
type Producer ¶
type Producer interface { Async(string, interface{}) Send(string, interface{}) error ConsumeSuccesses() ConsumeErrors() Close() error }
Producer defines an interface for producing events
type SASLAuthentication ¶
func GetSASLAuthentication ¶
func GetSASLAuthentication() *SASLAuthentication
GetSASLCredentials returns SASLCredentials struct
func (*SASLAuthentication) SASLEnabled ¶
func (s *SASLAuthentication) SASLEnabled() bool
SASLEnabled returns true if SASL_USERNAME, SASL_PASSWORD, and a SASL_MECHANISM are set
type SCRAMClient ¶
type SCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
SCRAMClient client for doing SCRAM authentication through sarama. This implementation was taken from the sarama examples here: https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go
func (*SCRAMClient) Begin ¶
func (s *SCRAMClient) Begin(user, pass, authzID string) error
Begin begins SCRAM authentication
type TopicProducer ¶
func NewTopicProducer ¶
func NewTopicProducer(p Producer, topic string) TopicProducer
NewTopicProducer wraps the given producer into an interface for sending messages without knowledge of message-bus details, such as topic