pubsub

package
v0.0.0-...-1747e0c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2015 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package pubsub contains two generic interfaces for publishing data to queues and subscribing and consuming data from those queues.

// Publisher is a generic interface to encapsulate how we want our publishers
// to behave. Until we find reason to change, we're forcing all publishers
// to emit protobufs.
type Publisher interface {
    // Publish will publish a message.
    Publish(string, proto.Message) error
    // Publish will publish a []byte message.
    PublishRaw(string, []byte) error
}

// Subscriber is a generic interface to encapsulate how we want our subscribers
// to behave. For now the system will auto stop if it encounters any errors. If
// a user encounters a closed channel, they should check the Err() method to see
// what happened.
type Subscriber interface {
    // Start will return a channel of raw messages
    Start() <-chan SubscriberMessage
    // Err will contain any errors returned from the consumer connection.
    Err() error
    // Stop will initiate a graceful shutdown of the subscriber connection
    Stop() error
}

Where a `SubscriberMessage` is an interface that gives implementations a hook for acknowledging/delete messages. Take a look at the docs for each implementation in `pubsub` to see how they behave.

There are currently 2 implementations of each type of `pubsub` interfaces:

For pubsub via Amazon's SNS/SQS, you can use the `SNSPublisher` and the `SQSSubscriber`.

For pubsub via Kafka topics, you can use the `KakfaPublisher` and the `KafkaSubscriber`.

Index

Constants

This section is empty.

Variables

View Source
var (
	// KafkaRequiredAcks will be used in Kafka configs
	// to set the 'RequiredAcks' value.
	KafkaRequiredAcks = sarama.WaitForAll
)
View Source
var Log = logrus.New()

Log is the structured logger used throughout the package.

Functions

func GetKafkaPartitions

func GetKafkaPartitions(brokerHosts []string, topic string) (partitions []int32, err error)

GetKafkaPartitions is a helper function to look up which partitions are available via the given brokers for the given topic. This should be called only on startup.

Types

type KafkaPublisher

type KafkaPublisher struct {
	// contains filtered or unexported fields
}

KafkaPublisher is an experimental publisher that provides an implementation for Kafka using the Shopify/sarama library.

func NewKafkaPublisher

func NewKafkaPublisher(cfg *config.Kafka) (*KafkaPublisher, error)

NewKafkaPublisher will initiate a new experimental Kafka publisher.

func (*KafkaPublisher) Publish

func (p *KafkaPublisher) Publish(key string, m proto.Message) error

Publish will marshal the proto message and emit it to the Kafka topic.

func (*KafkaPublisher) PublishRaw

func (p *KafkaPublisher) PublishRaw(key string, m []byte) error

PublishRaw will emit the byte array to the Kafka topic.

func (*KafkaPublisher) Stop

func (p *KafkaPublisher) Stop() error

Stop will close the pub connection.

type KafkaSubMessage

type KafkaSubMessage struct {
	// contains filtered or unexported fields
}

KafkaSubMessage is an SubscriberMessage implementation that will broadcast the message's offset when Done().

func (*KafkaSubMessage) Done

func (m *KafkaSubMessage) Done() error

Done will emit the message's offset.

func (*KafkaSubMessage) Message

func (m *KafkaSubMessage) Message() []byte

Message will return the message payload.

type KafkaSubscriber

type KafkaSubscriber struct {
	// contains filtered or unexported fields
}

KafkaSubscriber is an experimental subscriber implementation for Kafka. It is only capable of consuming a single partition so multiple may be required depending on your setup.

func NewKafkaSubscriber

func NewKafkaSubscriber(cfg *config.Kafka, offsetProvider func() int64, offsetBroadcast func(int64)) (*KafkaSubscriber, error)

NewKafkaSubscriber will initiate a the experimental Kafka consumer.

func (*KafkaSubscriber) Err

func (s *KafkaSubscriber) Err() error

Err will contain any errors that occurred during consumption. This method should be checked after a user encounters a closed channel.

func (*KafkaSubscriber) Start

func (s *KafkaSubscriber) Start() <-chan SubscriberMessage

Start will start consuming message on the Kafka topic partition and emit any messages to the returned channel. On start up, it will call the offset func provider to the subscriber to lookup the offset to start at. If it encounters any issues, it will populate the Err() error and close the returned channel.

func (*KafkaSubscriber) Stop

func (s *KafkaSubscriber) Stop() error

Stop willablock until the consumer has stopped consuming messages and return any errors seen on consumer close.

type Publisher

type Publisher interface {
	// Publish will publish a message.
	Publish(string, proto.Message) error
	// Publish will publish a raw byte array as a message.
	PublishRaw(string, []byte) error
}

Publisher is a generic interface to encapsulate how we want our publishers to behave. Until we find reason to change, we're forcing all pubslishers to emit protobufs.

type SNSPublisher

type SNSPublisher struct {
	// contains filtered or unexported fields
}

SNSPublisher will accept AWS credentials and an SNS topic name and it will emit any publish events to it.

func NewSNSPublisher

func NewSNSPublisher(cfg *config.SNS) (*SNSPublisher, error)

NewSNSPublisher will initiate the SNS client.

func (*SNSPublisher) Publish

func (p *SNSPublisher) Publish(key string, m proto.Message) error

Publish will marshal the proto message and emit it to the SNS topic. The key will be used as the SNS message subject.

func (*SNSPublisher) PublishRaw

func (p *SNSPublisher) PublishRaw(key string, m []byte) error

PublishRaw will emit the byte array to the SNS topic. The key will be used as the SNS message subject.

type SQSMessage

type SQSMessage struct {
	// contains filtered or unexported fields
}

SQSMessage is the SQS implementation of `SubscriberMessage`.

func (*SQSMessage) Done

func (m *SQSMessage) Done() error

Done will queue up a message to be deleted. By default, the `SQSDeleteBufferSize` will be 0, so this will block until the message has been deleted.

func (*SQSMessage) Message

func (m *SQSMessage) Message() []byte

Message will decode protobufed message bodies and simply return a byte slice containing the message body for all others types.

type SQSSubscriber

type SQSSubscriber struct {
	// contains filtered or unexported fields
}

SQSSubscriber is an SQS client that allows a user to consume messages via the pubsub.Subscriber interface.

func NewSQSSubscriber

func NewSQSSubscriber(cfg *config.SQS) (*SQSSubscriber, error)

NewSQSSubscriber will initiate a new Decrypter for the subscriber if a key file is provided. It will also fetch the SQS Queue Url and set up the SQS client.

func (*SQSSubscriber) Err

func (s *SQSSubscriber) Err() error

Err will contain any errors that occurred during consumption. This method should be checked after a user encounters a closed channel.

func (*SQSSubscriber) Start

func (s *SQSSubscriber) Start() <-chan SubscriberMessage

Start will start consuming messages on the SQS queue and emit any messages to the returned channel. If it encounters any issues, it will populate the Err() error and close the returned channel.

func (*SQSSubscriber) Stop

func (s *SQSSubscriber) Stop() error

Stop will block until the consumer has stopped consuming messages.

type Subscriber

type Subscriber interface {
	// Start will return a channel of raw messages.
	Start() <-chan SubscriberMessage
	// Err will contain any errors returned from the consumer connection.
	Err() error
	// Stop will initiate a graceful shutdown of the subscriber connection.
	Stop() error
}

Subscriber is a generic interface to encapsulate how we want our subscribers to behave. For now the system will auto stop if it encounters any errors. If a user encounters a closed channel, they should check the Err() method to see what happened.

type SubscriberMessage

type SubscriberMessage interface {
	Message() []byte
	Done() error
}

SubscriberMessage is a struct to encapsulate subscriber messages and provide a mechanism for acknowledging messages _after_ they've been processed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL