Documentation ¶
Index ¶
- Constants
- Variables
- func ChecksumEntries(entries []Entry) Hash
- type Canceler
- type Downloader
- func (downloader *Downloader) CancelDownload(partition uint64)
- func (downloader *Downloader) Download(partition uint64) <-chan int
- func (downloader *Downloader) IsDownloading(partition uint64) bool
- func (downloader *Downloader) OnDownloadStop(cb func(partition uint64))
- func (downloader *Downloader) OnPanic(cb func(p interface{}))
- func (downloader *Downloader) Reset(partition uint64)
- type Entry
- type EntryFilter
- type HTTPTransferAgent
- func (transferAgent *HTTPTransferAgent) Attach(router *mux.Router)
- func (transferAgent *HTTPTransferAgent) DisableAllOutgoingTransfers()
- func (transferAgent *HTTPTransferAgent) DisableOutgoingTransfers(partition uint64)
- func (transferAgent *HTTPTransferAgent) EnableOutgoingTransfers(partition uint64)
- func (transferAgent *HTTPTransferAgent) StartTransfer(partition uint64, replica uint64)
- func (transferAgent *HTTPTransferAgent) StopAllTransfers()
- func (transferAgent *HTTPTransferAgent) StopTransfer(partition uint64, replica uint64)
- type HTTPTransferTransport
- type IncomingTransfer
- type JSONPartitionReader
- type OutgoingTransfer
- type PartitionChunk
- type PartitionDownloader
- type PartitionTransfer
- type PartitionTransferAgent
- type PartitionTransferDecoder
- type PartitionTransferEncoder
- type PartitionTransferFactory
- type PartitionTransferPartnerStrategy
- type PartitionTransferProposer
- type PartitionTransferTransport
- type RandomTransferPartnerStrategy
- type TransferDecoder
- type TransferEncoder
- type TransferFactory
- type TransferProposer
- func (transferProposer *TransferProposer) CancelTransferProposal(partition uint64, replica uint64)
- func (transferProposer *TransferProposer) CancelTransferProposals(partition uint64)
- func (transferProposer *TransferProposer) PendingProposals(partition uint64) int
- func (transferProposer *TransferProposer) QueueTransferProposal(partition uint64, replica uint64, after <-chan int) <-chan error
- func (transferProposer *TransferProposer) QueuedProposals() map[uint64]map[uint64]bool
Constants ¶
View Source
const DefaultChunkSize = 100
View Source
const DefaultEndpointURL = "/partitions/%d/keys"
View Source
const (
DefaultScanBufferSize = 100
)
View Source
const RetryTimeoutMax = 32
Variables ¶
View Source
var EBadResponse = errors.New("Node responded with a bad response")
View Source
var EEntryChecksum = errors.New("Unable to reproduce the checksum for the entry in the partition chunk")
View Source
var ETransferCancelled = errors.New("Cancelled")
Functions ¶
func ChecksumEntries ¶
func ChecksumEntries(entries []Entry) Hash
Types ¶
type Downloader ¶
type Downloader struct {
// contains filtered or unexported fields
}
func NewDownloader ¶
func NewDownloader(configController ClusterConfigController, transferTransport PartitionTransferTransport, transferPartnerStrategy PartitionTransferPartnerStrategy, transferFactory PartitionTransferFactory, partitionPool PartitionPool) *Downloader
func (*Downloader) CancelDownload ¶
func (downloader *Downloader) CancelDownload(partition uint64)
Important! This should only be called by a transfer agent if all transfer proposals waiting for this download have been cancelled first
func (*Downloader) Download ¶
func (downloader *Downloader) Download(partition uint64) <-chan int
func (*Downloader) IsDownloading ¶
func (downloader *Downloader) IsDownloading(partition uint64) bool
func (*Downloader) OnDownloadStop ¶
func (downloader *Downloader) OnDownloadStop(cb func(partition uint64))
A callback that will be invoked after a download for a partition is cancelled or completed. Used only for tooling in order to test the flow of the downloader code
func (*Downloader) OnPanic ¶
func (downloader *Downloader) OnPanic(cb func(p interface{}))
A callback that will be invoked if there is a panic that occurs while writing keys from a transfer. Used only for tooling in order to test the flow of the downloader code
func (*Downloader) Reset ¶
func (downloader *Downloader) Reset(partition uint64)
type EntryFilter ¶
type HTTPTransferAgent ¶
type HTTPTransferAgent struct {
// contains filtered or unexported fields
}
func NewDefaultHTTPTransferAgent ¶
func NewDefaultHTTPTransferAgent(configController ClusterConfigController, partitionPool PartitionPool) *HTTPTransferAgent
An easy constructor
func NewHTTPTransferAgent ¶
func NewHTTPTransferAgent(configController ClusterConfigController, transferProposer PartitionTransferProposer, partitionDownloader PartitionDownloader, transferFactory PartitionTransferFactory, partitionPool PartitionPool) *HTTPTransferAgent
func (*HTTPTransferAgent) Attach ¶
func (transferAgent *HTTPTransferAgent) Attach(router *mux.Router)
func (*HTTPTransferAgent) DisableAllOutgoingTransfers ¶
func (transferAgent *HTTPTransferAgent) DisableAllOutgoingTransfers()
func (*HTTPTransferAgent) DisableOutgoingTransfers ¶
func (transferAgent *HTTPTransferAgent) DisableOutgoingTransfers(partition uint64)
func (*HTTPTransferAgent) EnableOutgoingTransfers ¶
func (transferAgent *HTTPTransferAgent) EnableOutgoingTransfers(partition uint64)
func (*HTTPTransferAgent) StartTransfer ¶
func (transferAgent *HTTPTransferAgent) StartTransfer(partition uint64, replica uint64)
func (*HTTPTransferAgent) StopAllTransfers ¶
func (transferAgent *HTTPTransferAgent) StopAllTransfers()
func (*HTTPTransferAgent) StopTransfer ¶
func (transferAgent *HTTPTransferAgent) StopTransfer(partition uint64, replica uint64)
type HTTPTransferTransport ¶
type HTTPTransferTransport struct {
// contains filtered or unexported fields
}
func NewHTTPTransferTransport ¶
func NewHTTPTransferTransport(configController ClusterConfigController, httpClient *http.Client) *HTTPTransferTransport
func (*HTTPTransferTransport) SetEndpointURL ¶
func (transferTransport *HTTPTransferTransport) SetEndpointURL(endpointURL string) *HTTPTransferTransport
type IncomingTransfer ¶
type IncomingTransfer struct {
// contains filtered or unexported fields
}
func NewIncomingTransfer ¶
func NewIncomingTransfer(reader io.Reader) *IncomingTransfer
func (*IncomingTransfer) Cancel ¶
func (transfer *IncomingTransfer) Cancel()
func (*IncomingTransfer) NextChunk ¶
func (transfer *IncomingTransfer) NextChunk() (PartitionChunk, error)
func (*IncomingTransfer) UseFilter ¶
func (transfer *IncomingTransfer) UseFilter(entryFilter EntryFilter)
type JSONPartitionReader ¶
type JSONPartitionReader struct { PartitionTransfer PartitionTransfer // contains filtered or unexported fields }
type OutgoingTransfer ¶
type OutgoingTransfer struct {
// contains filtered or unexported fields
}
func NewOutgoingTransfer ¶
func NewOutgoingTransfer(partition Partition, chunkSize int) *OutgoingTransfer
func (*OutgoingTransfer) Cancel ¶
func (transfer *OutgoingTransfer) Cancel()
func (*OutgoingTransfer) NextChunk ¶
func (transfer *OutgoingTransfer) NextChunk() (PartitionChunk, error)
func (*OutgoingTransfer) UseFilter ¶
func (transfer *OutgoingTransfer) UseFilter(entryFilter EntryFilter)
type PartitionChunk ¶
func (*PartitionChunk) IsEmpty ¶
func (partitionChunk *PartitionChunk) IsEmpty() bool
type PartitionDownloader ¶
type PartitionDownloader interface { // Starts the download process for a partition if it is not yet downloaded and // there isn't yet a download occurring for that partition. // Returns a channel that closes when the download is complete // If the download is successful all future calls to Download for that partition // should return that closed channel until CancelDownload is called // which resets it Download(partition uint64) <-chan int // Resets the downloader's internal state for this partition. Next time Download() is called // for this partition it should start a new download Reset(partition uint64) // Returns a boolean indicating whether or not a download is in progress // for this partition IsDownloading(partition uint64) bool // Cancels any download in progress. Resets internal state so next // call to Download for a partition will start a new download and // return a new after channel CancelDownload(partition uint64) }
type PartitionTransfer ¶
type PartitionTransfer interface { NextChunk() (PartitionChunk, error) UseFilter(EntryFilter) Cancel() }
type PartitionTransferAgent ¶
type PartitionTransferAgent interface { // Tell the partition transfer agent to start the holdership transfer process for this partition replica StartTransfer(partition uint64, replica uint64) // Tell the partition transfer agent to stop any holdership transfer processes for this partition replica StopTransfer(partition uint64, replica uint64) // Stop all holdership transfers for all partition replicas StopAllTransfers() // Allow downloads of this partition from this node EnableOutgoingTransfers(partition uint64) // Disallow future downloads of this partition from this node and cancel any currently running ones DisableOutgoingTransfers(partition uint64) // Disallow future downloads of all partition from this node and cancel any currently running ones DisableAllOutgoingTransfers() }
type PartitionTransferDecoder ¶
type PartitionTransferDecoder interface {
Decode() (PartitionTransfer, error)
}
type PartitionTransferFactory ¶
type PartitionTransferFactory interface { CreateIncomingTransfer(reader io.Reader) PartitionTransfer CreateOutgoingTransfer(partition Partition) (PartitionTransfer, error) }
type PartitionTransferProposer ¶
type PartitionTransferProposer interface { QueueTransferProposal(partition uint64, replica uint64, after <-chan int) <-chan error CancelTransferProposal(partition uint64, replica uint64) CancelTransferProposals(partition uint64) PendingProposals(partition uint64) int QueuedProposals() map[uint64]map[uint64]bool }
type RandomTransferPartnerStrategy ¶
type RandomTransferPartnerStrategy struct {
// contains filtered or unexported fields
}
func NewRandomTransferPartnerStrategy ¶
func NewRandomTransferPartnerStrategy(configController ClusterConfigController) *RandomTransferPartnerStrategy
func (*RandomTransferPartnerStrategy) ChooseTransferPartner ¶
func (partnerStrategy *RandomTransferPartnerStrategy) ChooseTransferPartner(partition uint64) uint64
type TransferDecoder ¶
type TransferDecoder struct {
// contains filtered or unexported fields
}
func NewTransferDecoder ¶
func NewTransferDecoder(reader io.Reader) *TransferDecoder
func (*TransferDecoder) Decode ¶
func (decoder *TransferDecoder) Decode() (PartitionTransfer, error)
type TransferEncoder ¶
type TransferEncoder struct {
// contains filtered or unexported fields
}
func NewTransferEncoder ¶
func NewTransferEncoder(transfer PartitionTransfer) *TransferEncoder
type TransferFactory ¶
type TransferFactory struct { }
func (*TransferFactory) CreateIncomingTransfer ¶
func (transferFactory *TransferFactory) CreateIncomingTransfer(reader io.Reader) PartitionTransfer
func (*TransferFactory) CreateOutgoingTransfer ¶
func (transferFactory *TransferFactory) CreateOutgoingTransfer(partition Partition) (PartitionTransfer, error)
type TransferProposer ¶
type TransferProposer struct {
// contains filtered or unexported fields
}
func NewTransferProposer ¶
func NewTransferProposer(configController ClusterConfigController) *TransferProposer
func (*TransferProposer) CancelTransferProposal ¶
func (transferProposer *TransferProposer) CancelTransferProposal(partition uint64, replica uint64)
func (*TransferProposer) CancelTransferProposals ¶
func (transferProposer *TransferProposer) CancelTransferProposals(partition uint64)
func (*TransferProposer) PendingProposals ¶
func (transferProposer *TransferProposer) PendingProposals(partition uint64) int
func (*TransferProposer) QueueTransferProposal ¶
func (transferProposer *TransferProposer) QueueTransferProposal(partition uint64, replica uint64, after <-chan int) <-chan error
func (*TransferProposer) QueuedProposals ¶
func (transferProposer *TransferProposer) QueuedProposals() map[uint64]map[uint64]bool
Click to show internal directories.
Click to hide internal directories.