Documentation ¶
Overview ¶
Package fileio provides transforms for matching and reading files.
Index ¶
- func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection
- func MatchContinuously(s beam.Scope, glob string, interval time.Duration, opts ...MatchContOptionFn) beam.PCollection
- func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection
- func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection
- type FileMetadata
- type MatchContOptionFn
- type MatchOptionFn
- type ReadOptionFn
- type ReadableFile
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MatchAll ¶
func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection
MatchAll finds all files matching the glob patterns given by the incoming PCollection<string> and returns a PCollection<FileMetadata> of the matching files. MatchAll accepts a variadic number of MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty matches are allowed if the pattern contains a wildcard.
Example ¶
package main import ( "context" "log" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() globs := beam.Create(s, "gs://path/to/sub1/*.gz", "gs://path/to/sub2/*.gz") matches := fileio.MatchAll(s, globs) debug.Print(s, matches) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
func MatchContinuously ¶
func MatchContinuously( s beam.Scope, glob string, interval time.Duration, opts ...MatchContOptionFn, ) beam.PCollection
MatchContinuously finds all files matching the glob pattern at the given interval and returns a PCollection<FileMetadata> of the matching files. MatchContinuously accepts a variadic number of MatchContOptionFn that can be used to configure:
- Start: start time for matching files. Defaults to the current timestamp
- End: end time for matching files. Defaults to the maximum timestamp
- DuplicateAllow: allow emitting matches that have already been observed. Defaults to false
- DuplicateAllowIfModified: allow emitting matches that have already been observed if the file has been modified since the last observation. Defaults to false
- DuplicateSkip: skip emitting matches that have already been observed. Defaults to true
- ApplyWindow: assign each element to an individual window with a fixed size equivalent to the interval. Defaults to false, i.e. all elements will reside in the global window
Example ¶
package main import ( "context" "log" "time" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() matches := fileio.MatchContinuously(s, "gs://path/to/*.json", 10*time.Second) debug.Print(s, matches) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
func MatchFiles ¶
func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection
MatchFiles finds all files matching the glob pattern and returns a PCollection<FileMetadata> of the matching files. MatchFiles accepts a variadic number of MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty matches are allowed if the pattern contains a wildcard.
Example ¶
package main import ( "context" "log" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() matches := fileio.MatchFiles(s, "gs://path/to/*.gz") debug.Print(s, matches) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
func ReadMatches ¶
func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection
ReadMatches accepts the result of MatchFiles, MatchAll or MatchContinuously as a PCollection<FileMetadata> and converts it to a PCollection<ReadableFile>. The ReadableFile can be used to retrieve file metadata, open the file for reading or read the entire file into memory. ReadMatches accepts a variadic number of ReadOptionFn that can be used to configure the compression type of the files and treatment of directories. By default, the compression type is determined by the file extension and directories are skipped.
Example ¶
package main import ( "context" "log" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx" "github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() matches := fileio.MatchFiles(s, "gs://path/to/*.gz") files := fileio.ReadMatches(s, matches) debug.Print(s, files) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
Types ¶
type FileMetadata ¶
FileMetadata contains metadata about a file, namely its path, size in bytes and last modified time.
type MatchContOptionFn ¶
type MatchContOptionFn func(*matchContOption)
MatchContOptionFn is a function that can be passed to MatchContinuously to configure options for matching files.
func MatchApplyWindow ¶
func MatchApplyWindow() MatchContOptionFn
MatchApplyWindow specifies that each element will be assigned to an individual window.
func MatchDuplicateAllow ¶
func MatchDuplicateAllow() MatchContOptionFn
MatchDuplicateAllow specifies that file path matches will not be deduplicated.
func MatchDuplicateAllowIfModified ¶
func MatchDuplicateAllowIfModified() MatchContOptionFn
MatchDuplicateAllowIfModified specifies that file path matches will be deduplicated unless the file has been modified since it was last observed.
func MatchDuplicateSkip ¶
func MatchDuplicateSkip() MatchContOptionFn
MatchDuplicateSkip specifies that file path matches will be deduplicated.
func MatchEnd ¶
func MatchEnd(end time.Time) MatchContOptionFn
MatchEnd specifies the end time for matching files.
func MatchStart ¶
func MatchStart(start time.Time) MatchContOptionFn
MatchStart specifies the start time for matching files.
type MatchOptionFn ¶
type MatchOptionFn func(*matchOption)
MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to configure options for matching files.
func MatchEmptyAllow ¶
func MatchEmptyAllow() MatchOptionFn
MatchEmptyAllow specifies that empty matches are allowed.
func MatchEmptyAllowIfWildcard ¶
func MatchEmptyAllowIfWildcard() MatchOptionFn
MatchEmptyAllowIfWildcard specifies that empty matches are allowed if the pattern contains a wildcard.
func MatchEmptyDisallow ¶
func MatchEmptyDisallow() MatchOptionFn
MatchEmptyDisallow specifies that empty matches are not allowed.
type ReadOptionFn ¶
type ReadOptionFn func(*readOption)
ReadOptionFn is a function that can be passed to ReadMatches to configure options for reading files.
func ReadAutoCompression ¶
func ReadAutoCompression() ReadOptionFn
ReadAutoCompression specifies that the compression type of files should be auto-detected.
func ReadDirectoryDisallow ¶
func ReadDirectoryDisallow() ReadOptionFn
ReadDirectoryDisallow specifies that directories are not allowed.
func ReadDirectorySkip ¶
func ReadDirectorySkip() ReadOptionFn
ReadDirectorySkip specifies that directories are skipped.
func ReadGzip ¶
func ReadGzip() ReadOptionFn
ReadGzip specifies that files have been compressed using gzip.
func ReadUncompressed ¶
func ReadUncompressed() ReadOptionFn
ReadUncompressed specifies that files have not been compressed.
type ReadableFile ¶
type ReadableFile struct { Metadata FileMetadata Compression compressionType }
ReadableFile is a wrapper around a FileMetadata and compressionType that can be used to obtain a file descriptor or read the file's contents.
func (ReadableFile) Open ¶
func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error)
Open opens the file for reading. The compression type is determined by the Compression field of the ReadableFile. If Compression is compressionAuto, the compression type is auto-detected from the file extension. It is the caller's responsibility to close the returned reader.
func (ReadableFile) Read ¶
func (f ReadableFile) Read(ctx context.Context) (data []byte, err error)
Read reads the entire file into memory and returns the contents.
func (ReadableFile) ReadString ¶
func (f ReadableFile) ReadString(ctx context.Context) (string, error)
ReadString reads the entire file into memory and returns the contents as a string.