proxycore

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2022 License: Apache-2.0 Imports: 28 Imported by: 2

Documentation

Index

Constants

View Source
const (
	DefaultRefreshWindow  = 10 * time.Second
	DefaultConnectTimeout = 10 * time.Second
	DefaultRefreshTimeout = 5 * time.Second
)
View Source
const (
	MaxMessages     = 1024
	MaxCoalesceSize = 16 * 1024 // TODO: What's a good value for this?
)
View Source
const (
	MaxStreams = 2048
)

Variables

View Source
var (
	Closed        = errors.New("connection closed")
	AlreadyClosed = errors.New("connection already closed")
)
View Source
var (
	StreamsExhausted     = errors.New("streams exhausted")
	AuthExpected         = errors.New("authentication required, but no authenticator provided")
	ProtocolNotSupported = errors.New("required protocol version is not supported")
)
View Source
var (
	ColumnNameNotFound = errors.New("column name not found")
)
View Source
var (
	NoConnForHost = errors.New("no connection available for host")
)

Functions

func DecodeType

func DecodeType(dt datatype.DataType, version primitive.ProtocolVersion, bytes []byte) (interface{}, error)

func EncodeType

func EncodeType(dt datatype.DataType, version primitive.ProtocolVersion, val interface{}) ([]byte, error)

func GetOrCreateNopLogger

func GetOrCreateNopLogger(logger *zap.Logger) *zap.Logger

func LookupEndpoint

func LookupEndpoint(endpoint Endpoint) (string, error)

func MockDefaultOptionsHandler

func MockDefaultOptionsHandler(_ *MockClient, _ *frame.Frame) message.Message

func MockDefaultQueryHandler

func MockDefaultQueryHandler(cl *MockClient, frm *frame.Frame) message.Message

func MockDefaultRegisterHandler

func MockDefaultRegisterHandler(cl *MockClient, frm *frame.Frame) message.Message

func MockDefaultStartupHandler

func MockDefaultStartupHandler(_ *MockClient, _ *frame.Frame) message.Message

Types

type AddEvent

type AddEvent struct {
	Host *Host
}

type Authenticator

type Authenticator interface {
	InitialResponse(authenticator string) ([]byte, error)
	EvaluateChallenge(token []byte) ([]byte, error)
	Success(token []byte) error
}

func NewPasswordAuth

func NewPasswordAuth(username string, password string) Authenticator

type BootstrapEvent

type BootstrapEvent struct {
	Hosts []*Host
}

type ClientConn

type ClientConn struct {
	// contains filtered or unexported fields
}

func ConnectClient

func ConnectClient(ctx context.Context, endpoint Endpoint, config ClientConnConfig) (*ClientConn, error)

ConnectClient creates a new connection to an endpoint within a downstream cluster using TLS if specified.

func (*ClientConn) Close

func (c *ClientConn) Close() error

func (*ClientConn) Closing

func (c *ClientConn) Closing(err error)

func (*ClientConn) Err

func (c *ClientConn) Err() error

func (*ClientConn) Handshake

func (*ClientConn) Heartbeats

func (c *ClientConn) Heartbeats(connectTimeout time.Duration, version primitive.ProtocolVersion, heartbeatInterval time.Duration, idleTimeout time.Duration, logger *zap.Logger)

Heartbeats sends an OPTIONS request to the endpoint in order to keep the connection alive.

func (*ClientConn) Inflight

func (c *ClientConn) Inflight() int32

func (*ClientConn) IsClosed

func (c *ClientConn) IsClosed() chan struct{}

func (*ClientConn) Query

func (c *ClientConn) Query(ctx context.Context, version primitive.ProtocolVersion, query message.Message) (*ResultSet, error)

func (*ClientConn) Receive

func (c *ClientConn) Receive(reader io.Reader) error

func (*ClientConn) Send

func (c *ClientConn) Send(request Request) error

func (*ClientConn) SendAndReceive

func (c *ClientConn) SendAndReceive(ctx context.Context, f *frame.Frame) (*frame.Frame, error)

func (*ClientConn) SetKeyspace

func (c *ClientConn) SetKeyspace(ctx context.Context, version primitive.ProtocolVersion, keyspace string) error

type ClientConnConfig

type ClientConnConfig struct {
	PreparedCache PreparedCache
	Handler       EventHandler
	Logger        *zap.Logger
}

type Cluster

type Cluster struct {

	// the following are immutable after start up
	NegotiatedVersion primitive.ProtocolVersion
	Info              ClusterInfo
	// contains filtered or unexported fields
}

Cluster defines a downstream cluster that is being proxied to.

func ConnectCluster

func ConnectCluster(ctx context.Context, config ClusterConfig) (*Cluster, error)

ConnectCluster establishes control connections to each of the endpoints within a downstream cluster that is being proxied to.

func (*Cluster) Listen

func (c *Cluster) Listen(listener ClusterListener) error

func (*Cluster) OnEvent

func (c *Cluster) OnEvent(frame *frame.Frame)

func (*Cluster) OutageDuration added in v0.1.0

func (c *Cluster) OutageDuration() time.Duration

type ClusterConfig

type ClusterConfig struct {
	Version           primitive.ProtocolVersion
	Auth              Authenticator
	Resolver          EndpointResolver
	ReconnectPolicy   ReconnectPolicy
	RefreshWindow     time.Duration
	ConnectTimeout    time.Duration
	RefreshTimeout    time.Duration
	Logger            *zap.Logger
	HeartBeatInterval time.Duration
	IdleTimeout       time.Duration
}

type ClusterInfo

type ClusterInfo struct {
	Partitioner    string
	ReleaseVersion string
	CQLVersion     string
}

type ClusterListener

type ClusterListener interface {
	OnEvent(event Event)
}

type ClusterListenerFunc

type ClusterListenerFunc func(event Event)

func (ClusterListenerFunc) OnEvent

func (f ClusterListenerFunc) OnEvent(event Event)

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(ctx context.Context, endpoint Endpoint, recv Receiver) (c *Conn, err error)

Connect creates a new connection to a server specified by the endpoint using TLS if specified

func NewConn

func NewConn(conn net.Conn, recv Receiver) *Conn

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) Err

func (c *Conn) Err() error

func (*Conn) IsClosed

func (c *Conn) IsClosed() chan struct{}

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

func (*Conn) Start

func (c *Conn) Start()

func (*Conn) Write

func (c *Conn) Write(sender Sender) error

func (*Conn) WriteBytes

func (c *Conn) WriteBytes(b []byte) error

type CqlError

type CqlError struct {
	Message message.Error
}

func (CqlError) Error

func (e CqlError) Error() string

type Endpoint

type Endpoint interface {
	fmt.Stringer
	Addr() string
	IsResolved() bool
	TlsConfig() *tls.Config
	Key() string
}

func NewEndpoint

func NewEndpoint(addr string) Endpoint

type EndpointResolver

type EndpointResolver interface {
	Resolve() ([]Endpoint, error)
	NewEndpoint(row Row) (Endpoint, error)
}

func NewResolver

func NewResolver(contactPoints ...string) EndpointResolver

func NewResolverWithDefaultPort

func NewResolverWithDefaultPort(contactPoints []string, defaultPort int) EndpointResolver

type Event added in v0.1.0

type Event interface {
	// contains filtered or unexported methods
}

type EventHandler

type EventHandler interface {
	OnEvent(frm *frame.Frame)
}

type EventHandlerFunc

type EventHandlerFunc func(frm *frame.Frame)

func (EventHandlerFunc) OnEvent

func (f EventHandlerFunc) OnEvent(frm *frame.Frame)

type Host

type Host struct {
	// contains filtered or unexported fields
}

func NewHostFromRow

func NewHostFromRow(endpoint Endpoint, _ Row) (*Host, error)

func (*Host) Endpoint

func (h *Host) Endpoint() Endpoint

func (*Host) Key

func (h *Host) Key() string

func (*Host) String

func (h *Host) String() string

type LoadBalancer

type LoadBalancer interface {
	ClusterListener
	NewQueryPlan() QueryPlan
}

func NewRoundRobinLoadBalancer

func NewRoundRobinLoadBalancer() LoadBalancer

type MockClient

type MockClient struct {
	// contains filtered or unexported fields
}

func (MockClient) Closing

func (c MockClient) Closing(_ error)

func (*MockClient) InterceptQuery

func (c *MockClient) InterceptQuery(hdr *frame.Header, msg *message.Query) message.Message

func (MockClient) Keyspace

func (c MockClient) Keyspace() string

func (MockClient) Local

func (c MockClient) Local() MockHost

func (*MockClient) Receive

func (c *MockClient) Receive(reader io.Reader) error

func (*MockClient) Register

func (c *MockClient) Register(version primitive.ProtocolVersion)

type MockCluster

type MockCluster struct {
	Handlers map[primitive.OpCode]MockRequestHandler
	// contains filtered or unexported fields
}

func NewMockCluster

func NewMockCluster(startIP net.IP, port int) *MockCluster

func (*MockCluster) Add

func (c *MockCluster) Add(ctx context.Context, n int) error

func (*MockCluster) Remove

func (c *MockCluster) Remove(n int)

func (*MockCluster) Shutdown

func (c *MockCluster) Shutdown()

func (*MockCluster) Start

func (c *MockCluster) Start(ctx context.Context, n int) error

func (*MockCluster) Stop

func (c *MockCluster) Stop(n int)

type MockHost

type MockHost struct {
	IP     string
	Port   int
	HostID *primitive.UUID
}

func (MockHost) String

func (h MockHost) String() string

type MockRequestHandler

type MockRequestHandler func(client *MockClient, frm *frame.Frame) message.Message

type MockRequestHandlers

type MockRequestHandlers map[primitive.OpCode]MockRequestHandler

func NewMockRequestHandlers

func NewMockRequestHandlers(overrides MockRequestHandlers) MockRequestHandlers

type MockServer

type MockServer struct {
	Handlers map[primitive.OpCode]MockRequestHandler
	// contains filtered or unexported fields
}

func (*MockServer) Add

func (s *MockServer) Add(host MockHost)

func (*MockServer) Event

func (s *MockServer) Event(evt message.Event)

func (*MockServer) Remove

func (s *MockServer) Remove(host MockHost)

func (*MockServer) Serve

func (s *MockServer) Serve(ctx context.Context, maxVersion primitive.ProtocolVersion, local MockHost, peers []MockHost) error

func (*MockServer) Shutdown

func (s *MockServer) Shutdown()

type PreparedCache

type PreparedCache interface {
	// Store add an entry to the cache.
	Store(id string, entry *PreparedEntry)
	// Load retrieves an entry from the cache. `ok` is true if the entry is present; otherwise it's false.
	Load(id string) (entry *PreparedEntry, ok bool)
}

PreparedCache a thread-safe cache for storing prepared queries.

type PreparedEntry

type PreparedEntry struct {
	PreparedFrame *frame.RawFrame
}

PreparedEntry is an entry in the prepared cache.

type QueryPlan

type QueryPlan interface {
	Next() *Host
}

type Receiver

type Receiver interface {
	Receive(reader io.Reader) error
	Closing(err error)
}

type ReconnectEvent

type ReconnectEvent struct {
	Endpoint
}

type ReconnectPolicy

type ReconnectPolicy interface {
	NextDelay() time.Duration
	Reset()
	Clone() ReconnectPolicy
}

func NewReconnectPolicy

func NewReconnectPolicy() ReconnectPolicy

func NewReconnectPolicyWithDelays

func NewReconnectPolicyWithDelays(baseDelay, maxDelay time.Duration) ReconnectPolicy

type RemoveEvent

type RemoveEvent struct {
	Host *Host
}

type Request

type Request interface {
	// Frame returns the frame to be executed as part of the request.
	// This must be idempotent.
	Frame() interface{}

	// Execute is called when a request need to be retried.
	// This is currently only called for executing prepared requests (i.e. `EXECUTE` request frames). If `EXECUTE`
	// request frames are not expected then the implementation should `panic()`.
	//
	// If `next` is false then the request must be retried on the current node; otherwise, it should be retried on
	// another node which is usually then next node in a query plan.
	Execute(next bool)

	// OnClose is called when the underlying connection is closed.
	// No assumptions should be made about whether the request has been successfully sent; it is possible that
	// the request has been fully sent and no response was received before
	OnClose(err error)

	// OnResult is called when a response frame has been sent back from the connection.
	OnResult(raw *frame.RawFrame)
}

Request represents the data frame and lifecycle of a CQL native protocol request.

type ResultSet

type ResultSet struct {
	// contains filtered or unexported fields
}

func NewResultSet

func NewResultSet(rows *message.RowsResult, version primitive.ProtocolVersion) *ResultSet

func (*ResultSet) Row

func (rs *ResultSet) Row(i int) Row

func (ResultSet) RowCount

func (rs ResultSet) RowCount() int

type Row

type Row struct {
	// contains filtered or unexported fields
}

func (Row) ByName

func (r Row) ByName(n string) (interface{}, error)

func (Row) ByPos

func (r Row) ByPos(i int) (interface{}, error)

type SchemaChangeEvent

type SchemaChangeEvent struct {
	Message *message.SchemaChangeEvent
}

type Sender

type Sender interface {
	Send(writer io.Writer) error
}

type SenderFunc

type SenderFunc func(writer io.Writer) error

func (SenderFunc) Send

func (s SenderFunc) Send(writer io.Writer) error

type Session

type Session struct {
	// contains filtered or unexported fields
}

func ConnectSession

func ConnectSession(ctx context.Context, cluster *Cluster, config SessionConfig) (*Session, error)

func (*Session) OnEvent

func (s *Session) OnEvent(event Event)

func (*Session) Send

func (s *Session) Send(host *Host, request Request) error

type SessionConfig

type SessionConfig struct {
	ReconnectPolicy ReconnectPolicy
	NumConns        int
	Keyspace        string
	Version         primitive.ProtocolVersion
	Auth            Authenticator
	Logger          *zap.Logger
	// PreparedCache a global cache share across sessions for storing previously prepared queries
	PreparedCache     PreparedCache
	ConnectTimeout    time.Duration
	HeartBeatInterval time.Duration
	IdleTimeout       time.Duration
}

type UnexpectedResponse

type UnexpectedResponse struct {
	Expected []string
	Received string
}

func (*UnexpectedResponse) Error

func (e *UnexpectedResponse) Error() string

type UpEvent added in v0.1.0

type UpEvent struct {
	Host *Host
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL