Documentation ¶
Overview ¶
Package pipeline implements a low-level pipelined API for SLO uploads.
You should only use this package if you need a customized upload pipeline. The github.com/ibmjstart/swiftlygo.SloUploader should be sufficient for basic uploads.
Most of the functions defined in this package are stages in an upload pipeline that communicate with channels of type FileChunk. A FileChunk is a representation of a chunk of a file that is being uploaded, but is also how we represent SLO manifests before uploading them.
To use the pipeline, either start with the BuildChunks source that creates a channel of FileChunks or make your own data source. Pass channels of FileChunks to each stage, and use the return value of one stage as input to the next.
The API expects an errors channel to be passed to most stages that will allow it to report nonfatal errors. It is generally sufficient to create a single errors channel and pass it to all stages. Ensure that you drain the errors channel though, or your pipeline will block on the first error that it encounters.
Index ¶
- Variables
- func BuildChunks(dataSize, chunkSize uint) (<-chan FileChunk, uint)
- func Consume(channel <-chan FileChunk)
- func Containerizer(chunks <-chan FileChunk, errors chan<- error, container string) <-chan FileChunk
- func Counter(chunks <-chan FileChunk) (<-chan FileChunk, <-chan Count)
- func Divide(chunks <-chan FileChunk, divisor uint) []chan FileChunk
- func Filter(chunks <-chan FileChunk, errors chan<- error, ...) <-chan FileChunk
- func Fork(chunks <-chan FileChunk) (<-chan FileChunk, <-chan FileChunk)
- func HashData(chunks <-chan FileChunk, errors chan<- error) <-chan FileChunkdeprecated
- func Join(chans ...<-chan FileChunk) <-chan FileChunk
- func Json(chunks <-chan FileChunk, errors chan<- error) <-chan []byte
- func ManifestBuilder(chunks <-chan FileChunk, errors chan<- error) <-chan FileChunk
- func Map(chunks <-chan FileChunk, errors chan<- error, ...) <-chan FileChunk
- func ObjectNamer(chunks <-chan FileChunk, errors chan<- error, nameFormat string) <-chan FileChunk
- func ReadData(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt) <-chan FileChunkdeprecated
- func ReadHashAndUpload(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt, ...) <-chan FileChunk
- func Separate(chunks <-chan FileChunk, errors chan<- error, ...) (<-chan FileChunk, <-chan FileChunk)
- func UploadData(chunks <-chan FileChunk, errors chan<- error, dest auth.Destination, ...) <-chan FileChunkdeprecated
- func UploadManifests(manifests <-chan FileChunk, errors chan<- error, dest auth.Destination) <-chan FileChunk
- type Count
- type FileChunk
Constants ¶
This section is empty.
Variables ¶
var UploadBufferSize uint = 1024 * 4
UploadBufferSize is the size of the data buffer that each ReadHashAndUpload goroutine will use to read data from the hard drive. This works best as a multiple of the hard drive sector size for an internal hard drive. If using a network-mounted hard drive, some experimentation may be needed to find an optimal value.
var UploadMaxAttempts uint = 5
UploadMaxAttempts is the number of times that each ReadHashAndUpload goroutine will retry a failing upload before moving on to the next one.
var UploadRetryBaseWait time.Duration = time.Second
UploadRetryBaseWait is the shortest time unit that each ReadHashAndUpload goroutine will wait between upload attempts.
Functions ¶
func BuildChunks ¶
BuildChunks sends back a channel of FileChunk structs each with Size of chunkSize or less and each with its Number field set sequentially from 0 upward. It also returns the number of chunks that it will yield on the channel. The Size of each chunk will be less than chunkSize when the final chunk doesn't need to be chunkSize to contain the remainder of the data. Both dataSize and chunkSize need to be greater than zero, and chunkSize must not be larger than dataSize
func Consume ¶
func Consume(channel <-chan FileChunk)
Consume reads the channel until it is empty, consigning its contents to the void.
func Containerizer ¶
Containerizer assigns each FileChunk the provided container.
func Counter ¶
Counter provides basic information on the data that passes through it. Be careful to read the outbound Count channel to prevent blocking the flow of data through it.
func Divide ¶
Divide distributes the input channel across divisor new channels, which are returned in a slice.
func Filter ¶
func Filter(chunks <-chan FileChunk, errors chan<- error, filter func(FileChunk) (bool, error)) <-chan FileChunk
Filter applies the provided closure to every FileChunk, passing on only FileChunks that satisfy the closure's boolean output. If the closure returns an error, that will be passed on the errors channel.
func HashData
deprecated
func Join ¶
Join performs a fan-in on the many input channels to combine their data into output channel.
func Json ¶
Json converts the incoming FileChunks into JSON, sending any conversion errors back on its errors channel.
func ManifestBuilder ¶
ManifestBuilder accepts FileChunks and creates SLO manifests out of them. If there are more than 1000 chunks, it will emit multiple FileChunks, each of which contains an SLO manifest for that region of the file. The FileChunks that are emitted have a Number (which is their manifest number), Data (the JSON of the manifest), and a Size (number of bytes in manifest JSON). They will need to be assigned and Object and Container before they can be uploaded.
func Map ¶
func Map(chunks <-chan FileChunk, errors chan<- error, operation func(FileChunk) (FileChunk, error)) <-chan FileChunk
Map applies the provided operation to each chunk that passes through it. It sends errors from the operation to the errors channel, and will not send on a FileChunk that caused an error in the operation.
func ObjectNamer ¶
ObjectNamer assigns names to objects based on their Size and Number. Use a Printf style string to format the names, and use %[1]d to refer to the Number and %[2]d to refer to the size.
func ReadData
deprecated
func ReadData(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt) <-chan FileChunk
ReadData populates the FileChunk structs that come in on the chunks channel with the data from the dataSource corresponding to that chunk's region of the file and sends its errors back on the errors channel. In order to work ReadData needs chunks with the Size and Offset properties set.
Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.
func ReadHashAndUpload ¶
func ReadHashAndUpload(chunks <-chan FileChunk, errors chan<- error, dataSource io.ReaderAt, dest auth.Destination) <-chan FileChunk
ReadHashAndUpload reads the data, performs the hash, and uploads it. Its monolithic design isn't very modular, but it reads the file and discards the data within a single function, which saves a lot of memory. Use this if memory footprint is a major concern. ReadHashAndUpload requires that incoming chunks have the Size, Number, Offset, Object, and Container properties already set.
func Separate ¶
func Separate(chunks <-chan FileChunk, errors chan<- error, condition func(FileChunk) (bool, error)) (<-chan FileChunk, <-chan FileChunk)
Separate divides the input channel into two output channels based on some condition. If the condition is true, the current chunk goes to the first output channel, otherwise it goes to the second.
func UploadData
deprecated
func UploadData(chunks <-chan FileChunk, errors chan<- error, dest auth.Destination, retryWait time.Duration) <-chan FileChunk
UploadData sends FileChunks to object storage via the provided destination. It places the objects in their Container with their Object name and checks the md5 of the upload, retrying on failure. It requires all fields of the FileChunk to be filled out before attempting an upload, and will send errors if it encountes FileChunks with missing fields. The retry wait is the base wait before a retry is attempted.
Deprecated: This consumes unnecessary memory. Use ReadHashAndUpload instead.
func UploadManifests ¶
func UploadManifests(manifests <-chan FileChunk, errors chan<- error, dest auth.Destination) <-chan FileChunk
UploadManifests treats the incoming FileChunks as manifests and uploads them with the special SLO manifest headers.
Types ¶
type Count ¶
Count represents basic statistics about the data that has passed through a Counter pipeline stage. It records the total Bytes of data that it has seen, as well as the number of chunks and the duration since the associated Counter stage was started. This information can be used to calculate statistics about the pipeline's performance, especially when multiple counters in different pipeline regions are employed.
type FileChunk ¶
type FileChunk struct { Number uint Object string Container string Hash string Data []byte Size uint Offset uint }
FileChunk represents a single region of a file.
Number respresents how many chunks into a given file that this chunk is Object is the name that this FileChunk will bear within object storage Container is the object storage Container that this chunk will be uploaded into Hash is the md5 sum of this FileChunk Data is a slice of the original file of length Size Size is the length of the Data slice if the FileChunk represents a normal file chunk
or it could be the apparent size of the manifest, if it represents a manifest file
Offset is the index of the first byte in the file that is included in Data
func (FileChunk) MarshalJSON ¶
MarshalJSON defines the transformation from a FileChunk to an SLO manifest entry