Documentation ¶
Overview ¶
like bufio.Writer (Copyright 2009 The Go Authors. All rights reserved.) but with instrumentation around flushing because this codebase is the only user: * doesn't implement the entire bufio.Writer api because it doesn't need to * and some simplifications can be made, less edgecases etc
Index ¶
- func NewKeepSafe(initialCap int, periodKeep time.Duration) *keepSafe
- func NewSlowChan(backend chan []byte, sleep time.Duration) chan []byte
- func Pickle(dp *Datapoint) []byte
- type Conn
- type Datapoint
- type Destination
- func (dest *Destination) Flush() error
- func (dest *Destination) GetMatcher() matcher.Matcher
- func (dest *Destination) Match(s []byte) bool
- func (dest *Destination) Run()
- func (dest *Destination) Shutdown() error
- func (dest *Destination) Snapshot() *Destination
- func (dest *Destination) Update(opts map[string]string) error
- func (dest *Destination) UpdateMatcher(matcher matcher.Matcher)
- func (dest *Destination) WaitOnline() chan struct{}
- type Spool
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewKeepSafe ¶
func NewSlowChan ¶
note: goroutine doesn't get cleaned until backend closes. don't instantiate gazillions of these
Types ¶
type Conn ¶
type Conn struct { In chan []byte // contains filtered or unexported fields }
Conn represents a connection to a tcp endpoint. As long as conn.isAlive(), caller may write data to conn.In when no longer alive, caller must call either getRedo or clearRedo: * getRedo to get the last bunch of data which may have not made it to the tcp endpoint. After calling getRedo(), no data may be written to conn.In it also clears the keepSafe buffer * clearRedo: releases the keepSafe buffer NOTE: in the future, this design can be much simpler: keepSafe doesn't need a separate structure, we could just have an in-line buffer in between the dest and the conn since we write buffered chunks in the bufWriter, may as well "keep those safe". e.g. buffered writing and keepSafe can be the same buffer. but this requires significant refactoring.
func (*Conn) Close ¶
func (c *Conn) Close()
Close closes the connection and releases all resources, with the exception of the keepSafe buffer. because the caller of conn needs a chance to collect that data
func (*Conn) HandleData ¶
func (c *Conn) HandleData()
type Datapoint ¶
func ParseDataPoint ¶
type Destination ¶
type Destination struct { Matcher matcher.Matcher `json:"matcher"` Addr string `json:"address"` // tcp dest Instance string `json:"instance"` // Optional carbon instance name, useful only with consistent hashing SpoolDir string // where to store spool files (if enabled) Key string // unique key per destination, based on routeName and destination addr/port combination Spool bool `json:"spool"` // spool metrics to disk while dest down? Pickle bool `json:"pickle"` // send in pickle format? Online bool `json:"online"` // state of connection online/offline. SlowNow bool `json:"slowNow"` // did we have to drop packets in current loop SlowLastLoop bool `json:"slowLastLoop"` // "" last loop SpoolBufSize int SpoolMaxBytesPerFile int64 SpoolSyncEvery int64 SpoolSyncPeriod time.Duration SpoolSleep time.Duration // how long to wait between stores to spool UnspoolSleep time.Duration // how long to wait between loads from spool RouteName string // set in/via Run() In chan []byte `json:"-"` // incoming metrics // contains filtered or unexported fields }
func New ¶
func New(routeName, prefix, sub, regex, addr, spoolDir string, spool, pickle bool, periodFlush, periodReConn time.Duration, connBufSize, ioBufSize, spoolBufSize int, spoolMaxBytesPerFile, spoolSyncEvery int64, spoolSyncPeriod, spoolSleep, unspoolSleep time.Duration) (*Destination, error)
New creates a destination object. Note that it still needs to be told to run via Run().
func (*Destination) Flush ¶
func (dest *Destination) Flush() error
func (*Destination) GetMatcher ¶
func (dest *Destination) GetMatcher() matcher.Matcher
func (*Destination) Match ¶
func (dest *Destination) Match(s []byte) bool
func (*Destination) Run ¶
func (dest *Destination) Run()
func (*Destination) Shutdown ¶
func (dest *Destination) Shutdown() error
func (*Destination) Snapshot ¶
func (dest *Destination) Snapshot() *Destination
a "basic" static copy of the dest, not actually running
func (*Destination) Update ¶
func (dest *Destination) Update(opts map[string]string) error
can't be changed yet: pickle, spool, flush, reconn
func (*Destination) UpdateMatcher ¶
func (dest *Destination) UpdateMatcher(matcher matcher.Matcher)
func (*Destination) WaitOnline ¶
func (dest *Destination) WaitOnline() chan struct{}
type Spool ¶
type Spool struct { InRT chan []byte InBulk chan []byte Out chan []byte // contains filtered or unexported fields }
sits in front of nsqd diskqueue. provides buffering (to accept input while storage is slow / sync() runs -every 1000 items- etc) QoS (RT vs Bulk) and controllable i/o rates
func NewSpool ¶
func NewSpool(key, spoolDir string, bufSize int, maxBytesPerFile, syncEvery int64, syncPeriod, spoolSleep, unspoolSleep time.Duration) *Spool
parameters should be tuned so that: can buffer packets for the duration of 1 sync buffer no more then needed, esp if we know the queue is slower then the ingest rate
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer implements buffering for an io.Writer object. If an error occurs writing to a Writer, no more data will be accepted and all subsequent writes will return the error. After all data has been written, the client should call the Flush method to guarantee all data has been forwarded to the underlying io.Writer.
func NewWriter ¶
NewWriterSize returns a new Writer whose buffer has at least the specified size. If the argument io.Writer is already a Writer with large enough size, it returns the underlying Writer.
func (*Writer) Buffered ¶
Buffered returns the number of bytes that have been written into the current buffer.