Documentation ¶
Index ¶
- Variables
- func New(ctx context.Context, tp trace.TracerProvider, logger logr.Logger, c *Config) (*serviceImpl, error)
- func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error)
- func NewMinioWatcher(ctx context.Context, tp trace.TracerProvider, logger logr.Logger, ...) (*minioWatcher, error)
- type BlobEvent
- type Cleanup
- type Config
- type FilesystemConfig
- type MinioConfig
- type MinioEvent
- type MinioEventS3
- type MinioEventS3Bucket
- type MinioEventS3Object
- type MinioEventSet
- type Service
- type Watcher
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrWatchTimeout = errors.New("watcher timed out") ErrBucketMismatch = errors.New("bucket mismatch") )
Functions ¶
func NewFilesystemWatcher ¶
func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error)
func NewMinioWatcher ¶
func NewMinioWatcher( ctx context.Context, tp trace.TracerProvider, logger logr.Logger, config *MinioConfig, ) (*minioWatcher, error)
Types ¶
type BlobEvent ¶
type BlobEvent struct { // Name of the watcher that received this blob. WatcherName string // Retention period for this blob. RetentionPeriod *time.Duration // Directory where the transfer is moved to once processing has completed // successfully. CompletedDir string // Whether the top-level directory is meant to be stripped. StripTopLevelDir bool // Key of the blob. Key string // Whether the blob is a directory (fs watcher) IsDir bool // Bucket where the blob lives. Bucket string `json:"Bucket,omitempty"` }
BlobEvent is a serializable event that describes a blob.
BlobEvent can be sent over the wire, i.e. they're serializable. Receivers, typicially Temporal activities, can download the blob via the service implementation in this package.
TODO: use signed URLs to simplify access to buckets?
func NewBlobEventWithBucket ¶
type Config ¶
type Config struct { Filesystem []*FilesystemConfig Minio []*MinioConfig Embedded *MinioConfig }
func (Config) CompletedDirs ¶
type FilesystemConfig ¶
type FilesystemConfig struct { Name string Path string CompletedDir string Ignore string Inotify bool RetentionPeriod *time.Duration StripTopLevelDir bool // PollInterval sets the length of time between filesystem polls (default: // 200ms). If Inotify is true then PollInterval is ignored. PollInterval time.Duration }
See filesystem.go for more.
type MinioConfig ¶
type MinioConfig struct { Name string RedisAddress string RedisList string RedisFailedList string RedisPopTimeout time.Duration Region string Endpoint string PathStyle bool Profile string Key string Secret string Token string Bucket string URL string RetentionPeriod *time.Duration StripTopLevelDir bool // PollInterval sets the length of time between Redis polls (default: 1s). PollInterval time.Duration }
See minio.go for more.
type MinioEvent ¶
type MinioEvent struct { Name string `json:"eventName"` S3 MinioEventS3 `json:"s3"` }
MinioEvent represents the event delivered by Minio (S3) via Redis.
For reference:
{ "eventVersion": "2.0", "eventSource": "minio:s3", "awsRegion": "", "eventTime": "2019-10-01T15:28:22Z", "eventName": "s3:ObjectCreated:CompleteMultipartUpload", "userIdentity": { "principalId": "36J9X8EZI4KEV1G7EHXA" }, "requestParameters": { "accessKey": "36J9X8EZI4KEV1G7EHXA", "region": "", "sourceIPAddress": "172.20.0.1" }, "responseElements": { "content-length": "291", "x-amz-request-id": "15C98F7AC9D60CA6", "x-minio-deployment-id": "bcc2f9ce-65f2-4558-a455-b8176012f89b", "x-minio-origin-endpoint": "http://172.20.0.5:9000" }, "s3": { "s3SchemaVersion": "1.0", "configurationId": "Config", "bucket": { "name": "sips", "ownerIdentity": { "principalId": "36J9X8EZI4KEV1G7EHXA" }, "arn": "arn:aws:s3:::sips" }, "object": { "key": "y25.gif", "size": 100, "eTag": "b0814df70de0779da2b0b3f9c676c64d-1", "contentType": "image/gif", "userMetadata": { "X-Minio-Internal-actual-size": "100", "content-type": "image/gif" }, "versionId": "1", "sequencer": "15C98F7ACA94598C" } }, "source": { "host": "172.20.0.1", "port": "", "userAgent": "MinIO (linux; amd64) minio-go/v6.0.32 mc/DEVELOPMENT.GOGET" } }
func (MinioEvent) String ¶
func (e MinioEvent) String() string
type MinioEventS3 ¶
type MinioEventS3 struct { Bucket MinioEventS3Bucket `json:"bucket"` Object MinioEventS3Object `json:"object"` }
type MinioEventS3Bucket ¶
type MinioEventS3Bucket struct {
Name string `json:"name"`
}
type MinioEventS3Object ¶
type MinioEventS3Object struct {
Key string `json:"key"`
}
type MinioEventSet ¶
type MinioEventSet struct { Event []MinioEvent EventTime string }
type Service ¶
type Service interface { // Watchers return all known watchers. Watchers() []Watcher // Return a watcher given its name. ByName(name string) (Watcher, error) // Download copies the watcherName file or directory identified by key to // dest. Download(ctx context.Context, dest, watcherName, key string) error // Delete blob given an event. Delete(ctx context.Context, watcherName, key string) error // Dipose blob into the completedDir directory. Dispose(ctx context.Context, watcherName, key string) error }
type Watcher ¶
type Watcher interface { // Watch waits until a blob is dispatched. // After the event is successfully processed by the receiver, the returned // Cleanup function must be executed to mark the event as processed. // Implementors must not return a nil function. Watch(ctx context.Context) (*BlobEvent, Cleanup, error) // Download copies the file or directory identified by key to dest. Download(ctx context.Context, dest, key string) error // OpenBucket returns the bucket where the blobs can be found. OpenBucket(ctx context.Context) (*blob.Bucket, error) RetentionPeriod() *time.Duration CompletedDir() string StripTopLevelDir() bool // Full path of the watched bucket when available, empty string otherwise. Path() string fmt.Stringer // It should return the name of the watcher. }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.