filewalk

package
v0.0.0-...-bd556f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2024 License: Apache-2.0 Imports: 10 Imported by: 19

README

Package cloudeng.io/file/filewalk

import cloudeng.io/file/filewalk

Package filewalk provides support for concurrent traversal of file system directories and files. It can traverse any filesytem that implements the Filesystem interface and is intended to be usable with cloud storage systems as AWS S3 or GCP's Cloud Storage. All compatible systems must implement some sort of hierarchical naming scheme, whether it be directory based (as per Unix/POSIX filesystems) or by convention (as per S3).

Variables

DefaultScanSize, DefaultConcurrentScans
// DefaultScansize is the default ScanSize used when the WithScanSize
// option is not supplied.
DefaultScanSize = 1000
// DefaultConcurrentScans is the default number of prefixes/directories
// that will be scanned concurrently when the WithConcurrencyOption is
// is not supplied.
DefaultConcurrentScans = 100

Types

Type Configuration
type Configuration struct {
	ConcurrentScans int
	ScanSize        int
}
Type Entry
type Entry struct {
	Name string
	Type fs.FileMode // Type is the Type portion of fs.FileMode
}
Methods
func (de Entry) IsDir() bool
Type EntryList
type EntryList []Entry
Functions
func EntriesFromInfoList(infos file.InfoList) EntryList
Methods
func (el EntryList) AppendBinary(data []byte) ([]byte, error)

AppendBinary appends a binary encoded instance of Info to the supplied byte slice.

func (el *EntryList) DecodeBinary(data []byte) ([]byte, error)

DecodeBinary decodes the supplied data into an InfoList and returns the remaining data.

func (el EntryList) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (el *EntryList) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary implements encoding.BinaryUnmarshaler.

Type Error
type Error struct {
	Path string
	Err  error
}

Error implements error and provides additional detail on the error encountered.

Methods
func (e *Error) As(target interface{}) bool

As implements errors.As.

func (e Error) Error() string

Error implements error.

func (e Error) Is(target error) bool

Is implements errors.Is.

func (e Error) Unwrap() error

Unwrap implements errors.Unwrap.

Type FS
type FS interface {
	file.FS

	LevelScanner(path string) LevelScanner
}

FS represents the interface that is implemeted for filesystems to be traversed/scanned.

Type Handler
type Handler[T any] interface {

	// Prefix is called to determine if a given level in the filesystem hiearchy
	// should be further examined or traversed. The file.Info is obtained via a call
	// to Lstat and hence will refer to a symlink itself if the prefix is a symlink.
	// If stop is true then traversal stops at this point. If a list of Entry's
	// is returned then this list is traversed directly rather than obtaining
	// the children from the filesystem. This allows for both exclusions and
	// incremental processing in conjunction with a database to be implemented.
	// Any returned is recorded, but traversal will continue unless stop is set.
	Prefix(ctx context.Context, state *T, prefix string, info file.Info, err error) (stop bool, children file.InfoList, returnErr error)

	// Contents is called, multiple times, to process the contents of a single
	// level in the filesystem hierarchy. Each such call contains at most the
	// number of items allowed for by the WithScanSize option. Note that
	// errors encountered whilst scanning the filesystem result in calls to
	// Done with the error encountered.
	Contents(ctx context.Context, state *T, prefix string, contents []Entry) (file.InfoList, error)

	// Done is called once calls to Contents have been made or if Prefix returned
	// an error. Done will always be called if Prefix did not return true for stop.
	// Errors returned by Done are recorded and returned by the Walk method.
	// An error returned by Done does not terminate the walk.
	Done(ctx context.Context, state *T, prefix string, err error) error
}

Handler is implemented by clients of Walker to process the results of walking a filesystem hierarchy. The type parameter is used to instantiate a state variable that is passed to each of the methods.

Type LevelScanner
type LevelScanner interface {
	Scan(ctx context.Context, n int) bool
	Contents() []Entry
	Err() error
}
Type Option
type Option func(o *options)

Option represents options accepted by Walker.

Functions
func WithConcurrentScans(n int) Option

WithConcurrentScans can be used to change the number of prefixes/directories that can be scanned concurrently. The default is DefaultConcurrentScans.

func WithScanSize(n int) Option

WithScanSize sets the number of prefix/directory entries to be scanned in a single operation. The default is DefaultScanSize.

Type Stats
type Stats struct {
	SynchronousScans int64
}
Type Status
type Status struct {
	// SynchronousOps is the number of Scans that were performed synchronously
	// as a fallback when all available goroutines are already occupied.
	SynchronousScans int64

	// SlowPrefix is a prefix that took longer than a certain duration
	// to scan. ScanDuration is the time spent scanning that prefix to
	// date. A SlowPrefix may be reported as slow before it has completed
	// scanning.
	SlowPrefix   string
	ScanDuration time.Duration
}

Status is used to communicate the status of in-process Walk operation.

Type Walker
type Walker[T any] struct {
	// contains filtered or unexported fields
}

Walker implements the filesyste walk.

Functions
func New[T any](fs FS, handler Handler[T], opts ...Option) *Walker[T]

New creates a new Walker instance.

Methods
func (w *Walker[T]) Configuration() Configuration
func (w *Walker[T]) Stats() Stats
func (w *Walker[T]) Walk(ctx context.Context, roots ...string) error

Walk traverses the hierarchies specified by each of the roots calling prefixFn and entriesFn as it goes. prefixFn will always be called before entriesFn for the same prefix, but no other ordering guarantees are provided.

Documentation

Overview

Package filewalk provides support for concurrent traversal of file system directories and files. It can traverse any filesytem that implements the Filesystem interface and is intended to be usable with cloud storage systems as AWS S3 or GCP's Cloud Storage. All compatible systems must implement some sort of hierarchical naming scheme, whether it be directory based (as per Unix/POSIX filesystems) or by convention (as per S3).

Index

Constants

This section is empty.

Variables

View Source
var (
	SkipAll = fs.SkipDir
	SkipDir = fs.SkipDir
)
View Source
var (
	// DefaultScansize is the default ScanSize used when the WithScanSize
	// option is not supplied.
	DefaultScanSize = 1000
	// DefaultConcurrentScans is the default number of prefixes/directories
	// that will be scanned concurrently when the WithConcurrencyOption is
	// is not supplied.
	DefaultConcurrentScans = 100
)

Functions

func ContentsOnly

func ContentsOnly(ctx context.Context, fs FS, start string, h ContentsHandler, opts ...Option) error

ContentsOnly provides a simplified API for walking the contents (files) of a directory hierarchy. Inovations of the ContentsHandler may be concurrent.

Types

type Configuration

type Configuration struct {
	ConcurrentScans int
	ScanSize        int
}

type ContentsHandler

type ContentsHandler func(ctx context.Context, prefix string, contents []Entry, err error) error

ContentsHandler can return an error of fs.SkipAll or fs.SkipDir to skip all subsequent content or the current directory only respectively. All other errors are treated as fatal. Note that SkipDir, depending on the order that entires are encountered may result in subdirectories being skipped also.

type Entry

type Entry struct {
	Name string
	Type fs.FileMode // Type is the Type portion of fs.FileMode
}

func (Entry) IsDir

func (de Entry) IsDir() bool

type EntryList

type EntryList []Entry

func EntriesFromInfoList

func EntriesFromInfoList(infos file.InfoList) EntryList

func (EntryList) AppendBinary

func (el EntryList) AppendBinary(data []byte) ([]byte, error)

AppendBinary appends a binary encoded instance of Info to the supplied byte slice.

func (*EntryList) DecodeBinary

func (el *EntryList) DecodeBinary(data []byte) ([]byte, error)

DecodeBinary decodes the supplied data into an InfoList and returns the remaining data.

func (EntryList) MarshalBinary

func (el EntryList) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (*EntryList) UnmarshalBinary

func (el *EntryList) UnmarshalBinary(data []byte) (err error)

UnmarshalBinary implements encoding.BinaryUnmarshaler.

type Error

type Error struct {
	Path string
	Err  error
}

Error implements error and provides additional detail on the error encountered.

func (*Error) As

func (e *Error) As(target interface{}) bool

As implements errors.As.

func (Error) Error

func (e Error) Error() string

Error implements error.

func (Error) Is

func (e Error) Is(target error) bool

Is implements errors.Is.

func (Error) Unwrap

func (e Error) Unwrap() error

Unwrap implements errors.Unwrap.

type FS

type FS interface {
	file.FS

	LevelScanner(path string) LevelScanner
}

FS represents the interface that is implemeted for filesystems to be traversed/scanned.

type Handler

type Handler[T any] interface {

	// Prefix is called to determine if a given level in the filesystem hiearchy
	// should be further examined or traversed. The file.Info is obtained via a call
	// to Lstat and hence will refer to a symlink itself if the prefix is a symlink.
	// If stop is true then traversal stops at this point. If a list of Entry's
	// is returned then this list is traversed directly rather than obtaining
	// the children from the filesystem. This allows for both exclusions and
	// incremental processing in conjunction with a database to be implemented.
	// Any returned is recorded, but traversal will continue unless stop is set.
	Prefix(ctx context.Context, state *T, prefix string, info file.Info, err error) (stop bool, children file.InfoList, returnErr error)

	// Contents is called, multiple times, to process the contents of a single
	// level in the filesystem hierarchy. Each such call contains at most the
	// number of items allowed for by the WithScanSize option. Note that
	// errors encountered whilst scanning the filesystem result in calls to
	// Done with the error encountered.
	Contents(ctx context.Context, state *T, prefix string, contents []Entry) (file.InfoList, error)

	// Done is called once calls to Contents have been made or if Prefix returned
	// an error. Done will always be called if Prefix did not return true for stop.
	// Errors returned by Done are recorded and returned by the Walk method.
	// An error returned by Done does not terminate the walk.
	Done(ctx context.Context, state *T, prefix string, err error) error
}

Handler is implemented by clients of Walker to process the results of walking a filesystem hierarchy. The type parameter is used to instantiate a state variable that is passed to each of the methods.

type LevelScanner

type LevelScanner interface {
	Scan(ctx context.Context, n int) bool
	Contents() []Entry
	Err() error
}

type Option

type Option func(o *options)

Option represents options accepted by Walker.

func WithConcurrentScans

func WithConcurrentScans(n int) Option

WithConcurrentScans can be used to change the number of prefixes/directories that can be scanned concurrently. The default is DefaultConcurrentScans.

func WithDepth

func WithDepth(d int) Option

WithDepth sets the maximum depth of the traversal. A depth of 0 will only traverse the specified roots. A depth of 1, one level below the roots etc. The default is -1 which denotes no limit on the depth.

func WithScanSize

func WithScanSize(n int) Option

WithScanSize sets the number of prefix/directory entries to be scanned in a single operation. The default is DefaultScanSize.

type Stats

type Stats struct {
	SynchronousScans int64
}

type Status

type Status struct {
	// SynchronousOps is the number of Scans that were performed synchronously
	// as a fallback when all available goroutines are already occupied.
	SynchronousScans int64

	// SlowPrefix is a prefix that took longer than a certain duration
	// to scan. ScanDuration is the time spent scanning that prefix to
	// date. A SlowPrefix may be reported as slow before it has completed
	// scanning.
	SlowPrefix   string
	ScanDuration time.Duration
}

Status is used to communicate the status of in-process Walk operation.

type Walker

type Walker[T any] struct {
	// contains filtered or unexported fields
}

Walker implements the filesyste walk.

func New

func New[T any](fs FS, handler Handler[T], opts ...Option) *Walker[T]

New creates a new Walker instance.

func (*Walker[T]) Configuration

func (w *Walker[T]) Configuration() Configuration

func (*Walker[T]) Stats

func (w *Walker[T]) Stats() Stats

func (*Walker[T]) Walk

func (w *Walker[T]) Walk(ctx context.Context, roots ...string) error

Walk traverses the hierarchies specified by each of the roots calling prefixFn and entriesFn as it goes. prefixFn will always be called before entriesFn for the same prefix, but no other ordering guarantees are provided.

Directories

Path Synopsis
Package filewalktestutil provides utilities for testing code that uses filewalk.FS.
Package filewalktestutil provides utilities for testing code that uses filewalk.FS.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL