osio

package module
v0.0.0-...-6113ae4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

README

Go Reference License Build Status Coverage Status Go Report Card

Osio is an object storage wrapper to expose a posix-like read-only interface to objects stored in a bucket. It can be used to pass an object reference to functions requiring an io.ReadSeeker or an io.ReaderAt whereas object stores only expose the equivalent of io.ReaderAt.

Osio is adapted in the case where you will only be accessing a small subset of the bytes of the remote object, for example:

Under the hood, osio splits the remote object into blocks of fixed sizes (by default 128k), and keeps an lru cache of the already downloaded blocks. Subsequent reads from the object will be populated by the contents of these cached blocks. An Osio adapter is safe for concurrent usage, and mechanisms are in place do de-duplicate reads to the source object in case of concurrent access.

Osio has support for the following handlers:

  • Google Storage,
  • Amazon S3,
  • Plain HTTP.

Example Usage

Google Storage - Zip extraction

The following example shows how to extract a single file from a (large) zip archive stored on a Google Cloud Storage bucket.

import(
    "github.com/airbusgeo/osio"
    "github.com/airbusgeo/osio/gcs"
)
func ExampleGSHandle_zip() {
    ctx := context.Background()
    gcsr, err := gcs.Handle(ctx)
    /* handle error, typically if credentials could not be found, network down ,etc... */
    gcsa, _ = osio.NewAdapter(gcsr)

    file := "gs://bucket/path/to/large/archive.zip"
    obj, err := gcsa.Reader(file)
    if err != nil {
        return fmt.Errorf("open %s: %w", file, err)
    }
    zipf, err := zip.NewReader(obj, obj.Size())
    if err != nil {
        return fmt.Errorf("zip corrupted?: %w", err)
    }
    for _, f := range zipf.File {
        if f.Name == "mytargetfile.txt" {
            fr, err := f.Open()
            dstf, err := os.Create("/local/mytargetfile.txt")
            _, err = io.Copy(dstf, fr)
            fr.Close()
            err = dstf.Close()
            //fmt.Printf("extracted %s\n", f.Name)
        }
    }
}

Amazon S3 - Zip extraction

import(
    aws3 "github.com/aws/aws-sdk-go-v2/service/s3"
    "github.com/airbusgeo/osio"
    "github.com/airbusgeo/osio/s3"
)
func WithS3Region(region string) func(opts *aws3.Options) {
	return func(opts *aws3.Options) {
		opts.Region = region
	}
}

func ExampleS3Handle_zip() {
	ctx := context.Background()

	cfg, _ := config.LoadDefaultConfig(ctx)
	s3cl := aws3.NewFromConfig(cfg, WithS3Region("eu-central-1"))
	s3r, _ := s3.Handle(ctx, osio.S3Client(s3cl), osio.S3RequestPayer())
	osr, _ := osio.NewAdapter(s3r)

	uri := "s3://sentinel-s2-l1c-zips/S2A_MSIL1C_20210630T074611_N0300_R135_T48XWN_20210630T082841.zip"
	obj, _ := osr.Reader(uri)
	zipf, _ := zip.NewReader(obj, obj.Size())

	for _, f := range zipf.File {
		fmt.Printf("%s\n", f.Name)
		break
	}

	// Output:
	// S2A_MSIL1C_20210630T074611_N0300_R135_T48XWN_20210630T082841.SAFE/MTD_MSIL1C.xml
}

GDAL I/O handler

Osio is used by the GDAL godal bindings to enable GDAL to directly access files stored on a bucket. (Note: this mechanism only really makes sense when accessing file formats that are object-storage friendly, e.g. cogeotiffs )

ctx := context.Background()
gcsr, err := gcs.Handle(ctx)
gcs, _ = osio.NewAdapter(gcsr)
godal.RegisterVSIAdapter("gs://", gcs)
dataset,err := godal.Open("gs://bucket/path/to/cog.tif")
...

Contributing and TODOs

PRs are welcome! If you want to work on any of these things, please open an issue to coordinate.

  • Azure handler

Documentation

Index

Constants

View Source
const (
	DefaultBlockSize       = 128 * 1024
	DefaultNumCachedBlocks = 100
)

Variables

View Source
var StdLogger stdLogger

StdLogger is a Logger using golang's standard library logger

Functions

func BlockSize

func BlockSize(blockSize string) interface {
	AdapterOption
}

BlockSize is an option to set the size of the blocks that will be cached. If not provided, the adapter will use 128kb blocks.

BlockSize will panic if the given string does not represent a strictly positive number of bytes

func NumCachedBlocks

func NumCachedBlocks(n int) interface {
	AdapterOption
}

NumCachedBlocks is an option to set the number of blocks to cache in the default lru implementation. It is ignored if you are passing your own cache implementation through BlockCache

func Retries

func Retries(retries int) interface {
	AdapterOption
}

Retries is an option to set the number of times a ReadAt() will be retried if it returns a temporary/transient error

func SizeCache

func SizeCache(numEntries int) interface {
	AdapterOption
}

SizeCache is an option that determines how many key sizes will be cached by the adapter. Having a size cache speeds up the opening of files by not requiring that a lookup to the KeyStreamerAt for the object size.

func SplitRanges deprecated

func SplitRanges(splitRanges bool) interface {
	AdapterOption
}

SplitRanges is an option to prevent making MultiRead try to merge consecutive ranges into a single block request

Deprecated: osio now automatically splits a request into individual blocks when needed

func WithLogger

func WithLogger(logger Logger) interface {
	AdapterOption
}

WithLogger is an option to make the adapter log requests that were not served from the lru cache, i.e. that logs each request to the underlying KeyStreamerAt

Types

type Adapter

type Adapter struct {
	// contains filtered or unexported fields
}

Adapter caches fixed-sized chunks of a KeyStreamerAt, and exposes ReadAt(key string, buf []byte, offset int64) (int, error) that feeds from its internal cache, only falling back to the provided KeyStreamerAt whenever data could not be retrieved from its internal cache, while ensuring that concurrent requests only result in a single call to the source reader.

func NewAdapter

func NewAdapter(keyStreamer KeyStreamerAt, opts ...AdapterOption) (*Adapter, error)

NewStreamingAdapter creates a caching adapter around the provided KeyStreamerAt.

NewStreamingAdapter will only return an error if you do not provide plausible options (e.g. negative number of blocks or sizes, nil caches, etc...)

func (*Adapter) ReadAt

func (a *Adapter) ReadAt(key string, p []byte, off int64) (int, error)

func (*Adapter) ReadAtMulti

func (a *Adapter) ReadAtMulti(key string, bufs [][]byte, offsets []int64) ([]int, error)

func (*Adapter) Reader

func (a *Adapter) Reader(key string) (*Reader, error)

func (*Adapter) Size

func (a *Adapter) Size(key string) (int64, error)

type AdapterOption

type AdapterOption interface {
	// contains filtered or unexported methods
}

func BlockCache

func BlockCache(bc BlockCacher) AdapterOption

BlockCache is an option to make Adapter use the specified block cacher. If not provided, the Adapter will use an internal lru cache holding up to 100 blocks of data

type BlockCacher

type BlockCacher interface {
	Add(key string, blockID uint, data []byte)
	Get(key string, blockID uint) ([]byte, bool)
}

BlockCacher is the interface that wraps block caching functionality

Add inserts data to the cache for the given key and blockID.

Get fetches the data for the given key and blockID. It returns the data and wether the data was found in the cache or not

type Client

type Client interface {
	Do(*http.Request) (*http.Response, error)
}

type HTTPHandler

type HTTPHandler struct {
	// contains filtered or unexported fields
}

func HTTPHandle

func HTTPHandle(ctx context.Context, opts ...HTTPOption) (*HTTPHandler, error)

HTTPHandle creates a KeyReaderAt suitable for constructing an Adapter that accesses objects using the http protocol

func (*HTTPHandler) ReadAt

func (h *HTTPHandler) ReadAt(key string, p []byte, off int64) (int, int64, error)

func (*HTTPHandler) StreamAt

func (h *HTTPHandler) StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error)

type HTTPOption

type HTTPOption func(o *HTTPHandler)

HTTPOption is an option that can be passed to RegisterHandler

func HTTPBasicAuth

func HTTPBasicAuth(username, password string) HTTPOption

HTTPBasicAuth sets user/pwd for each request

func HTTPClient

func HTTPClient(cl Client) HTTPOption

HTTPClient sets the http.Client that will be used by the handler

func HTTPHeader

func HTTPHeader(key, value string) HTTPOption

HTTPHeader sets a header on http request. Useful to add api keys.

type KeyStreamerAt

type KeyStreamerAt interface {
	// StreamAt returns a io.ReadCloser on a section from the resource identified by key
	// starting at offset off. It returns any error encountered.
	//
	// If the stream fails because the object does not exist, StreamAt must return syscall.ENOENT
	// (or a wrapped error of syscall.ENOENT)
	//
	// The reader returned by StreamAt must follow the standard io.ReadCloser convention with respect
	// to error handling.
	//
	// Clients of StreamAt can execute parallel StreamAt calls on the same input source.
	//
	// If called with off==0, StreamAt must also return the total object size in its second
	// return value
	//
	// The caller of StreamAt is responsible for closing the stream.
	StreamAt(key string, off int64, n int64) (io.ReadCloser, int64, error)
}

KeyStreamerAt is the second interface a handler can implement.

• StreamAt should return ENOENT in case of an error due to an inexistant file. This non-existant status is cached by the Adapter in order to prevent subsequent calls to the same key.

• StreamAt should return the total size of the object when called with a 0 offset. This is required in order to implement the io.Seeker interface, and to detect out of bounds accesses without incurring a network access. If you do not rely on this functionality, your implementation may return math.MaxInt64

type LRUCache

type LRUCache struct {
	// contains filtered or unexported fields
}

func NewLRUCache

func NewLRUCache(numEntries int) (*LRUCache, error)

func (*LRUCache) Add

func (cg *LRUCache) Add(key string, id uint, data []byte)

func (*LRUCache) Get

func (cg *LRUCache) Get(key string, id uint) ([]byte, bool)

func (*LRUCache) Purge

func (cg *LRUCache) Purge()

func (*LRUCache) PurgeKey

func (cg *LRUCache) PurgeKey(prefix string)

type Logger

type Logger interface {
	Log(key string, offset, length int64)
}

Logger is used to optionally log requests to the underlying KetStreamerAt

type NamedOnceMutex

type NamedOnceMutex interface {
	//Lock tries to acquire a lock on a keyed resource. If the keyed resource is not already locked,
	//Lock aquires a lock to the resource and returns true. If the keyed resource is already locked,
	//Lock waits until the resource has been unlocked and returns false
	Lock(key interface{}) bool
	//TryLock tries to acquire a lock on a keyed resource. If the keyed resource is not already locked,
	//TryLock aquires a lock to the resource and returns true. If the keyed resource is already locked,
	//TryLock returns false immediately
	TryLock(key interface{}) bool
	//Unlock a keyed resource. Should be called by a client whose call to Lock returned true once the
	//resource is ready for consumption by other clients
	Unlock(key interface{})
}

NamedOnceMutex is a locker on arbitrary lock names.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func (*Reader) Read

func (r *Reader) Read(buf []byte) (int, error)

func (*Reader) ReadAt

func (r *Reader) ReadAt(buf []byte, off int64) (int, error)

func (*Reader) ReadAtMulti

func (r *Reader) ReadAtMulti(bufs [][]byte, offs []int64) ([]int, error)

func (*Reader) Seek

func (r *Reader) Seek(off int64, nWhence int) (int64, error)

func (*Reader) Size

func (r *Reader) Size() int64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL