Documentation
¶
Overview ¶
Package logd contains logd client functionality.
Client implements the logd socket protocol. For application use, Writer and Scanner provide APIs for writing to and reading from the log server.
Writer implements io.Writer, batching input and flushing to the server, either when the configured batch size is reached, or at a configured time interval.
Scanner has a similar API to bufio.Scanner. It can read either from the beginning of the log, or from an offset, keeping track of the current batch offset and message delta as well as calling for more batches as necessary.
Index ¶
- Variables
- func IsRetryable(err error) bool
- type Backlog
- type Backlogger
- type BenchConfig
- type Client
- func (c *Client) Batch(batch *protocol.Batch) (uint64, error)
- func (c *Client) BatchRaw(b []byte) (uint64, error)
- func (c *Client) Close() error
- func (c *Client) Config() (*config.Config, error)
- func (c *Client) ReadOffset(topic []byte, offset uint64, limit int) (int, *protocol.BatchScanner, error)
- func (c *Client) SetConn(conn net.Conn) *Client
- func (c *Client) Stop()
- func (c *Client) Tail(topic []byte, limit int) (uint64, int, *protocol.BatchScanner, error)
- type Config
- type Dialer
- type ErrorHandler
- type FileStatePuller
- type MemoryStatePuller
- type MockStatePusher
- type NoopBacklogger
- type NoopErrorHandler
- type NoopStatePuller
- type NoopStatePusher
- type Scanner
- func (s *Scanner) Close() error
- func (s *Scanner) Complete(err error) error
- func (s *Scanner) Error() error
- func (s *Scanner) Message() *protocol.Message
- func (s *Scanner) Reset()
- func (s *Scanner) Scan() bool
- func (s *Scanner) SetLimit(n int)
- func (s *Scanner) SetOffset(off uint64)
- func (s *Scanner) SetTopic(topic string)
- func (s *Scanner) Start() error
- func (s *Scanner) Stop()
- func (s *Scanner) Topic() string
- func (s *Scanner) UseTail()
- func (s *Scanner) WithStateHandler(statem StatePuller) *Scanner
- type StateOutputter
- type StatePuller
- type StatePusher
- type Writer
- func (w *Writer) Close() error
- func (w *Writer) Flush() error
- func (w *Writer) Reset(topic string)
- func (w *Writer) WithBacklog(bl Backlogger) *Writer
- func (w *Writer) WithErrorHandler(eh ErrorHandler) *Writer
- func (w *Writer) WithStateHandler(m StatePusher) *Writer
- func (w *Writer) Write(p []byte) (int, error)
Constants ¶
This section is empty.
Variables ¶
var DefaultBenchConfig = &BenchConfig{ Verbose: false, }
DefaultBenchConfig is the default benchmarking configuration
var DefaultConfig = &Config{ Verbose: false, Host: "127.0.0.1:1774", Timeout: 10 * time.Second, ConnectTimeout: -1, WriteTimeout: -1, ReadTimeout: -1, Count: false, OutputPath: "-", WaitInterval: 400 * time.Millisecond, ConnRetries: 50, ConnRetryInterval: 1 * time.Second, ConnRetryMaxInterval: 30 * time.Second, ConnRetryMultiplier: 2.0, BatchSize: 1024 * 64, InputPath: "-", Limit: 15, }
DefaultConfig is the default client configuration
var ErrEmptyBatch = errors.New("attempted to send an empty batch")
ErrEmptyBatch is returned when an empty batch write is attempted.
var ErrNoState = errors.New("state uninitialized")
ErrNoState should be returned by StatePullers when the state hasn't stored any offset information yet.
var ErrProcessing = errors.New("message already being processed")
var ErrStopped = stderrors.New("stopped")
ErrStopped indicates the scanner was stopped
Functions ¶
func IsRetryable ¶
IsRetryable returns true if an error can be recovered from.
Types ¶
type Backlogger ¶
type Backlogger interface { // Backlog() should return a channel with the desired backfill size. The client // should attempt to write failed batches to the channel, but if the channel is // full, it will log the error and continue. Backlog() chan *Backlog }
Backlogger saves failed batch writes so they can be attempted later.
type BenchConfig ¶
type BenchConfig struct {
Verbose bool
}
BenchConfig manages benchmark configuration
type Client ¶
Client represents a connection to the database.
func DialConfig ¶
DialConfig returns a new, configured instance of *Client with a connected Conn.
func (*Client) Batch ¶
Batch sends a BATCH request and returns the response. Batch does not retry. If you want reconnect functionality, use a Writer.
func (*Client) Config ¶
Config sends a CONFIG request, returning parts the server's configuration relevant to the client.
func (*Client) ReadOffset ¶
func (c *Client) ReadOffset(topic []byte, offset uint64, limit int) (int, *protocol.BatchScanner, error)
ReadOffset sends a READ request, returning a scanner that can be used to iterate over the messages in the response.
type Config ¶
type Config struct { // Verbose prints debugging information. Verbose bool `json:"verbose"` // Host defines the host:port to connect to. Host string `json:"host"` // Timeout defines the overall socket timeout. For more granular control, // set ConnectTimeout, WriteTimeout, and ReadTimeout. Timeout time.Duration `json:"timeout"` // ConnectTimeout defines the time limit for connecting to the server. ConnectTimeout time.Duration `json:"connect-timeout"` // WriteTimeout defines the time limit for writing to the server socket. WriteTimeout time.Duration `json:"write-timeout"` // ReadTimeout defines the time limit for reading from the server socket. ReadTimeout time.Duration `json:"read-timeout"` // Count prints some counts of messages written before exiting log-cli. Count bool `json:"count"` // OutputPath is the file log-cli will write output data to. Defaults to // standard out. OutputPath string `json:"output"` // WaitInterval, for Writer, determines the length of time, without any // messages written, before the batch is flushed to the server. For // Scanner, determines how long to wait before requesting new batches. // TODO these should be split out when writer and scanner have seperate // configs. WaitInterval time.Duration `json:"wait-interval"` // ConnRetries defines how many attempts to connect should happen. A // negative number will retry forever. ConnRetries int `json:"connection-retries"` // ConnRetryMaxInterval defines the initial amount of time to wait before // attempting to reconnect. ConnRetryInterval time.Duration `json:"connection-retry-interval"` // ConnRetryMaxInterval defines the maximum amount of time to wait before // attempting to reconnect. ConnRetryMaxInterval time.Duration `json:"connection-retry-max-interval"` // ConnRetryMultiplier defines the multiplier for reconnection attempts. // For example, if the multiplier is 2.0, the amount of time to wait will // double each time a reconnect is attempted, up to ConnRetryMaxInterval. ConnRetryMultiplier float64 `json:"connection-retry-multiplier"` // BatchSize is the maximum batch size to reach before flushing a batch to // the server. BatchSize int `json:"batch-size"` // InputPath defines the file from which log-cli reads messages. Defaults // to standard input. InputPath string `json:"input"` // Limit defines the minimum number of messages to read per read operation. Limit int `json:"limit"` // Offset defines the initial offset from which to read messages. Offset uint64 `json:"offset"` // ReadForever will read messages until the scanner is stopped. ReadForever bool `json:"read-forever"` // UseTail tells the scanner to begin reading from the beginning of the // log. UseTail bool `json:"use-tail"` }
Config is used for client configuration
func DefaultTestConfig ¶
DefaultTestConfig returns a testing configuration
func (*Config) FromGeneralConfig ¶
FromGeneralConfig returns a new client config that copies common attributes.
func (*Config) ToGeneralConfig ¶
ToGeneralConfig copies what is needed for shared modules (internal, protocol) to the server config struct.
type Dialer ¶
type Dialer interface {
DialTimeout(network, addr string, timeout time.Duration) (net.Conn, error)
}
Dialer defines an interface for connecting to servers. It can be used for mocking in tests.
type ErrorHandler ¶
type ErrorHandler interface {
HandleError(err error)
}
type FileStatePuller ¶
type FileStatePuller struct {
// contains filtered or unexported fields
}
FileStatePuller tracks offset state in a file.
func NewFileStatePuller ¶
func NewFileStatePuller(conf *Config, name string) *FileStatePuller
NewFileStatePuller returns a new instance of *FileStatePuller.
func (*FileStatePuller) Complete ¶
func (m *FileStatePuller) Complete(off, delta uint64) error
Complete implements StatePuller interface.
func (*FileStatePuller) Get ¶
func (m *FileStatePuller) Get() (uint64, uint64, error)
Get implements StatePuller interface.
func (*FileStatePuller) Setup ¶
func (m *FileStatePuller) Setup() error
Setup implements internal.LifecycleManager.
func (*FileStatePuller) Shutdown ¶
func (m *FileStatePuller) Shutdown() error
Shutdown implements internal.LifecycleManager.
func (*FileStatePuller) Start ¶
func (m *FileStatePuller) Start(off, delta uint64) error
type MemoryStatePuller ¶
type MemoryStatePuller struct {
// contains filtered or unexported fields
}
func NewMemoryStatePuller ¶
func NewMemoryStatePuller(conf *Config) *MemoryStatePuller
func (*MemoryStatePuller) Complete ¶
func (m *MemoryStatePuller) Complete(off, delta uint64, err error) error
Complete implements StatePuller interface
func (*MemoryStatePuller) Get ¶
func (m *MemoryStatePuller) Get() (uint64, uint64, error)
Get implements StatePuller interface
func (*MemoryStatePuller) Start ¶
func (m *MemoryStatePuller) Start(off, delta uint64) error
type MockStatePusher ¶
type MockStatePusher struct {
// contains filtered or unexported fields
}
MockStatePusher saves pushed state so it can be read in tests
func NewMockStatePusher ¶
func NewMockStatePusher() *MockStatePusher
NewMockStatePusher returns a new instance of MockStatePusher
func (*MockStatePusher) Next ¶
func (m *MockStatePusher) Next() (uint64, bool)
Next returns the next offset, error, and batch, starting from the first. if there are no more pushed states, the last return value will be false
func (*MockStatePusher) Push ¶
func (m *MockStatePusher) Push(off uint64) error
Push implements StatePusher
func (*MockStatePusher) SetError ¶
func (m *MockStatePusher) SetError(err error)
SetError sets the error to be returned from calls to Push
type NoopBacklogger ¶
type NoopBacklogger struct{}
func (*NoopBacklogger) Backlog ¶
func (bl *NoopBacklogger) Backlog() chan *Backlog
type NoopErrorHandler ¶
type NoopErrorHandler struct{}
func (*NoopErrorHandler) HandleError ¶
func (eh *NoopErrorHandler) HandleError(err error)
type NoopStatePuller ¶
type NoopStatePuller int
func (NoopStatePuller) Complete ¶
func (h NoopStatePuller) Complete(off, delta uint64, err error) error
Complete implements StatePuller interface.
func (NoopStatePuller) Get ¶
func (h NoopStatePuller) Get() (uint64, uint64, error)
Get implements StatePuller interface.
func (NoopStatePuller) Start ¶
func (h NoopStatePuller) Start(off, delta uint64) error
Start implements StatePuller interface.
type NoopStatePusher ¶
type NoopStatePusher struct { }
NoopStatePusher discards input
func (*NoopStatePusher) Close ¶
func (m *NoopStatePusher) Close() error
Close implements StatePusher
func (*NoopStatePusher) Push ¶
func (m *NoopStatePusher) Push(off uint64) error
Push implements StatePusher
type Scanner ¶
type Scanner struct { Client *Client // contains filtered or unexported fields }
Scanner is used to read batches from the log, scanning message by message
func DialScanner ¶
DialScanner returns a new scanner with a default configuration
func DialScannerConfig ¶
DialScannerConfig returns a new writer with a connection to addr
func NewScanner ¶
NewScanner returns a new instance of *Scanner.
func ScannerForClient ¶
ScannerForClient returns a new scanner from a Client
func (*Scanner) Complete ¶
Complete marks the current message processing completed. It will panic if no StatePuller exists on the scanner.
func (*Scanner) Reset ¶
func (s *Scanner) Reset()
Reset sets the scanner to it's initial values so it can be reused.
func (*Scanner) Start ¶
Start marks the current message as processing. It will return ErrProcessing if the message is already being processed.
func (*Scanner) Stop ¶
func (s *Scanner) Stop()
Stop causes the scanner to stop making requests and returns ErrStopped
func (*Scanner) WithStateHandler ¶
func (s *Scanner) WithStateHandler(statem StatePuller) *Scanner
WithSetStatePuller sets the StatePuller on the Scanner.
type StateOutputter ¶
type StateOutputter struct {
// contains filtered or unexported fields
}
StateOutputter writes offsets to a file. Intended for use by command line applications.
func NewStateOutputter ¶
func NewStateOutputter(f *os.File) *StateOutputter
NewStateOutputter returns a new oneee
func (*StateOutputter) Push ¶
func (m *StateOutputter) Push(off uint64) error
Push implements StatePusher
type StatePuller ¶
type StatePuller interface { // Get returns the oldest message offset and delta that isn't already being // processed. Get() (uint64, uint64, error) // Start marks a message as being processed. It should return // ErrProcessing if the message is already being processed. Start(off, delta uint64) error // Complete marks a message as completed or failed. Failure is written when // err is not nil. Complete(off, delta uint64, err error) error }
StatePuller pulls messages from the state and marks them completed or failed.
type StatePusher ¶
StatePusher saves recently written offsets for later processing.
type Writer ¶
type Writer struct { Client *Client // contains filtered or unexported fields }
Writer writes message batches to the log server.
func (*Writer) WithBacklog ¶
func (w *Writer) WithBacklog(bl Backlogger) *Writer
func (*Writer) WithErrorHandler ¶
func (w *Writer) WithErrorHandler(eh ErrorHandler) *Writer
func (*Writer) WithStateHandler ¶
func (w *Writer) WithStateHandler(m StatePusher) *Writer
WithStateHandler sets a state pusher on the writer. It should be called as part of initialization.