objstore

package module
v0.0.0-...-063ea38 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: Apache-2.0 Imports: 25 Imported by: 153

README

Thanos Logo

Latest Release Slack

Go Report Card Go Code reference

Tests

Thanos Object Storage Client

objstore is a Go module providing unified interface and efficient clients to work with various object storage providers.

Features:

  • Ability to perform common operations with clear contract against most popular object storages.
  • High focus on efficiency and reliability required for distributed databases on object storages.
  • Optional built-in YAML based configuration definition for consistent configuration.
  • Optional Prometheus metric instrumentation for bucket operations.

This moduile is battle-tested and used on high scale production by projects like Thanos, Loki, Cortex, Mimir, Tempo, Parca and more.

Contributing

Contributions are very welcome! See our CONTRIBUTING.md for more information.

Community

Thanos is an open source project and we value and welcome new contributors and members of the community. Here are ways to get in touch with the community:

Adopters

See [Adopters List](https://github.com/thanos-io/thanos/blob/main/website/data/adopters.yml.

Background

This library was initially developed as a Thanos objstore package. Thanos uses object storage as primary storage for metrics and metadata related to them. This package ended up being used by other projects like Cortex, Loki, Mimir, Tempo, Parca and more.

Given reusability, Thanos community promoted this package to standalone Go module with smaller amount of dependencies.

Maintainers

See MAINTAINERS.md

How to use objstore

The core this module is the Bucket interface:

// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
	io.Closer
	BucketReader

	Provider() ObjProvider

	// Upload the contents of the reader as an object into the bucket.
	// Upload should be idempotent.
	Upload(ctx context.Context, name string, r io.Reader) error

	// Delete removes the object with the given name.
	// If object does not exist in the moment of deletion, Delete should throw error.
	Delete(ctx context.Context, name string) error

	// Name returns the bucket name for the provider.
	Name() string
}

All provider implementations have to implement Bucket interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the BucketReader interface:

// BucketReader provides read access to an object storage bucket.
type BucketReader interface {
	// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
	// object name including the prefix of the inspected directory.

	// Entries are passed to function in sorted order.
	Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

	// IterWithAttributes calls f for each entry in the given directory similar to Iter.
	// In addition to Name, it also includes requested object attributes in the argument to f.
	//
	// Attributes can be requested using IterOption.
	// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
	IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

	// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
	SupportedIterOptions() []IterOptionType

	// Get returns a reader for the given object name.
	Get(ctx context.Context, name string) (io.ReadCloser, error)

	// GetRange returns a new range reader for the given object name and range.
	GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error)

	// Exists checks if the given object exists in the bucket.
	Exists(ctx context.Context, name string) (bool, error)

	// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
	IsObjNotFoundErr(err error) bool

	// IsAccessDeniedErr returns true if access to object is denied.
	IsAccessDeniedErr(err error) bool

	// Attributes returns information about the specified object.
	Attributes(ctx context.Context, name string) (ObjectAttributes, error)
}

Those interfaces represent the object storage operations your code can use from objstore clients.

Factory

Generally, you have two ways of using objstore module:

First is to import the provider you want e.g. github.com/thanos-io/objstore/providers/s3 and instantiate it with available constructor (e.g. NewBucket).

The second option is to use the factory NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) that will instantiate the object storage client based on YAML file provided. The YAML file has generally the format like this:

type: <PROVIDER_TYPE>
config:
  <PROVIDER_TYPE specific options>

The exact option depends on provider and are in sections below.

NOTE: All code snippets are auto-generated from code and up-to-date.

Check out the Thanos documentation to see how Thanos uses this module.

Supported Providers (Clients)

Current object storage client implementations:

Provider Maturity Aimed For Auto-tested on CI Maintainers
Google Cloud Storage Stable Production Usage yes @bwplotka
AWS/S3 (and all S3-compatible storages e.g disk-based Minio) Stable Production Usage yes @bwplotka
Azure Storage Account Stable Production Usage no @vglafirov,@phillebaba
OpenStack Swift Beta (working PoC) Production Usage yes @FUSAKLA
Tencent COS Beta Production Usage no @jojohappy,@hanjm
AliYun OSS Beta Production Usage no @shaulboozhiao,@wujinhu
Baidu BOS Beta Production Usage no @yahaa
Local Filesystem Stable Testing and Demo only yes @bwplotka
Oracle Cloud Infrastructure Object Storage Beta Production Usage yes @aarontams,@gaurav-05,@ericrrath
HuaweiCloud OBS Beta Production Usage no @setoru

Missing support to some object storage? Check out how to add your client section

NOTE: Currently Thanos requires strong consistency (write-read) for object store implementation for singleton Compaction purposes.

S3

Thanos uses the minio client library to upload Prometheus data into AWS S3.

NOTE: S3 client was designed for AWS S3, but it can be configured against other S3-compatible object storages e.g Ceph

The S3 object storage yaml configuration definition:

type: S3
config:
  bucket: ""
  endpoint: ""
  region: ""
  disable_dualstack: false
  aws_sdk_auth: false
  access_key: ""
  insecure: false
  signature_version2: false
  secret_key: ""
  session_token: ""
  put_user_metadata: {}
  http_config:
    idle_conn_timeout: 1m30s
    response_header_timeout: 2m
    insecure_skip_verify: false
    tls_handshake_timeout: 10s
    expect_continue_timeout: 1s
    max_idle_conns: 100
    max_idle_conns_per_host: 100
    max_conns_per_host: 0
    tls_config:
      ca_file: ""
      cert_file: ""
      key_file: ""
      server_name: ""
      insecure_skip_verify: false
    disable_compression: false
  trace:
    enable: false
  list_objects_version: ""
  bucket_lookup_type: auto
  send_content_md5: true
  disable_multipart: false
  part_size: 67108864
  sse_config:
    type: ""
    kms_key_id: ""
    kms_encryption_context: {}
    encryption_key: ""
  sts_endpoint: ""
prefix: ""

At a minimum, you will need to provide a value for the bucket, endpoint, access_key, and secret_key keys. The rest of the keys are optional.

However if you set aws_sdk_auth: true Thanos will use the default authentication methods of the AWS SDK for go based on known environment variables (AWS_PROFILE, AWS_WEB_IDENTITY_TOKEN_FILE ... etc) and known AWS config files (~/.aws/config). If you turn this on, then the bucket and endpoint are the required config keys.

The field prefix can be used to transparently use prefixes in your S3 bucket. This allows you to separate blocks coming from different sources into paths with different prefixes, making it easier to understand what's going on (i.e. you don't have to use Thanos tooling to know from where which blocks came).

The AWS region to endpoint mapping can be found in this link.

By default, the library prefers using dual-stack endpoints. You can explicitly disable this behaviour by setting disable_dualstack: true.

Make sure you use a correct signature version. Currently AWS requires signature v4, so it needs signature_version2: false. If you don't specify it, you will get an Access Denied error. On the other hand, several S3 compatible APIs use signature_version2: true.

You can configure the timeout settings for the HTTP client by setting the http_config.idle_conn_timeout and http_config.response_header_timeout keys. As a rule of thumb, if you are seeing errors like timeout awaiting response headers in your logs, you may want to increase the value of http_config.response_header_timeout.

Please refer to the documentation of the Transport type in the net/http package for detailed information on what each option does.

part_size is specified in bytes and refers to the minimum file size used for multipart uploads, as some custom S3 implementations may have different requirements. A value of 0 means to use a default 128 MiB size.

Set list_objects_version: "v1" for S3 compatible APIs that don't support ListObjectsV2 (e.g. some versions of Ceph). Default value ("") is equivalent to "v2".

http_config.tls_config allows configuring TLS connections. Please refer to the document of tls_config for detailed information on what each option does.

bucket_lookup_type can be auto, virtual-hosted or path. Read more about it here.

For debug and testing purposes you can set

  • insecure: true to switch to plain insecure HTTP instead of HTTPS

  • http_config.insecure_skip_verify: true to disable TLS certificate verification (if your S3 based storage is using a self-signed certificate, for example)

  • trace.enable: true to enable the minio client's verbose logging. Each request and response will be logged into the debug logger, so debug level logging must be enabled for this functionality.

S3 Server-Side Encryption

SSE can be configued using the sse_config. SSE-S3, SSE-KMS, and SSE-C are supported.

  • If type is set to SSE-S3 you do not need to configure other options.

  • If type is set to SSE-KMS you must set kms_key_id. The kms_encryption_context is optional, as AWS provides a default encryption context.

  • If type is set to SSE-C you must provide a path to the encryption key using encryption_key.

If the SSE Config block is set but the type is not one of SSE-S3, SSE-KMS, or SSE-C, an error is raised.

You will also need to apply the following AWS IAM policy for the user to access the KMS key:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "KMSAccess",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey",
                "kms:Encrypt",
                "kms:Decrypt"
            ],
            "Resource": "arn:aws:kms:<region>:<account>:key/<KMS key id>"
        }
    ]
}
Credentials

By default Thanos will try to retrieve credentials from the following sources:

  1. From config file if BOTH access_key and secret_key are present.
  2. From the standard AWS environment variable - AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
  3. From ~/.aws/credentials
  4. IAM credentials retrieved from an instance profile.

NOTE: Getting access key from config file and secret key from other method (and vice versa) is not supported.

AWS Policies

Example working AWS IAM policy for user:

  • For deployment (policy for Thanos services):
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject",
                "s3:DeleteObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket>/*",
                "arn:aws:s3:::<bucket>"
            ]
        }
    ]
}

(No bucket policy)

To test the policy, set env vars for S3 access for empty, not used bucket as well as:

THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI,OBS
THANOS_ALLOW_EXISTING_BUCKET_USE=true

And run: GOCACHE=off go test -v -run TestObjStore_AcceptanceTest_e2e ./pkg/...

  • For testing (policy to run e2e tests):

We need access to CreateBucket and DeleteBucket and access to all buckets:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject",
                "s3:DeleteObject",
                "s3:PutObject",
                "s3:CreateBucket",
                "s3:DeleteBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket>/*",
                "arn:aws:s3:::<bucket>"
            ]
        }
    ]
}

With this policy you should be able to run set THANOS_TEST_OBJSTORE_SKIP=GCS,AZURE,SWIFT,COS,ALIYUNOSS,OCI,OBS and unset S3_BUCKET and run all tests using make test.

Details about AWS policies: https://docs.aws.amazon.com/AmazonS3/latest/dev/using-with-s3-actions.html

STS Endpoint

If you want to use IAM credential retrieved from an instance profile, Thanos needs to authenticate through AWS STS. For this purposes you can specify your own STS Endpoint.

By default Thanos will use endpoint: https://sts.amazonaws.com and AWS region corresponding endpoints.

GCS

To configure Google Cloud Storage bucket as an object store you need to set bucket with GCS bucket name and configure Google Application credentials.

For example:

type: GCS
config:
  bucket: ""
  service_account: ""
  use_grpc: false
  grpc_conn_pool_size: 0
  http_config:
    idle_conn_timeout: 0s
    response_header_timeout: 0s
    insecure_skip_verify: false
    tls_handshake_timeout: 0s
    expect_continue_timeout: 0s
    max_idle_conns: 0
    max_idle_conns_per_host: 0
    max_conns_per_host: 0
    tls_config:
      ca_file: ""
      cert_file: ""
      key_file: ""
      server_name: ""
      insecure_skip_verify: false
    disable_compression: false
  chunk_size_bytes: 0
prefix: ""
Using GOOGLE_APPLICATION_CREDENTIALS

Application credentials are configured via JSON file and only the bucket needs to be specified, the client looks for:

  1. A JSON file whose path is specified by the GOOGLE_APPLICATION_CREDENTIALS environment variable.
  2. A JSON file in a location known to the gcloud command-line tool. On Windows, this is %APPDATA%/gcloud/application_default_credentials.json. On other systems, $HOME/.config/gcloud/application_default_credentials.json.
  3. On Google App Engine it uses the appengine.AccessToken function.
  4. On Google Compute Engine and Google App Engine Managed VMs, it fetches credentials from the metadata server. (In this final case any provided scopes are ignored.)

You can read more on how to get application credential json file in https://cloud.google.com/docs/authentication/production

Using inline a Service Account

Another possibility is to inline the ServiceAccount into the Thanos configuration and only maintain one file. This feature was added, so that the Prometheus Operator only needs to take care of one secret file.

type: GCS
config:
  bucket: "thanos"
  service_account: |-
    {
      "type": "service_account",
      "project_id": "project",
      "private_key_id": "abcdefghijklmnopqrstuvwxyz12345678906666",
      "private_key": "-----BEGIN PRIVATE KEY-----\...\n-----END PRIVATE KEY-----\n",
      "client_email": "project@thanos.iam.gserviceaccount.com",
      "client_id": "123456789012345678901",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/thanos%40gitpods.iam.gserviceaccount.com"
    }
GCS Policies

Note: GCS Policies should be applied at the project level, not at the bucket level

For deployment:

Storage Object Creator and Storage Object Viewer

For testing:

Storage Object Admin for ability to create and delete temporary buckets.

To test the policy is working as expected, exec into the sidecar container, eg:

kubectl exec -it -n <namespace> <prometheus with sidecar pod name> -c <sidecar container name> -- /bin/sh

Then test that you can at least list objects in the bucket, eg:

thanos tools bucket ls --objstore.config="${OBJSTORE_CONFIG}"
Azure

To use Azure Storage as Thanos object store, you need to precreate storage account from Azure portal or using Azure CLI. Follow the instructions from Azure Storage Documentation: https://docs.microsoft.com/en-us/azure/storage/common/storage-quickstart-create-account

Config file format is the following:

type: AZURE
config:
  storage_account: ""
  storage_account_key: ""
  storage_connection_string: ""
  storage_create_container: false
  container: ""
  endpoint: ""
  user_assigned_id: ""
  max_retries: 0
  reader_config:
    max_retry_requests: 0
  pipeline_config:
    max_tries: 0
    try_timeout: 0s
    retry_delay: 0s
    max_retry_delay: 0s
  http_config:
    idle_conn_timeout: 0s
    response_header_timeout: 0s
    insecure_skip_verify: false
    tls_handshake_timeout: 0s
    expect_continue_timeout: 0s
    max_idle_conns: 0
    max_idle_conns_per_host: 0
    max_conns_per_host: 0
    tls_config:
      ca_file: ""
      cert_file: ""
      key_file: ""
      server_name: ""
      insecure_skip_verify: false
    disable_compression: false
  msi_resource: ""
prefix: ""

If msi_resource is used, authentication is done via system-assigned managed identity. The value for Azure should be https://<storage-account-name>.blob.core.windows.net.

If user_assigned_id is used, authentication is done via user-assigned managed identity. When using user_assigned_id the msi_resource defaults to https://<storage_account>.<endpoint>

If storage_connection_string is set, the values of storage_account and endpoint values will not be used. Use this method over storage_account_key if you need to authenticate via a SAS token.

The generic max_retries will be used as value for the pipeline_config's max_tries and reader_config's max_retry_requests. For more control, max_retries could be ignored (0) and one could set specific retry values.

OpenStack Swift

Thanos uses ncw/swift client to upload Prometheus data into OpenStack Swift.

Below is an example configuration file for thanos to use OpenStack swift container as an object store. Note that if the name of a user, project or tenant is used one must also specify its domain by ID or name. Various examples for OpenStack authentication can be found in the official documentation.

By default, OpenStack Swift has a limit for maximum file size of 5 GiB. Thanos index files are often larger than that. To resolve this issue, Thanos uses Static Large Objects (SLO) which are uploaded as segments. These are by default put into the segments directory of the same container. The default limit for using SLO is 1 GiB which is also the maximum size of the segment. If you don't want to use the same container for the segments (best practise is to use <container_name>_segments to avoid polluting listing of the container objects) you can use the large_file_segments_container_name option to override the default and put the segments to other container. In rare cases you can switch to Dynamic Large Objects (DLO) by setting the use_dynamic_large_objects to true, but use it with caution since it even more relies on eventual consistency.

type: SWIFT
config:
  auth_version: 0
  auth_url: ""
  username: ""
  user_domain_name: ""
  user_domain_id: ""
  user_id: ""
  password: ""
  domain_id: ""
  domain_name: ""
  application_credential_id: ""
  application_credential_name: ""
  application_credential_secret: ""
  project_id: ""
  project_name: ""
  project_domain_id: ""
  project_domain_name: ""
  region_name: ""
  container_name: ""
  large_object_chunk_size: 1073741824
  large_object_segments_container_name: ""
  retries: 3
  connect_timeout: 10s
  timeout: 5m
  use_dynamic_large_objects: false
  http_config:
    idle_conn_timeout: 1m30s
    response_header_timeout: 2m
    insecure_skip_verify: false
    tls_handshake_timeout: 10s
    expect_continue_timeout: 1s
    max_idle_conns: 100
    max_idle_conns_per_host: 100
    max_conns_per_host: 0
    tls_config:
      ca_file: ""
      cert_file: ""
      key_file: ""
      server_name: ""
      insecure_skip_verify: false
    disable_compression: false
prefix: ""
Tencent COS

To use Tencent COS as storage store, you should apply a Tencent Account to create an object storage bucket at first. Note that detailed from Tencent Cloud Documents: https://cloud.tencent.com/document/product/436

To configure Tencent Account to use COS as storage store you need to set these parameters in yaml format stored in a file:

type: COS
config:
  bucket: ""
  region: ""
  app_id: ""
  endpoint: ""
  secret_key: ""
  secret_id: ""
  http_config:
    idle_conn_timeout: 1m30s
    response_header_timeout: 2m
    insecure_skip_verify: false
    tls_handshake_timeout: 10s
    expect_continue_timeout: 1s
    max_idle_conns: 100
    max_idle_conns_per_host: 100
    max_conns_per_host: 0
    tls_config:
      ca_file: ""
      cert_file: ""
      key_file: ""
      server_name: ""
      insecure_skip_verify: false
    disable_compression: false
prefix: ""

The secret_key and secret_id field is required. The http_config field is optional for optimize HTTP transport settings. There are two ways to configure the required bucket information:

  1. Provide the values of bucket, region and app_id keys.
  2. Provide the values of endpoint key with url format when you want to specify vpc internal endpoint. Please refer to the document of endpoint for more detail.
AliYun OSS

In order to use AliYun OSS object storage, you should first create a bucket with proper Storage Class , ACLs and get the access key on the AliYun cloud. Go to https://www.alibabacloud.com/product/oss for more detail.

The AliYun OSS object storage yaml configuration definition:

type: ALIYUNOSS
config:
  endpoint: ""
  bucket: ""
  access_key_id: ""
  access_key_secret: ""
prefix: ""
Baidu BOS

In order to use Baidu BOS object storage, you should apply for a Baidu Account and create an object storage bucket first. Refer to Baidu Cloud Documents for more details. The Baidu BOS object storage yaml configuration definition:

type: BOS
config:
  bucket: ""
  endpoint: ""
  access_key: ""
  secret_key: ""
prefix: ""
Filesystem

This storage type is used when user wants to store and access the bucket in the local filesystem. We treat filesystem the same way we would treat object storage, so all optimization for remote bucket applies even though, we might have the files locally.

NOTE: This storage type is experimental and might be inefficient. It is NOT advised to use it as the main storage for metrics in production environment. Particularly there is no planned support for distributed filesystems like NFS. This is mainly useful for testing and demos.

Filesystem "object storage" yaml configuration definition:

type: FILESYSTEM
config:
  directory: ""
prefix: ""
Oracle Cloud Infrastructure Object Storage

To configure Oracle Cloud Infrastructure (OCI) Object Storage as a Thanos Object Store, you need to provide appropriate authentication credentials to your OCI tenancy. The OCI object storage client implementation for Thanos supports default keypair, instance principal, and OKE workload identity authentication.

API Signing Key

The default API signing key authentication provider leverages same configuration as the OCI CLI which is usually stored in at $HOME/.oci/config or via variable names starting with the string OCI_CLI. You can also use environment variables that start with TF_VAR. If the same configuration is found in multiple places the provider will prefer the first one.

The following example configures the provider to look for an existing API signing key for authentication:

type: OCI
config:
  provider: "default"
  bucket: ""
  compartment_ocid: ""
  part_size: ""                   // Optional part size to override the OCI default of 128 MiB, value is in bytes.
  max_request_retries: ""         // Optional maximum number of retries for a request.
  request_retry_interval: ""      // Optional sleep duration in seconds between retry requests.
  http_config:
    idle_conn_timeout: 1m30s      // Optional maximum amount of time an idle (keep-alive) connection will remain idle before closing itself. Zero means no limit.
    response_header_timeout: 2m   // Optional amount of time to wait for a server's response headers after fully writing the request.
    tls_handshake_timeout: 10s    // Optional maximum amount of time waiting to wait for a TLS handshake. Zero means no timeout.
    expect_continue_timeout: 1s   // Optional amount of time to wait for a server's first response headers. Zero means no timeout and causes the body to be sent immediately.
    insecure_skip_verify: false   // Optional. If true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.
    max_idle_conns: 100           // Optional maximum number of idle (keep-alive) connections across all hosts. Zero means no limit.
    max_idle_conns_per_host: 100  // Optional maximum idle (keep-alive) connections to keep per-host. If zero, DefaultMaxIdleConnsPerHost=2 is used.
    max_conns_per_host: 0         // Optional maximum total number of connections per host.
    disable_compression: false    // Optional. If true, prevents the Transport from requesting compression.
    client_timeout: 90s           // Optional time limit for requests made by the HTTP Client.
Instance Principal Provider

For Example:

type: OCI
config:
  provider: "instance-principal"
  bucket: ""
  compartment_ocid: ""

You can also include any of the optional configuration just like the example in Default Provider.

Raw Provider

For Example:

type: OCI
config:
  provider: "raw"
  bucket: ""
  compartment_ocid: ""
  tenancy_ocid: ""
  user_ocid: ""
  region: ""
  fingerprint: ""
  privatekey: ""
  passphrase: ""         // Optional passphrase to encrypt the private API Signing key

You can also include any of the optional configuration just like the example in Default Provider.

OKE Workload Identity Provider

For Example:

type: OCI
config:
  provider: "oke-workload-identity"
  bucket: ""
  region: ""

The bucket and region fields are required. The region field identifies the bucket region.

HuaweiCloud OBS

To use HuaweiCloud OBS as an object store, you should apply for a HuaweiCloud Account to create an object storage bucket at first. More details: HuaweiCloud OBS

To configure HuaweiCloud Account to use OBS as storage store you need to set these parameters in YAML format stored in a file:

type: OBS
config:
  bucket: ""
  endpoint: ""
  access_key: ""
  secret_key: ""
  http_config:
    idle_conn_timeout: 1m30s
    response_header_timeout: 2m
    insecure_skip_verify: false
    tls_handshake_timeout: 10s
    expect_continue_timeout: 1s
    max_idle_conns: 100
    max_idle_conns_per_host: 100
    max_conns_per_host: 0
    tls_config:
      ca_file: ""
      cert_file: ""
      key_file: ""
      server_name: ""
      insecure_skip_verify: false
    disable_compression: false
prefix: ""

The access_key and secret_key field is required. The http_config field is optional for optimize HTTP transport settings.

How to add a new client to Thanos?

Following checklist allows adding new Go code client to supported providers:

  1. Create new directory under ./providers/<provider>
  2. Implement objstore.Bucket interface
  3. Add NewTestBucket constructor for testing purposes, that creates and deletes temporary bucket.
  4. Use created NewTestBucket in ForeachStore method to ensure we can run tests against new provider. (In PR)
  5. RUN the TestObjStoreAcceptanceTest against your provider to ensure it fits. Fix any found error until test passes. (In PR)
  6. Add client implementation to the factory in factory code. (Using as small amount of flags as possible in every command)
  7. Add client struct config to cfggen; to allow config auto generation.

At that point, anyone can use your provider by spec.

Documentation

Index

Constants

View Source
const (
	OpIter       = "iter"
	OpGet        = "get"
	OpGetRange   = "get_range"
	OpExists     = "exists"
	OpUpload     = "upload"
	OpDelete     = "delete"
	OpAttributes = "attributes"
)
View Source
const DirDelim = "/"

DirDelim is the delimiter used to model a directory structure in an object store bucket.

Variables

View Source
var ErrOptionNotSupported = errors.New("iter option is not supported")

Functions

func AcceptanceTest

func AcceptanceTest(t *testing.T, bkt Bucket)

func CreateTemporaryTestBucketName

func CreateTemporaryTestBucketName(t testing.TB) string

func DownloadDir

func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, originalSrc, src, dst string, options ...DownloadOption) error

DownloadDir downloads all object found in the directory into the local directory.

func DownloadFile

func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) (err error)

DownloadFile downloads the src file from the bucket to dst. If dst is an existing directory, a file with the same name as the source is created in dst. If destination file is already existing, download file will overwrite it.

func EmptyBucket

func EmptyBucket(t testing.TB, ctx context.Context, bkt Bucket)

EmptyBucket deletes all objects from bucket. This operation is required to properly delete bucket as a whole. It is used for testing only. TODO(bplotka): Add retries.

func NewTLSConfig

func NewTLSConfig(cfg *TLSConfig) (*tls.Config, error)

NewTLSConfig creates a new tls.Config from the given TLSConfig.

func NopCloserWithSize

func NopCloserWithSize(r io.Reader) io.ReadCloser

NopCloserWithSize returns a ReadCloser with a no-op Close method wrapping the provided Reader r. Returned ReadCloser also implements Size method.

func TryToGetSize

func TryToGetSize(r io.Reader) (int64, error)

TryToGetSize tries to get upfront size from reader. Some implementations may return only size of unread data in the reader, so it's best to call this method before doing any reading.

TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.

func UploadDir

func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error

UploadDir uploads all files in srcdir to the bucket with into a top-level directory named dstdir. It is a caller responsibility to clean partial upload in case of failure.

func UploadFile

func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst string) error

UploadFile uploads the file with the given name to the bucket. It is a caller responsibility to clean partial upload in case of failure.

func ValidateIterOptions

func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error

func WrapWith

func WrapWith(b Bucket, metrics *Metrics) *metricBucket

WrapWith takes a `bucket` and `metrics` that returns instrumented bucket. Similar to WrapWithMetrics, but `metrics` can be passed separately as an argument.

func WrapWithMetrics

func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBucket

WrapWithMetrics takes a bucket and registers metrics with the given registry for operations run against the bucket.

Types

type Bucket

type Bucket interface {
	io.Closer
	BucketReader

	Provider() ObjProvider

	// Upload the contents of the reader as an object into the bucket.
	// Upload should be idempotent.
	Upload(ctx context.Context, name string, r io.Reader) error

	// Delete removes the object with the given name.
	// If object does not exist in the moment of deletion, Delete should throw error.
	Delete(ctx context.Context, name string) error

	// Name returns the bucket name for the provider.
	Name() string
}

Bucket provides read and write access to an object storage bucket. NOTE: We assume strong consistency for write-read flow.

func NewPrefixedBucket

func NewPrefixedBucket(bkt Bucket, prefix string) Bucket

func WithDelay

func WithDelay(bkt Bucket, delay time.Duration) Bucket

type BucketReader

type BucketReader interface {

	// Entries are passed to function in sorted order.
	Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

	// IterWithAttributes calls f for each entry in the given directory similar to Iter.
	// In addition to Name, it also includes requested object attributes in the argument to f.
	//
	// Attributes can be requested using IterOption.
	// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
	IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

	// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
	SupportedIterOptions() []IterOptionType

	// Get returns a reader for the given object name.
	Get(ctx context.Context, name string) (io.ReadCloser, error)

	// GetRange returns a new range reader for the given object name and range.
	GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error)

	// Exists checks if the given object exists in the bucket.
	Exists(ctx context.Context, name string) (bool, error)

	// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
	IsObjNotFoundErr(err error) bool

	// IsAccessDeniedErr returns true if access to object is denied.
	IsAccessDeniedErr(err error) bool

	// Attributes returns information about the specified object.
	Attributes(ctx context.Context, name string) (ObjectAttributes, error)
}

BucketReader provides read access to an object storage bucket.

type DownloadOption

type DownloadOption func(params *downloadParams)

DownloadOption configures the provided params.

func WithDownloadIgnoredPaths

func WithDownloadIgnoredPaths(ignoredPaths ...string) DownloadOption

WithDownloadIgnoredPaths is an option to set the paths to not be downloaded.

func WithFetchConcurrency

func WithFetchConcurrency(concurrency int) DownloadOption

WithFetchConcurrency is an option to set the concurrency of the download operation.

type InMemBucket

type InMemBucket struct {
	// contains filtered or unexported fields
}

InMemBucket implements the objstore.Bucket interfaces against local memory. Methods from Bucket interface are thread-safe. Objects are assumed to be immutable.

func NewInMemBucket

func NewInMemBucket() *InMemBucket

NewInMemBucket returns a new in memory Bucket. NOTE: Returned bucket is just a naive in memory bucket implementation. For test use cases only.

func (*InMemBucket) Attributes

func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttributes, error)

Attributes returns information about the specified object.

func (*InMemBucket) Close

func (b *InMemBucket) Close() error

func (*InMemBucket) Delete

func (b *InMemBucket) Delete(_ context.Context, name string) error

Delete removes all data prefixed with the dir.

func (*InMemBucket) Exists

func (b *InMemBucket) Exists(_ context.Context, name string) (bool, error)

Exists checks if the given directory exists in memory.

func (*InMemBucket) Get

func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error)

Get returns a reader for the given object name.

func (*InMemBucket) GetRange

func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error)

GetRange returns a new range reader for the given object name and range.

func (*InMemBucket) IsAccessDeniedErr

func (b *InMemBucket) IsAccessDeniedErr(err error) bool

IsAccessDeniedErr returns true if access to object is denied.

func (*InMemBucket) IsObjNotFoundErr

func (b *InMemBucket) IsObjNotFoundErr(err error) bool

IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.

func (*InMemBucket) Iter

func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, options ...IterOption) error

Iter calls f for each entry in the given directory. The argument to f is the full object name including the prefix of the inspected directory.

func (*InMemBucket) IterWithAttributes

func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

func (*InMemBucket) Name

func (b *InMemBucket) Name() string

Name returns the bucket name.

func (*InMemBucket) Objects

func (b *InMemBucket) Objects() map[string][]byte

Objects returns a copy of the internally stored objects. NOTE: For assert purposes.

func (*InMemBucket) Provider

func (b *InMemBucket) Provider() ObjProvider

func (*InMemBucket) SupportedIterOptions

func (i *InMemBucket) SupportedIterOptions() []IterOptionType

func (*InMemBucket) Upload

func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error

Upload writes the file specified in src to into the memory.

type InstrumentedBucket

type InstrumentedBucket interface {
	Bucket

	// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
	// objstore_bucket_operation_failures_total metric.
	WithExpectedErrs(IsOpFailureExpectedFunc) Bucket

	// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
	// objstore_bucket_operation_failures_total metric.
	// TODO(bwplotka): Remove this when moved to Go 1.14 and replace with InstrumentedBucketReader.
	ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

InstrumentedBucket is a Bucket with optional instrumentation control on reader.

func WithNoopInstr

func WithNoopInstr(bkt Bucket) InstrumentedBucket

type InstrumentedBucketReader

type InstrumentedBucketReader interface {
	BucketReader

	// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
	// objstore_bucket_operation_failures_total metric.
	ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

InstrumentedBucketReader is a BucketReader with optional instrumentation control.

type IsOpFailureExpectedFunc

type IsOpFailureExpectedFunc func(error) bool

IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment objstore_bucket_operation_failures_total metric.

type IterObjectAttributes

type IterObjectAttributes struct {
	Name string
	// contains filtered or unexported fields
}

func (*IterObjectAttributes) LastModified

func (i *IterObjectAttributes) LastModified() (time.Time, bool)

LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available.

func (*IterObjectAttributes) SetLastModified

func (i *IterObjectAttributes) SetLastModified(t time.Time)

type IterOption

type IterOption struct {
	Type  IterOptionType
	Apply func(params *IterParams)
}

IterOption configures the provided params.

func WithRecursiveIter

func WithRecursiveIter() IterOption

WithRecursiveIter is an option that can be applied to Iter() to recursively list objects in the bucket.

func WithUpdatedAt

func WithUpdatedAt() IterOption

WithUpdatedAt is an option that can be applied to Iter() to include the last modified time in the attributes. NB: Prefixes may not report last modified time. This option is currently supported for the azure, s3, bos, gcs and filesystem providers.

type IterOptionType

type IterOptionType int

IterOptionType is used for type-safe option support checking.

const (
	Recursive IterOptionType = iota
	UpdatedAt
)

type IterParams

type IterParams struct {
	Recursive    bool
	LastModified bool
}

IterParams holds the Iter() parameters and is used by objstore clients implementations.

func ApplyIterOptions

func ApplyIterOptions(options ...IterOption) IterParams

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func BucketMetrics

func BucketMetrics(reg prometheus.Registerer, name string) *Metrics

type ObjProvider

type ObjProvider string
const (
	MEMORY     ObjProvider = "MEMORY"
	FILESYSTEM ObjProvider = "FILESYSTEM"
	GCS        ObjProvider = "GCS"
	S3         ObjProvider = "S3"
	AZURE      ObjProvider = "AZURE"
	SWIFT      ObjProvider = "SWIFT"
	COS        ObjProvider = "COS"
	ALIYUNOSS  ObjProvider = "ALIYUNOSS"
	BOS        ObjProvider = "BOS"
	OCI        ObjProvider = "OCI"
	OBS        ObjProvider = "OBS"
)

type ObjectAttributes

type ObjectAttributes struct {
	// Size is the object size in bytes.
	Size int64 `json:"size"`

	// LastModified is the timestamp the object was last modified.
	LastModified time.Time `json:"last_modified"`
}

type ObjectSizer

type ObjectSizer interface {
	// ObjectSize returns the size of the object in bytes, or error if it is not available.
	ObjectSize() (int64, error)
}

ObjectSizer can return size of object.

type ObjectSizerReadCloser

type ObjectSizerReadCloser struct {
	io.ReadCloser
	Size func() (int64, error)
}

func (ObjectSizerReadCloser) ObjectSize

func (o ObjectSizerReadCloser) ObjectSize() (int64, error)

ObjectSize implement ObjectSizer.

type PrefixedBucket

type PrefixedBucket struct {
	// contains filtered or unexported fields
}

func (*PrefixedBucket) Attributes

func (p *PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error)

Attributes returns information about the specified object.

func (*PrefixedBucket) Close

func (p *PrefixedBucket) Close() error

func (*PrefixedBucket) Delete

func (p *PrefixedBucket) Delete(ctx context.Context, name string) error

Delete removes the object with the given name. If object does not exists in the moment of deletion, Delete should throw error.

func (*PrefixedBucket) Exists

func (p *PrefixedBucket) Exists(ctx context.Context, name string) (bool, error)

Exists checks if the given object exists in the bucket.

func (*PrefixedBucket) Get

func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error)

Get returns a reader for the given object name.

func (*PrefixedBucket) GetRange

func (p *PrefixedBucket) GetRange(ctx context.Context, name string, off int64, length int64) (io.ReadCloser, error)

GetRange returns a new range reader for the given object name and range.

func (*PrefixedBucket) IsAccessDeniedErr

func (p *PrefixedBucket) IsAccessDeniedErr(err error) bool

IsAccessDeniedErr returns true if access to object is denied.

func (*PrefixedBucket) IsObjNotFoundErr

func (p *PrefixedBucket) IsObjNotFoundErr(err error) bool

IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.

func (*PrefixedBucket) Iter

func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error

Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full object name including the prefix of the inspected directory. Entries are passed to function in sorted order.

func (*PrefixedBucket) IterWithAttributes

func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error

func (*PrefixedBucket) Name

func (p *PrefixedBucket) Name() string

Name returns the bucket name for the provider.

func (*PrefixedBucket) Provider

func (p *PrefixedBucket) Provider() ObjProvider

func (*PrefixedBucket) SupportedIterOptions

func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType

func (*PrefixedBucket) Upload

func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) error

Upload the contents of the reader as an object into the bucket. Upload should be idempotent.

type TLSConfig

type TLSConfig struct {
	// The CA cert to use for the targets.
	CAFile string `yaml:"ca_file"`
	// The client cert file for the targets.
	CertFile string `yaml:"cert_file"`
	// The client key file for the targets.
	KeyFile string `yaml:"key_file"`
	// Used to verify the hostname for the targets.
	ServerName string `yaml:"server_name"`
	// Disable target certificate validation.
	InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
}

TLSConfig configures the options for TLS connections.

type UploadOption

type UploadOption func(params *uploadParams)

UploadOption configures the provided params.

func WithUploadConcurrency

func WithUploadConcurrency(concurrency int) UploadOption

WithUploadConcurrency is an option to set the concurrency of the upload operation.

Directories

Path Synopsis
providers
bos
cos
gcs
Package gcs implements common object storage abstractions against Google Cloud Storage.
Package gcs implements common object storage abstractions against Google Cloud Storage.
obs
oci
oss
s3
Package s3 implements common object storage abstractions against s3-compatible APIs.
Package s3 implements common object storage abstractions against s3-compatible APIs.
swift
Package swift implements common object storage abstractions against OpenStack swift APIs.
Package swift implements common object storage abstractions against OpenStack swift APIs.
scripts
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL