logd

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2020 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package logd contains logd client functionality.

Client implements the logd socket protocol. For application use, Writer and Scanner provide APIs for writing to and reading from the log server.

Writer implements io.Writer, batching input and flushing to the server, either when the configured batch size is reached, or at a configured time interval.

Scanner has a similar API to bufio.Scanner. It can read either from the beginning of the log, or from an offset, keeping track of the current batch offset and message delta as well as calling for more batches as necessary.

Index

Constants

This section is empty.

Variables

View Source
var DefaultBenchConfig = &BenchConfig{
	Verbose: false,
}

DefaultBenchConfig is the default benchmarking configuration

View Source
var DefaultConfig = &Config{
	Verbose:              false,
	Host:                 "127.0.0.1:1774",
	Timeout:              10 * time.Second,
	ConnectTimeout:       -1,
	WriteTimeout:         -1,
	ReadTimeout:          -1,
	Count:                false,
	OutputPath:           "-",
	WaitInterval:         400 * time.Millisecond,
	ConnRetries:          50,
	ConnRetryInterval:    1 * time.Second,
	ConnRetryMaxInterval: 30 * time.Second,
	ConnRetryMultiplier:  2.0,

	BatchSize: 1024 * 64,
	InputPath: "-",

	Limit: 15,
}

DefaultConfig is the default client configuration

View Source
var ErrEmptyBatch = errors.New("attempted to send an empty batch")

ErrEmptyBatch is returned when an empty batch write is attempted.

View Source
var ErrNoState = errors.New("state uninitialized")

ErrNoState should be returned by StatePullers when the state hasn't stored any offset information yet.

View Source
var ErrProcessing = errors.New("message already being processed")
View Source
var ErrStopped = stderrors.New("stopped")

ErrStopped indicates the scanner was stopped

Functions

func IsRetryable

func IsRetryable(err error) bool

IsRetryable returns true if an error can be recovered from.

Types

type Backlog

type Backlog struct {
	Batch *protocol.Batch
	Err   error
}

type Backlogger

type Backlogger interface {
	// Backlog() should return a channel with the desired backfill size. The client
	// should attempt to write failed batches to the channel, but if the channel is
	// full, it will log the error and continue.
	Backlog() chan *Backlog
}

Backlogger saves failed batch writes so they can be attempted later.

type BenchConfig

type BenchConfig struct {
	Verbose bool
}

BenchConfig manages benchmark configuration

type Client

type Client struct {
	net.Conn
	// contains filtered or unexported fields
}

Client represents a connection to the database.

func Dial

func Dial(addr string) (*Client, error)

Dial returns a new instance of *Client with a connected Conn.

func DialConfig

func DialConfig(addr string, conf *Config) (*Client, error)

DialConfig returns a new, configured instance of *Client with a connected Conn.

func New

func New(conf *Config) *Client

New returns a new instance of Client without a net.Conn

func (*Client) Batch

func (c *Client) Batch(batch *protocol.Batch) (uint64, error)

Batch sends a BATCH request and returns the response. Batch does not retry. If you want reconnect functionality, use a Writer.

func (*Client) BatchRaw

func (c *Client) BatchRaw(b []byte) (uint64, error)

BatchRaw sends a BATCH request with a raw batch

func (*Client) Close

func (c *Client) Close() error

Close sends a CLOSE request and then closes the connection

func (*Client) Config

func (c *Client) Config() (*config.Config, error)

Config sends a CONFIG request, returning parts the server's configuration relevant to the client.

func (*Client) ReadOffset

func (c *Client) ReadOffset(topic []byte, offset uint64, limit int) (int, *protocol.BatchScanner, error)

ReadOffset sends a READ request, returning a scanner that can be used to iterate over the messages in the response.

func (*Client) SetConn

func (c *Client) SetConn(conn net.Conn) *Client

SetConn sets net.Conn for a client.

func (*Client) Stop

func (c *Client) Stop()

Stop causes any pending blocking operation to return ErrStopped

func (*Client) Tail

func (c *Client) Tail(topic []byte, limit int) (uint64, int, *protocol.BatchScanner, error)

Tail sends a TAIL request, returning the initial offset and a scanner starting from the first available batch.

type Config

type Config struct {

	// Verbose prints debugging information.
	Verbose bool `json:"verbose"`

	// Host defines the host:port to connect to.
	Host string `json:"host"`

	// Timeout defines the overall socket timeout. For more granular control,
	// set ConnectTimeout, WriteTimeout, and ReadTimeout.
	Timeout time.Duration `json:"timeout"`

	// ConnectTimeout defines the time limit for connecting to the server.
	ConnectTimeout time.Duration `json:"connect-timeout"`

	// WriteTimeout defines the time limit for writing to the server socket.
	WriteTimeout time.Duration `json:"write-timeout"`

	// ReadTimeout defines the time limit for reading from the server socket.
	ReadTimeout time.Duration `json:"read-timeout"`

	// Count prints some counts of messages written before exiting log-cli.
	Count bool `json:"count"`

	// OutputPath is the file log-cli will write output data to. Defaults to
	// standard out.
	OutputPath string `json:"output"`

	// WaitInterval, for Writer, determines the length of time, without any
	// messages written, before the batch is flushed to the server. For
	// Scanner, determines how long to wait before requesting new batches.
	// TODO these should be split out when writer and scanner have seperate
	// configs.
	WaitInterval time.Duration `json:"wait-interval"`

	// ConnRetries defines how many attempts to connect should happen. A
	// negative number will retry forever.
	ConnRetries int `json:"connection-retries"`

	// ConnRetryMaxInterval defines the initial amount of time to wait before
	// attempting to reconnect.
	ConnRetryInterval time.Duration `json:"connection-retry-interval"`

	// ConnRetryMaxInterval defines the maximum amount of time to wait before
	// attempting to reconnect.
	ConnRetryMaxInterval time.Duration `json:"connection-retry-max-interval"`

	// ConnRetryMultiplier defines the multiplier for reconnection attempts.
	// For example, if the multiplier is 2.0, the amount of time to wait will
	// double each time a reconnect is attempted, up to ConnRetryMaxInterval.
	ConnRetryMultiplier float64 `json:"connection-retry-multiplier"`

	// BatchSize is the maximum batch size to reach before flushing a batch to
	// the server.
	BatchSize int `json:"batch-size"`

	// InputPath defines the file from which log-cli reads messages. Defaults
	// to standard input.
	InputPath string `json:"input"`

	// Limit defines the minimum number of messages to read per read operation.
	Limit int `json:"limit"`

	// Offset defines the initial offset from which to read messages.
	Offset uint64 `json:"offset"`

	// ReadForever will read messages until the scanner is stopped.
	ReadForever bool `json:"read-forever"`

	// UseTail tells the scanner to begin reading from the beginning of the
	// log.
	UseTail bool `json:"use-tail"`
}

Config is used for client configuration

func DefaultTestConfig

func DefaultTestConfig(verbose bool) *Config

DefaultTestConfig returns a testing configuration

func NewConfig

func NewConfig() *Config

NewConfig returns a new default client configuration.

func (*Config) FromGeneralConfig

func (c *Config) FromGeneralConfig(conf *config.Config) *Config

FromGeneralConfig returns a new client config that copies common attributes.

func (*Config) String

func (c *Config) String() string

func (*Config) ToGeneralConfig

func (c *Config) ToGeneralConfig() *config.Config

ToGeneralConfig copies what is needed for shared modules (internal, protocol) to the server config struct.

func (*Config) Validate

func (c *Config) Validate() error

Validate returns an error pointing to incorrect values for the configuration, if any.

type Dialer

type Dialer interface {
	DialTimeout(network, addr string, timeout time.Duration) (net.Conn, error)
}

Dialer defines an interface for connecting to servers. It can be used for mocking in tests.

type ErrorHandler

type ErrorHandler interface {
	HandleError(err error)
}

type FileStatePuller

type FileStatePuller struct {
	// contains filtered or unexported fields
}

FileStatePuller tracks offset state in a file.

func NewFileStatePuller

func NewFileStatePuller(conf *Config, name string) *FileStatePuller

NewFileStatePuller returns a new instance of *FileStatePuller.

func (*FileStatePuller) Complete

func (m *FileStatePuller) Complete(off, delta uint64) error

Complete implements StatePuller interface.

func (*FileStatePuller) Get

func (m *FileStatePuller) Get() (uint64, uint64, error)

Get implements StatePuller interface.

func (*FileStatePuller) Setup

func (m *FileStatePuller) Setup() error

Setup implements internal.LifecycleManager.

func (*FileStatePuller) Shutdown

func (m *FileStatePuller) Shutdown() error

Shutdown implements internal.LifecycleManager.

func (*FileStatePuller) Start

func (m *FileStatePuller) Start(off, delta uint64) error

type MemoryStatePuller

type MemoryStatePuller struct {
	// contains filtered or unexported fields
}

func NewMemoryStatePuller

func NewMemoryStatePuller(conf *Config) *MemoryStatePuller

func (*MemoryStatePuller) Complete

func (m *MemoryStatePuller) Complete(off, delta uint64, err error) error

Complete implements StatePuller interface

func (*MemoryStatePuller) Get

func (m *MemoryStatePuller) Get() (uint64, uint64, error)

Get implements StatePuller interface

func (*MemoryStatePuller) Start

func (m *MemoryStatePuller) Start(off, delta uint64) error

type MockStatePusher

type MockStatePusher struct {
	// contains filtered or unexported fields
}

MockStatePusher saves pushed state so it can be read in tests

func NewMockStatePusher

func NewMockStatePusher() *MockStatePusher

NewMockStatePusher returns a new instance of MockStatePusher

func (*MockStatePusher) Next

func (m *MockStatePusher) Next() (uint64, bool)

Next returns the next offset, error, and batch, starting from the first. if there are no more pushed states, the last return value will be false

func (*MockStatePusher) Push

func (m *MockStatePusher) Push(off uint64) error

Push implements StatePusher

func (*MockStatePusher) SetError

func (m *MockStatePusher) SetError(err error)

SetError sets the error to be returned from calls to Push

type NoopBacklogger

type NoopBacklogger struct{}

func (*NoopBacklogger) Backlog

func (bl *NoopBacklogger) Backlog() chan *Backlog

type NoopErrorHandler

type NoopErrorHandler struct{}

func (*NoopErrorHandler) HandleError

func (eh *NoopErrorHandler) HandleError(err error)

type NoopStatePuller

type NoopStatePuller int

func (NoopStatePuller) Complete

func (h NoopStatePuller) Complete(off, delta uint64, err error) error

Complete implements StatePuller interface.

func (NoopStatePuller) Get

func (h NoopStatePuller) Get() (uint64, uint64, error)

Get implements StatePuller interface.

func (NoopStatePuller) Start

func (h NoopStatePuller) Start(off, delta uint64) error

Start implements StatePuller interface.

type NoopStatePusher

type NoopStatePusher struct {
}

NoopStatePusher discards input

func (*NoopStatePusher) Close

func (m *NoopStatePusher) Close() error

Close implements StatePusher

func (*NoopStatePusher) Push

func (m *NoopStatePusher) Push(off uint64) error

Push implements StatePusher

type Scanner

type Scanner struct {
	Client *Client
	// contains filtered or unexported fields
}

Scanner is used to read batches from the log, scanning message by message

func DialScanner

func DialScanner(addr string) (*Scanner, error)

DialScanner returns a new scanner with a default configuration

func DialScannerConfig

func DialScannerConfig(addr string, conf *Config) (*Scanner, error)

DialScannerConfig returns a new writer with a connection to addr

func NewScanner

func NewScanner(conf *Config, topic string) *Scanner

NewScanner returns a new instance of *Scanner.

func ScannerForClient

func ScannerForClient(c *Client) *Scanner

ScannerForClient returns a new scanner from a Client

func (*Scanner) Close

func (s *Scanner) Close() error

func (*Scanner) Complete

func (s *Scanner) Complete(err error) error

Complete marks the current message processing completed. It will panic if no StatePuller exists on the scanner.

func (*Scanner) Error

func (s *Scanner) Error() error

func (*Scanner) Message

func (s *Scanner) Message() *protocol.Message

Message returns the current message

func (*Scanner) Reset

func (s *Scanner) Reset()

Reset sets the scanner to it's initial values so it can be reused.

func (*Scanner) Scan

func (s *Scanner) Scan() bool

Scan reads the next message. If it encounters an error, it returns false.

func (*Scanner) SetLimit

func (s *Scanner) SetLimit(n int)

func (*Scanner) SetOffset

func (s *Scanner) SetOffset(off uint64)

func (*Scanner) SetTopic

func (s *Scanner) SetTopic(topic string)

SetTopic sets the topic for the scanner.

func (*Scanner) Start

func (s *Scanner) Start() error

Start marks the current message as processing. It will return ErrProcessing if the message is already being processed.

func (*Scanner) Stop

func (s *Scanner) Stop()

Stop causes the scanner to stop making requests and returns ErrStopped

func (*Scanner) Topic

func (s *Scanner) Topic() string

Topic returns the topic for the scanner.

func (*Scanner) UseTail

func (s *Scanner) UseTail()

func (*Scanner) WithStateHandler

func (s *Scanner) WithStateHandler(statem StatePuller) *Scanner

WithSetStatePuller sets the StatePuller on the Scanner.

type StateOutputter

type StateOutputter struct {
	// contains filtered or unexported fields
}

StateOutputter writes offsets to a file. Intended for use by command line applications.

func NewStateOutputter

func NewStateOutputter(f *os.File) *StateOutputter

NewStateOutputter returns a new oneee

func (*StateOutputter) Close

func (m *StateOutputter) Close() error

Close implements StatePusher

func (*StateOutputter) Push

func (m *StateOutputter) Push(off uint64) error

Push implements StatePusher

type StatePuller

type StatePuller interface {
	// Get returns the oldest message offset and delta that isn't already being
	// processed.
	Get() (uint64, uint64, error)

	// Start marks a message as being processed. It should return
	// ErrProcessing if the message is already being processed.
	Start(off, delta uint64) error

	// Complete marks a message as completed or failed. Failure is written when
	// err is not nil.
	Complete(off, delta uint64, err error) error
}

StatePuller pulls messages from the state and marks them completed or failed.

type StatePusher

type StatePusher interface {
	Push(off uint64) error
}

StatePusher saves recently written offsets for later processing.

type Writer

type Writer struct {
	Client *Client
	// contains filtered or unexported fields
}

Writer writes message batches to the log server.

func NewWriter

func NewWriter(conf *Config, topic string) *Writer

NewWriter returns a new instance of Writer for a topic

func (*Writer) Close

func (w *Writer) Close() error

Close implements the LogWriter interface

func (*Writer) Flush

func (w *Writer) Flush() error

Flush implements the LogWriter interface

func (*Writer) Reset

func (w *Writer) Reset(topic string)

Reset sets the writer to its initial values

func (*Writer) WithBacklog

func (w *Writer) WithBacklog(bl Backlogger) *Writer

func (*Writer) WithErrorHandler

func (w *Writer) WithErrorHandler(eh ErrorHandler) *Writer

func (*Writer) WithStateHandler

func (w *Writer) WithStateHandler(m StatePusher) *Writer

WithStateHandler sets a state pusher on the writer. It should be called as part of initialization.

func (*Writer) Write

func (w *Writer) Write(p []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL