transfer

package
v0.0.0-...-ac41614 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2022 License: MIT Imports: 17 Imported by: 3

Documentation

Index

Constants

View Source
const DefaultChunkSize = 100
View Source
const DefaultEndpointURL = "/partitions/%d/keys"
View Source
const (
	DefaultScanBufferSize = 100
)
View Source
const RetryTimeoutMax = 32

Variables

View Source
var EBadResponse = errors.New("Node responded with a bad response")
View Source
var EEntryChecksum = errors.New("Unable to reproduce the checksum for the entry in the partition chunk")
View Source
var ETransferCancelled = errors.New("Cancelled")

Functions

func ChecksumEntries

func ChecksumEntries(entries []Entry) Hash

Types

type Canceler

type Canceler struct {
	Cancel func()
}

type Downloader

type Downloader struct {
	// contains filtered or unexported fields
}

func NewDownloader

func NewDownloader(configController ClusterConfigController, transferTransport PartitionTransferTransport, transferPartnerStrategy PartitionTransferPartnerStrategy, transferFactory PartitionTransferFactory, partitionPool PartitionPool) *Downloader

func (*Downloader) CancelDownload

func (downloader *Downloader) CancelDownload(partition uint64)

Important! This should only be called by a transfer agent if all transfer proposals waiting for this download have been cancelled first

func (*Downloader) Download

func (downloader *Downloader) Download(partition uint64) <-chan int

func (*Downloader) IsDownloading

func (downloader *Downloader) IsDownloading(partition uint64) bool

func (*Downloader) OnDownloadStop

func (downloader *Downloader) OnDownloadStop(cb func(partition uint64))

A callback that will be invoked after a download for a partition is cancelled or completed. Used only for tooling in order to test the flow of the downloader code

func (*Downloader) OnPanic

func (downloader *Downloader) OnPanic(cb func(p interface{}))

A callback that will be invoked if there is a panic that occurs while writing keys from a transfer. Used only for tooling in order to test the flow of the downloader code

func (*Downloader) Reset

func (downloader *Downloader) Reset(partition uint64)

type Entry

type Entry struct {
	Site   string
	Bucket string
	Key    string
	Value  *SiblingSet
}

type EntryFilter

type EntryFilter func(Entry) bool

type HTTPTransferAgent

type HTTPTransferAgent struct {
	// contains filtered or unexported fields
}

func NewDefaultHTTPTransferAgent

func NewDefaultHTTPTransferAgent(configController ClusterConfigController, partitionPool PartitionPool) *HTTPTransferAgent

An easy constructor

func NewHTTPTransferAgent

func NewHTTPTransferAgent(configController ClusterConfigController, transferProposer PartitionTransferProposer, partitionDownloader PartitionDownloader, transferFactory PartitionTransferFactory, partitionPool PartitionPool) *HTTPTransferAgent

func (*HTTPTransferAgent) Attach

func (transferAgent *HTTPTransferAgent) Attach(router *mux.Router)

func (*HTTPTransferAgent) DisableAllOutgoingTransfers

func (transferAgent *HTTPTransferAgent) DisableAllOutgoingTransfers()

func (*HTTPTransferAgent) DisableOutgoingTransfers

func (transferAgent *HTTPTransferAgent) DisableOutgoingTransfers(partition uint64)

func (*HTTPTransferAgent) EnableOutgoingTransfers

func (transferAgent *HTTPTransferAgent) EnableOutgoingTransfers(partition uint64)

func (*HTTPTransferAgent) StartTransfer

func (transferAgent *HTTPTransferAgent) StartTransfer(partition uint64, replica uint64)

func (*HTTPTransferAgent) StopAllTransfers

func (transferAgent *HTTPTransferAgent) StopAllTransfers()

func (*HTTPTransferAgent) StopTransfer

func (transferAgent *HTTPTransferAgent) StopTransfer(partition uint64, replica uint64)

type HTTPTransferTransport

type HTTPTransferTransport struct {
	// contains filtered or unexported fields
}

func NewHTTPTransferTransport

func NewHTTPTransferTransport(configController ClusterConfigController, httpClient *http.Client) *HTTPTransferTransport

func (*HTTPTransferTransport) Get

func (transferTransport *HTTPTransferTransport) Get(nodeID uint64, partition uint64) (io.Reader, func(), error)

func (*HTTPTransferTransport) SetEndpointURL

func (transferTransport *HTTPTransferTransport) SetEndpointURL(endpointURL string) *HTTPTransferTransport

type IncomingTransfer

type IncomingTransfer struct {
	// contains filtered or unexported fields
}

func NewIncomingTransfer

func NewIncomingTransfer(reader io.Reader) *IncomingTransfer

func (*IncomingTransfer) Cancel

func (transfer *IncomingTransfer) Cancel()

func (*IncomingTransfer) NextChunk

func (transfer *IncomingTransfer) NextChunk() (PartitionChunk, error)

func (*IncomingTransfer) UseFilter

func (transfer *IncomingTransfer) UseFilter(entryFilter EntryFilter)

type JSONPartitionReader

type JSONPartitionReader struct {
	PartitionTransfer PartitionTransfer
	// contains filtered or unexported fields
}

func (*JSONPartitionReader) Read

func (partitionReader *JSONPartitionReader) Read(p []byte) (n int, err error)

type OutgoingTransfer

type OutgoingTransfer struct {
	// contains filtered or unexported fields
}

func NewOutgoingTransfer

func NewOutgoingTransfer(partition Partition, chunkSize int) *OutgoingTransfer

func (*OutgoingTransfer) Cancel

func (transfer *OutgoingTransfer) Cancel()

func (*OutgoingTransfer) NextChunk

func (transfer *OutgoingTransfer) NextChunk() (PartitionChunk, error)

func (*OutgoingTransfer) UseFilter

func (transfer *OutgoingTransfer) UseFilter(entryFilter EntryFilter)

type PartitionChunk

type PartitionChunk struct {
	Index    uint64
	Entries  []Entry
	Checksum Hash
}

func (*PartitionChunk) IsEmpty

func (partitionChunk *PartitionChunk) IsEmpty() bool

type PartitionDownloader

type PartitionDownloader interface {
	// Starts the download process for a partition if it is not yet downloaded and
	// there isn't yet a download occurring for that partition.
	// Returns a channel that closes when the download is complete
	// If the download is successful all future calls to Download for that partition
	// should return that closed channel until CancelDownload is called
	// which resets it
	Download(partition uint64) <-chan int
	// Resets the downloader's internal state for this partition. Next time Download() is called
	// for this partition it should start a new download
	Reset(partition uint64)
	// Returns a boolean indicating whether or not a download is in progress
	// for this partition
	IsDownloading(partition uint64) bool
	// Cancels any download in progress. Resets internal state so next
	// call to Download for a partition will start a new download and
	// return a new after channel
	CancelDownload(partition uint64)
}

type PartitionTransfer

type PartitionTransfer interface {
	NextChunk() (PartitionChunk, error)
	UseFilter(EntryFilter)
	Cancel()
}

type PartitionTransferAgent

type PartitionTransferAgent interface {
	// Tell the partition transfer agent to start the holdership transfer process for this partition replica
	StartTransfer(partition uint64, replica uint64)
	// Tell the partition transfer agent to stop any holdership transfer processes for this partition replica
	StopTransfer(partition uint64, replica uint64)
	// Stop all holdership transfers for all partition replicas
	StopAllTransfers()
	// Allow downloads of this partition from this node
	EnableOutgoingTransfers(partition uint64)
	// Disallow future downloads of this partition from this node and cancel any currently running ones
	DisableOutgoingTransfers(partition uint64)
	// Disallow future downloads of all partition from this node and cancel any currently running ones
	DisableAllOutgoingTransfers()
}

type PartitionTransferDecoder

type PartitionTransferDecoder interface {
	Decode() (PartitionTransfer, error)
}

type PartitionTransferEncoder

type PartitionTransferEncoder interface {
	Encode() (io.Reader, error)
}

type PartitionTransferFactory

type PartitionTransferFactory interface {
	CreateIncomingTransfer(reader io.Reader) PartitionTransfer
	CreateOutgoingTransfer(partition Partition) (PartitionTransfer, error)
}

type PartitionTransferPartnerStrategy

type PartitionTransferPartnerStrategy interface {
	ChooseTransferPartner(partition uint64) uint64
}

type PartitionTransferProposer

type PartitionTransferProposer interface {
	QueueTransferProposal(partition uint64, replica uint64, after <-chan int) <-chan error
	CancelTransferProposal(partition uint64, replica uint64)
	CancelTransferProposals(partition uint64)
	PendingProposals(partition uint64) int
	QueuedProposals() map[uint64]map[uint64]bool
}

type PartitionTransferTransport

type PartitionTransferTransport interface {
	Get(nodeID uint64, partition uint64) (io.Reader, func(), error)
}

type RandomTransferPartnerStrategy

type RandomTransferPartnerStrategy struct {
	// contains filtered or unexported fields
}

func NewRandomTransferPartnerStrategy

func NewRandomTransferPartnerStrategy(configController ClusterConfigController) *RandomTransferPartnerStrategy

func (*RandomTransferPartnerStrategy) ChooseTransferPartner

func (partnerStrategy *RandomTransferPartnerStrategy) ChooseTransferPartner(partition uint64) uint64

type TransferDecoder

type TransferDecoder struct {
	// contains filtered or unexported fields
}

func NewTransferDecoder

func NewTransferDecoder(reader io.Reader) *TransferDecoder

func (*TransferDecoder) Decode

func (decoder *TransferDecoder) Decode() (PartitionTransfer, error)

type TransferEncoder

type TransferEncoder struct {
	// contains filtered or unexported fields
}

func NewTransferEncoder

func NewTransferEncoder(transfer PartitionTransfer) *TransferEncoder

func (*TransferEncoder) Encode

func (encoder *TransferEncoder) Encode() (io.Reader, error)

type TransferFactory

type TransferFactory struct {
}

func (*TransferFactory) CreateIncomingTransfer

func (transferFactory *TransferFactory) CreateIncomingTransfer(reader io.Reader) PartitionTransfer

func (*TransferFactory) CreateOutgoingTransfer

func (transferFactory *TransferFactory) CreateOutgoingTransfer(partition Partition) (PartitionTransfer, error)

type TransferProposer

type TransferProposer struct {
	// contains filtered or unexported fields
}

func NewTransferProposer

func NewTransferProposer(configController ClusterConfigController) *TransferProposer

func (*TransferProposer) CancelTransferProposal

func (transferProposer *TransferProposer) CancelTransferProposal(partition uint64, replica uint64)

func (*TransferProposer) CancelTransferProposals

func (transferProposer *TransferProposer) CancelTransferProposals(partition uint64)

func (*TransferProposer) PendingProposals

func (transferProposer *TransferProposer) PendingProposals(partition uint64) int

func (*TransferProposer) QueueTransferProposal

func (transferProposer *TransferProposer) QueueTransferProposal(partition uint64, replica uint64, after <-chan int) <-chan error

func (*TransferProposer) QueuedProposals

func (transferProposer *TransferProposer) QueuedProposals() map[uint64]map[uint64]bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL