Documentation ¶
Index ¶
- Variables
- type Empty
- type Firehose
- func (p *Firehose) Close() error
- func (p *Firehose) CloseSync() error
- func (p *Firehose) Errors() <-chan error
- func (p *Firehose) Init() (Producer, error)
- func (p *Firehose) InitC(stream, _, _, accessKey, secretKey, region string, concurrency int) (Producer, error)
- func (p *Firehose) InitCWithEndpoint(stream, _, _, accessKey, secretKey, region string, concurrency int, ...) (Producer, error)
- func (p *Firehose) IsProducing() (isProducing bool)
- func (p *Firehose) Messages() <-chan *Message
- func (p *Firehose) NewEndpoint(endpoint, stream string) (err error)
- func (p *Firehose) ReInit()
- func (p *Firehose) Send(msg *Message)
- func (p *Firehose) SendSync(msg *Message)
- func (p *Firehose) TryToSend(msg *Message) error
- type KinesisProducer
- func (p *KinesisProducer) Close() error
- func (p *KinesisProducer) CloseSync() error
- func (p *KinesisProducer) Errors() <-chan error
- func (p *KinesisProducer) Init() (Producer, error)
- func (p *KinesisProducer) InitC(stream, shard, shardIterType, accessKey, secretKey, region string, ...) (Producer, error)
- func (p *KinesisProducer) InitCWithEndpoint(stream, shard, shardIterType, accessKey, secretKey, region string, ...) (Producer, error)
- func (p *KinesisProducer) IsProducing() bool
- func (p *KinesisProducer) Messages() <-chan *Message
- func (p *KinesisProducer) NewEndpoint(endpoint, stream string) (err error)
- func (p *KinesisProducer) ReInit()
- func (p *KinesisProducer) Send(msg *Message)
- func (p *KinesisProducer) SendSync(msg *Message)
- func (p *KinesisProducer) TryToSend(msg *Message) error
- type Listener
- func (l *Listener) Close() error
- func (l *Listener) CloseSync() error
- func (l *Listener) Errors() <-chan error
- func (l *Listener) GetClient() *kinesisiface.KinesisAPI
- func (l *Listener) Init() (*Listener, error)
- func (l *Listener) InitC(stream, shard, shardIterType, accessKey, secretKey, region string, ...) (*Listener, error)
- func (l *Listener) InitCWithEndpoint(stream, shard, shardIterType, accessKey, secretKey, region string, ...) (*Listener, error)
- func (l *Listener) InitWithStartTime(stream, shard, shardIterType, accessKey, secretKey, region string, ...) (*Listener, error)
- func (l *Listener) IsConsuming() bool
- func (l *Listener) IsListening() bool
- func (l *Listener) Listen(fn msgFn)
- func (l *Listener) NewEndpoint(endpoint, stream string) (err error)
- func (l *Listener) ReInit()
- func (l *Listener) Retrieve() (*Message, error)
- func (l *Listener) RetrieveFn(fn msgFn)
- type Message
- type Producer
Constants ¶
This section is empty.
Variables ¶
var ( // HTTPTimeout is the timeout for http for requests. HTTPTimeout = 60 * time.Second // DialTimeout is the timeout for the tpc dial. DialTimeout = 5 * time.Second // HandShakeTimeout is the timeout for the tpc handshake. HandShakeTimeout = 5 * time.Second )
var ( // ErrNullStream represents an error where the stream was not specified ErrNullStream = errors.New("A stream must be specified") // ErrNotActive represents an error where the stream is not ready for processing ErrNotActive = errors.New("The Stream is not yet active") )
var ( // ErrThroughputExceeded represents an error when the Kinesis throughput has been exceeded ErrThroughputExceeded = errors.New("Configured AWS Kinesis throughput has been exceeded") // ErrKinesisFailure represents a generic internal AWS Kinesis error ErrKinesisFailure = errors.New("AWS Kinesis internal failure") // ErrBadConcurrency represents an error when the provided concurrency value is invalid ErrBadConcurrency = errors.New("Concurrency must be greater than zero") // ErrDroppedMessage represents an error the channel is full and messages are being dropped ErrDroppedMessage = errors.New("Channel is full, dropped message") )
var ErrMetaAuthentication = errors.New("Authentication error: failed to auth from meta. Your IAM roles are bad, or you need to specify an AccessKey and SecretKey")
ErrMetaAuthentication represents an error that occurred on authentication from meta
var ErrNilShardIterator = errors.New("Nil shard iterator")
ErrNilShardIterator is an error for when we get back a nil shard iterator
var ErrNilShardStatus = errors.New("Nil stream status")
ErrNilShardStatus is an error for when we get back a nil stream status
var ( // ShardIterTypes are the types of iterators to use within Kinesis ShardIterTypes shardIteratorTypes = map[int]string{ // contains filtered or unexported fields } )
Functions ¶
This section is empty.
Types ¶
type Empty ¶
type Empty struct{}
Empty is an empty struct. It is mostly used for counting semaphore purposes
type Firehose ¶
type Firehose struct {
// contains filtered or unexported fields
}
Firehose is a Producer
func (*Firehose) InitC ¶
func (p *Firehose) InitC(stream, _, _, accessKey, secretKey, region string, concurrency int) (Producer, error)
InitC initializes a producer for Kinesis Firehose with the specified params
func (*Firehose) InitCWithEndpoint ¶
func (p *Firehose) InitCWithEndpoint(stream, _, _, accessKey, secretKey, region string, concurrency int, endpoint string) (Producer, error)
InitCWithEndpoint initializes a producer for Kinesis Firehose with the specified params
func (*Firehose) IsProducing ¶
IsProducing returns true if Firehose is producing otherwise false
func (*Firehose) NewEndpoint ¶
NewEndpoint switches the endpoint of the firehose stream. This is useful for testing.
func (*Firehose) ReInit ¶
func (p *Firehose) ReInit()
ReInit re-initializes the shard iterator. Used with conjucntion with NewEndpoint
type KinesisProducer ¶
type KinesisProducer struct {
// contains filtered or unexported fields
}
KinesisProducer keeps a queue of messages on a channel and continually attempts to POST the records using the PutRecords method. If the messages were not sent successfully they are placed back on the queue to retry
func (*KinesisProducer) Close ¶
func (p *KinesisProducer) Close() error
Close stops queuing and producing and waits for all tasks to finish
func (*KinesisProducer) CloseSync ¶
func (p *KinesisProducer) CloseSync() error
CloseSync closes the Producer in a syncronous manner.
func (*KinesisProducer) Errors ¶
func (p *KinesisProducer) Errors() <-chan error
Errors gets the current number of errors on the Producer
func (*KinesisProducer) Init ¶
func (p *KinesisProducer) Init() (Producer, error)
Init initializes a producer with the params specified in the configuration file
func (*KinesisProducer) InitC ¶
func (p *KinesisProducer) InitC(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (Producer, error)
InitC initializes a producer with the specified configuration: stream, shard, shard-iter-type, access-key, secret-key, and region
func (*KinesisProducer) InitCWithEndpoint ¶
func (p *KinesisProducer) InitCWithEndpoint(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int, endpoint string) (Producer, error)
InitCWithEndpoint initializes a producer with the specified configuration: stream, shard, shard-iter-type, access-key, secret-key, and region
func (*KinesisProducer) IsProducing ¶
func (p *KinesisProducer) IsProducing() bool
IsProducing identifies whether or not the messages are queued for POSTing to the stream
func (*KinesisProducer) Messages ¶
func (p *KinesisProducer) Messages() <-chan *Message
Messages gets the current message channel from the producer
func (*KinesisProducer) NewEndpoint ¶
func (p *KinesisProducer) NewEndpoint(endpoint, stream string) (err error)
NewEndpoint re-initializes kinesis client with new endpoint. Used for testing with kinesalite
func (*KinesisProducer) ReInit ¶
func (p *KinesisProducer) ReInit()
ReInit re-initializes the shard iterator. Used with conjucntion with NewEndpoint
func (*KinesisProducer) Send ¶
func (p *KinesisProducer) Send(msg *Message)
Send a message to the queue for POSTing
func (*KinesisProducer) SendSync ¶
func (p *KinesisProducer) SendSync(msg *Message)
SendSync - blocking send. TODO: we might want a version of this that can be fed a timeout value and listen for cancellation channels. This version is for simplicity.
func (*KinesisProducer) TryToSend ¶
func (p *KinesisProducer) TryToSend(msg *Message) error
TryToSend tries to send the message, but if the channel is full it drops the message, and returns an error.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener represents a kinesis listener
func (*Listener) GetClient ¶
func (l *Listener) GetClient() *kinesisiface.KinesisAPI
GetClient returns the client
func (*Listener) Init ¶
Init initializes a listener with the params specified in the configuration file
func (*Listener) InitC ¶
func (l *Listener) InitC(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (*Listener, error)
InitC initialize a listener with the supplied params
func (*Listener) InitCWithEndpoint ¶
func (l *Listener) InitCWithEndpoint(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int, endpoint string) (*Listener, error)
InitCWithEndpoint initialize a listener with the supplied params
func (*Listener) InitWithStartTime ¶
func (l *Listener) InitWithStartTime(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int, startTime *time.Time) (*Listener, error)
InitWithStartTime initializes a listener with start time
func (*Listener) IsConsuming ¶
IsConsuming identifies whether or not the kinesis stream is being polled
func (*Listener) IsListening ¶
IsListening identifies whether or not messages and errors are being handled after consumption
func (*Listener) Listen ¶
func (l *Listener) Listen(fn msgFn)
Listen handles the consumed messages, errors and interrupts
func (*Listener) NewEndpoint ¶
NewEndpoint re-initializes kinesis client with new endpoint. Used for testing with kinesalite
func (*Listener) ReInit ¶
func (l *Listener) ReInit()
ReInit re-initializes the shard iterator. Used with conjucntion with NewEndpoint
func (*Listener) RetrieveFn ¶
func (l *Listener) RetrieveFn(fn msgFn)
RetrieveFn retrieves a message from the queue and apply the supplied function to the message
type Message ¶
type Message struct {
awsKinesis.Record
}
Message represents an item on the Kinesis stream
type Producer ¶
type Producer interface { Init() (Producer, error) InitC(stream, shard, shardIterType, accessKey, secretKey, region string, concurrency int) (Producer, error) NewEndpoint(endpoint, stream string) (err error) ReInit() IsProducing() bool Send(msg *Message) SendSync(msg *Message) TryToSend(msg *Message) error Close() error CloseSync() error }
Producer is an interface for sending messages to a stream of some sort.