fileio

package
v3.0.0-...-7ba4d6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 17 Imported by: 0

Documentation

Overview

Package fileio provides transforms for matching and reading files.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func MatchAll

func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection

MatchAll finds all files matching the glob patterns given by the incoming PCollection<string> and returns a PCollection<FileMetadata> of the matching files. MatchAll accepts a variadic number of MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty matches are allowed if the pattern contains a wildcard.

Example
package main

import (
	"context"
	"log"

	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug"
)

func main() {
	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	globs := beam.Create(s, "gs://path/to/sub1/*.gz", "gs://path/to/sub2/*.gz")
	matches := fileio.MatchAll(s, globs)
	debug.Print(s, matches)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

func MatchContinuously

func MatchContinuously(
	s beam.Scope,
	glob string,
	interval time.Duration,
	opts ...MatchContOptionFn,
) beam.PCollection

MatchContinuously finds all files matching the glob pattern at the given interval and returns a PCollection<FileMetadata> of the matching files. MatchContinuously accepts a variadic number of MatchContOptionFn that can be used to configure:

  • Start: start time for matching files. Defaults to the current timestamp
  • End: end time for matching files. Defaults to the maximum timestamp
  • DuplicateAllow: allow emitting matches that have already been observed. Defaults to false
  • DuplicateAllowIfModified: allow emitting matches that have already been observed if the file has been modified since the last observation. Defaults to false
  • DuplicateSkip: skip emitting matches that have already been observed. Defaults to true
  • ApplyWindow: assign each element to an individual window with a fixed size equivalent to the interval. Defaults to false, i.e. all elements will reside in the global window
Example
package main

import (
	"context"
	"log"
	"time"

	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug"
)

func main() {
	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	matches := fileio.MatchContinuously(s, "gs://path/to/*.json", 10*time.Second)
	debug.Print(s, matches)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

func MatchFiles

func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection

MatchFiles finds all files matching the glob pattern and returns a PCollection<FileMetadata> of the matching files. MatchFiles accepts a variadic number of MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty matches are allowed if the pattern contains a wildcard.

Example
package main

import (
	"context"
	"log"

	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug"
)

func main() {
	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
	debug.Print(s, matches)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

func ReadMatches

func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection

ReadMatches accepts the result of MatchFiles, MatchAll or MatchContinuously as a PCollection<FileMetadata> and converts it to a PCollection<ReadableFile>. The ReadableFile can be used to retrieve file metadata, open the file for reading or read the entire file into memory. ReadMatches accepts a variadic number of ReadOptionFn that can be used to configure the compression type of the files and treatment of directories. By default, the compression type is determined by the file extension and directories are skipped.

Example
package main

import (
	"context"
	"log"

	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/io/fileio"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/beamx"
	"github.com/Beamdust/beam-fork/sdks/v3/go/pkg/beam/x/debug"
)

func main() {
	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
	files := fileio.ReadMatches(s, matches)
	debug.Print(s, files)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

Types

type FileMetadata

type FileMetadata struct {
	Path         string
	Size         int64
	LastModified time.Time
}

FileMetadata contains metadata about a file, namely its path, size in bytes and last modified time.

type MatchContOptionFn

type MatchContOptionFn func(*matchContOption)

MatchContOptionFn is a function that can be passed to MatchContinuously to configure options for matching files.

func MatchApplyWindow

func MatchApplyWindow() MatchContOptionFn

MatchApplyWindow specifies that each element will be assigned to an individual window.

func MatchDuplicateAllow

func MatchDuplicateAllow() MatchContOptionFn

MatchDuplicateAllow specifies that file path matches will not be deduplicated.

func MatchDuplicateAllowIfModified

func MatchDuplicateAllowIfModified() MatchContOptionFn

MatchDuplicateAllowIfModified specifies that file path matches will be deduplicated unless the file has been modified since it was last observed.

func MatchDuplicateSkip

func MatchDuplicateSkip() MatchContOptionFn

MatchDuplicateSkip specifies that file path matches will be deduplicated.

func MatchEnd

func MatchEnd(end time.Time) MatchContOptionFn

MatchEnd specifies the end time for matching files.

func MatchStart

func MatchStart(start time.Time) MatchContOptionFn

MatchStart specifies the start time for matching files.

type MatchOptionFn

type MatchOptionFn func(*matchOption)

MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to configure options for matching files.

func MatchEmptyAllow

func MatchEmptyAllow() MatchOptionFn

MatchEmptyAllow specifies that empty matches are allowed.

func MatchEmptyAllowIfWildcard

func MatchEmptyAllowIfWildcard() MatchOptionFn

MatchEmptyAllowIfWildcard specifies that empty matches are allowed if the pattern contains a wildcard.

func MatchEmptyDisallow

func MatchEmptyDisallow() MatchOptionFn

MatchEmptyDisallow specifies that empty matches are not allowed.

type ReadOptionFn

type ReadOptionFn func(*readOption)

ReadOptionFn is a function that can be passed to ReadMatches to configure options for reading files.

func ReadAutoCompression

func ReadAutoCompression() ReadOptionFn

ReadAutoCompression specifies that the compression type of files should be auto-detected.

func ReadDirectoryDisallow

func ReadDirectoryDisallow() ReadOptionFn

ReadDirectoryDisallow specifies that directories are not allowed.

func ReadDirectorySkip

func ReadDirectorySkip() ReadOptionFn

ReadDirectorySkip specifies that directories are skipped.

func ReadGzip

func ReadGzip() ReadOptionFn

ReadGzip specifies that files have been compressed using gzip.

func ReadUncompressed

func ReadUncompressed() ReadOptionFn

ReadUncompressed specifies that files have not been compressed.

type ReadableFile

type ReadableFile struct {
	Metadata    FileMetadata
	Compression compressionType
}

ReadableFile is a wrapper around a FileMetadata and compressionType that can be used to obtain a file descriptor or read the file's contents.

func (ReadableFile) Open

func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error)

Open opens the file for reading. The compression type is determined by the Compression field of the ReadableFile. If Compression is compressionAuto, the compression type is auto-detected from the file extension. It is the caller's responsibility to close the returned reader.

func (ReadableFile) Read

func (f ReadableFile) Read(ctx context.Context) (data []byte, err error)

Read reads the entire file into memory and returns the contents.

func (ReadableFile) ReadString

func (f ReadableFile) ReadString(ctx context.Context) (string, error)

ReadString reads the entire file into memory and returns the contents as a string.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL