pipeline

package
v0.0.0-...-ee1b036 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2016 License: Apache-2.0 Imports: 9 Imported by: 1

Documentation

Overview

Package pipeline implements a low-level pipelined API for SLO uploads.

You should only use this package if you need a customized upload pipeline. The github.com/ibmjstart/swiftlygo.SloUploader should be sufficient for basic uploads.

Most of the functions defined in this package are stages in an upload pipeline that communicate with channels of type FileChunk. A FileChunk is a representation of a chunk of a file that is being uploaded, but is also how we represent SLO manifests before uploading them.

To use the pipeline, either start with the BuildChunks source that creates a channel of FileChunks or make your own data source. Pass channels of FileChunks to each stage, and use the return value of one stage as input to the next.

The API expects an errors channel to be passed to most stages that will allow it to report nonfatal errors. It is generally sufficient to create a single errors channel and pass it to all stages. Ensure that you drain the errors channel though, or your pipeline will block on the first error that it encounters.

Index

Constants

This section is empty.

Variables

View Source
var UploadBufferSize uint = 1024 * 4

UploadBufferSize is the size of the data buffer that each ReadHashAndUpload goroutine will use to read data from the hard drive. This works best as a multiple of the hard drive sector size for an internal hard drive. If using a network-mounted hard drive, some experimentation may be needed to find an optimal value.

View Source
var UploadMaxAttempts uint = 5

UploadMaxAttempts is the number of times that each ReadHashAndUpload goroutine will retry a failing upload before moving on to the next one.

View Source
var UploadRetryBaseWait time.Duration = time.Second

UploadRetryBaseWait is the shortest time unit that each ReadHashAndUpload goroutine will wait between upload attempts.

Functions

func BuildChunks

func BuildChunks(dataSize, chunkSize uint) (<-chan FileChunk, uint)

BuildChunks sends back a channel of FileChunk structs each with Size of chunkSize or less and each with its Number field set sequentially from 0 upward. It also returns the number of chunks that it will yield on the channel. The Size of each chunk will be less than chunkSize when the final chunk doesn't need to be chunkSize to contain the remainder of the data. Both dataSize and chunkSize need to be greater than zero, and chunkSize must not be larger than dataSize

func Consume

func Consume(channel <-chan FileChunk)

Consume reads the channel until it is empty, consigning its contents to the void.

func Containerizer

func Containerizer(chunks <-chan FileChunk, errors chan<- error, container string) <-chan FileChunk

Containerizer assigns each FileChunk the provided container.

func Counter

func Counter(chunks <-chan FileChunk) (<-chan FileChunk, <-chan Count)

Counter provides basic information on the data that passes through it. Be careful to read the outbound Count channel to prevent blocking the flow of data through it.

func Divide

func Divide(chunks <-chan FileChunk, divisor uint) []chan FileChunk

Divide distributes the input channel across divisor new channels, which are returned in a slice.

func Filter

func Filter(chunks <-chan FileChunk, errors chan<- error, filter func(FileChunk) (bool, error)) <-chan FileChunk

Filter applies the provided closure to every FileChunk, passing on only FileChunks that satisfy the closure's boolean output. If the closure returns an error, that will be passed on the errors channel.

func Fork

func Fork(chunks <-chan FileChunk) (<-chan FileChunk, <-chan FileChunk)

Fork copies the input to two output channels, allowing a pipeline to diverge.

func HashData deprecated

func HashData(chunks <-chan FileChunk, errors chan<- error) <-chan FileChunk

HashData attaches the hash of a FileChunk's data. Do not give it FileChunks without Data attached. It returns errors if you do.

Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.

func Join

func Join(chans ...<-chan FileChunk) <-chan FileChunk

Join performs a fan-in on the many input channels to combine their data into output channel.

func Json

func Json(chunks <-chan FileChunk, errors chan<- error) <-chan []byte

Json converts the incoming FileChunks into JSON, sending any conversion errors back on its errors channel.

func ManifestBuilder

func ManifestBuilder(chunks <-chan FileChunk, errors chan<- error) <-chan FileChunk

ManifestBuilder accepts FileChunks and creates SLO manifests out of them. If there are more than 1000 chunks, it will emit multiple FileChunks, each of which contains an SLO manifest for that region of the file. The FileChunks that are emitted have a Number (which is their manifest number), Data (the JSON of the manifest), and a Size (number of bytes in manifest JSON). They will need to be assigned and Object and Container before they can be uploaded.

func Map

func Map(chunks <-chan FileChunk, errors chan<- error, operation func(FileChunk) (FileChunk, error)) <-chan FileChunk

Map applies the provided operation to each chunk that passes through it. It sends errors from the operation to the errors channel, and will not send on a FileChunk that caused an error in the operation.

func ObjectNamer

func ObjectNamer(chunks <-chan FileChunk, errors chan<- error, nameFormat string) <-chan FileChunk

ObjectNamer assigns names to objects based on their Size and Number. Use a Printf style string to format the names, and use %[1]d to refer to the Number and %[2]d to refer to the size.

func ReadData deprecated

func ReadData(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt) <-chan FileChunk

ReadData populates the FileChunk structs that come in on the chunks channel with the data from the dataSource corresponding to that chunk's region of the file and sends its errors back on the errors channel. In order to work ReadData needs chunks with the Size and Offset properties set.

Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.

func ReadHashAndUpload

func ReadHashAndUpload(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt, dest auth.Destination) <-chan FileChunk

ReadHashAndUpload reads the data, performs the hash, and uploads it. Its monolithic design isn't very modular, but it reads the file and discards the data within a single function, which saves a lot of memory. Use this if memory footprint is a major concern. ReadHashAndUpload requires that incoming chunks have the Size, Number, Offset, Object, and Container properties already set.

func Separate

func Separate(chunks <-chan FileChunk, errors chan<- error, condition func(FileChunk) (bool, error)) (<-chan FileChunk, <-chan FileChunk)

Separate divides the input channel into two output channels based on some condition. If the condition is true, the current chunk goes to the first output channel, otherwise it goes to the second.

func UploadData deprecated

func UploadData(chunks <-chan FileChunk, errors chan<- error, dest auth.Destination, retryWait time.Duration) <-chan FileChunk

UploadData sends FileChunks to object storage via the provided destination. It places the objects in their Container with their Object name and checks the md5 of the upload, retrying on failure. It requires all fields of the FileChunk to be filled out before attempting an upload, and will send errors if it encountes FileChunks with missing fields. The retry wait is the base wait before a retry is attempted.

Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.

func UploadManifests

func UploadManifests(manifests <-chan FileChunk, errors chan<- error, dest auth.Destination) <-chan FileChunk

UploadManifests treats the incoming FileChunks as manifests and uploads them with the special SLO manifest headers.

Types

type Count

type Count struct {
	Bytes   uint
	Chunks  uint
	Elapsed time.Duration
}

Count represents basic statistics about the data that has passed through a Counter pipeline stage. It records the total Bytes of data that it has seen, as well as the number of chunks and the duration since the associated Counter stage was started. This information can be used to calculate statistics about the pipeline's performance, especially when multiple counters in different pipeline regions are employed.

func (Count) Rate

func (c Count) Rate() float64

Rate returns the rate of data flow in bytes per second

func (Count) RateKBPS

func (c Count) RateKBPS() float64

RateKBPS returns the rate of data flow in kilobytes per second

func (Count) RateKiBPS

func (c Count) RateKiBPS() float64

RateKiBPS returns the rate of data flow in kibibytes per second

func (Count) RateMBPS

func (c Count) RateMBPS() float64

RateMBPS returns the rate of data flow in megabytes per second

func (Count) RateMiBPS

func (c Count) RateMiBPS() float64

RateMiBPS returns the rate of data flow in mebibytes per second

type FileChunk

type FileChunk struct {
	Number    uint
	Object    string
	Container string
	Hash      string
	Data      []byte
	Size      uint
	Offset    uint
}

FileChunk represents a single region of a file.

Number respresents how many chunks into a given file that this chunk is Object is the name that this FileChunk will bear within object storage Container is the object storage Container that this chunk will be uploaded into Hash is the md5 sum of this FileChunk Data is a slice of the original file of length Size Size is the length of the Data slice if the FileChunk represents a normal file chunk

or it could be the apparent size of the manifest, if it represents a manifest file

Offset is the index of the first byte in the file that is included in Data

func (FileChunk) MarshalJSON

func (f FileChunk) MarshalJSON() ([]byte, error)

MarshalJSON defines the transformation from a FileChunk to an SLO manifest entry

func (FileChunk) Path

func (f FileChunk) Path() string

Path returns the path that this FileChunks will be uploaded to in object storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL