Documentation ¶
Index ¶
- Constants
- Variables
- func Check(err error)
- func NewFollowerCache(followers []*streaming_api.Server, multiStreamPercent int) *followerCache
- func NewLoadBalancer(config *loadbalancerConfig, opts ...grpc.ServerOption) (*grpc.Server, error)
- func NewServer(config *ServerConfig, opts ...grpc.ServerOption) (*grpc.Server, error)
- func PanicValue(fn func()) (recovered interface{})
- func RunService(ctx context.Context, targetDir string, service string)
- func SetupTLSConfig(cfg TLSConfig) (*tls.Config, error)
- func StopService(ctx context.Context)
- type Agent
- type AgentConfig
- type Arc
- type ArcConfig
- type Bundler
- type Config
- type DistributedLoci
- func (d *DistributedLoci) Append(record *streaming_api.ProduceRequest) (uint64, error)
- func (d *DistributedLoci) ClosePoint(pointId string) error
- func (d *DistributedLoci) GetFollowers(req *streaming_api.GetFollowersRequest) ([]*streaming_api.Server, error)
- func (d *DistributedLoci) GetMetadata(pointId string) (PointMetadata, error)
- func (d *DistributedLoci) GetPoints() []string
- func (d *DistributedLoci) GetServers(req *streaming_api.GetServersRequest) ([]*streaming_api.Server, error)
- func (d *DistributedLoci) Join(rpcAddr string, rule ParticipationRule) error
- func (d *DistributedLoci) Leave(rpcAddr string) error
- func (d *DistributedLoci) OnChange(batch []ring.ShardResponsibility)
- func (d *DistributedLoci) Read(pointId string, pos uint64) (uint64, []byte, error)
- func (d *DistributedLoci) ReadAt(pointId string, b []byte, off uint64) (int, error)
- func (d *DistributedLoci) ReadWithLimit(pointId string, pos uint64, chunkSize uint64, limit uint64) (uint64, []byte, error)
- func (d *DistributedLoci) Remove() error
- func (d *DistributedLoci) Shutdown() error
- type FSM
- type FSMRecordResponse
- type Follower
- type FollowerResolverHelper
- type GetFollowerer
- type GetServerer
- type GetServersPromise
- type GetServersRequest
- type GetServersResponse
- type InMemoryPointStore
- type LBResolverHelper
- type Locus
- func (l *Locus) Append(pointId string, b []byte) (n uint64, pos uint64, err error)
- func (l *Locus) Close(pointId string) error
- func (l *Locus) CloseAll() error
- func (l *Locus) GetMetadata(pointId string) (PointMetadata, error)
- func (l *Locus) GetPoints() []string
- func (l *Locus) Read(pointId string, pos uint64, chunkSize uint64, limit uint64) (uint64, []byte, error)
- func (l *Locus) ReadAt(pointId string, b []byte, off uint64) (int, error)
- func (l *Locus) Remove(pointId string) error
- func (l *Locus) RemoveAll() error
- func (l *Locus) Reset() error
- type LocusHelper
- type LocusStreamLayer
- type ParticipationRule
- type Picker
- type Point
- func (p *Point) Append(b []byte) (n uint64, pos uint64, err error)
- func (p *Point) Close() error
- func (p *Point) GetLastAccessed() time.Time
- func (p *Point) GetMetadata() PointMetadata
- func (p *Point) Open() error
- func (p *Point) Read(pos uint64, chunkSize uint64, limit uint64) (uint64, []byte, error)
- func (p *Point) ReadAt(b []byte, off uint64) (int, error)
- type PointMetadata
- type Promise
- type RPC
- type RPCResponse
- type RecordEntriesPipeline
- type RecordEntriesPromise
- type RecordEntriesRequest
- type RecordEntriesResponse
- type RecordPromise
- type RecordRequest
- type RecordResponse
- type RedisPointStore
- type ReplicateCommandPromise
- type ReplicationClusterConfig
- type ReplicationClusterHandler
- type RequestBundler
- type RequestType
- type Resolver
- type ResolverHelperConfig
- type Server
- type ServerAddress
- type ServerConfig
- type ServerID
- type Store
- type StreamLayer
- type StreamingConfig
- type StreamingManager
- func (s *StreamingManager) ConsumeStream(req *streaming_api.ConsumeRequest, ...) error
- func (s *StreamingManager) GetMetadata(ctx context.Context, req *streaming_api.MetadataRequest) (*streaming_api.MetadataResponse, error)
- func (s *StreamingManager) ProduceStream(stream streaming_api.Streaming_ProduceStreamServer) error
- type TCPStreamLayer
- type TLSConfig
- type Transport
- func (transport *Transport) Close() error
- func (transport *Transport) CloseStreams()
- func (transport *Transport) Consumer() <-chan RPC
- func (transport *Transport) IsShutdown() bool
- func (transport *Transport) LocalAddr() ServerAddress
- func (transport *Transport) PrepareCommandTransport(target ServerAddress) (RecordEntriesPipeline, error)
- func (transport *Transport) SendGetServersRequest(target ServerAddress, req *GetServersRequest, resp *GetServersResponse) error
- func (transport *Transport) SendRecordEntriesRequest(target ServerAddress, req *RecordEntriesRequest, resp *RecordEntriesResponse) error
- type TransportConfig
Constants ¶
const DefaultMaxChunkSize = 256 // DefaultMaxChunkSize defines the default value of the size of the chunk that can be processed by this server.
const DefaultMultiStreamPercent = 100 // DefaultMultiStreamPercent defines the default value for the multi-stream percent is 100.
const RingRPC = 1
const TestName = "goutube"
Variables ¶
var ( // ErrArcShutdown is returned when operations are requested against an // inactive Raft. ErrArcShutdown = errors.New("arc is already shutdown") // ErrEnqueueTimeout is returned when a command fails due to a timeout. ErrEnqueueTimeout = errors.New("timed out enqueuing operation") // ErrStoreNullPointer is returned when the provided ArcConfig has nil Log ErrStoreNullPointer = errors.New("store cannot be nil") // ErrFSMNullPointer is returned when the provided ArcConfig has nil FSM ErrFSMNullPointer = errors.New("FSM cannot be nil") )
var ( CAFile = configFile("ca.pem") ServerCertFile = configFile("server.pem") ServerKeyFile = configFile("server-key.pem") ClientCertFile = configFile("client.pem") ClientKeyFile = configFile("client-key.pem") RootClientCertFile = configFile("root-client.pem") RootClientKeyFile = configFile("root-client-key.pem") NobodyClientCertFile = configFile("nobody-client.pem") NobodyClientKeyFile = configFile("nobody-client-key.pem") ACLModelFile = configFile("model.conf") ACLPolicyFile = configFile("policy.csv") )
var ( ErrCannotHandleRequest = errors.New("couldn't handle the request") ErrMultiStreamMetadataCorrupted = errors.New("metadata cannot be verified") ErrWorkersNotFound = errors.New("workers not found") )
var ( // ErrTransportShutdown is returned when operations on a transport are // invoked after it's been terminated. ErrTransportShutdown = errors.New("transport shutdown") // ErrPipelineShutdown is returned when the pipeline is closed. ErrPipelineShutdown = errors.New("command pipeline closed") )
var (
ErrMaxChunkSizeInvalid = errors.New("configuration error: max chunk size cannot be zero")
)
var (
ErrPointNotFoundInMemory = errors.New("point not found in the store")
)
Functions ¶
func NewFollowerCache ¶
func NewFollowerCache(followers []*streaming_api.Server, multiStreamPercent int) *followerCache
NewFollowerCache caches the provided followers. multiStreamPercent is used to calculate the percent of followers that will be used to read stream parallel.
func NewLoadBalancer ¶
func NewLoadBalancer(config *loadbalancerConfig, opts ...grpc.ServerOption) (*grpc.Server, error)
func NewServer ¶
func NewServer(config *ServerConfig, opts ...grpc.ServerOption) (*grpc.Server, error)
func PanicValue ¶
func PanicValue(fn func()) (recovered interface{})
func RunService ¶
Build and run a service in a target directory
func StopService ¶
Types ¶
type Agent ¶
type Agent struct { AgentConfig // contains filtered or unexported fields }
func NewAgent ¶
func NewAgent(config AgentConfig) (*Agent, error)
type AgentConfig ¶
type AgentConfig struct { DataDir string BindAddr string RPCPort int ReplicationPort int NodeName string SeedAddresses []string VirtualNodeCount int ACLModelFile string ACLPolicyFile string ServerTLSConfig *tls.Config // Served to clients. PeerTLSConfig *tls.Config // Servers so they can connect with and replicate each other. LeaderAddresses []string // Addresses of the servers which will set this server as one of its loadbalancers (for replication). Rule ParticipationRule // True, if this server takes part in the ring (peer-to-peer architecture) and/or replication. MaxChunkSize uint64 // MaxChunkSize defines the size of the chunk that can be processed by this server. MultiStreamPercent int // MultiStreamPercent tells Percents of the number of followers to return upon GetMetadata request? }
func (AgentConfig) RPCAddr ¶
func (c AgentConfig) RPCAddr() (string, error)
func (AgentConfig) ReplicationRPCAddr ¶
func (c AgentConfig) ReplicationRPCAddr() (string, error)
type Arc ¶
func (Arc) GetFollowers ¶
func (state Arc) GetFollowers() []Server
GetFollowers gets the addresses of its loadbalancers.
type ArcConfig ¶
type ArcConfig struct { // Dialer StreamLayer StreamLayer Logger hclog.Logger // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply // the timeout by (SnapshotSize / TimeoutScale). Timeout time.Duration MaxChunkSize uint64 Bundler Bundler // contains filtered or unexported fields }
type Config ¶
type Config struct { Distributed struct { LocalID string StreamLayer *LocusStreamLayer StoreAddress string Logger hclog.Logger Rule ParticipationRule RPCAddress string Ring *ring.Ring MaxChunkSize uint64 } Point struct { TickTime time.Duration CloseTimeout time.Duration // contains filtered or unexported fields } }
type DistributedLoci ¶
type DistributedLoci struct {
// contains filtered or unexported fields
}
func NewDistributedLoci ¶
func NewDistributedLoci(dataDir string, config Config) ( *DistributedLoci, error, )
func (*DistributedLoci) Append ¶
func (d *DistributedLoci) Append(record *streaming_api.ProduceRequest) (uint64, error)
func (*DistributedLoci) ClosePoint ¶
func (d *DistributedLoci) ClosePoint(pointId string) error
func (*DistributedLoci) GetFollowers ¶
func (d *DistributedLoci) GetFollowers(req *streaming_api.GetFollowersRequest) ([]*streaming_api.Server, error)
func (*DistributedLoci) GetMetadata ¶ added in v0.1.2
func (d *DistributedLoci) GetMetadata(pointId string) (PointMetadata, error)
func (*DistributedLoci) GetPoints ¶
func (d *DistributedLoci) GetPoints() []string
func (*DistributedLoci) GetServers ¶
func (d *DistributedLoci) GetServers(req *streaming_api.GetServersRequest) ([]*streaming_api.Server, error)
func (*DistributedLoci) Join ¶
func (d *DistributedLoci) Join(rpcAddr string, rule ParticipationRule) error
func (*DistributedLoci) Leave ¶
func (d *DistributedLoci) Leave(rpcAddr string) error
func (*DistributedLoci) OnChange ¶ added in v0.1.2
func (d *DistributedLoci) OnChange(batch []ring.ShardResponsibility)
OnChange allows to get notified when new server joins the ring at the position next on the ring to this server. Hence, this server needs to send off points that should not be handled by this server anymore.
func (*DistributedLoci) ReadWithLimit ¶ added in v0.1.2
func (*DistributedLoci) Remove ¶
func (d *DistributedLoci) Remove() error
func (*DistributedLoci) Shutdown ¶
func (d *DistributedLoci) Shutdown() error
type FSM ¶
type FSM interface { Apply(command *RecordRequest) *FSMRecordResponse Read(key string, offset uint64) (uint64, []byte, error) }
type FSMRecordResponse ¶
type FSMRecordResponse struct { StoreKey interface{} StoreValue interface{} Response interface{} }
type Follower ¶
type Follower struct {
// contains filtered or unexported fields
}
func NewFollower ¶
func NewFollower(ServerAddress ServerAddress) (*Follower, error)
type FollowerResolverHelper ¶
type FollowerResolverHelper struct { streaming_api.UnimplementedFollowerResolverHelperServer *ResolverHelperConfig }
func NewFollowerResolverHelper ¶
func NewFollowerResolverHelper(config *ResolverHelperConfig) (*FollowerResolverHelper, error)
func (*FollowerResolverHelper) GetFollowers ¶
func (r *FollowerResolverHelper) GetFollowers(ctx context.Context, req *streaming_api.GetFollowersRequest) (*streaming_api.GetFollowersResponse, error)
type GetFollowerer ¶
type GetFollowerer interface {
GetFollowers(*streaming_api.GetFollowersRequest) ([]*streaming_api.Server, error)
}
type GetServerer ¶
type GetServerer interface {
GetServers(*streaming_api.GetServersRequest) ([]*streaming_api.Server, error)
}
type GetServersPromise ¶
type GetServersPromise struct {
// contains filtered or unexported fields
}
func (*GetServersPromise) Request ¶
func (c *GetServersPromise) Request() *GetServersRequest
func (*GetServersPromise) Response ¶
func (c *GetServersPromise) Response() interface{}
type GetServersRequest ¶
type GetServersRequest struct { }
type GetServersResponse ¶
type GetServersResponse struct {
Response interface{}
}
type InMemoryPointStore ¶
type InMemoryPointStore struct {
// contains filtered or unexported fields
}
InMemoryPointStore manages points and its last offset
func NewInMomoryPointStore ¶
func NewInMomoryPointStore(dir string) (*InMemoryPointStore, error)
func (*InMemoryPointStore) AddPointEvent ¶
func (store *InMemoryPointStore) AddPointEvent(pointId string, offset uint64) error
func (*InMemoryPointStore) GetPointEvent ¶
func (store *InMemoryPointStore) GetPointEvent(pointId string) (uint64, error)
type LBResolverHelper ¶
type LBResolverHelper struct { streaming_api.UnimplementedLBResolverHelperServer *ResolverHelperConfig }
func NewLBResolverHelper ¶
func NewLBResolverHelper(config *ResolverHelperConfig) (*LBResolverHelper, error)
func (*LBResolverHelper) GetServers ¶
func (r *LBResolverHelper) GetServers(ctx context.Context, req *streaming_api.GetServersRequest) (*streaming_api.GetServersResponse, error)
type Locus ¶
type Locus struct { Config Config // contains filtered or unexported fields }
func (*Locus) GetMetadata ¶ added in v0.1.2
func (l *Locus) GetMetadata(pointId string) (PointMetadata, error)
type LocusHelper ¶
type LocusStreamLayer ¶
type LocusStreamLayer struct {
// contains filtered or unexported fields
}
func NewStreamLayer ¶
func NewStreamLayer( ln net.Listener, serverTLSConfig, peerTLSConfig *tls.Config, ) *LocusStreamLayer
func (*LocusStreamLayer) Addr ¶
func (s *LocusStreamLayer) Addr() net.Addr
func (*LocusStreamLayer) Close ¶
func (s *LocusStreamLayer) Close() error
func (*LocusStreamLayer) Dial ¶
func (s *LocusStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)
type ParticipationRule ¶
type ParticipationRule uint8
const ( StandaloneLeaderRule ParticipationRule = iota // StandaloneLeaderRule server only participates in the sharding and doesn't need replication. LeaderRule // LeaderRule server participates in the sharding, but also has followers for replication. FollowerRule // FollowerRule server only participates in the replication. LoadBalancerRule // LoadBalancerRule server acts as an entry point to servers with other rules. )
type Point ¶
func (*Point) GetLastAccessed ¶
func (*Point) GetMetadata ¶ added in v0.1.2
func (p *Point) GetMetadata() PointMetadata
type PointMetadata ¶ added in v0.1.2
type PointMetadata struct {
// contains filtered or unexported fields
}
type RPC ¶
type RPC struct { Command interface{} RespChan chan<- RPCResponse }
RPC has a command, and provides a response mechanism.
type RPCResponse ¶
type RPCResponse struct { Response interface{} Error error }
RPCResponse captures both a response and a potential error.
type RecordEntriesPipeline ¶
type RecordEntriesPipeline interface { // SendRecordEntriesRequest is used to add another request to the pipeline. // To send may block which is an effective form of back-pressure. SendRecordEntriesRequest(req *RecordEntriesRequest, resp *RecordEntriesResponse) (Promise, error) // Consumer returns a channel that can be used to consume // response futures when they are ready. Consumer() <-chan Promise // Close closes the pipeline and cancels all inflight RPCs Close() error }
RecordEntriesPipeline is used for pipelining AppendEntries requests. It is used to increase the replication throughput by masking latency and better utilizing bandwidth.
type RecordEntriesPromise ¶
type RecordEntriesPromise struct {
// contains filtered or unexported fields
}
func (*RecordEntriesPromise) Request ¶
func (c *RecordEntriesPromise) Request() *RecordEntriesRequest
func (*RecordEntriesPromise) Response ¶
func (c *RecordEntriesPromise) Response() interface{}
type RecordEntriesRequest ¶
type RecordEntriesRequest struct {
Entries []*RecordRequest
}
type RecordEntriesResponse ¶
type RecordEntriesResponse struct { LastOff uint64 Response interface{} }
type RecordPromise ¶
type RecordPromise struct {
// contains filtered or unexported fields
}
func (*RecordPromise) Request ¶
func (c *RecordPromise) Request() *RecordRequest
func (*RecordPromise) Response ¶
func (c *RecordPromise) Response() interface{}
type RecordRequest ¶
type RecordRequest struct {
Data []byte
}
type RecordResponse ¶
type RecordResponse struct { LastOff uint64 Response interface{} }
type RedisPointStore ¶
type RedisPointStore struct {
// contains filtered or unexported fields
}
RedisPointStore manages points and its last offset
func NewRedisPointStore ¶
func NewRedisPointStore(address string, dir string) (*RedisPointStore, error)
func (*RedisPointStore) AddPointEvent ¶
func (rps *RedisPointStore) AddPointEvent(pointId string, offset uint64) error
func (*RedisPointStore) GetPointEvent ¶
func (rps *RedisPointStore) GetPointEvent(pointId string) (uint64, error)
type ReplicateCommandPromise ¶
type ReplicateCommandPromise struct {
// contains filtered or unexported fields
}
func NewReplicateCommandPromise ¶
func NewReplicateCommandPromise(req *RecordRequest, expected interface{}) *ReplicateCommandPromise
type ReplicationClusterHandler ¶
type ReplicationClusterHandler interface { Join(rpcAddr string, rule ParticipationRule) error Leave(rpcAddr string) error }
ReplicationClusterHandler interface to get notified when a new member joins or existing member leaves the cluster of replication.
type RequestBundler ¶
type RequestBundler struct { }
func (*RequestBundler) Build ¶
func (rb *RequestBundler) Build(header interface{}, key interface{}, value interface{}) ( []byte, error, )
type Resolver ¶
type Resolver struct {
// contains filtered or unexported fields
}
func (*Resolver) Build ¶
func (r *Resolver) Build( target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, ) ( resolver.Resolver, error, )
func (*Resolver) ResolveNow ¶
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions)
type ResolverHelperConfig ¶
type ResolverHelperConfig struct { GetServerer GetServerer GetFollowerer GetFollowerer }
type Server ¶
type Server struct { // Address is its network address that a transport can contact. Address ServerAddress }
Server tracks the information about a single server in a configuration.
type ServerAddress ¶
type ServerAddress string
ServerAddress is a network address for a server that a transport can contact.
type ServerConfig ¶
type ServerConfig struct { StreamingConfig *StreamingConfig ResolverHelperConfig *ResolverHelperConfig Rule ParticipationRule }
type StreamLayer ¶
type StreamLayer interface { net.Listener // Dial is used to create a new outgoing connection Dial(address ServerAddress, timeout time.Duration) (net.Conn, error) }
StreamLayer is used with the NetworkTransport to provide the low level stream abstraction.
type StreamingConfig ¶
type StreamingConfig struct { Locus LocusHelper Authorizer *authorizer }
type StreamingManager ¶
type StreamingManager struct { streaming_api.UnimplementedStreamingServer *StreamingConfig }
func NewStreamingServer ¶
func NewStreamingServer(config *StreamingConfig) (*StreamingManager, error)
func (*StreamingManager) ConsumeStream ¶
func (s *StreamingManager) ConsumeStream(req *streaming_api.ConsumeRequest, stream streaming_api.Streaming_ConsumeStreamServer) error
func (*StreamingManager) GetMetadata ¶ added in v0.1.2
func (s *StreamingManager) GetMetadata(ctx context.Context, req *streaming_api.MetadataRequest) (*streaming_api.MetadataResponse, error)
func (*StreamingManager) ProduceStream ¶
func (s *StreamingManager) ProduceStream(stream streaming_api.Streaming_ProduceStreamServer) error
type TCPStreamLayer ¶
type TCPStreamLayer struct {
// contains filtered or unexported fields
}
TCPStreamLayer implements StreamLayer interface for plain TCP.
func (*TCPStreamLayer) Accept ¶
func (t *TCPStreamLayer) Accept() (c net.Conn, err error)
Accept implements the net.Listener interface.
func (*TCPStreamLayer) Addr ¶
func (t *TCPStreamLayer) Addr() net.Addr
Addr implements the net.Listener interface.
func (*TCPStreamLayer) Close ¶
func (t *TCPStreamLayer) Close() (err error)
Close implements the net.Listener interface.
func (*TCPStreamLayer) Dial ¶
func (t *TCPStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)
Dial implements the StreamLayer interface.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport provides a network based transport that can be used to communicate with Raft on remote machines. It requires an underlying stream layer to provide a stream abstraction, which can be simple TCP, TLS, etc.
This transport is very simple and lightweight. Each RPC request is framed by sending a byte that indicates the MsgPack encoded request.
The response is an error string followed by the response object, both are encoded using MsgPack.
func NewTransportWithConfig ¶
func NewTransportWithConfig( config *TransportConfig, ) *Transport
NewTransportWithConfig creates a new network transport with the given config struct
func (*Transport) CloseStreams ¶
func (transport *Transport) CloseStreams()
CloseStreams closes the current streams.
func (*Transport) IsShutdown ¶
IsShutdown is used to check if the transport is shutdown.
func (*Transport) LocalAddr ¶
func (transport *Transport) LocalAddr() ServerAddress
LocalAddr implements the Transport interface.
func (*Transport) PrepareCommandTransport ¶
func (transport *Transport) PrepareCommandTransport(target ServerAddress) (RecordEntriesPipeline, error)
PrepareCommandTransport returns an interface that can be used to pipeline SendRecordEntriesRequest requests.
func (*Transport) SendGetServersRequest ¶
func (transport *Transport) SendGetServersRequest(target ServerAddress, req *GetServersRequest, resp *GetServersResponse) error
SendGetServersRequest requests the target to provide the list of its loadbalancers.
func (*Transport) SendRecordEntriesRequest ¶
func (transport *Transport) SendRecordEntriesRequest(target ServerAddress, req *RecordEntriesRequest, resp *RecordEntriesResponse) error
SendRecordEntriesRequest implements the Transport interface.
type TransportConfig ¶
type TransportConfig struct { Logger hclog.Logger // Dialer Stream StreamLayer // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply // the timeout by (SnapshotSize / TimeoutScale). Timeout time.Duration MaxPool int }
TransportConfig encapsulates configuration for the network transport layer.
Source Files ¶
- agent.go
- arc.go
- authorizer.go
- cmd.go
- config.go
- const.go
- distributed.go
- files.go
- fsm.go
- in_memory_store.go
- loadbalancer.go
- locus.go
- picker.go
- point.go
- promise.go
- redis_store.go
- replicate.go
- replication_cluster.go
- resolver.go
- server.go
- streaming.go
- svc_util.go
- tcp_transport.go
- tls.go
- transport.go
- types.go
- util.go