Documentation ¶
Overview ¶
Package storage defines extensible storage interface. This package registers "storage" config section that maps to Config struct. Use NewDataStore(cfg) to initialize a DataStore with the provided config. The package provides default implementation to access local, S3 (and minio), and In-Memory storage. Use NewCompositeDataStore to swap any portions of the DataStore interface with an external implementation (e.g. a cached protobuf store). The underlying storage is provided by extensible "stow" library. You can use NewStowRawStore(cfg) to create a Raw store based on any other stow-supported configs (e.g. Azure Blob Storage)
Index ¶
- Constants
- Variables
- func IsExceedsLimit(err error) bool
- func IsExists(err error) bool
- func IsFailedWriteToCache(err error) bool
- func IsNotFound(err error) bool
- func MapStrings(mapper func(string) string, strings ...string) []string
- func MergeMaps(dst map[string]string, src ...map[string]string)
- func RegisterStowKind(kind string, f func(string) DataReference) error
- type CachingConfig
- type ComposedProtobufStore
- type Config
- type ConnectionConfig
- type Cursor
- type CursorState
- type DataReference
- type DataStore
- type DefaultProtobufStore
- type HTTPClientConfig
- type InMemoryStore
- func (s *InMemoryStore) Clear(ctx context.Context) error
- func (c InMemoryStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
- func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
- func (s *InMemoryStore) Delete(ctx context.Context, reference DataReference) error
- func (s *InMemoryStore) GetBaseContainerFQN(ctx context.Context) DataReference
- func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
- func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
- func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
- func (s *InMemoryStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, ...) (err error)
- type LimitsConfig
- type MemoryMetadata
- type Metadata
- type Options
- type ProtobufStore
- type RawStore
- type ReferenceConstructor
- type SignedURLConfig
- type SignedURLProperties
- type SignedURLResponse
- type StowConfig
- type StowMetadata
- type StowStore
- func (c StowStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
- func (s *StowStore) CreateContainer(ctx context.Context, container string) (stow.Container, error)
- func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
- func (s *StowStore) Delete(ctx context.Context, reference DataReference) error
- func (s *StowStore) GetBaseContainerFQN(ctx context.Context) DataReference
- func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
- func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
- func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error)
- func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
- func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, ...) error
- type Type
- type URLPathConstructor
Examples ¶
Constants ¶
const ( KiB int64 = 1024 MiB int64 = 1024 * KiB )
const FailureTypeLabel contextutils.Key = "failure_type"
const FlyteContentMD5 = "flyteContentMD5"
Variables ¶
var ( ErrExceedsLimit stdErrs.ErrorCode = "LIMIT_EXCEEDED" ErrFailedToWriteCache stdErrs.ErrorCode = "CACHE_WRITE_FAILED" )
var (
ConfigSection = config.MustRegisterSection(configSectionKey, defaultConfig)
)
Functions ¶
func IsExceedsLimit ¶
IsExceedsLimit gets a value indicating whether the root cause of error is a "limit exceeded" error.
func IsExists ¶
IsExists gets a value indicating whether the underlying error is "already exists" error.
func IsFailedWriteToCache ¶
func IsNotFound ¶
IsNotFound gets a value indicating whether the underlying error is a Not Found error.
func RegisterStowKind ¶
func RegisterStowKind(kind string, f func(string) DataReference) error
RegisterStowKind registers a new kind of stow store.
Types ¶
type CachingConfig ¶
type CachingConfig struct { // Maximum size of the cache where the Blob store data is cached in-memory // Refer to https://github.com/coocood/freecache to understand how to set the value // If not specified or set to 0, cache is not used // NOTE: if Object sizes are larger than 1/1024 of the cache size, the entry will not be written to the cache // Also refer to https://github.com/coocood/freecache/issues/17 to understand how to set the cache MaxSizeMegabytes int `` /* 149-byte string literal not displayed */ // sets the garbage collection target percentage: // a collection is triggered when the ratio of freshly allocated data // to live data remaining after the previous collection reaches this percentage. // refer to https://golang.org/pkg/runtime/debug/#SetGCPercent // If not specified or set to 0, GC percent is not tweaked TargetGCPercent int `json:"target_gc_percent" pflag:",Sets the garbage collection target percentage."` }
type ComposedProtobufStore ¶
type ComposedProtobufStore interface { RawStore ProtobufStore }
ComposedProtobufStore interface includes all the necessary data to allow a ProtobufStore to interact with storage through a RawStore.
type Config ¶
type Config struct { Type Type `json:"type" pflag:",Sets the type of storage to configure [s3/minio/local/mem/stow]."` // Deprecated: Please use StowConfig instead Connection ConnectionConfig `json:"connection"` Stow StowConfig `json:"stow,omitempty" pflag:",Storage config for stow backend."` // Container here is misleading, it refers to a Bucket (AWS S3) like blobstore entity. In some terms it could be a table InitContainer string `json:"container" pflag:",Initial container (in s3 a bucket) to create -if it doesn't exist-.'"` // By default if this is not enabled, multiple containers are not supported by the storage layer. Only the configured `container` InitContainer will be allowed to requests data from. But, if enabled then data will be loaded to written to any // container specified in the DataReference. MultiContainerEnabled bool `` /* 213-byte string literal not displayed */ // Caching is recommended to improve the performance of underlying systems. It caches the metadata and resolving // inputs is accelerated. The size of the cache is large so understand how to configure the cache. // TODO provide some default config choices // If this section is skipped, Caching is disabled Cache CachingConfig `json:"cache"` Limits LimitsConfig `json:"limits" pflag:",Sets limits for stores."` DefaultHTTPClient HTTPClientConfig `json:"defaultHttpClient" pflag:",Sets the default http client config."` SignedURL SignedURLConfig `json:"signedUrl" pflag:",Sets config for SignedURL."` }
Config is a common storage config.
type ConnectionConfig ¶
type ConnectionConfig struct { Endpoint config.URL `json:"endpoint" pflag:",URL for storage client to connect to."` AuthType string `json:"auth-type" pflag:",Auth Type to use [iam,accesskey]."` AccessKey string `json:"access-key" pflag:",Access key to use. Only required when authtype is set to accesskey."` SecretKey string `json:"secret-key" pflag:",Secret to use when accesskey is set."` Region string `json:"region" pflag:",Region to connect to."` DisableSSL bool `json:"disable-ssl" pflag:",Disables SSL connection. Should only be used for development."` }
ConnectionConfig defines connection configurations.
type Cursor ¶ added in v1.13.2
type Cursor struct {
// contains filtered or unexported fields
}
func NewCursorAtEnd ¶ added in v1.13.2
func NewCursorAtEnd() Cursor
func NewCursorAtStart ¶ added in v1.13.2
func NewCursorAtStart() Cursor
func NewCursorFromCustomPosition ¶ added in v1.13.2
type CursorState ¶ added in v1.13.2
type CursorState int
const ( // Enum representing state of the cursor AtStartCursorState CursorState = 0 AtEndCursorState CursorState = 1 AtCustomPosCursorState CursorState = 2 )
type DataReference ¶
type DataReference string
DataReference defines a reference to data location.
func (DataReference) Split ¶
func (r DataReference) Split() (scheme, container, key string, err error)
Split splits the data reference into parts.
func (DataReference) String ¶
func (r DataReference) String() string
type DataStore ¶
type DataStore struct { ComposedProtobufStore ReferenceConstructor // contains filtered or unexported fields }
DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. Today we rely on Stow for multi-cloud support, but this interface abstracts that part
func NewCompositeDataStore ¶
func NewCompositeDataStore(refConstructor ReferenceConstructor, composedProtobufStore ComposedProtobufStore) *DataStore
NewCompositeDataStore composes a new DataStore.
func NewDataStore ¶
NewDataStore creates a new Data Store with the supplied config.
Example ¶
testScope := promutils.NewTestScope() ctx := context.Background() fmt.Println("Creating in memory data store.") store, err := NewDataStore(&Config{ Type: TypeMemory, }, testScope.NewSubScope("exp_new")) if err != nil { fmt.Printf("Failed to create data store. Error: %v", err) } ref, err := store.ConstructReference(ctx, DataReference("root"), "subkey", "subkey2") if err != nil { fmt.Printf("Failed to construct data reference. Error: %v", err) } fmt.Printf("Constructed data reference [%v] and writing data to it.\n", ref) dataToStore := "hello world" err = store.WriteRaw(ctx, ref, int64(len(dataToStore)), Options{}, strings.NewReader(dataToStore)) if err != nil { fmt.Printf("Failed to write data. Error: %v", err) }
Output: Creating in memory data store. Constructed data reference [/root/subkey/subkey2] and writing data to it.
type DefaultProtobufStore ¶
type DefaultProtobufStore struct { RawStore // contains filtered or unexported fields }
Implements ProtobufStore to marshal and unmarshal protobufs to/from a RawStore
func NewDefaultProtobufStore ¶
func NewDefaultProtobufStore(store RawStore, scope promutils.Scope) DefaultProtobufStore
func NewDefaultProtobufStoreWithMetrics ¶
func NewDefaultProtobufStoreWithMetrics(store RawStore, metrics *protoMetrics) DefaultProtobufStore
func (DefaultProtobufStore) ReadProtobuf ¶
func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error
func (DefaultProtobufStore) WriteProtobuf ¶
func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error
type HTTPClientConfig ¶
type HTTPClientConfig struct { Headers map[string][]string `json:"headers" pflag:"-,Sets http headers to set on the http client."` Timeout config.Duration `json:"timeout" pflag:",Sets time out on the http client."` }
HTTPClientConfig encapsulates common settings that can be applied to an HTTP Client.
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
func (InMemoryStore) CopyRaw ¶
func (c InMemoryStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
CopyRaw is a naiive implementation for copy that reads all data locally then writes them to destination. TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy: https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectUsingREST.html
func (*InMemoryStore) CreateSignedURL ¶
func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
CreateSignedURL creates a signed url with the provided properties.
func (*InMemoryStore) Delete ¶
func (s *InMemoryStore) Delete(ctx context.Context, reference DataReference) error
Delete removes the referenced data from the cache map.
func (*InMemoryStore) GetBaseContainerFQN ¶
func (s *InMemoryStore) GetBaseContainerFQN(ctx context.Context) DataReference
func (*InMemoryStore) Head ¶
func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
func (*InMemoryStore) List ¶ added in v1.13.2
func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
func (*InMemoryStore) ReadRaw ¶
func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
type LimitsConfig ¶
type LimitsConfig struct {
GetLimitMegabytes int64 `json:"maxDownloadMBs" pflag:",Maximum allowed download size (in MBs) per call."`
}
LimitsConfig specifies limits for storage package.
type MemoryMetadata ¶
type MemoryMetadata struct {
// contains filtered or unexported fields
}
func (MemoryMetadata) ContentMD5 ¶ added in v1.11.0
func (m MemoryMetadata) ContentMD5() string
func (MemoryMetadata) Etag ¶
func (m MemoryMetadata) Etag() string
func (MemoryMetadata) Exists ¶
func (m MemoryMetadata) Exists() bool
func (MemoryMetadata) Size ¶
func (m MemoryMetadata) Size() int64
type Metadata ¶
type Metadata interface { Exists() bool Size() int64 Etag() string // ContentMD5 retrieves the value of a special metadata tag added by the system that // contains the MD5 of the uploaded file. If there is no metadata attached // or that `FlyteContentMD5` key isn't set, ContentMD5 will return empty. ContentMD5() string }
Metadata is a placeholder for data reference metadata.
type Options ¶
type Options struct {
Metadata map[string]interface{}
}
Options holds storage options. It is used to pass Metadata (like headers for S3) and also tags or labels for objects
type ProtobufStore ¶
type ProtobufStore interface { // ReadProtobuf retrieves the entire blob from blobstore and unmarshals it to the passed protobuf ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error // WriteProtobuf serializes and stores the protobuf. WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error }
ProtobufStore defines an interface for reading and writing protobuf messages
type RawStore ¶
type RawStore interface { // GetBaseContainerFQN returns a FQN DataReference with the configured base init container GetBaseContainerFQN(ctx context.Context) DataReference // CreateSignedURL creates a signed url with the provided properties. CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error) // Head gets metadata about the reference. This should generally be a light weight operation. Head(ctx context.Context, reference DataReference) (Metadata, error) // List gets a list of items given a prefix, using a paginated API List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) // ReadRaw retrieves a byte array from the Blob store or an error ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) // WriteRaw stores a raw byte array. WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error // CopyRaw copies from source to destination. CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error // Delete removes the referenced data from the blob store. Delete(ctx context.Context, reference DataReference) error }
RawStore defines a low level interface for accessing and storing bytes.
type ReferenceConstructor ¶
type ReferenceConstructor interface { // ConstructReference creates a new dataReference that matches the storage structure. ConstructReference(ctx context.Context, reference DataReference, nestedKeys ...string) (DataReference, error) }
ReferenceConstructor defines an interface for building data reference paths.
type SignedURLConfig ¶
type SignedURLConfig struct {
StowConfigOverride map[string]string `json:"stowConfigOverride,omitempty" pflag:"-,Configuration for stow backend. Refer to github/flyteorg/stow"`
}
SignedURLConfig encapsulates configs specifically used for SignedURL behavior.
type SignedURLProperties ¶
type SignedURLProperties struct { // Scope defines the permission level allowed for the generated URL. Scope stow.ClientMethod // ExpiresIn defines the expiration duration for the URL. It's strongly recommended setting it. ExpiresIn time.Duration // ContentMD5 defines the expected hash of the generated file. It's strongly recommended setting it. ContentMD5 string // AddContentMD5Metadata Add ContentMD5 to the metadata of signed URL if true. AddContentMD5Metadata bool }
SignedURLProperties encapsulates properties about the signedURL operation.
type SignedURLResponse ¶
type StowConfig ¶
type StowConfig struct { Kind string `json:"kind,omitempty" pflag:",Kind of Stow backend to use. Refer to github/flyteorg/stow"` Config map[string]string `json:"config,omitempty" pflag:",Configuration for stow backend. Refer to github/flyteorg/stow"` }
StowConfig defines configs for stow as defined in github.com/flyteorg/stow
type StowMetadata ¶
type StowMetadata struct {
// contains filtered or unexported fields
}
StowMetadata that will be returned
func (StowMetadata) ContentMD5 ¶ added in v1.11.0
func (s StowMetadata) ContentMD5() string
func (StowMetadata) Etag ¶
func (s StowMetadata) Etag() string
func (StowMetadata) Exists ¶
func (s StowMetadata) Exists() bool
func (StowMetadata) Size ¶
func (s StowMetadata) Size() int64
type StowStore ¶
type StowStore struct {
// contains filtered or unexported fields
}
Implements DataStore to talk to stow location store.
func NewStowRawStore ¶
func (StowStore) CopyRaw ¶
func (c StowStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
CopyRaw is a naiive implementation for copy that reads all data locally then writes them to destination. TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy: https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectUsingREST.html
func (*StowStore) CreateContainer ¶
func (*StowStore) CreateSignedURL ¶
func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
func (*StowStore) Delete ¶
func (s *StowStore) Delete(ctx context.Context, reference DataReference) error
Delete removes the referenced data from the blob store.
func (*StowStore) GetBaseContainerFQN ¶
func (s *StowStore) GetBaseContainerFQN(ctx context.Context) DataReference
func (*StowStore) List ¶ added in v1.13.2
func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
func (*StowStore) LoadContainer ¶
func (*StowStore) ReadRaw ¶
func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
type URLPathConstructor ¶
type URLPathConstructor struct { }
URLPathConstructor implements ReferenceConstructor that assumes paths are URL-compatible.
func NewURLPathConstructor ¶
func NewURLPathConstructor() URLPathConstructor
func (URLPathConstructor) ConstructReference ¶
func (URLPathConstructor) ConstructReference(ctx context.Context, reference DataReference, nestedKeys ...string) (DataReference, error)