Documentation
¶
Overview ¶
Package eio implements extensible, async io backends. Many of these provide batch operations, and you're completely free to add your own.
Producers ¶
The following producers exist. Each one has a corresponding class, with documentation below for arguments (in the Args field).
All values are case-insensitive.
"Blackhole" Blackhole "File" FileProducer "HTTP" HTTPProducer "Stderr" OutOutput "Stdout" OutOutput "TestLog" TestLogProducer
Index ¶
- Variables
- func RegisterConsumer(name string, nc MakeConsumer)
- func RegisterProducer(name string, np MakeProducer)
- type Args
- type Blackhole
- type Consumer
- type ErrorProducer
- type FileProducer
- type HTTPProducer
- type LocalConsumer
- type LocalProducer
- type MakeConsumer
- type MakeProducer
- type OutProducer
- type Producer
- type TestLogProducer
- type TopicProducer
Constants ¶
This section is empty.
Variables ¶
var ( // ClosedErrCh should be returned from Errs() when no errors can be // produced. This ensures that any receiver immediately returns and doesn't // block forever. ClosedErrCh chan error )
Functions ¶
func RegisterConsumer ¶
func RegisterConsumer(name string, nc MakeConsumer)
RegisterConsumer registers a Consumer for use. Names are case insensitive.
func RegisterProducer ¶
func RegisterProducer(name string, np MakeProducer)
RegisterProducer registers a Producer for use. Names are case insensitive.
If your producer implements TopicProducer, it will automatically be made available.
Types ¶
type Args ¶
type Args map[string]interface{}
Args can be used to configure things that don't have mappings until checking other options
type Blackhole ¶
type Blackhole struct{}
Blackhole drops everything and returns nil slices
type Consumer ¶
type Consumer interface { // Get messages from here. Next() ([]byte, error) // Tear the consumer down and wait for it to exit. // // If you use NewConsumer(), this is automatically called when the object // is GCd. Close() cog.Errors }
A Consumer reads messages
type ErrorProducer ¶
type ErrorProducer struct {
// contains filtered or unexported fields
}
An ErrorProducer only ever returns errors. It's useful for testing error handling.
func (*ErrorProducer) Close ¶
func (p *ErrorProducer) Close() (errs cog.Errors)
Close implements Producer.Close
func (*ErrorProducer) Errs ¶
func (p *ErrorProducer) Errs() <-chan error
Errs implements Producer.Errs
func (*ErrorProducer) Produce ¶
func (p *ErrorProducer) Produce([]byte)
Produce implements Producer.Produce
func (*ErrorProducer) Rotate ¶
func (*ErrorProducer) Rotate() error
Rotate implements Producer.Rotate
type FileProducer ¶
type FileProducer struct { Args struct { // Path of file to write to Path string } // contains filtered or unexported fields }
FileProducer writes each message as a single line to the given file
func (*FileProducer) Close ¶
func (p *FileProducer) Close() (es cog.Errors)
Close implements Producer.Close
func (*FileProducer) Errs ¶
func (p *FileProducer) Errs() <-chan error
Errs implements Producer.Errs
func (*FileProducer) Produce ¶
func (p *FileProducer) Produce(b []byte)
Produce implements Producer.Produce
func (*FileProducer) Rotate ¶
func (p *FileProducer) Rotate() error
Rotate implements Producer.Rotate
type HTTPProducer ¶
type HTTPProducer struct { Args struct { Servers []string // Backend servers to balance amongst (may include scheme) Endpoint string // Path to POST to Retries uint // How many times to retry failed requests BatchSize uint // How large a batch may grow before flushing BatchDelay ctime.HumanDuration // How long to wait before forcing a flush (0 = forever) InitialRetryDelay ctime.HumanDuration // Time to wait when first request fails MaxRetryBackoff ctime.HumanDuration // Max duration to wait when retrying // Some servers don't know how to handle Chunked bodies. DisableChunked bool // MimeType to use instead of "application/octet-stream". MimeType string } // contains filtered or unexported fields }
HTTPProducer implements batched POSTing. It allows for load balancing amongst a number of backend servers with automatic retries.
func (*HTTPProducer) Close ¶
func (p *HTTPProducer) Close() (es cog.Errors)
Close implements Producer.Close
func (*HTTPProducer) Errs ¶
func (p *HTTPProducer) Errs() <-chan error
Errs implements Producer.Errs
func (*HTTPProducer) Produce ¶
func (p *HTTPProducer) Produce(b []byte)
Produce implements Producer.Produce
func (*HTTPProducer) Rotate ¶
func (p *HTTPProducer) Rotate() error
Rotate implements Producer.Rotate
type LocalConsumer ¶
type LocalConsumer struct { Args struct { // Name of the local topic to pull from Topic string } // contains filtered or unexported fields }
LocalConsumer works on the other side of LocalProducer, consuming local messages
func (*LocalConsumer) Close ¶
func (c *LocalConsumer) Close() (es cog.Errors)
Close implements Consumer.Close
func (*LocalConsumer) Next ¶
func (c *LocalConsumer) Next() ([]byte, error)
Next implements Consumer.Next
type LocalProducer ¶
type LocalProducer struct { Args struct { // Name of the local topic to push to Topic string } // contains filtered or unexported fields }
LocalProducer is a channel-backed producer that works locally
func (*LocalProducer) Close ¶
func (p *LocalProducer) Close() cog.Errors
Close implements Producer.Close
func (*LocalProducer) Errs ¶
func (p *LocalProducer) Errs() <-chan error
Errs implements Producer.Errs
func (*LocalProducer) Produce ¶
func (p *LocalProducer) Produce(b []byte)
Produce implements Producer.Produce
func (*LocalProducer) ProduceTo ¶
func (p *LocalProducer) ProduceTo(topic string, b []byte)
ProduceTo implements TopicProducer.ProduceTo
func (*LocalProducer) Rotate ¶
func (p *LocalProducer) Rotate() error
Rotate implements Producer.Rotate
type MakeConsumer ¶
MakeConsumer creates a new Consumer
type MakeProducer ¶
MakeProducer creates a new Producer
type OutProducer ¶
type OutProducer struct {
// contains filtered or unexported fields
}
OutProducer writes to either stdout or stderr, depending on if you create a producer with name "stdout" or "stderr".
func (*OutProducer) Close ¶
func (p *OutProducer) Close() (es cog.Errors)
Close implements Producer.Close
func (*OutProducer) Produce ¶
func (p *OutProducer) Produce(b []byte)
Produce implements Producer.Produce
type Producer ¶
type Producer interface { // Send everything here Produce(b []byte) // Select on this, otherwise a producer might block Errs() <-chan error // Some producers rotate (files). Call this to get them to reopen any // underlying files. // // The error is returned here so that logrotate (and friends) can quickly // determine if rotation succeeded. Rotate() error // Tear the producer down and flush any pending messages. // // If you use NewProducer(), this is automatically called when the object // is GCd. Close() cog.Errors }
A Producer writes messages
type TestLogProducer ¶
type TestLogProducer struct {
// contains filtered or unexported fields
}
TestLogProducer writes each to the test log. This producer MUST receive a single argument, "log", which includes the test's log (typically the t in `t *testing.T`).
func (*TestLogProducer) Close ¶
func (*TestLogProducer) Close() (errs cog.Errors)
Close implements Producer.Close
func (*TestLogProducer) Errs ¶
func (*TestLogProducer) Errs() <-chan error
Errs implements Producer.Errs
func (*TestLogProducer) Produce ¶
func (p *TestLogProducer) Produce(b []byte)
Produce implements Producer.Produce
func (*TestLogProducer) Rotate ¶
func (*TestLogProducer) Rotate() error
Rotate implements Producer.Rotate
type TopicProducer ¶
type TopicProducer interface { Producer // Send to the given topic ProduceTo(topic string, b []byte) }
TopicProducer is just like Producer, except it allows you to send events to specific topics.
func NewTopicProducer ¶
func NewTopicProducer(name string, args Args) (TopicProducer, error)
NewTopicProducer creates a new producer, provided that the producer implements TopicProducer.