Documentation
¶
Index ¶
- Constants
- Variables
- func NewDefaultPreparedCache(size int) (proxycore.PreparedCache, error)
- func Run(ctx context.Context, args []string) int
- func ValidateAndApplyDefaults(cfg *UserConfig) error
- type CassandraToSpannerConfigs
- type Config
- type Listener
- type Operation
- type Otel
- type OtelConfig
- type PeerConfig
- type PreparedQuery
- type Proxy
- func (p *Proxy) Close() error
- func (p *Proxy) Connect() error
- func (p *Proxy) IsIdempotent(id []byte) bool
- func (p *Proxy) MaybeStorePreparedIdempotence(raw *frame.RawFrame, msg message.Message)
- func (p *Proxy) OnEvent(event proxycore.Event)
- func (p *Proxy) Ready() bool
- func (p *Proxy) Serve(l net.Listener) (err error)
- type RetryDecision
- type RetryPolicy
- type Sender
- type Session
- type Spanner
- type SpannerClient
- type SpannerConfig
- type UserConfig
Constants ¶
const ( DefaultSpannerGrpcChannels = 4 DefaultSpannerMinSession = 100 DefaultSpannerMaxSession = 400 DefaultConfigTableName = "TableConfigurations" )
Defaults for Spanner Settings.
const Query = "Query"
const (
SpannerConnectionString = "projects/%s/instances/%s/databases/%s"
)
Variables ¶
var ErrProxyAlreadyConnected = errors.New("proxy already connected")
var ErrProxyClosed = errors.New("proxy closed")
var ErrProxyNotConnected = errors.New("proxy not connected")
var NewSpannerClient = func(ctx context.Context, config Config, ot *otelgo.OpenTelemetry) iface.SpannerClientInterface { if os.Getenv("SPANNER_EMULATOR_HOST") == "" { os.Setenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS", "true") } pool := grpc.WithConnectParams(grpc.ConnectParams{ MinConnectTimeout: 10 * time.Second, }) spc := spanner.DefaultSessionPoolConfig if config.SpannerConfig.MinSessions != 0 { spc.MinOpened = config.SpannerConfig.MinSessions } if config.SpannerConfig.MaxSessions != 0 { spc.MaxOpened = config.SpannerConfig.MaxSessions } spc.InactiveTransactionRemovalOptions = spanner.InactiveTransactionRemovalOptions{ ActionOnInactiveTransaction: spanner.WarnAndClose, } cfg := spanner.ClientConfig{SessionPoolConfig: spc, UserAgent: config.UserAgent} if config.OtelConfig.Enabled && ot != nil { if config.OtelConfig.Metrics.Enabled { if config.OtelConfig.EnabledClientSideMetrics { spanner.EnableOpenTelemetryMetrics() } cfg.OpenTelemetryMeterProvider = ot.MeterProvider } if config.OtelConfig.Traces.Enabled { os.Setenv("GOOGLE_API_GO_EXPERIMENTAL_TELEMETRY_PLATFORM_TRACING", "opentelemetry") otel.SetTracerProvider(ot.TracerProvider) } } database := fmt.Sprintf(SpannerConnectionString, config.SpannerConfig.GCPProjectID, config.SpannerConfig.InstanceName, config.SpannerConfig.DatabaseName) client, err := spanner.NewClientWithConfig(ctx, database, cfg, option.WithGRPCConnectionPool(config.SpannerConfig.NumOfChannels), option.WithGRPCDialOption(pool)) if err != nil { config.Logger.Error("Failed to create client" + err.Error()) return nil } return &SpannerClient{ Client: client, KeyspaceFlatter: config.KeyspaceFlatter, } }
NewSpannerClient creates a new instance of SpannerClient
var (
TCP_BIND_PORT = "0.0.0.0:%s"
)
Functions ¶
func NewDefaultPreparedCache ¶
func NewDefaultPreparedCache(size int) (proxycore.PreparedCache, error)
NewDefaultPreparedCache creates a new default prepared cache capping the max item capacity to `size`.
func Run ¶
Run starts the proxy command. 'args' shouldn't include the executable (i.e. os.Args[1:]). It returns the exit code for the proxy.
func ValidateAndApplyDefaults ¶
func ValidateAndApplyDefaults(cfg *UserConfig) error
ApplyDefaults applies default values to the configuration after it is loaded
Types ¶
type CassandraToSpannerConfigs ¶
type CassandraToSpannerConfigs struct { KeyspaceFlatter bool `yaml:"keyspaceFlatter"` ProjectID string `yaml:"projectId"` ConfigTableName string `yaml:"configTableName"` UseRowTTL bool `yaml:"useRowTTL"` UseRowTimestamp bool `yaml:"useRowTimestamp"` }
CassandraToSpannerConfigs contains configurations for Cassandra to Spanner
type Config ¶
type Config struct { Version primitive.ProtocolVersion MaxVersion primitive.ProtocolVersion Auth proxycore.Authenticator Resolver proxycore.EndpointResolver RetryPolicy RetryPolicy Logger *zap.Logger HeartBeatInterval time.Duration ConnectTimeout time.Duration IdleTimeout time.Duration RPCAddr string DC string Tokens []string SpannerConfig SpannerConfig OtelConfig *OtelConfig ReleaseVersion string Partitioner string CQLVersion string // PreparedCache a cache that stores prepared queries. If not set it uses the default implementation with a max // capacity of ~100MB. PreparedCache proxycore.PreparedCache KeyspaceFlatter bool UseRowTimestamp bool UseRowTTL bool Debug bool UserAgent string }
type Listener ¶
type Listener struct { Name string `yaml:"name"` Port int `yaml:"port"` Spanner Spanner `yaml:"spanner"` Otel Otel `yaml:"otel"` }
Listener represents each listener configuration
type Operation ¶
type Operation struct { MaxCommitDelay uint64 `yaml:"maxCommitDelay"` ReplayProtection bool `yaml:"replayProtection"` }
Spanner read/write operation settings.
type Otel ¶
type Otel struct {
Disabled bool `yaml:"disabled"`
}
Otel configures OpenTelemetry features
type OtelConfig ¶
type OtelConfig struct { Enabled bool `yaml:"enabled"` EnabledClientSideMetrics bool `yaml:"enabledClientSideMetrics"` ServiceName string `yaml:"serviceName"` HealthCheck struct { Enabled bool `yaml:"enabled"` Endpoint string `yaml:"endpoint"` } `yaml:"healthcheck"` Metrics struct { Enabled bool `yaml:"enabled"` Endpoint string `yaml:"endpoint"` } `yaml:"metrics"` Traces struct { Enabled bool `yaml:"enabled"` Endpoint string `yaml:"endpoint"` SamplingRatio float64 `yaml:"samplingRatio"` } `yaml:"traces"` }
OtelConfig defines the structure of the YAML configuration
type PeerConfig ¶
type PreparedQuery ¶
type Proxy ¶
type Proxy struct {
// contains filtered or unexported fields
}
func (*Proxy) IsIdempotent ¶
isIdempotent checks whether a prepared ID is idempotent. If the proxy receives a query that it's never prepared then this will also return false.
func (*Proxy) MaybeStorePreparedIdempotence ¶
MaybeStorePreparedIdempotence stores the idempotence of a "PREPARE" request's query. This information is used by future "EXECUTE" requests when they need to be retried.
type RetryDecision ¶
type RetryDecision int
RetryDecision is a type used for deciding what to do when a request has failed.
const ( // RetrySame should be returned when a request should be retried on the same host. RetrySame RetryDecision = iota // RetryNext should be returned when a request should be retried on the next host according to the request's query // plan. RetryNext // ReturnError should be returned when a request's original error should be forwarded along to the client. ReturnError )
func (RetryDecision) String ¶
func (r RetryDecision) String() string
type RetryPolicy ¶
type RetryPolicy interface { // OnReadTimeout handles the retry decision for a server-side read timeout error (Read_timeout = 0x1200). // This occurs when a replica read request times out during a read query. OnReadTimeout(msg *message.ReadTimeout, retryCount int) RetryDecision // OnWriteTimeout handles the retry decision for a server-side write timeout error (Write_timeout = 0x1100). // This occurs when a replica write request times out during a write query. OnWriteTimeout(msg *message.WriteTimeout, retryCount int) RetryDecision // This occurs when a coordinator determines that there are not enough replicas to handle a query at the requested // consistency level. OnUnavailable(msg *message.Unavailable, retryCount int) RetryDecision // OnErrorResponse handles the retry decision for other potentially recoverable errors. // This can be called for the following error types: server error (ServerError = 0x0000), // overloaded (Overloaded = 0x1001), truncate error (Truncate_error = 0x1003), read failure (Read_failure = 0x1300), // and write failure (Write_failure = 0x1500). OnErrorResponse(msg message.Error, retryCount int) RetryDecision }
RetryPolicy is an interface for defining retry behavior when a server-side error occurs.
func NewDefaultRetryPolicy ¶
func NewDefaultRetryPolicy() RetryPolicy
NewDefaultRetryPolicy creates a new default retry policy. The default retry policy takes a conservative approach to retrying requests. In most cases it retries only once in cases where a retry is likely to succeed.
type Session ¶
type Session struct { Min uint64 `yaml:"min"` Max uint64 `yaml:"max"` GrpcChannels int `yaml:"grpcChannels"` }
Session describes the settings for Spanner sessions
type Spanner ¶
type Spanner struct { ProjectID string `yaml:"projectId"` InstanceID string `yaml:"instanceId"` DatabaseID string `yaml:"databaseId"` ConfigTableName string `yaml:"configTableName"` Session Session `yaml:"Session"` Operation Operation `yaml:"Operation"` }
Spanner holds the Spanner database configuration
type SpannerClient ¶
type SpannerConfig ¶
type UserConfig ¶
type UserConfig struct { CassandraToSpannerConfigs CassandraToSpannerConfigs `yaml:"cassandra_to_spanner_configs"` Listeners []Listener `yaml:"listeners"` Otel *OtelConfig `yaml:"otel"` LoggerConfig *utilities.LoggerConfig `yaml:"loggerConfig"` }
Config holds all the configuration data
func LoadConfig ¶
func LoadConfig(filename string) (*UserConfig, error)
LoadConfig reads and parses the configuration from a YAML file