Documentation
¶
Index ¶
- Variables
- func GetNamespaceConfig(namespaces []model.NamespaceConfig, namespace string) *model.NamespaceConfig
- func NewK8SClientConfig() *rest.Config
- func NewK8SClientset(config *rest.Config) kubernetes.Interface
- type Client
- type Coordinator
- type MetadataContainer
- type MetadataProvider
- type NodeAvailabilityListener
- type NodeController
- type NodeStatus
- type ResourceInterface
- type RpcProvider
- type ServerContext
- type ShardAssignmentsProvider
- type ShardController
- type SwapNodeAction
- type Version
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrMetadataNotInitialized = errors.New("metadata not initialized") ErrMetadataBadVersion = errors.New("metadata bad version") )
View Source
var (
ErrNamespaceNotFound = errors.New("namespace not found")
)
Functions ¶
func GetNamespaceConfig ¶ added in v0.10.0
func GetNamespaceConfig(namespaces []model.NamespaceConfig, namespace string) *model.NamespaceConfig
func NewK8SClientConfig ¶
func NewK8SClientset ¶
func NewK8SClientset(config *rest.Config) kubernetes.Interface
Types ¶
type Client ¶
type Client[Resource resource] interface { Upsert(namespace, name string, resource *Resource) (*Resource, error) Delete(namespace, name string) error Get(namespace, name string) (*Resource, error) }
func K8SConfigMaps ¶
func K8SConfigMaps(kc kubernetes.Interface) Client[corev1.ConfigMap]
type Coordinator ¶
type Coordinator interface { io.Closer ShardAssignmentsProvider InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error ElectedLeader(namespace string, shard int64, metadata model.ShardMetadata) error ShardDeleted(namespace string, shard int64) error NodeAvailabilityListener FindServerAddressByInternalAddress(internalAddress string) (*model.ServerAddress, bool) ClusterStatus() model.ClusterStatus }
func NewCoordinator ¶
func NewCoordinator(metadataProvider MetadataProvider, clusterConfigProvider func() (model.ClusterConfig, error), clusterConfigNotificationsCh chan any, rpc RpcProvider) (Coordinator, error)
type MetadataContainer ¶
type MetadataContainer struct { ClusterStatus *model.ClusterStatus `json:"clusterStatus"` Version Version `json:"version"` }
type MetadataProvider ¶
type MetadataProvider interface { io.Closer Get() (cs *model.ClusterStatus, version Version, err error) Store(cs *model.ClusterStatus, expectedVersion Version) (newVersion Version, err error) }
func NewMetadataProviderConfigMap ¶
func NewMetadataProviderConfigMap(kc k8s.Interface, namespace, name string) MetadataProvider
func NewMetadataProviderFile ¶
func NewMetadataProviderFile(path string) MetadataProvider
func NewMetadataProviderMemory ¶
func NewMetadataProviderMemory() MetadataProvider
type NodeAvailabilityListener ¶
type NodeAvailabilityListener interface {
}type NodeController ¶
type NodeController interface { io.Closer Status() NodeStatus SetStatus(status NodeStatus) }
The NodeController takes care of checking the health-status of each node and to push all the service discovery updates.
func NewNodeController ¶
func NewNodeController(addr model.ServerAddress, shardAssignmentsProvider ShardAssignmentsProvider, nodeAvailabilityListener NodeAvailabilityListener, rpc RpcProvider) NodeController
type ResourceInterface ¶
type ResourceInterface[Resource resource] interface { Create(ctx context.Context, resource *Resource, opts metav1.CreateOptions) (*Resource, error) Update(ctx context.Context, resource *Resource, opts metav1.UpdateOptions) (*Resource, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*Resource, error) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*Resource, error) }
type RpcProvider ¶
type RpcProvider interface { PushShardAssignments(ctx context.Context, node model.ServerAddress) (proto.OxiaCoordination_PushShardAssignmentsClient, error) NewTerm(ctx context.Context, node model.ServerAddress, req *proto.NewTermRequest) (*proto.NewTermResponse, error) BecomeLeader(ctx context.Context, node model.ServerAddress, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error) AddFollower(ctx context.Context, node model.ServerAddress, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error) GetStatus(ctx context.Context, node model.ServerAddress, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error) DeleteShard(ctx context.Context, node model.ServerAddress, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error) GetHealthClient(node model.ServerAddress) (grpc_health_v1.HealthClient, io.Closer, error) ClearPooledConnections(node model.ServerAddress) }
func NewRpcProvider ¶
func NewRpcProvider(pool common.ClientPool) RpcProvider
type ServerContext ¶ added in v0.11.4
type ServerContext struct { Addr model.ServerAddress Shards common.Set[int64] }
type ShardAssignmentsProvider ¶
type ShardAssignmentsProvider interface {
WaitForNextUpdate(ctx context.Context, currentValue *proto.ShardAssignments) (*proto.ShardAssignments, error)
}
type ShardController ¶
type ShardController interface { io.Closer HandleNodeFailure(failedNode model.ServerAddress) SyncServerAddress() SwapNode(from model.ServerAddress, to model.ServerAddress) error DeleteShard() Term() int64 Leader() *model.ServerAddress Status() model.ShardStatus }
The ShardController is responsible to handle all the state transition for a given a shard e.g. electing a new leader.
func NewShardController ¶
func NewShardController(namespace string, shard int64, namespaceConfig *model.NamespaceConfig, shardMetadata model.ShardMetadata, rpc RpcProvider, coordinator Coordinator) ShardController
type SwapNodeAction ¶
type SwapNodeAction struct { Shard int64 From model.ServerAddress To model.ServerAddress }
Click to show internal directories.
Click to hide internal directories.