proxycore

package
v0.0.0-...-d61a097 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2023 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultRefreshWindow  = 10 * time.Second
	DefaultConnectTimeout = 10 * time.Second
	DefaultRefreshTimeout = 5 * time.Second
)
View Source
const (
	MaxMessages     = 1024
	MaxCoalesceSize = 16 * 1024 // TODO: What's a good value for this?
)
View Source
const (
	MaxStreams = 2048
)

Variables

View Source
var (
	Closed        = errors.New("connection closed")
	AlreadyClosed = errors.New("connection already closed")
)
View Source
var (
	StreamsExhausted     = errors.New("streams exhausted")
	AuthExpected         = errors.New("authentication required, but no authenticator provided")
	ProtocolNotSupported = errors.New("required protocol version is not supported")
)
View Source
var (
	ColumnNameNotFound = errors.New("column name not found")
	ColumnIsNull       = errors.New("column is null")
)
View Source
var IgnoreEndpoint = errors.New("ignore endpoint")
View Source
var (
	NoConnForHost = errors.New("no connection available for host")
)

Functions

func DecodeType

func DecodeType(dt datatype.DataType, version primitive.ProtocolVersion, bytes []byte) (interface{}, error)

func EncodeType

func EncodeType(dt datatype.DataType, version primitive.ProtocolVersion, val interface{}) ([]byte, error)

func GetOrCreateNopLogger

func GetOrCreateNopLogger(logger *zap.Logger) *zap.Logger

func LookupEndpoint

func LookupEndpoint(endpoint Endpoint) (string, error)

func MockDefaultOptionsHandler

func MockDefaultOptionsHandler(_ *MockClient, _ *frame.Frame) message.Message

func MockDefaultQueryHandler

func MockDefaultQueryHandler(cl *MockClient, frm *frame.Frame) message.Message

func MockDefaultRegisterHandler

func MockDefaultRegisterHandler(cl *MockClient, frm *frame.Frame) message.Message

func MockDefaultStartupHandler

func MockDefaultStartupHandler(_ *MockClient, _ *frame.Frame) message.Message

Types

type AddEvent

type AddEvent struct {
	Host *Host
}

type Authenticator

type Authenticator interface {
	InitialResponse(authenticator string) ([]byte, error)
	EvaluateChallenge(token []byte) ([]byte, error)
	Success(token []byte) error
}

func NewPasswordAuth

func NewPasswordAuth(username string, password string) Authenticator

type BootstrapEvent

type BootstrapEvent struct {
	Hosts []*Host
}

type ClientConn

type ClientConn struct {
	// contains filtered or unexported fields
}

func ConnectClient

func ConnectClient(ctx context.Context, endpoint Endpoint, config ClientConnConfig) (*ClientConn, error)

ConnectClient creates a new connection to an endpoint within a downstream cluster using TLS if specified.

func (*ClientConn) Close

func (c *ClientConn) Close() error

func (*ClientConn) Closing

func (c *ClientConn) Closing(err error)

func (*ClientConn) Err

func (c *ClientConn) Err() error

func (*ClientConn) Handshake

func (*ClientConn) Heartbeats

func (c *ClientConn) Heartbeats(connectTimeout time.Duration, version primitive.ProtocolVersion, heartbeatInterval time.Duration, idleTimeout time.Duration, logger *zap.Logger)

Heartbeats sends an OPTIONS request to the endpoint in order to keep the connection alive.

func (*ClientConn) Inflight

func (c *ClientConn) Inflight() int32

func (*ClientConn) IsClosed

func (c *ClientConn) IsClosed() chan struct{}

func (*ClientConn) Query

func (c *ClientConn) Query(ctx context.Context, version primitive.ProtocolVersion, query message.Message) (*ResultSet, error)

func (*ClientConn) QueryFrame

func (c *ClientConn) QueryFrame(ctx context.Context, frm *frame.Frame) (*ResultSet, error)

func (*ClientConn) Receive

func (c *ClientConn) Receive(reader io.Reader) error

func (*ClientConn) Send

func (c *ClientConn) Send(request Request) error

func (*ClientConn) SendAndReceive

func (c *ClientConn) SendAndReceive(ctx context.Context, f *frame.Frame) (*frame.Frame, error)

func (*ClientConn) SetKeyspace

func (c *ClientConn) SetKeyspace(ctx context.Context, version primitive.ProtocolVersion, keyspace string) error

type ClientConnConfig

type ClientConnConfig struct {
	PreparedCache PreparedCache
	Handler       EventHandler
	Logger        *zap.Logger
}

type Cluster

type Cluster struct {

	// the following are immutable after start up
	NegotiatedVersion primitive.ProtocolVersion
	Info              ClusterInfo
	// contains filtered or unexported fields
}

Cluster defines a downstream cluster that is being proxied to.

func ConnectCluster

func ConnectCluster(ctx context.Context, config ClusterConfig) (*Cluster, error)

ConnectCluster establishes control connections to each of the endpoints within a downstream cluster that is being proxied to.

func (*Cluster) ExecuteControlQuery

func (c *Cluster) ExecuteControlQuery(ctx context.Context, query string) (rs *ResultSet, err error)

func (*Cluster) Listen

func (c *Cluster) Listen(listener ClusterListener) error

func (*Cluster) OnEvent

func (c *Cluster) OnEvent(frame *frame.Frame)

func (*Cluster) OutageDuration

func (c *Cluster) OutageDuration() time.Duration

type ClusterConfig

type ClusterConfig struct {
	Version           primitive.ProtocolVersion
	Auth              Authenticator
	Resolver          EndpointResolver
	ReconnectPolicy   ReconnectPolicy
	RefreshWindow     time.Duration
	ConnectTimeout    time.Duration
	RefreshTimeout    time.Duration
	Logger            *zap.Logger
	HeartBeatInterval time.Duration
	IdleTimeout       time.Duration
}

type ClusterInfo

type ClusterInfo struct {
	Partitioner    string
	ReleaseVersion string
	CQLVersion     string
	LocalDC        string
	DSEVersion     string
}

type ClusterListener

type ClusterListener interface {
	OnEvent(event Event)
}

type ClusterListenerFunc

type ClusterListenerFunc func(event Event)

func (ClusterListenerFunc) OnEvent

func (f ClusterListenerFunc) OnEvent(event Event)

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(ctx context.Context, endpoint Endpoint, recv Receiver) (c *Conn, err error)

Connect creates a new connection to a server specified by the endpoint using TLS if specified

func NewConn

func NewConn(conn net.Conn, recv Receiver) *Conn

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) Err

func (c *Conn) Err() error

func (*Conn) IsClosed

func (c *Conn) IsClosed() chan struct{}

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

func (*Conn) Start

func (c *Conn) Start()

func (*Conn) Write

func (c *Conn) Write(sender Sender) error

func (*Conn) WriteBytes

func (c *Conn) WriteBytes(b []byte) error

type CqlError

type CqlError struct {
	Message message.Error
}

func (CqlError) Error

func (e CqlError) Error() string

type Endpoint

type Endpoint interface {
	fmt.Stringer
	Addr() string
	IsResolved() bool
	TLSConfig() *tls.Config
	Key() string
}

func NewEndpoint

func NewEndpoint(addr string) Endpoint

func NewEndpointTLS

func NewEndpointTLS(addr string, cfg *tls.Config) Endpoint

type EndpointResolver

type EndpointResolver interface {
	Resolve(ctx context.Context) ([]Endpoint, error)
	NewEndpoint(row Row) (Endpoint, error)
}

func NewResolver

func NewResolver(contactPoints ...string) EndpointResolver

func NewResolverWithDefaultPort

func NewResolverWithDefaultPort(contactPoints []string, defaultPort int, tlsConfig *tls.Config) EndpointResolver

type Event

type Event interface {
	// contains filtered or unexported methods
}

type EventHandler

type EventHandler interface {
	OnEvent(frm *frame.Frame)
}

type EventHandlerFunc

type EventHandlerFunc func(frm *frame.Frame)

func (EventHandlerFunc) OnEvent

func (f EventHandlerFunc) OnEvent(frm *frame.Frame)

type Host

type Host struct {
	Endpoint
	DC string
}

func NewHostFromRow

func NewHostFromRow(endpoint Endpoint, row Row) (*Host, error)

func (*Host) Key

func (h *Host) Key() string

func (*Host) String

func (h *Host) String() string

type LoadBalancer

type LoadBalancer interface {
	ClusterListener
	NewQueryPlan() QueryPlan
}

func NewRoundRobinLoadBalancer

func NewRoundRobinLoadBalancer() LoadBalancer

type MockClient

type MockClient struct {
	// contains filtered or unexported fields
}

func (*MockClient) Closing

func (c *MockClient) Closing(_ error)

func (*MockClient) InterceptQuery

func (c *MockClient) InterceptQuery(hdr *frame.Header, msg *message.Query) message.Message

func (MockClient) Keyspace

func (c MockClient) Keyspace() string

func (MockClient) Local

func (c MockClient) Local() MockHost

func (*MockClient) Receive

func (c *MockClient) Receive(reader io.Reader) error

func (*MockClient) Register

func (c *MockClient) Register(version primitive.ProtocolVersion)

type MockCluster

type MockCluster struct {
	DseVersion string
	Handlers   map[primitive.OpCode]MockRequestHandler
	// contains filtered or unexported fields
}

func NewMockCluster

func NewMockCluster(startIP net.IP, port int) *MockCluster

func (*MockCluster) Add

func (c *MockCluster) Add(ctx context.Context, n int) error

func (*MockCluster) Remove

func (c *MockCluster) Remove(n int)

func (*MockCluster) Shutdown

func (c *MockCluster) Shutdown()

func (*MockCluster) Start

func (c *MockCluster) Start(ctx context.Context, n int) error

func (*MockCluster) Stop

func (c *MockCluster) Stop(n int)

type MockHost

type MockHost struct {
	IP     string
	Port   int
	HostID *primitive.UUID
}

func (MockHost) String

func (h MockHost) String() string

type MockRequestHandler

type MockRequestHandler func(client *MockClient, frm *frame.Frame) message.Message

type MockRequestHandlers

type MockRequestHandlers map[primitive.OpCode]MockRequestHandler

func NewMockRequestHandlers

func NewMockRequestHandlers(overrides MockRequestHandlers) MockRequestHandlers

type MockServer

type MockServer struct {
	DseVersion string
	Handlers   map[primitive.OpCode]MockRequestHandler
	// contains filtered or unexported fields
}

func (*MockServer) Add

func (s *MockServer) Add(host MockHost)

func (*MockServer) Event

func (s *MockServer) Event(evt message.Event)

func (*MockServer) Remove

func (s *MockServer) Remove(host MockHost)

func (*MockServer) Serve

func (s *MockServer) Serve(ctx context.Context, maxVersion primitive.ProtocolVersion, local MockHost, peers []MockHost) error

func (*MockServer) Shutdown

func (s *MockServer) Shutdown()

type PreparedCache

type PreparedCache interface {
	// Store add an entry to the cache.
	Store(id string, entry *PreparedEntry)
	// Load retrieves an entry from the cache. `ok` is true if the entry is present; otherwise it's false.
	Load(id string) (entry *PreparedEntry, ok bool)
}

PreparedCache a thread-safe cache for storing prepared queries.

type PreparedEntry

type PreparedEntry struct {
	PreparedFrame *frame.RawFrame
}

PreparedEntry is an entry in the prepared cache.

type QueryPlan

type QueryPlan interface {
	Next() *Host
}

type Receiver

type Receiver interface {
	Receive(reader io.Reader) error
	Closing(err error)
}

type ReconnectEvent

type ReconnectEvent struct {
	Endpoint
}

type ReconnectPolicy

type ReconnectPolicy interface {
	NextDelay() time.Duration
	Reset()
	Clone() ReconnectPolicy
}

func NewReconnectPolicy

func NewReconnectPolicy() ReconnectPolicy

func NewReconnectPolicyWithDelays

func NewReconnectPolicyWithDelays(baseDelay, maxDelay time.Duration) ReconnectPolicy

type RemoveEvent

type RemoveEvent struct {
	Host *Host
}

type Request

type Request interface {
	// Frame returns the frame to be executed as part of the request.
	// This must be idempotent.
	Frame() interface{}

	// Execute is called when a request need to be retried.
	// This is currently only called for executing prepared requests (i.e. `EXECUTE` request frames). If `EXECUTE`
	// request frames are not expected then the implementation should `panic()`.
	//
	// If `next` is false then the request must be retried on the current node; otherwise, it should be retried on
	// another node which is usually then next node in a query plan.
	Execute(next bool)

	// OnClose is called when the underlying connection is closed.
	// No assumptions should be made about whether the request has been successfully sent; it is possible that
	// the request has been fully sent and no response was received before
	OnClose(err error)

	// OnResult is called when a response frame has been sent back from the connection.
	OnResult(raw *frame.RawFrame)
}

Request represents the data frame and lifecycle of a CQL native protocol request.

type ResultSet

type ResultSet struct {
	// contains filtered or unexported fields
}

func NewResultSet

func NewResultSet(rows *message.RowsResult, version primitive.ProtocolVersion) *ResultSet

func (*ResultSet) Row

func (rs *ResultSet) Row(i int) Row

func (ResultSet) RowCount

func (rs ResultSet) RowCount() int

type Row

type Row struct {
	// contains filtered or unexported fields
}

func (Row) ByName

func (r Row) ByName(n string) (interface{}, error)

func (Row) ByPos

func (r Row) ByPos(i int) (interface{}, error)

func (Row) InetByName

func (r Row) InetByName(n string) (net.IP, error)

func (Row) StringByName

func (r Row) StringByName(n string) (string, error)

func (Row) UUIDByName

func (r Row) UUIDByName(n string) (primitive.UUID, error)

type SchemaChangeEvent

type SchemaChangeEvent struct {
	Message *message.SchemaChangeEvent
}

type Sender

type Sender interface {
	Send(writer io.Writer) error
}

type SenderFunc

type SenderFunc func(writer io.Writer) error

func (SenderFunc) Send

func (s SenderFunc) Send(writer io.Writer) error

type Session

type Session struct {
	// contains filtered or unexported fields
}

func ConnectSession

func ConnectSession(ctx context.Context, cluster *Cluster, config SessionConfig) (*Session, error)

func (*Session) OnEvent

func (s *Session) OnEvent(event Event)

func (*Session) Send

func (s *Session) Send(host *Host, request Request) error

type SessionConfig

type SessionConfig struct {
	ReconnectPolicy ReconnectPolicy
	NumConns        int
	Keyspace        string
	Version         primitive.ProtocolVersion
	Auth            Authenticator
	Logger          *zap.Logger
	// PreparedCache a global cache share across sessions for storing previously prepared queries
	PreparedCache     PreparedCache
	ConnectTimeout    time.Duration
	HeartBeatInterval time.Duration
	IdleTimeout       time.Duration
}

type UnexpectedResponse

type UnexpectedResponse struct {
	Expected []string
	Received string
}

func (*UnexpectedResponse) Error

func (e *UnexpectedResponse) Error() string

type UpEvent

type UpEvent struct {
	Host *Host
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL