Documentation ¶
Overview ¶
like bufio.Writer (Copyright 2009 The Go Authors. All rights reserved.) but with instrumentation around flushing because this codebase is the only user: * doesn't implement the entire bufio.Writer api because it doesn't need to * and some simplifications can be made, less edgecases etc
Index ¶
- func NewKeepSafe(initialCap int, periodKeep time.Duration) *keepSafe
- func NewSlowChan(backend chan []byte, sleep time.Duration) chan []byte
- func SetLogger(l *logging.Logger)
- type Conn
- type Datapoint
- type Destination
- func (dest *Destination) Flush() error
- func (dest *Destination) GetMatcher() matcher.Matcher
- func (dest *Destination) Match(s []byte) bool
- func (dest *Destination) Run()
- func (dest *Destination) Shutdown() error
- func (dest *Destination) Snapshot() *Destination
- func (dest *Destination) Update(opts map[string]string) error
- func (dest *Destination) UpdateMatcher(matcher matcher.Matcher)
- func (dest *Destination) WaitOnline() chan struct{}
- type Spool
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewKeepSafe ¶
func NewSlowChan ¶
note: goroutine doesn't get cleaned until backend closes. don't instantiate gazillions of these
Types ¶
type Conn ¶
type Conn struct { In chan []byte // contains filtered or unexported fields }
Conn represents a connection to a tcp endpoint
func (*Conn) HandleData ¶
func (c *Conn) HandleData()
func (*Conn) HandleStatus ¶
func (c *Conn) HandleStatus()
type Destination ¶
type Destination struct { Matcher matcher.Matcher `json:"matcher"` Addr string `json:"address"` // tcp dest Instance string `json:"instance"` // Optional carbon instance name, useful only with consistent hashing SpoolDir string // where to store spool files (if enabled) Spool bool `json:"spool"` // spool metrics to disk while dest down? Pickle bool `json:"pickle"` // send in pickle format? Online bool `json:"online"` // state of connection online/offline. SlowNow bool `json:"slowNow"` // did we have to drop packets in current loop SlowLastLoop bool `json:"slowLastLoop"` // "" last loop SpoolBufSize int SpoolMaxBytesPerFile int64 SpoolSyncEvery int64 SpoolSyncPeriod time.Duration SpoolSleep time.Duration // how long to wait between stores to spool UnspoolSleep time.Duration // how long to wait between loads from spool // set in/via Run() In chan []byte `json:"-"` // incoming metrics // contains filtered or unexported fields }
func New ¶
func New(prefix, sub, regex, addr, spoolDir string, spool, pickle bool, periodFlush, periodReConn time.Duration, connBufSize, ioBufSize, spoolBufSize int, spoolMaxBytesPerFile, spoolSyncEvery int64, spoolSyncPeriod, spoolSleep, unspoolSleep time.Duration) (*Destination, error)
New creates a destination object. Note that it still needs to be told to run via Run().
func (*Destination) Flush ¶
func (dest *Destination) Flush() error
func (*Destination) GetMatcher ¶
func (dest *Destination) GetMatcher() matcher.Matcher
func (*Destination) Match ¶
func (dest *Destination) Match(s []byte) bool
func (*Destination) Run ¶
func (dest *Destination) Run()
func (*Destination) Shutdown ¶
func (dest *Destination) Shutdown() error
func (*Destination) Snapshot ¶
func (dest *Destination) Snapshot() *Destination
a "basic" static copy of the dest, not actually running
func (*Destination) Update ¶
func (dest *Destination) Update(opts map[string]string) error
can't be changed yet: pickle, spool, flush, reconn
func (*Destination) UpdateMatcher ¶
func (dest *Destination) UpdateMatcher(matcher matcher.Matcher)
func (*Destination) WaitOnline ¶
func (dest *Destination) WaitOnline() chan struct{}
type Spool ¶
type Spool struct { InRT chan []byte InBulk chan []byte Out chan []byte // contains filtered or unexported fields }
sits in front of nsqd diskqueue. provides buffering (to accept input while storage is slow / sync() runs -every 1000 items- etc) QoS (RT vs Bulk) and controllable i/o rates
func NewSpool ¶
func NewSpool(key, spoolDir string, bufSize int, maxBytesPerFile, syncEvery int64, syncPeriod, spoolSleep, unspoolSleep time.Duration) *Spool
parameters should be tuned so that: can buffer packets for the duration of 1 sync buffer no more then needed, esp if we know the queue is slower then the ingest rate
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer implements buffering for an io.Writer object. If an error occurs writing to a Writer, no more data will be accepted and all subsequent writes will return the error. After all data has been written, the client should call the Flush method to guarantee all data has been forwarded to the underlying io.Writer.
func NewWriter ¶
NewWriterSize returns a new Writer whose buffer has at least the specified size. If the argument io.Writer is already a Writer with large enough size, it returns the underlying Writer.
func (*Writer) Buffered ¶
Buffered returns the number of bytes that have been written into the current buffer.