pipeline

package
v0.33.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2025 License: GPL-3.0 Imports: 13 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var FilterWalkDirEntryExcludeDir = &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeDir}

Exclude directories, others are left as unknown

View Source
var FilterWalkDirEntryExcludeFile = &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeFile}

Exclude regular files, others are left as unknown

View Source
var FilterWalkDirEntryIncludeDir = &filterWalkDirEntryIncludeDir{}

Include directories, others are left as unknown

View Source
var FilterWalkDirEntryIncludeFile = &filterWalkDirEntryIncludeFile{}

Include regular files, others are left as unknown

Functions

func Batch

func Batch[T any](
	ctx context.Context,
	inputChan <-chan T,
	batchSize int,
) <-chan []T

Reads batchSize entries from the input channel and send them as an slice <= batchSize into the output channel

Batching is done on an un-buffered channel, one by one, so it will block until the next batch can be consumed.

Batching may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()

func Filter

func Filter[T any](
	ctx context.Context,
	inputChan <-chan T,
	filterRule FilterRule[T],
) <-chan T

Filter an input channel

Filtering is done on an un-buffered channel, one by one, so it will block until the next item can be consumed.

Filtering may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()

func FromContext

func FromContext(ctx context.Context) *zap.SugaredLogger

Get the logger from context or return the default logger.

Loggers are used from context in order to avoid passing it explicitly everywhere, which would make APIs cumbersome to use.

func NewContext

func NewContext(parentCtx context.Context, logger *zap.SugaredLogger) context.Context

Create a new context with a new logger.

Any existing loggers will be superseded by the given one in the returned context and others. Parent context logger (if exists) is untouched.

func ParallelProcess

func ParallelProcess[I any, O any](
	ctx context.Context,
	maxParallelProcessors int,
	inputChan <-chan I,
	processor Processor[I, O],
	finalize Finalize[O],
) (outputChan <-chan O)

Process items in parallel (fan-out) with a maximum number of parallel workers.

Note that each item may be processed in a different goroutine.

Note that processor() operations using io.Reader/Read() are **NOT** thread safe as they will advance the offset/pointer. However only one item is expected to be processed at time, so there should be no worries.

Processing may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline(). If any processor() returns an error the processing is **NOT** early stopped, one must cancel the context explicitly.

The processor() should check context.Context in order to know when to early stop its work, for instance use http.NewRequestWithContext() or check context.Context.Done() explicitly.

Finalize is called with finishedInput == true if all parallel workers finished their input and false if at least one of them stopped earlier.

Code is based on https://go.dev/blog/pipelines#bounded-parallelism

func PrepareWriteChunks

func PrepareWriteChunks(
	ctx context.Context,
	w io.WriterAt,
	size int64,
	chunkSize int64,
) (outputChan <-chan WriteableChunk)

func Process

func Process[I any, O any](
	ctx context.Context,
	inputChan <-chan I,
	processor Processor[I, O],
	finalize Finalize[O],
) (outputChan <-chan O)

General framework to process items from one channel to another.

Processing is done to an un-buffered output channel, one by one, so it will block until the next item can be consumed.

Processing may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()

The function finalize may be provided and is called after all the processing is done and before the channel is closed.

The context may also contain a logger which can be retrieved with ContextLogger(), which defaults to the module logger. A new logger will be derived from it including both inputChan and outputChan values and this new logger will be passed to the processor.

func RangeGenerator

func RangeGenerator(ctx context.Context, n int) <-chan int

Generates integers in interval [0,n)

func ReadChunks

func ReadChunks(
	ctx context.Context,
	r io.ReaderAt,
	size int64,
	chunkSize int64,
) (outputChan <-chan ReadableChunk)

Reads content splitting into Chunks (Sections), each being its own ChunkReader (io.SectionReader)

Chunks are produced in a channel that is closed when the last is produced. The channel is not buffered, it won't send until the other side is ready to consume.

Generation may be early stopped by context.Context.Done(), see context.WithCancel(), context.WithTimeout() and context.WithDeadline()

func ReadFileChunks

func ReadFileChunks(ctx context.Context, f *os.File, chunkSize int64) (<-chan ReadableChunk, error)

Reads a file into Chunks (Sections), each being its own ChunkReader (io.SectionReader)

Wrapper on top of ReadChunks() that queries the f.Stat().Size()

func SliceItemConsumer

func SliceItemConsumer[S ~[]T, T any](ctx context.Context, inputChan <-chan T) (result S, err error)

func SliceItemGenerator

func SliceItemGenerator[T any](ctx context.Context, slice []T) <-chan T

Sends all entries of slice to a channel

func SliceItemLimitedConsumer

func SliceItemLimitedConsumer[S ~[]T, T any](ctx context.Context, limit int, inputChan <-chan T) (result S, err error)

func WalkDirEntries

func WalkDirEntries(
	ctx context.Context,
	root string,
	checkPath fs.WalkDirFunc,
) (outputChan <-chan WalkDirEntry)

WalkDirEntries recursively scans files/directories from a root directory

checkPath() may be used to return fs.SkipDir or fs.SkipAll and control the walk process. If provided (non-nil), it's called before anything else. See fs.WalkDirFunc documentation. It may be used to omit hidden folders (ie: ".git") and the likes

Each file/directory may contain an associated error, it may be ignored or keep going. By default, if no checkPath is provided, it keeps going.

func WalkDirFilterHiddenDirs

func WalkDirFilterHiddenDirs(path string, d fs.DirEntry, err error) error

Do not process any entry that name stars with "."

Types

type ChunkReader

type ChunkReader interface {
	io.Reader
	io.ReaderAt
}

type ChunkWriter

type ChunkWriter interface {
	io.Writer
	io.WriterAt
}

type FilterNil

type FilterNil[T any] struct{}

Only pass forward the nil elements (likely to count?)

func (FilterNil[T]) Filter

func (r FilterNil[T]) Filter(ctx context.Context, entry T) FilterStatus

type FilterNonNil

type FilterNonNil[T any] struct{}

Only pass forward the non-nil elements

func (FilterNonNil[T]) Filter

func (r FilterNonNil[T]) Filter(ctx context.Context, entry T) FilterStatus

type FilterRule

type FilterRule[T any] interface {
	Filter(ctx context.Context, entry T) FilterStatus
}

Given an entry, say if it should be included or excluded from the output.

If FilterUnknown is used, the filtering will keep going. Ultimately FilterUnknown values will be included.

func RecursiveFilterRuleLog

func RecursiveFilterRuleLog[T any](filterRule FilterRule[T]) FilterRule[T]

Recursively wraps all required filter elements with logging

Rules may implement FilterRuleRecursiveWrapper in order to keep the recursion going.

type FilterRuleAll

type FilterRuleAll[T any] struct {
	All []FilterRule[T]
}

All must match (either FilterInclude or FilterExclude, FilterUnknown is skipped)

func (FilterRuleAll[T]) Filter

func (r FilterRuleAll[T]) Filter(ctx context.Context, entry T) FilterStatus

func (FilterRuleAll[T]) RecursiveWrap

func (r FilterRuleAll[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]

type FilterRuleAnd

type FilterRuleAnd[T any] struct {
	And []FilterRule[T]
}

All values must match FilterInclude or FilterUnknown. If so, return FilterInclude. Otherwise, return FilterExclude

func (FilterRuleAnd[T]) Filter

func (r FilterRuleAnd[T]) Filter(ctx context.Context, entry T) FilterStatus

func (FilterRuleAnd[T]) RecursiveWrap

func (r FilterRuleAnd[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]

type FilterRuleAny

type FilterRuleAny[T any] struct {
	Any []FilterRule[T]
}

Any known value is used (either FilterInclude or FilterExclude, FilterUnknown is skipped)

func (FilterRuleAny[T]) Filter

func (r FilterRuleAny[T]) Filter(ctx context.Context, entry T) FilterStatus

func (FilterRuleAny[T]) RecursiveWrap

func (r FilterRuleAny[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]

type FilterRuleFirst

type FilterRuleFirst[T any] struct {
	Filters []FilterRule[T]
}

All values must match FilterInclude or FilterUnknown. If so, return FilterInclude. Otherwise, return FilterExclude

func (FilterRuleFirst[T]) Filter

func (r FilterRuleFirst[T]) Filter(ctx context.Context, entry T) FilterStatus

func (FilterRuleFirst[T]) RecursiveWrap

func (r FilterRuleFirst[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]

type FilterRuleIncludeOnly

type FilterRuleIncludeOnly[T any] struct {
	Pattern FilterRule[T]
}

Excludes every entry that didn't return FilterInclude, which applies for FilterExclude and FilterUnknown

func (FilterRuleIncludeOnly[T]) Filter

func (r FilterRuleIncludeOnly[T]) Filter(ctx context.Context, entry T) FilterStatus

func (FilterRuleIncludeOnly[T]) RecursiveWrap

func (r FilterRuleIncludeOnly[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]

type FilterRuleLog

type FilterRuleLog[T any] struct {
	Rule FilterRule[T]
}

Log the given rule

ContextLogger() is used to get the logger

func (FilterRuleLog[T]) Filter

func (r FilterRuleLog[T]) Filter(ctx context.Context, entry T) (status FilterStatus)

func (FilterRuleLog[T]) MarshalJSON

func (r FilterRuleLog[T]) MarshalJSON() ([]byte, error)

type FilterRuleNot

type FilterRuleNot[T any] struct {
	Not FilterRule[T]
}

Inverts FilterInclude<->FilterExclude, keeps FilterUnknown

func (FilterRuleNot[T]) Filter

func (r FilterRuleNot[T]) Filter(ctx context.Context, entry T) FilterStatus

func (FilterRuleNot[T]) RecursiveWrap

func (r FilterRuleNot[T]) RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]

type FilterRuleRecursiveWrapper

type FilterRuleRecursiveWrapper[T any] interface {
	// Wrap any children with the given wrapper and return a new instance of the rule
	RecursiveWrap(wrapper filterRuleWrapper[T]) FilterRule[T]
}

type FilterStatus

type FilterStatus int
const (
	// keep going until an explicit FilterInclude or FilterExclude
	FilterUnknown FilterStatus = iota
	// The entry should be included in the output
	FilterInclude
	// The entry should NOT be included in the output
	FilterExclude
)

type FilterWalkDirEntryIncludeGlobMatch

type FilterWalkDirEntryIncludeGlobMatch struct {
	Pattern       string
	CancelOnError func(error)
}

Include entries if name matches the glob pattern

Note that only file names (basename) are matched, not the whole path

Non-matching files are NOT excluded, they are just left as unknown. to exclude them, use &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeGlobMatch{...}}

func (FilterWalkDirEntryIncludeGlobMatch) Filter

type FilterWalkDirEntryIncludeRegExp

type FilterWalkDirEntryIncludeRegExp struct {
	Regexp        *regexp.Regexp
	CancelOnError func(error)
}

Include entries if name matches the regular expression

Note that only file names (basename) are matched, not the whole path

Non-matching files are NOT excluded, they are just left as unknown. to exclude them, use &FilterRuleNot[WalkDirEntry]{FilterWalkDirEntryIncludeRegExp{...}}

func (FilterWalkDirEntryIncludeRegExp) Filter

func (FilterWalkDirEntryIncludeRegExp) MarshalJSON

func (r FilterWalkDirEntryIncludeRegExp) MarshalJSON() ([]byte, error)

type Finalize

type Finalize[O any] func(ctx context.Context, finishedInput bool) (output O, status ProcessStatus)

Finalize processing the input.

If finishedInput is true, the input channel was exhausted/closed and all input items were processed. If it's false, it was early aborted by context.Context.Done() or ProcessAbort status.

The context also contains a logger which can be retrieved from the context.

type ProcessStatus

type ProcessStatus int
const (
	// Send the output to the channel
	ProcessOutput ProcessStatus = iota
	// Skip sending the output to the channel, but keep processing
	ProcessSkip
	// Stop processing altogether
	ProcessAbort
)

type Processor

type Processor[I any, O any] func(ctx context.Context, input I) (output O, status ProcessStatus)

Process the input into an output.

The processor should check context.Context in order to know when to early stop its work, for instance use http.NewRequestWithContext() or check context.Context.Done() explicitly.

The context also contains a logger which can be retrieved from the context.

type ProcessorResult

type ProcessorResult[I any, O any] struct {
	Input  I
	Output O
	Err    error
}

Helper to produce output that is paired with input and error

type ReadableChunk

type ReadableChunk struct {
	Reader      ChunkReader
	StartOffset int64
	TotalSize   int64
}

type SimpleWalkDirEntry

type SimpleWalkDirEntry[T fs.DirEntry] struct {
	Object T
	// contains filtered or unexported fields
}

func NewSimpleWalkDirEntry

func NewSimpleWalkDirEntry[T fs.DirEntry](path string, dirEntry T, err error) *SimpleWalkDirEntry[T]

func (*SimpleWalkDirEntry[T]) DirEntry

func (e *SimpleWalkDirEntry[T]) DirEntry() fs.DirEntry

func (*SimpleWalkDirEntry[T]) Err

func (e *SimpleWalkDirEntry[T]) Err() error

func (*SimpleWalkDirEntry[T]) Path

func (e *SimpleWalkDirEntry[T]) Path() string

type WalkDirEntry

type WalkDirEntry interface {
	Path() string
	DirEntry() fs.DirEntry
	Err() error
}

WalkDirEntries -> FilterWalkDirEntries See fs.WalkDirFunc documentation

type WriteableChunk

type WriteableChunk struct {
	Writer      ChunkWriter
	StartOffset int64
	EndOffset   int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL