eio

package
v0.0.0-...-f5c3557 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2018 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package eio implements extensible, async io backends. Many of these provide batch operations, and you're completely free to add your own.

Producers

The following producers exist. Each one has a corresponding class, with documentation below for arguments (in the Args field).

All values are case-insensitive.

"Blackhole"  Blackhole
"File"       FileProducer
"HTTP"       HTTPProducer
"Stderr"     OutOutput
"Stdout"     OutOutput
"TestLog"    TestLogProducer

Index

Constants

This section is empty.

Variables

View Source
var (
	// ClosedErrCh should be returned from Errs() when no errors can be
	// produced. This ensures that any receiver immediately returns and doesn't
	// block forever.
	ClosedErrCh chan error
)

Functions

func RegisterConsumer

func RegisterConsumer(name string, nc MakeConsumer)

RegisterConsumer registers a Consumer for use. Names are case insensitive.

func RegisterProducer

func RegisterProducer(name string, np MakeProducer)

RegisterProducer registers a Producer for use. Names are case insensitive.

If your producer implements TopicProducer, it will automatically be made available.

Types

type Args

type Args map[string]interface{}

Args can be used to configure things that don't have mappings until checking other options

func (Args) ApplyTo

func (a Args) ApplyTo(i interface{}) (err error)

ApplyTo unmarshals the options into the given interface for simpler configuration.

type Blackhole

type Blackhole struct{}

Blackhole drops everything and returns nil slices

func (Blackhole) Close

func (Blackhole) Close() cog.Errors

Close implements Consumer/Producer.Close

func (Blackhole) Errs

func (Blackhole) Errs() <-chan error

Errs implements Producer.Errs

func (Blackhole) Next

func (Blackhole) Next() ([]byte, error)

Next implements Consumer.Next

func (Blackhole) Produce

func (Blackhole) Produce([]byte)

Produce implements Producer.Produce

func (Blackhole) Rotate

func (Blackhole) Rotate() error

Rotate implements Producer.Rotate

type Consumer

type Consumer interface {
	// Get messages from here.
	Next() ([]byte, error)

	// Tear the consumer down and wait for it to exit.
	//
	// If you use NewConsumer(), this is automatically called when the object
	// is GCd.
	Close() cog.Errors
}

A Consumer reads messages

func NewConsumer

func NewConsumer(name string, args Args) (Consumer, error)

NewConsumer creates a new consumer with the given arguments

type ErrorProducer

type ErrorProducer struct {
	// contains filtered or unexported fields
}

An ErrorProducer only ever returns errors. It's useful for testing error handling.

func (*ErrorProducer) Close

func (p *ErrorProducer) Close() (errs cog.Errors)

Close implements Producer.Close

func (*ErrorProducer) Errs

func (p *ErrorProducer) Errs() <-chan error

Errs implements Producer.Errs

func (*ErrorProducer) Produce

func (p *ErrorProducer) Produce([]byte)

Produce implements Producer.Produce

func (*ErrorProducer) Rotate

func (*ErrorProducer) Rotate() error

Rotate implements Producer.Rotate

type FileProducer

type FileProducer struct {
	Args struct {
		// Path of file to write to
		Path string
	}
	// contains filtered or unexported fields
}

FileProducer writes each message as a single line to the given file

func (*FileProducer) Close

func (p *FileProducer) Close() (es cog.Errors)

Close implements Producer.Close

func (*FileProducer) Errs

func (p *FileProducer) Errs() <-chan error

Errs implements Producer.Errs

func (*FileProducer) Produce

func (p *FileProducer) Produce(b []byte)

Produce implements Producer.Produce

func (*FileProducer) Rotate

func (p *FileProducer) Rotate() error

Rotate implements Producer.Rotate

type HTTPProducer

type HTTPProducer struct {
	Args struct {
		Servers    []string            // Backend servers to balance amongst (may include scheme)
		Endpoint   string              // Path to POST to
		Retries    uint                // How many times to retry failed requests
		BatchSize  uint                // How large a batch may grow before flushing
		BatchDelay ctime.HumanDuration // How long to wait before forcing a flush (0 = forever)

		InitialRetryDelay ctime.HumanDuration // Time to wait when first request fails
		MaxRetryBackoff   ctime.HumanDuration // Max duration to wait when retrying

		// Some servers don't know how to handle Chunked bodies.
		DisableChunked bool

		// MimeType to use instead of "application/octet-stream".
		MimeType string
	}
	// contains filtered or unexported fields
}

HTTPProducer implements batched POSTing. It allows for load balancing amongst a number of backend servers with automatic retries.

func (*HTTPProducer) Close

func (p *HTTPProducer) Close() (es cog.Errors)

Close implements Producer.Close

func (*HTTPProducer) Errs

func (p *HTTPProducer) Errs() <-chan error

Errs implements Producer.Errs

func (*HTTPProducer) Produce

func (p *HTTPProducer) Produce(b []byte)

Produce implements Producer.Produce

func (*HTTPProducer) Rotate

func (p *HTTPProducer) Rotate() error

Rotate implements Producer.Rotate

type LocalConsumer

type LocalConsumer struct {
	Args struct {
		// Name of the local topic to pull from
		Topic string
	}
	// contains filtered or unexported fields
}

LocalConsumer works on the other side of LocalProducer, consuming local messages

func (*LocalConsumer) Close

func (c *LocalConsumer) Close() (es cog.Errors)

Close implements Consumer.Close

func (*LocalConsumer) Next

func (c *LocalConsumer) Next() ([]byte, error)

Next implements Consumer.Next

type LocalProducer

type LocalProducer struct {
	Args struct {
		// Name of the local topic to push to
		Topic string
	}
	// contains filtered or unexported fields
}

LocalProducer is a channel-backed producer that works locally

func (*LocalProducer) Close

func (p *LocalProducer) Close() cog.Errors

Close implements Producer.Close

func (*LocalProducer) Errs

func (p *LocalProducer) Errs() <-chan error

Errs implements Producer.Errs

func (*LocalProducer) Produce

func (p *LocalProducer) Produce(b []byte)

Produce implements Producer.Produce

func (*LocalProducer) ProduceTo

func (p *LocalProducer) ProduceTo(topic string, b []byte)

ProduceTo implements TopicProducer.ProduceTo

func (*LocalProducer) Rotate

func (p *LocalProducer) Rotate() error

Rotate implements Producer.Rotate

type MakeConsumer

type MakeConsumer func(args Args) (Consumer, error)

MakeConsumer creates a new Consumer

type MakeProducer

type MakeProducer func(args Args) (Producer, error)

MakeProducer creates a new Producer

type OutProducer

type OutProducer struct {
	// contains filtered or unexported fields
}

OutProducer writes to either stdout or stderr, depending on if you create a producer with name "stdout" or "stderr".

func (*OutProducer) Close

func (p *OutProducer) Close() (es cog.Errors)

Close implements Producer.Close

func (*OutProducer) Errs

func (p *OutProducer) Errs() <-chan error

Errs implements Producer.Errs

func (*OutProducer) Produce

func (p *OutProducer) Produce(b []byte)

Produce implements Producer.Produce

func (*OutProducer) Rotate

func (p *OutProducer) Rotate() error

Rotate implements Producer.Rotate

type Producer

type Producer interface {
	// Send everything here
	Produce(b []byte)

	// Select on this, otherwise a producer might block
	Errs() <-chan error

	// Some producers rotate (files). Call this to get them to reopen any
	// underlying files.
	//
	// The error is returned here so that logrotate (and friends) can quickly
	// determine if rotation succeeded.
	Rotate() error

	// Tear the producer down and flush any pending messages.
	//
	// If you use NewProducer(), this is automatically called when the object
	// is GCd.
	Close() cog.Errors
}

A Producer writes messages

func NewProducer

func NewProducer(name string, args Args) (Producer, error)

NewProducer creates a new producer with the given arguments

type TestLogProducer

type TestLogProducer struct {
	// contains filtered or unexported fields
}

TestLogProducer writes each to the test log. This producer MUST receive a single argument, "log", which includes the test's log (typically the t in `t *testing.T`).

func (*TestLogProducer) Close

func (*TestLogProducer) Close() (errs cog.Errors)

Close implements Producer.Close

func (*TestLogProducer) Errs

func (*TestLogProducer) Errs() <-chan error

Errs implements Producer.Errs

func (*TestLogProducer) Produce

func (p *TestLogProducer) Produce(b []byte)

Produce implements Producer.Produce

func (*TestLogProducer) Rotate

func (*TestLogProducer) Rotate() error

Rotate implements Producer.Rotate

type TopicProducer

type TopicProducer interface {
	Producer

	// Send to the given topic
	ProduceTo(topic string, b []byte)
}

TopicProducer is just like Producer, except it allows you to send events to specific topics.

func NewTopicProducer

func NewTopicProducer(name string, args Args) (TopicProducer, error)

NewTopicProducer creates a new producer, provided that the producer implements TopicProducer.

Directories

Path Synopsis
Package kafka implements an eio producer and consumer for kafka.
Package kafka implements an eio producer and consumer for kafka.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL