Documentation ¶
Index ¶
- Constants
- Variables
- func NewRateLimiter(limit int64) *limiter
- type DiskBackedQueue
- func (l *DiskBackedQueue) Advance() error
- func (l *DiskBackedQueue) Append(b []byte) error
- func (l *DiskBackedQueue) Close() error
- func (l *DiskBackedQueue) Current() ([]byte, error)
- func (l *DiskBackedQueue) DiskUsage() int64
- func (l *DiskBackedQueue) LastModified() (time.Time, error)
- func (l *DiskBackedQueue) Open() error
- func (l *DiskBackedQueue) Position() (*queuePos, error)
- func (l *DiskBackedQueue) PurgeOlderThan(when time.Time) error
- func (l *DiskBackedQueue) Remove() error
- func (l *DiskBackedQueue) SetMaxSegmentSize(size int64) error
- type DiskBackedQueueOption
- type Queue
- type WriteReplayer
- type WriteReplayerOption
Constants ¶
const ( GiB = 1 << 30 // DefaultMaxAge is the default maximum amount of time that a hinted handoff write // can stay in the queue. After this time, the write will be purged. DefaultMaxAge = 7 * 24 * time.Hour // DefaultRetryRateLimit is the default rate that hinted handoffs will be retried. // The rate is in bytes per second. A value of 0 disables the rate limit. DefaultRetryRateLimit = 0 // DefaultRetryInterval is the default amount of time the system waits before // attempting to flush hinted handoff queues. With each failure of a hinted // handoff write, this retry interval increases exponentially until it reaches // the maximum DefaultRetryInterval = 10 * time.Millisecond // DefaultRetryMaxInterval is the maximum the hinted handoff retry interval // will ever be. DefaultRetryMaxInterval = 10 * time.Second // DefaultPurgeInterval is the amount of time the system waits before attempting // to purge hinted handoff data due to age or inactive nodes. DefaultPurgeInterval = time.Hour )
const ( // DefaultMaxSize is the default maximum size of all disk backed queues in bytes. DefaultMaxSize = 10 * GiB )
Variables ¶
var ( ErrNotOpen = fmt.Errorf("queue not open") ErrQueueFull = fmt.Errorf("queue is full") ErrSegmentFull = fmt.Errorf("segment is full") )
Possible errors returned by a hinted handoff queue.
Functions ¶
func NewRateLimiter ¶
func NewRateLimiter(limit int64) *limiter
NewRateLimiter returns a new limiter configured to restrict a process to the limit per second. limit is the maximum amount that can be used per second. The limit should be > 0. A limit <= 0, will not limit the processes.
Types ¶
type DiskBackedQueue ¶ added in v1.4.4
type DiskBackedQueue struct {
// contains filtered or unexported fields
}
queue is a bounded, disk-backed, append-only type that combines queue and log semantics. byte slices can be appended and read back in-order. The queue maintains a pointer to the current head byte slice and can re-read from the head until it has been advanced.
Internally, the queue writes byte slices to multiple segment files so that disk space can be reclaimed. When a segment file is larger than the max segment size, a new file is created. Segments are removed after their head pointer has advanced past the last entry. The first segment is the head, and the last segment is the tail. Reads are from the head segment and writes tail segment.
queues can have a max size configured such that when the size of all segments on disk exceeds the size, write will fail.
┌─────┐ │Head │ ├─────┘ │ ▼ ┌─────────────────┐ ┌─────────────────┐┌─────────────────┐ │Segment 1 - 10MB │ │Segment 2 - 10MB ││Segment 3 - 10MB │ └─────────────────┘ └─────────────────┘└─────────────────┘
▲ │ │ ┌─────┤ │Tail │ └─────┘
func NewDiskBackedQueue ¶ added in v1.4.4
func NewDiskBackedQueue(dir string, opts ...DiskBackedQueueOption) *DiskBackedQueue
NewDiskBackedQueue create a queue that will store segments in dir and that will consume more than maxSize on disk.
func (*DiskBackedQueue) Advance ¶ added in v1.4.4
func (l *DiskBackedQueue) Advance() error
Advance moves the head point to the next byte slice in the queue
func (*DiskBackedQueue) Append ¶ added in v1.4.4
func (l *DiskBackedQueue) Append(b []byte) error
Append appends a byte slice to the end of the queue
func (*DiskBackedQueue) Close ¶ added in v1.4.4
func (l *DiskBackedQueue) Close() error
Close stops the queue for reading and writing
func (*DiskBackedQueue) Current ¶ added in v1.4.4
func (l *DiskBackedQueue) Current() ([]byte, error)
Current returns the current byte slice at the head of the queue
func (*DiskBackedQueue) DiskUsage ¶ added in v1.4.4
func (l *DiskBackedQueue) DiskUsage() int64
DiskUsage returns the total size on disk used by the queue
func (*DiskBackedQueue) LastModified ¶ added in v1.4.4
func (l *DiskBackedQueue) LastModified() (time.Time, error)
LastModified returns the last time the queue was modified.
func (*DiskBackedQueue) Open ¶ added in v1.4.4
func (l *DiskBackedQueue) Open() error
Open opens the queue for reading and writing
func (*DiskBackedQueue) Position ¶ added in v1.4.4
func (l *DiskBackedQueue) Position() (*queuePos, error)
func (*DiskBackedQueue) PurgeOlderThan ¶ added in v1.4.4
func (l *DiskBackedQueue) PurgeOlderThan(when time.Time) error
func (*DiskBackedQueue) Remove ¶ added in v1.4.4
func (l *DiskBackedQueue) Remove() error
Remove removes all underlying file-based resources for the queue. It is an error to call this on an open queue.
func (*DiskBackedQueue) SetMaxSegmentSize ¶ added in v1.4.4
func (l *DiskBackedQueue) SetMaxSegmentSize(size int64) error
SetMaxSegmentSize updates the max segment size for new and existing segments.
type DiskBackedQueueOption ¶ added in v1.4.4
type DiskBackedQueueOption func(*DiskBackedQueue)
func WithDiskBackedQueueMaxSize ¶ added in v1.4.4
func WithDiskBackedQueueMaxSize(size int64) DiskBackedQueueOption
type WriteReplayer ¶
type WriteReplayer struct { PurgeInterval time.Duration // Interval between periodic purge checks RetryInterval time.Duration // Interval between periodic write-to-node attempts. RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts. MaxAge time.Duration // Maximum age queue data can get before purging. RetryRateLimit int64 // Limits the rate data is sent to node. // contains filtered or unexported fields }
WriteReplayer encapsulates a queue of hinted-handoff data for a node, and the transmission of the data to the node.
func NewWriteReplayer ¶
func NewWriteReplayer(queue Queue, c tcpClient, metrics metrics.Registrar, targetNodeIndex string, opts ...WriteReplayerOption) *WriteReplayer
NewWriteReplayer returns a new WriteReplayer for the given node, using dir for the hinted-handoff data.
func (*WriteReplayer) Open ¶
func (w *WriteReplayer) Open(done chan struct{}) error
Open opens the WriteReplayer. It will read and write data present in dir, and start transmitting data to the node. A WriteReplayer must be opened before it can accept hinted data.
func (*WriteReplayer) SendWrite ¶
func (w *WriteReplayer) SendWrite() (int, error)
SendWrite attempts to sent the current block of hinted data to the target node. If successful, it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF when there is no more data or the node is inactive.
type WriteReplayerOption ¶
type WriteReplayerOption func(*WriteReplayer)
func WithWriteReplayerLogger ¶
func WithWriteReplayerLogger(log *logger.Logger) WriteReplayerOption