watcher

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWatchTimeout   = errors.New("watcher timed out")
	ErrBucketMismatch = errors.New("bucket mismatch")
)

Functions

func New

func New(ctx context.Context, tp trace.TracerProvider, logger logr.Logger, c *Config) (*serviceImpl, error)

func NewFilesystemWatcher

func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error)

func NewMinioWatcher

func NewMinioWatcher(
	ctx context.Context,
	tp trace.TracerProvider,
	logger logr.Logger,
	config *MinioConfig,
) (*minioWatcher, error)

Types

type BlobEvent

type BlobEvent struct {
	// Name of the watcher that received this blob.
	WatcherName string

	// Retention period for this blob.
	RetentionPeriod *time.Duration

	// Directory where the transfer is moved to once processing has completed
	// successfully.
	CompletedDir string

	// Whether the top-level directory is meant to be stripped.
	StripTopLevelDir bool

	// Key of the blob.
	Key string

	// Whether the blob is a directory (fs watcher)
	IsDir bool

	// Bucket where the blob lives.
	Bucket string `json:"Bucket,omitempty"`
}

BlobEvent is a serializable event that describes a blob.

BlobEvent can be sent over the wire, i.e. they're serializable. Receivers, typicially Temporal activities, can download the blob via the service implementation in this package.

TODO: use signed URLs to simplify access to buckets?

func NewBlobEvent

func NewBlobEvent(w Watcher, key string, isDir bool) *BlobEvent

func NewBlobEventWithBucket

func NewBlobEventWithBucket(w Watcher, bucket, key string) *BlobEvent

func (BlobEvent) String

func (e BlobEvent) String() string

type Cleanup

type Cleanup func(ctx context.Context) error

type Config

type Config struct {
	Filesystem []*FilesystemConfig
	Minio      []*MinioConfig
	Embedded   *MinioConfig
}

func (Config) CompletedDirs

func (c Config) CompletedDirs() []string

type FilesystemConfig

type FilesystemConfig struct {
	Name         string
	Path         string
	CompletedDir string
	Ignore       string
	Inotify      bool

	RetentionPeriod  *time.Duration
	StripTopLevelDir bool

	// PollInterval sets the length of time between filesystem polls (default:
	// 200ms). If Inotify is true then PollInterval is ignored.
	PollInterval time.Duration
}

See filesystem.go for more.

type MinioConfig

type MinioConfig struct {
	Name            string
	RedisAddress    string
	RedisList       string
	RedisFailedList string
	RedisPopTimeout time.Duration
	Region          string
	Endpoint        string
	PathStyle       bool
	Profile         string
	Key             string
	Secret          string
	Token           string
	Bucket          string
	URL             string

	RetentionPeriod  *time.Duration
	StripTopLevelDir bool

	// PollInterval sets the length of time between Redis polls (default: 1s).
	PollInterval time.Duration
}

See minio.go for more.

type MinioEvent

type MinioEvent struct {
	Name string       `json:"eventName"`
	S3   MinioEventS3 `json:"s3"`
}

MinioEvent represents the event delivered by Minio (S3) via Redis.

For reference:

{
    "eventVersion": "2.0",
    "eventSource": "minio:s3",
    "awsRegion": "",
    "eventTime": "2019-10-01T15:28:22Z",
    "eventName": "s3:ObjectCreated:CompleteMultipartUpload",
    "userIdentity": {
        "principalId": "36J9X8EZI4KEV1G7EHXA"
    },
    "requestParameters": {
        "accessKey": "36J9X8EZI4KEV1G7EHXA",
        "region": "",
        "sourceIPAddress": "172.20.0.1"
    },
    "responseElements": {
        "content-length": "291",
        "x-amz-request-id": "15C98F7AC9D60CA6",
        "x-minio-deployment-id": "bcc2f9ce-65f2-4558-a455-b8176012f89b",
        "x-minio-origin-endpoint": "http://172.20.0.5:9000"
    },
    "s3": {
        "s3SchemaVersion": "1.0",
        "configurationId": "Config",
        "bucket": {
            "name": "sips",
            "ownerIdentity": {
                "principalId": "36J9X8EZI4KEV1G7EHXA"
            },
            "arn": "arn:aws:s3:::sips"
        },
        "object": {
            "key": "y25.gif",
            "size": 100,
            "eTag": "b0814df70de0779da2b0b3f9c676c64d-1",
            "contentType": "image/gif",
            "userMetadata": {
                "X-Minio-Internal-actual-size": "100",
                "content-type": "image/gif"
            },
            "versionId": "1",
            "sequencer": "15C98F7ACA94598C"
        }
    },
    "source": {
        "host": "172.20.0.1",
        "port": "",
        "userAgent": "MinIO (linux; amd64) minio-go/v6.0.32 mc/DEVELOPMENT.GOGET"
    }
}

func (MinioEvent) String

func (e MinioEvent) String() string

type MinioEventS3

type MinioEventS3 struct {
	Bucket MinioEventS3Bucket `json:"bucket"`
	Object MinioEventS3Object `json:"object"`
}

type MinioEventS3Bucket

type MinioEventS3Bucket struct {
	Name string `json:"name"`
}

type MinioEventS3Object

type MinioEventS3Object struct {
	Key string `json:"key"`
}

type MinioEventSet

type MinioEventSet struct {
	Event     []MinioEvent
	EventTime string
}

type Service

type Service interface {
	// Watchers return all known watchers.
	Watchers() []Watcher

	// Return a watcher given its name.
	ByName(name string) (Watcher, error)

	// Download copies the watcherName file or directory identified by key to
	// dest.
	Download(ctx context.Context, dest, watcherName, key string) error

	// Delete blob given an event.
	Delete(ctx context.Context, watcherName, key string) error

	// Dipose blob into the completedDir directory.
	Dispose(ctx context.Context, watcherName, key string) error
}

type Watcher

type Watcher interface {
	// Watch waits until a blob is dispatched.
	// After the event is successfully processed by the receiver, the returned
	// Cleanup function must be executed to mark the event as processed.
	// Implementors must not return a nil function.
	Watch(ctx context.Context) (*BlobEvent, Cleanup, error)

	// Download copies the file or directory identified by key to dest.
	Download(ctx context.Context, dest, key string) error

	// OpenBucket returns the bucket where the blobs can be found.
	OpenBucket(ctx context.Context) (*blob.Bucket, error)

	RetentionPeriod() *time.Duration
	CompletedDir() string
	StripTopLevelDir() bool

	// Full path of the watched bucket when available, empty string otherwise.
	Path() string

	fmt.Stringer // It should return the name of the watcher.
}

Directories

Path Synopsis
Package fake is a generated GoMock package.
Package fake is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL