Documentation ¶
Overview ¶
Package pubsub contains two generic interfaces for publishing data to queues and subscribing and consuming data from those queues.
// Publisher is a generic interface to encapsulate how we want our publishers // to behave. Until we find reason to change, we're forcing all publishers // to emit protobufs. type Publisher interface { // Publish will publish a message. Publish(string, proto.Message) error // Publish will publish a []byte message. PublishRaw(string, []byte) error } // Subscriber is a generic interface to encapsulate how we want our subscribers // to behave. For now the system will auto stop if it encounters any errors. If // a user encounters a closed channel, they should check the Err() method to see // what happened. type Subscriber interface { // Start will return a channel of raw messages Start() <-chan SubscriberMessage // Err will contain any errors returned from the consumer connection. Err() error // Stop will initiate a graceful shutdown of the subscriber connection Stop() error }
Where a `SubscriberMessage` is an interface that gives implementations a hook for acknowledging/delete messages. Take a look at the docs for each implementation in `pubsub` to see how they behave.
There are currently 2 implementations of each type of `pubsub` interfaces:
For pubsub via Amazon's SNS/SQS, you can use the `SNSPublisher` and the `SQSSubscriber`.
For pubsub via Kafka topics, you can use the `KakfaPublisher` and the `KafkaSubscriber`.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // KafkaRequiredAcks will be used in Kafka configs // to set the 'RequiredAcks' value. KafkaRequiredAcks = sarama.WaitForAll )
var Log = logrus.New()
Log is the structured logger used throughout the package.
Functions ¶
Types ¶
type KafkaPublisher ¶
type KafkaPublisher struct {
// contains filtered or unexported fields
}
KafkaPublisher is an experimental publisher that provides an implementation for Kafka using the Shopify/sarama library.
func NewKafkaPublisher ¶
func NewKafkaPublisher(cfg *config.Kafka) (*KafkaPublisher, error)
NewKafkaPublisher will initiate a new experimental Kafka publisher.
func (*KafkaPublisher) Publish ¶
func (p *KafkaPublisher) Publish(key string, m proto.Message) error
Publish will marshal the proto message and emit it to the Kafka topic.
func (*KafkaPublisher) PublishRaw ¶
func (p *KafkaPublisher) PublishRaw(key string, m []byte) error
PublishRaw will emit the byte array to the Kafka topic.
func (*KafkaPublisher) Stop ¶
func (p *KafkaPublisher) Stop() error
Stop will close the pub connection.
type KafkaSubMessage ¶
type KafkaSubMessage struct {
// contains filtered or unexported fields
}
KafkaSubMessage is an SubscriberMessage implementation that will broadcast the message's offset when Done().
func (*KafkaSubMessage) Done ¶
func (m *KafkaSubMessage) Done() error
Done will emit the message's offset.
func (*KafkaSubMessage) Message ¶
func (m *KafkaSubMessage) Message() []byte
Message will return the message payload.
type KafkaSubscriber ¶
type KafkaSubscriber struct {
// contains filtered or unexported fields
}
KafkaSubscriber is an experimental subscriber implementation for Kafka. It is only capable of consuming a single partition so multiple may be required depending on your setup.
func NewKafkaSubscriber ¶
func NewKafkaSubscriber(cfg *config.Kafka, offsetProvider func() int64, offsetBroadcast func(int64)) (*KafkaSubscriber, error)
NewKafkaSubscriber will initiate a the experimental Kafka consumer.
func (*KafkaSubscriber) Err ¶
func (s *KafkaSubscriber) Err() error
Err will contain any errors that occurred during consumption. This method should be checked after a user encounters a closed channel.
func (*KafkaSubscriber) Start ¶
func (s *KafkaSubscriber) Start() <-chan SubscriberMessage
Start will start consuming message on the Kafka topic partition and emit any messages to the returned channel. On start up, it will call the offset func provider to the subscriber to lookup the offset to start at. If it encounters any issues, it will populate the Err() error and close the returned channel.
func (*KafkaSubscriber) Stop ¶
func (s *KafkaSubscriber) Stop() error
Stop willablock until the consumer has stopped consuming messages and return any errors seen on consumer close.
type Publisher ¶
type Publisher interface { // Publish will publish a message. Publish(string, proto.Message) error // Publish will publish a raw byte array as a message. PublishRaw(string, []byte) error }
Publisher is a generic interface to encapsulate how we want our publishers to behave. Until we find reason to change, we're forcing all pubslishers to emit protobufs.
type SNSPublisher ¶
type SNSPublisher struct {
// contains filtered or unexported fields
}
SNSPublisher will accept AWS credentials and an SNS topic name and it will emit any publish events to it.
func NewSNSPublisher ¶
func NewSNSPublisher(cfg *config.SNS) (*SNSPublisher, error)
NewSNSPublisher will initiate the SNS client. If no credentials are passed in with the config, the publisher is instantiated with the AWS_ACCESS_KEY and the AWS_SECRET_KEY environment variables.
func (*SNSPublisher) Publish ¶
func (p *SNSPublisher) Publish(key string, m proto.Message) error
Publish will marshal the proto message and emit it to the SNS topic. The key will be used as the SNS message subject.
func (*SNSPublisher) PublishRaw ¶
func (p *SNSPublisher) PublishRaw(key string, m []byte) error
PublishRaw will emit the byte array to the SNS topic. The key will be used as the SNS message subject.
type SQSMessage ¶
type SQSMessage struct {
// contains filtered or unexported fields
}
SQSMessage is the SQS implementation of `SubscriberMessage`.
func (*SQSMessage) Done ¶
func (m *SQSMessage) Done() error
Done will queue up a message to be deleted. By default, the `SQSDeleteBufferSize` will be 0, so this will block until the message has been deleted.
func (*SQSMessage) Message ¶
func (m *SQSMessage) Message() []byte
Message will decode protobufed message bodies and simply return a byte slice containing the message body for all others types.
type SQSSubscriber ¶
type SQSSubscriber struct {
// contains filtered or unexported fields
}
SQSSubscriber is an SQS client that allows a user to consume messages via the pubsub.Subscriber interface.
func NewSQSSubscriber ¶
func NewSQSSubscriber(cfg *config.SQS) (*SQSSubscriber, error)
NewSQSSubscriber will initiate a new Decrypter for the subscriber if a key file is provided. It will also fetch the SQS Queue Url and set up the SQS client.
func (*SQSSubscriber) Err ¶
func (s *SQSSubscriber) Err() error
Err will contain any errors that occurred during consumption. This method should be checked after a user encounters a closed channel.
func (*SQSSubscriber) Start ¶
func (s *SQSSubscriber) Start() <-chan SubscriberMessage
Start will start consuming messages on the SQS queue and emit any messages to the returned channel. If it encounters any issues, it will populate the Err() error and close the returned channel.
func (*SQSSubscriber) Stop ¶
func (s *SQSSubscriber) Stop() error
Stop will block until the consumer has stopped consuming messages.
type Subscriber ¶
type Subscriber interface { // Start will return a channel of raw messages. Start() <-chan SubscriberMessage // Err will contain any errors returned from the consumer connection. Err() error // Stop will initiate a graceful shutdown of the subscriber connection. Stop() error }
Subscriber is a generic interface to encapsulate how we want our subscribers to behave. For now the system will auto stop if it encounters any errors. If a user encounters a closed channel, they should check the Err() method to see what happened.
type SubscriberMessage ¶
SubscriberMessage is a struct to encapsulate subscriber messages and provide a mechanism for acknowledging messages _after_ they've been processed.