proxy

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2025 License: Apache-2.0, Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSpannerGrpcChannels = 4
	DefaultSpannerMinSession   = 100
	DefaultSpannerMaxSession   = 400
	DefaultConfigTableName     = "TableConfigurations"
	ExternalHostProjectID      = "default"
	ExternalHostInstanceID     = "default"
)

Defaults for Spanner Settings.

View Source
const Query = "Query"
View Source
const (
	SpannerConnectionString = "projects/%s/instances/%s/databases/%s"
)

Variables

View Source
var ErrProxyAlreadyConnected = errors.New("proxy already connected")
View Source
var ErrProxyClosed = errors.New("proxy closed")
View Source
var ErrProxyNotConnected = errors.New("proxy not connected")
View Source
var NewSpannerClient = func(ctx context.Context, config Config, ot *otelgo.OpenTelemetry) iface.SpannerClientInterface {

	if os.Getenv("SPANNER_EMULATOR_HOST") == "" {

		os.Setenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS", "true")
	}

	pool := grpc.WithConnectParams(grpc.ConnectParams{
		MinConnectTimeout: 10 * time.Second,
	})

	spc := spanner.DefaultSessionPoolConfig

	if config.SpannerConfig.MinSessions != 0 {
		spc.MinOpened = config.SpannerConfig.MinSessions
	}

	if config.SpannerConfig.MaxSessions != 0 {
		spc.MaxOpened = config.SpannerConfig.MaxSessions
	}

	spc.InactiveTransactionRemovalOptions = spanner.InactiveTransactionRemovalOptions{
		ActionOnInactiveTransaction: spanner.WarnAndClose,
	}

	cfg := spanner.ClientConfig{SessionPoolConfig: spc, UserAgent: config.UserAgent}

	if config.OtelConfig.Enabled && ot != nil {
		if config.OtelConfig.Metrics.Enabled {
			if config.OtelConfig.EnabledClientSideMetrics {

				spanner.EnableOpenTelemetryMetrics()
			}

			cfg.OpenTelemetryMeterProvider = ot.MeterProvider
		}

		if config.OtelConfig.Traces.Enabled {

			os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry")

			otel.SetTracerProvider(ot.TracerProvider)
		}
	}

	database := fmt.Sprintf(SpannerConnectionString, config.SpannerConfig.GCPProjectID, config.SpannerConfig.InstanceName, config.SpannerConfig.DatabaseName)

	opts := []option.ClientOption{
		option.WithGRPCConnectionPool(config.SpannerConfig.NumOfChannels),
		option.WithGRPCDialOption(pool),
	}

	if config.Endpoint != "" {
		config.Logger.Info("Using Spanner endpoint: " + config.Endpoint)
		opts = append(opts, option.WithEndpoint(config.Endpoint))
		if config.UsePlainText {
			opts = append(opts, option.WithoutAuthentication())
			opts = append(opts, option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
		} else {
			creds, credsErr := utilities.NewCred(config.CaCertificate, config.ClientCertificate, config.ClientKey)
			if credsErr != nil {
				config.Logger.Error(credsErr.Error())
				return nil
			}
			opts = append(opts, option.WithGRPCDialOption(grpc.WithTransportCredentials(creds)))
		}
	}

	client, err := spanner.NewClientWithConfig(ctx, database, cfg, opts...)

	if err != nil {
		config.Logger.Error("Failed to create client" + err.Error())
		return nil
	}

	return &SpannerClient{
		Client:          client,
		KeyspaceFlatter: config.KeyspaceFlatter,
	}
}

NewSpannerClient creates a new instance of SpannerClient

View Source
var (
	TCP_BIND_PORT = "0.0.0.0:%s"
)

Functions

func NewDefaultPreparedCache

func NewDefaultPreparedCache(size int) (proxycore.PreparedCache, error)

NewDefaultPreparedCache creates a new default prepared cache capping the max item capacity to `size`.

func Run

func Run(ctx context.Context, args []string) int

Run starts the proxy command. 'args' shouldn't include the executable (i.e. os.Args[1:]). It returns the exit code for the proxy.

func ValidateAndApplyDefaults

func ValidateAndApplyDefaults(cfg *UserConfig) error

ApplyDefaults applies default values to the configuration after it is loaded

Types

type CassandraToSpannerConfigs

type CassandraToSpannerConfigs struct {
	KeyspaceFlatter        bool   `yaml:"keyspaceFlatter"`
	ProjectID              string `yaml:"projectId"`
	ConfigTableName        string `yaml:"configTableName"`
	UseRowTTL              bool   `yaml:"useRowTTL"`
	UseRowTimestamp        bool   `yaml:"useRowTimestamp"`
	Endpoint               string `yaml:"endpoint"`
	CaCertificate          string `yaml:"caCertificate"`
	ClientCertificate      string `yaml:"clientCertificate"`
	ClientKey              string `yaml:"clientKey"`
	UsePlainText           bool   `yaml:"usePlainText"`
	ReadinessCheckEndpoint string `yaml:"readinessCheckEndpoint"`
}

CassandraToSpannerConfigs contains configurations for Cassandra to Spanner

type Config

type Config struct {
	Version           primitive.ProtocolVersion
	MaxVersion        primitive.ProtocolVersion
	Auth              proxycore.Authenticator
	Resolver          proxycore.EndpointResolver
	RetryPolicy       RetryPolicy
	Logger            *zap.Logger
	HeartBeatInterval time.Duration
	ConnectTimeout    time.Duration
	IdleTimeout       time.Duration
	RPCAddr           string
	DC                string
	Tokens            []string
	SpannerConfig     SpannerConfig
	OtelConfig        *OtelConfig
	ReleaseVersion    string
	Partitioner       string
	CQLVersion        string
	// PreparedCache a cache that stores prepared queries. If not set it uses the default implementation with a max
	// capacity of ~100MB.
	PreparedCache     proxycore.PreparedCache
	KeyspaceFlatter   bool
	UseRowTimestamp   bool
	UseRowTTL         bool
	Debug             bool
	UserAgent         string
	Endpoint          string
	CaCertificate     string
	ClientCertificate string
	ClientKey         string
	UsePlainText      bool
}

type Listener

type Listener struct {
	Name    string  `yaml:"name"`
	Port    int     `yaml:"port"`
	Spanner Spanner `yaml:"spanner"`
	Otel    Otel    `yaml:"otel"`
}

Listener represents each listener configuration

type Operation

type Operation struct {
	MaxCommitDelay   uint64 `yaml:"maxCommitDelay"`
	ReplayProtection bool   `yaml:"replayProtection"`
}

Spanner read/write operation settings.

type Otel

type Otel struct {
	Disabled bool `yaml:"disabled"`
}

Otel configures OpenTelemetry features

type OtelConfig

type OtelConfig struct {
	Enabled                  bool   `yaml:"enabled"`
	EnabledClientSideMetrics bool   `yaml:"enabledClientSideMetrics"`
	ServiceName              string `yaml:"serviceName"`
	HealthCheck              struct {
		Enabled  bool   `yaml:"enabled"`
		Endpoint string `yaml:"endpoint"`
	} `yaml:"healthcheck"`
	Metrics struct {
		Enabled  bool   `yaml:"enabled"`
		Endpoint string `yaml:"endpoint"`
	} `yaml:"metrics"`
	Traces struct {
		Enabled       bool    `yaml:"enabled"`
		Endpoint      string  `yaml:"endpoint"`
		SamplingRatio float64 `yaml:"samplingRatio"`
	} `yaml:"traces"`
	DisableRandomServiceInstanceIDKey bool `yaml:"disableRandomServiceInstanceIDKey"`
}

OtelConfig defines the structure of the YAML configuration

type PeerConfig

type PeerConfig struct {
	RPCAddr string   `yaml:"rpc-address"`
	DC      string   `yaml:"data-center,omitempty"`
	Tokens  []string `yaml:"tokens,omitempty"`
}

type PreparedQuery

type PreparedQuery struct {
	Query           string
	SelectedColumns []string
	PreparedColumns []string
}

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

func NewProxy

func NewProxy(ctx context.Context, config Config) (*Proxy, error)

func (*Proxy) Close

func (p *Proxy) Close() error

func (*Proxy) Connect

func (p *Proxy) Connect() error

func (*Proxy) IsIdempotent

func (p *Proxy) IsIdempotent(id []byte) bool

isIdempotent checks whether a prepared ID is idempotent. If the proxy receives a query that it's never prepared then this will also return false.

func (*Proxy) MaybeStorePreparedIdempotence

func (p *Proxy) MaybeStorePreparedIdempotence(raw *frame.RawFrame, msg message.Message)

MaybeStorePreparedIdempotence stores the idempotence of a "PREPARE" request's query. This information is used by future "EXECUTE" requests when they need to be retried.

func (*Proxy) OnEvent

func (p *Proxy) OnEvent(event proxycore.Event)

func (*Proxy) Ready

func (p *Proxy) Ready() bool

func (*Proxy) Serve

func (p *Proxy) Serve(l net.Listener) (err error)

Serve the proxy using the specified listener. It can be called multiple times with different listeners allowing them to share the same backend clusters.

type RetryDecision

type RetryDecision int

RetryDecision is a type used for deciding what to do when a request has failed.

const (
	// RetrySame should be returned when a request should be retried on the same host.
	RetrySame RetryDecision = iota
	// RetryNext should be returned when a request should be retried on the next host according to the request's query
	// plan.
	RetryNext
	// ReturnError should be returned when a request's original error should be forwarded along to the client.
	ReturnError
)

func (RetryDecision) String

func (r RetryDecision) String() string

type RetryPolicy

type RetryPolicy interface {
	// OnReadTimeout handles the retry decision for a server-side read timeout error (Read_timeout = 0x1200).
	// This occurs when a replica read request times out during a read query.
	OnReadTimeout(msg *message.ReadTimeout, retryCount int) RetryDecision

	// OnWriteTimeout handles the retry decision for a server-side write timeout error (Write_timeout = 0x1100).
	// This occurs when a replica write request times out during a write query.
	OnWriteTimeout(msg *message.WriteTimeout, retryCount int) RetryDecision

	// OnUnavailable handles the retry decision for a server-side unavailable exception (Unavailable = 0x1000).
	// This occurs when a coordinator determines that there are not enough replicas to handle a query at the requested
	// consistency level.
	OnUnavailable(msg *message.Unavailable, retryCount int) RetryDecision

	// OnErrorResponse handles the retry decision for other potentially recoverable errors.
	// This can be called for the following error types: server error (ServerError = 0x0000),
	// overloaded (Overloaded = 0x1001), truncate error (Truncate_error = 0x1003), read failure (Read_failure = 0x1300),
	// and write failure (Write_failure = 0x1500).
	OnErrorResponse(msg message.Error, retryCount int) RetryDecision
}

RetryPolicy is an interface for defining retry behavior when a server-side error occurs.

func NewDefaultRetryPolicy

func NewDefaultRetryPolicy() RetryPolicy

NewDefaultRetryPolicy creates a new default retry policy. The default retry policy takes a conservative approach to retrying requests. In most cases it retries only once in cases where a retry is likely to succeed.

type Sender

type Sender interface {
	Send(hdr *frame.Header, msg message.Message)
}

type Session

type Session struct {
	Min          uint64 `yaml:"min"`
	Max          uint64 `yaml:"max"`
	GrpcChannels int    `yaml:"grpcChannels"`
}

Session describes the settings for Spanner sessions

type Spanner

type Spanner struct {
	ProjectID       string    `yaml:"projectId"`
	InstanceID      string    `yaml:"instanceId"`
	DatabaseID      string    `yaml:"databaseId"`
	ConfigTableName string    `yaml:"configTableName"`
	Session         Session   `yaml:"Session"`
	Operation       Operation `yaml:"Operation"`
}

Spanner holds the Spanner database configuration

type SpannerClient

type SpannerClient struct {
	Client          *spanner.Client
	Logger          interface{}
	KeyspaceFlatter interface{}
}

func (*SpannerClient) GetClient

func (sc *SpannerClient) GetClient(ctx context.Context) (*spanner.Client, error)

type SpannerConfig

type SpannerConfig struct {
	DatabaseName     string
	ConfigTableName  string
	NumOfChannels    int
	InstanceName     string
	GCPProjectID     string
	MaxSessions      uint64
	MinSessions      uint64
	MaxCommitDelay   uint64
	ReplayProtection bool
	KeyspaceFlatter  bool
}

type UserConfig

type UserConfig struct {
	CassandraToSpannerConfigs CassandraToSpannerConfigs `yaml:"cassandra_to_spanner_configs"`
	Listeners                 []Listener                `yaml:"listeners"`
	Otel                      *OtelConfig               `yaml:"otel"`
	LoggerConfig              *utilities.LoggerConfig   `yaml:"loggerConfig"`
}

Config holds all the configuration data

func LoadConfig

func LoadConfig(filename string) (*UserConfig, error)

LoadConfig reads and parses the configuration from a YAML file

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL