handoff

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

README

Hinted Handoff Queue

Hinted handoff is a concept that is used in Cassandra and Amazon's Dynamo for dealing with failed remote writes.

The files queue.go, limiter.go and node_processor.go (and their tests) were taken from InfluxDB 0.11.1:

https://github.com/influxdata/influxdb/tree/v0.11.1/services/hh

These files were released under the MIT license.

Documentation

Index

Constants

View Source
const (
	GiB = 1 << 30

	// DefaultMaxAge is the default maximum amount of time that a hinted handoff write
	// can stay in the queue.  After this time, the write will be purged.
	DefaultMaxAge = 7 * 24 * time.Hour

	// DefaultRetryRateLimit is the default rate that hinted handoffs will be retried.
	// The rate is in bytes per second.   A value of 0 disables the rate limit.
	DefaultRetryRateLimit = 0

	// DefaultRetryInterval is the default amount of time the system waits before
	// attempting to flush hinted handoff queues. With each failure of a hinted
	// handoff write, this retry interval increases exponentially until it reaches
	// the maximum
	DefaultRetryInterval = 10 * time.Millisecond

	// DefaultRetryMaxInterval is the maximum the hinted handoff retry interval
	// will ever be.
	DefaultRetryMaxInterval = 10 * time.Second

	// DefaultPurgeInterval is the amount of time the system waits before attempting
	// to purge hinted handoff data due to age or inactive nodes.
	DefaultPurgeInterval = time.Hour
)
View Source
const (

	// DefaultMaxSize is the default maximum size of all disk backed queues in bytes.
	DefaultMaxSize = 10 * GiB
)

Variables

View Source
var (
	ErrNotOpen     = fmt.Errorf("queue not open")
	ErrQueueFull   = fmt.Errorf("queue is full")
	ErrSegmentFull = fmt.Errorf("segment is full")
)

Possible errors returned by a hinted handoff queue.

Functions

func NewRateLimiter

func NewRateLimiter(limit int64) *limiter

NewRateLimiter returns a new limiter configured to restrict a process to the limit per second. limit is the maximum amount that can be used per second. The limit should be > 0. A limit <= 0, will not limit the processes.

Types

type DiskBackedQueue added in v1.4.4

type DiskBackedQueue struct {
	// contains filtered or unexported fields
}

queue is a bounded, disk-backed, append-only type that combines queue and log semantics. byte slices can be appended and read back in-order. The queue maintains a pointer to the current head byte slice and can re-read from the head until it has been advanced.

Internally, the queue writes byte slices to multiple segment files so that disk space can be reclaimed. When a segment file is larger than the max segment size, a new file is created. Segments are removed after their head pointer has advanced past the last entry. The first segment is the head, and the last segment is the tail. Reads are from the head segment and writes tail segment.

queues can have a max size configured such that when the size of all segments on disk exceeds the size, write will fail.

┌─────┐ │Head │ ├─────┘ │ ▼ ┌─────────────────┐ ┌─────────────────┐┌─────────────────┐ │Segment 1 - 10MB │ │Segment 2 - 10MB ││Segment 3 - 10MB │ └─────────────────┘ └─────────────────┘└─────────────────┘

      ▲
      │
      │
┌─────┤
│Tail │
└─────┘

func NewDiskBackedQueue added in v1.4.4

func NewDiskBackedQueue(dir string, opts ...DiskBackedQueueOption) *DiskBackedQueue

NewDiskBackedQueue create a queue that will store segments in dir and that will consume more than maxSize on disk.

func (*DiskBackedQueue) Advance added in v1.4.4

func (l *DiskBackedQueue) Advance() error

Advance moves the head point to the next byte slice in the queue

func (*DiskBackedQueue) Append added in v1.4.4

func (l *DiskBackedQueue) Append(b []byte) error

Append appends a byte slice to the end of the queue

func (*DiskBackedQueue) Close added in v1.4.4

func (l *DiskBackedQueue) Close() error

Close stops the queue for reading and writing

func (*DiskBackedQueue) Current added in v1.4.4

func (l *DiskBackedQueue) Current() ([]byte, error)

Current returns the current byte slice at the head of the queue

func (*DiskBackedQueue) DiskUsage added in v1.4.4

func (l *DiskBackedQueue) DiskUsage() int64

DiskUsage returns the total size on disk used by the queue

func (*DiskBackedQueue) LastModified added in v1.4.4

func (l *DiskBackedQueue) LastModified() (time.Time, error)

LastModified returns the last time the queue was modified.

func (*DiskBackedQueue) Open added in v1.4.4

func (l *DiskBackedQueue) Open() error

Open opens the queue for reading and writing

func (*DiskBackedQueue) Position added in v1.4.4

func (l *DiskBackedQueue) Position() (*queuePos, error)

func (*DiskBackedQueue) PurgeOlderThan added in v1.4.4

func (l *DiskBackedQueue) PurgeOlderThan(when time.Time) error

func (*DiskBackedQueue) Remove added in v1.4.4

func (l *DiskBackedQueue) Remove() error

Remove removes all underlying file-based resources for the queue. It is an error to call this on an open queue.

func (*DiskBackedQueue) SetMaxSegmentSize added in v1.4.4

func (l *DiskBackedQueue) SetMaxSegmentSize(size int64) error

SetMaxSegmentSize updates the max segment size for new and existing segments.

type DiskBackedQueueOption added in v1.4.4

type DiskBackedQueueOption func(*DiskBackedQueue)

func WithDiskBackedQueueMaxSize added in v1.4.4

func WithDiskBackedQueueMaxSize(size int64) DiskBackedQueueOption

type Queue

type Queue interface {
	Advance() error
	Append([]byte) error
	Close() error
	Current() ([]byte, error)
	DiskUsage() int64
	Open() error
	PurgeOlderThan(time.Time) error
	SetMaxSegmentSize(size int64) error
}

type WriteReplayer

type WriteReplayer struct {
	PurgeInterval    time.Duration // Interval between periodic purge checks
	RetryInterval    time.Duration // Interval between periodic write-to-node attempts.
	RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts.
	MaxAge           time.Duration // Maximum age queue data can get before purging.
	RetryRateLimit   int64         // Limits the rate data is sent to node.
	// contains filtered or unexported fields
}

WriteReplayer encapsulates a queue of hinted-handoff data for a node, and the transmission of the data to the node.

func NewWriteReplayer

func NewWriteReplayer(queue Queue, c tcpClient, metrics metrics.Registrar, targetNodeIndex string, opts ...WriteReplayerOption) *WriteReplayer

NewWriteReplayer returns a new WriteReplayer for the given node, using dir for the hinted-handoff data.

func (*WriteReplayer) Open

func (w *WriteReplayer) Open(done chan struct{}) error

Open opens the WriteReplayer. It will read and write data present in dir, and start transmitting data to the node. A WriteReplayer must be opened before it can accept hinted data.

func (*WriteReplayer) SendWrite

func (w *WriteReplayer) SendWrite() (int, error)

SendWrite attempts to sent the current block of hinted data to the target node. If successful, it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF when there is no more data or the node is inactive.

func (*WriteReplayer) Write

func (w *WriteReplayer) Write(points []*rpc.Point) error

type WriteReplayerOption

type WriteReplayerOption func(*WriteReplayer)

func WithWriteReplayerLogger

func WithWriteReplayerLogger(log *logger.Logger) WriteReplayerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL