Documentation
¶
Index ¶
- Variables
- func Batch[T any](ctx context.Context, inputChan <-chan T, batchSize int) <-chan []T
- func Filter[T any](ctx context.Context, inputChan <-chan T, filterRule FilterRule[T]) <-chan T
- func FromContext(ctx context.Context) *zap.SugaredLogger
- func NewContext(parentCtx context.Context, logger *zap.SugaredLogger) context.Context
- func ParallelProcess[I any, O any](ctx context.Context, maxParallelProcessors int, inputChan <-chan I, ...) (outputChan <-chan O)
- func PrepareWriteChunks(ctx context.Context, w io.WriterAt, size int64, chunkSize int64) (outputChan <-chan WriteableChunk)
- func Process[I any, O any](ctx context.Context, inputChan <-chan I, processor Processor[I, O], ...) (outputChan <-chan O)
- func RangeGenerator(ctx context.Context, n int) <-chan int
- func ReadChunks(ctx context.Context, r io.ReaderAt, size int64, chunkSize int64) (outputChan <-chan ReadableChunk)
- func ReadFileChunks(ctx context.Context, f *os.File, chunkSize int64) (<-chan ReadableChunk, error)
- func SliceItemConsumer[S ~[]T, T any](ctx context.Context, inputChan <-chan T) (result S, err error)
- func SliceItemGenerator[T any](ctx context.Context, slice []T) <-chan T
- func SliceItemLimitedConsumer[S ~[]T, T any](ctx context.Context, limit int, inputChan <-chan T) (result S, err error)
- func WalkDirEntries(ctx context.Context, root string, checkPath fs.WalkDirFunc) (outputChan <-chan WalkDirEntry)
- func WalkDirFilterHiddenDirs(path string, d fs.DirEntry, err error) error
- type ChunkReader
- type ChunkWriter
- type FilterNil
- type FilterNonNil
- type FilterRule
- type FilterRuleAll
- type FilterRuleAnd
- type FilterRuleAny
- type FilterRuleFirst
- type FilterRuleIncludeOnly
- type FilterRuleLog
- type FilterRuleNot
- type FilterRuleRecursiveWrapper
- type FilterStatus
- type FilterWalkDirEntryIncludeGlobMatch
- type FilterWalkDirEntryIncludeRegExp
- type Finalize
- type ProcessStatus
- type Processor
- type ProcessorResult
- type ReadableChunk
- type SimpleWalkDirEntry
- type WalkDirEntry
- type WriteableChunk
Constants ¶
This section is empty.
Variables ¶
var FilterWalkDirEntryExcludeDir = &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeDir}
Exclude directories, others are left as unknown
var FilterWalkDirEntryExcludeFile = &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeFile}
Exclude regular files, others are left as unknown
var FilterWalkDirEntryIncludeDir = &filterWalkDirEntryIncludeDir{}
Include directories, others are left as unknown
var FilterWalkDirEntryIncludeFile = &filterWalkDirEntryIncludeFile{}
Include regular files, others are left as unknown
Functions ¶
func Batch ¶
Reads batchSize entries from the input channel and send them as an slice <= batchSize into the output channel
Batching is done on an un-buffered channel, one by one, so it will block until the next batch can be consumed.
Batching may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()
func Filter ¶
func Filter[T any]( ctx context.Context, inputChan <-chan T, filterRule FilterRule[T], ) <-chan T
Filter an input channel
Filtering is done on an un-buffered channel, one by one, so it will block until the next item can be consumed.
Filtering may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()
func FromContext ¶
func FromContext(ctx context.Context) *zap.SugaredLogger
Get the logger from context or return the default logger.
Loggers are used from context in order to avoid passing it explicitly everywhere, which would make APIs cumbersome to use.
func NewContext ¶
Create a new context with a new logger.
Any existing loggers will be superseded by the given one in the returned context and others. Parent context logger (if exists) is untouched.
func ParallelProcess ¶
func ParallelProcess[I any, O any]( ctx context.Context, maxParallelProcessors int, inputChan <-chan I, processor Processor[I, O], finalize Finalize[O], ) (outputChan <-chan O)
Process items in parallel (fan-out) with a maximum number of parallel workers.
Note that each item may be processed in a different goroutine.
Note that processor() operations using io.Reader/Read() are **NOT** thread safe as they will advance the offset/pointer. However only one item is expected to be processed at time, so there should be no worries.
Processing may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline(). If any processor() returns an error the processing is **NOT** early stopped, one must cancel the context explicitly.
The processor() should check context.Context in order to know when to early stop its work, for instance use http.NewRequestWithContext() or check context.Context.Done() explicitly.
Finalize is called with finishedInput == true if all parallel workers finished their input and false if at least one of them stopped earlier.
Code is based on https://go.dev/blog/pipelines#bounded-parallelism
func PrepareWriteChunks ¶
func Process ¶
func Process[I any, O any]( ctx context.Context, inputChan <-chan I, processor Processor[I, O], finalize Finalize[O], ) (outputChan <-chan O)
General framework to process items from one channel to another.
Processing is done to an un-buffered output channel, one by one, so it will block until the next item can be consumed.
Processing may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()
The function finalize may be provided and is called after all the processing is done and before the channel is closed.
The context may also contain a logger which can be retrieved with ContextLogger(), which defaults to the module logger. A new logger will be derived from it including both inputChan and outputChan values and this new logger will be passed to the processor.
func RangeGenerator ¶
Generates integers in interval [0,n)
func ReadChunks ¶
func ReadChunks( ctx context.Context, r io.ReaderAt, size int64, chunkSize int64, ) (outputChan <-chan ReadableChunk)
Reads content splitting into Chunks (Sections), each being its own ChunkReader (io.SectionReader)
Chunks are produced in a channel that is closed when the last is produced. The channel is not buffered, it won't send until the other side is ready to consume.
Generation may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()
func ReadFileChunks ¶
Reads a file into Chunks (Sections), each being its own ChunkReader (io.SectionReader)
Wrapper on top of ReadChunks() that queries the f.Stat().Size()
func SliceItemConsumer ¶
func SliceItemGenerator ¶
Sends all entries of slice to a channel
func WalkDirEntries ¶
func WalkDirEntries( ctx context.Context, root string, checkPath fs.WalkDirFunc, ) (outputChan <-chan WalkDirEntry)
WalkDirEntries recursively scans files/directories from a root directory
checkPath() may be used to return fs.SkipDir or fs.SkipAll and control the walk process. If provided (non-nil), it's called before anything else. See fs.WalkDirFunc documentation. It may be used to omit hidden folders (ie: ".git") and the likes
Each file/directory may contain an associated error, it may be ignored or keep going. By default, if no checkPath is provided, it keeps going.
Types ¶
type FilterNil ¶
type FilterNil[T any] struct{}
Only pass forward the nil elements (likely to count?)
type FilterNonNil ¶
type FilterNonNil[T any] struct{}
Only pass forward the non-nil elements
func (FilterNonNil[T]) Filter ¶
func (r FilterNonNil[T]) Filter(ctx context.Context, entry T) FilterStatus
type FilterRule ¶
type FilterRule[T any] interface { Filter(ctx context.Context, entry T) FilterStatus }
Given an entry, say if it should be included or excluded from the output.
If FilterUnknown is used, the filtering will keep going. Ultimately FilterUnknown values will be included.
func RecursiveFilterRuleLog ¶
func RecursiveFilterRuleLog[T any](filterRule FilterRule[T]) FilterRule[T]
Recursively wraps all required filter elements with logging
Rules may implement FilterRuleRecursiveWrapper in order to keep the recursion going.
type FilterRuleAll ¶
type FilterRuleAll[T any] struct { All []FilterRule[T] }
All must match (either FilterInclude or FilterExclude, FilterUnknown is skipped)
func (FilterRuleAll[T]) Filter ¶
func (r FilterRuleAll[T]) Filter(ctx context.Context, entry T) FilterStatus
func (FilterRuleAll[T]) RecursiveWrap ¶
func (r FilterRuleAll[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]
type FilterRuleAnd ¶
type FilterRuleAnd[T any] struct { And []FilterRule[T] }
All values must match FilterInclude or FilterUnknown. If so, return FilterInclude. Otherwise, return FilterExclude
func (FilterRuleAnd[T]) Filter ¶
func (r FilterRuleAnd[T]) Filter(ctx context.Context, entry T) FilterStatus
func (FilterRuleAnd[T]) RecursiveWrap ¶
func (r FilterRuleAnd[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]
type FilterRuleAny ¶
type FilterRuleAny[T any] struct { Any []FilterRule[T] }
Any known value is used (either FilterInclude or FilterExclude, FilterUnknown is skipped)
func (FilterRuleAny[T]) Filter ¶
func (r FilterRuleAny[T]) Filter(ctx context.Context, entry T) FilterStatus
func (FilterRuleAny[T]) RecursiveWrap ¶
func (r FilterRuleAny[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]
type FilterRuleFirst ¶
type FilterRuleFirst[T any] struct { Filters []FilterRule[T] }
All values must match FilterInclude or FilterUnknown. If so, return FilterInclude. Otherwise, return FilterExclude
func (FilterRuleFirst[T]) Filter ¶
func (r FilterRuleFirst[T]) Filter(ctx context.Context, entry T) FilterStatus
func (FilterRuleFirst[T]) RecursiveWrap ¶
func (r FilterRuleFirst[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]
type FilterRuleIncludeOnly ¶
type FilterRuleIncludeOnly[T any] struct { Pattern FilterRule[T] }
Excludes every entry that didn't return FilterInclude, which applies for FilterExclude and FilterUnknown
func (FilterRuleIncludeOnly[T]) Filter ¶
func (r FilterRuleIncludeOnly[T]) Filter(ctx context.Context, entry T) FilterStatus
func (FilterRuleIncludeOnly[T]) RecursiveWrap ¶
func (r FilterRuleIncludeOnly[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]
type FilterRuleLog ¶
type FilterRuleLog[T any] struct { Rule FilterRule[T] }
Log the given rule
ContextLogger() is used to get the logger
func (FilterRuleLog[T]) Filter ¶
func (r FilterRuleLog[T]) Filter(ctx context.Context, entry T) (status FilterStatus)
func (FilterRuleLog[T]) MarshalJSON ¶
func (r FilterRuleLog[T]) MarshalJSON() ([]byte, error)
type FilterRuleNot ¶
type FilterRuleNot[T any] struct { Not FilterRule[T] }
Inverts FilterInclude<->FilterExclude, keeps FilterUnknown
func (FilterRuleNot[T]) Filter ¶
func (r FilterRuleNot[T]) Filter(ctx context.Context, entry T) FilterStatus
func (FilterRuleNot[T]) RecursiveWrap ¶
func (r FilterRuleNot[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]
type FilterRuleRecursiveWrapper ¶
type FilterRuleRecursiveWrapper[T any] interface { // Wrap any children with the given wrapper and return a new instance of the rule RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T] }
type FilterStatus ¶
type FilterStatus int
const ( // keep going until an explicit FilterInclude or FilterExclude FilterUnknown FilterStatus = iota // The entry should be included in the output FilterInclude // The entry should NOT be included in the output FilterExclude )
type FilterWalkDirEntryIncludeGlobMatch ¶
Include entries if name matches the glob pattern
Note that only file names (basename) are matched, not the whole path ¶
Non-matching files are NOT excluded, they are just left as unknown. to exclude them, use &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeGlobMatch{...}}
func (FilterWalkDirEntryIncludeGlobMatch) Filter ¶
func (r FilterWalkDirEntryIncludeGlobMatch) Filter(ctx context.Context, entry WalkDirEntry) FilterStatus
type FilterWalkDirEntryIncludeRegExp ¶
Include entries if name matches the regular expression
Note that only file names (basename) are matched, not the whole path ¶
Non-matching files are NOT excluded, they are just left as unknown. to exclude them, use &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeRegExp{...}}
func (FilterWalkDirEntryIncludeRegExp) Filter ¶
func (r FilterWalkDirEntryIncludeRegExp) Filter(ctx context.Context, entry WalkDirEntry) FilterStatus
func (FilterWalkDirEntryIncludeRegExp) MarshalJSON ¶
func (r FilterWalkDirEntryIncludeRegExp) MarshalJSON() ([]byte, error)
type Finalize ¶
type Finalize[O any] func(ctx context.Context, finishedInput bool) (output O, status ProcessStatus)
Finalize processing the input.
If finishedInput is true, the input channel was exhausted/closed and all input items were processed. If it's false, it was early aborted by context.Context.Done() or ProcessAbort status.
The context also contains a logger which can be retrieved from the context.
type ProcessStatus ¶
type ProcessStatus int
const ( // Send the output to the channel ProcessOutput ProcessStatus = iota // Skip sending the output to the channel, but keep processing ProcessSkip // Stop processing altogether ProcessAbort )
type Processor ¶
type Processor[I any, O any] func(ctx context.Context, input I) (output O, status ProcessStatus)
Process the input into an output.
The processor should check context.Context in order to know when to early stop its work, for instance use http.NewRequestWithContext() or check context.Context.Done() explicitly.
The context also contains a logger which can be retrieved from the context.
type ProcessorResult ¶
Helper to produce output that is paired with input and error
type ReadableChunk ¶
type ReadableChunk struct { Reader ChunkReader StartOffset int64 TotalSize int64 }
type SimpleWalkDirEntry ¶
type SimpleWalkDirEntry[T fs.DirEntry] struct { Object T // contains filtered or unexported fields }
func NewSimpleWalkDirEntry ¶
func NewSimpleWalkDirEntry[T fs.DirEntry](path string, dirEntry T, err error) *SimpleWalkDirEntry[T]
func (*SimpleWalkDirEntry[T]) DirEntry ¶
func (e *SimpleWalkDirEntry[T]) DirEntry() fs.DirEntry
func (*SimpleWalkDirEntry[T]) Err ¶
func (e *SimpleWalkDirEntry[T]) Err() error
func (*SimpleWalkDirEntry[T]) Path ¶
func (e *SimpleWalkDirEntry[T]) Path() string
type WalkDirEntry ¶
WalkDirEntries -> FilterWalkDirEntries See fs.WalkDirFunc documentation
type WriteableChunk ¶
type WriteableChunk struct { Writer ChunkWriter StartOffset int64 EndOffset int64 }