helix

package module
v0.0.0-...-896f72c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2016 License: GPL-3.0 Imports: 6 Imported by: 0

README

go-helix

A apache-helix implemented in golang

Originally based on https://github.com/yichen/gohelix

Roadmap
  • Full HelixAdmin
  • Full HelixManager
  • Full Spectator
  • Controller
Features
  • Multi node

    • partitioning
    • discovery
    • co-location
  • Fault tolarant

    • replication
    • fault detection
    • recovery
  • Cluster expansion

    • throttle movement
    • redistribute data
TODO
  • PROPERTYSTORE
  • timer tasks
  • chroot bug
  • super cluster
  • tag
  • too many WaitUntilConnected
  • metrics
  • constraint
  • HelixMultiClusterController, HelixStateTransitionHandler, HelixTaskExecutor.onMessage, RoutingTableProvider

Documentation

Overview

Package helix is the base pkg required for Helix. In this package, we provide several abstraction:

HelixAdmin HelixManager HelixTimerTask StateMachineEngine ClusterMessagingService

Besides, The following structs are defined in helix pkg:

StateModel Context

Index

Constants

View Source
const (
	ConstaintTypeState   = "STATE_CONSTRAINT"
	ConstaintTypeMessage = "MESSAGE_CONSTRAINT"
)
View Source
const (
	MessageStateNew           = "NEW"
	MessageStateRead          = "READ"
	MessageStateUnprocessable = "UNPROCESSABLE"
)
View Source
const (
	MessageTypeStateTransition  = "STATE_TRANSITION"
	MessageTypeScheduler        = "SCHEDULER_MSG"
	MessageTypeUserDefine       = "USER_DEFINE_MSG"
	MessageTypeController       = "CONTROLLER_MSG"
	MessageTypeTaskReply        = "TASK_REPLY"
	MessageTypeNoOp             = "NO_OP"
	MessageTypeParticipantError = "PARTICIPANT_ERROR_REPORT"
)
View Source
const (
	RebalancerModeFullAuto    = "FULL_AUTO"
	RebalancerModeSemiAuto    = "SEMI_AUTO"
	RebalancerModeCustomized  = "CUSTOMIZED"
	RebalancerModeUserDefined = "USER_DEFINED"
	RebalancerModeTask        = "TASK"
)
----------------------------------------------------------
|FULL_AUTO     |  SEMI_AUTO | CUSTOMIZED|  USER_DEFINED  |
---------------------------------------------------------|

LOCATION | HELIX | APP | APP | APP |

      ---------------------------------------------------------|
STATE | HELIX        |  HELIX     |  APP      |      APP       |
      ----------------------------------------------------------
View Source
const (
	StateModelLeaderStandby      = "LeaderStandby"
	StateModelMasterSlave        = "MasterSlave"
	StateModelOnlineOffline      = "OnlineOffline"
	StateModelDefaultSchemata    = "STORAGE_DEFAULT_SM_SCHEMATA"
	StateModelSchedulerTaskQueue = "SchedulerTaskQueue"
	StateModelTask               = "Task"
)

Variables

View Source
var (
	// ErrClusterNotSetup means the helix data structure in zookeeper /{CLUSTER_NAME}
	// is not correct or does not exist
	ErrClusterNotSetup = errors.New("Cluster not setup")

	// ErrNodeAlreadyExists the zookeeper node exists when it is not expected to
	ErrNodeAlreadyExists = errors.New("Node already exists in cluster")

	// ErrNodeNotExist the zookeeper node does not exist when it is expected to
	ErrNodeNotExist = errors.New("Node does not exist in config for cluster")

	// ErrInstanceNotExist the instance of a cluster does not exist when it is expected to
	ErrInstanceNotExist = errors.New("Node does not exist in instances for cluster")

	// ErrStateModelDefNotExist the state model definition is expected to exist in zookeeper
	ErrStateModelDefNotExist = errors.New("State model not exist in cluster")

	// ErrResourceExists the resource already exists in cluster and cannot be added again
	ErrResourceExists = errors.New("Resource already exists in cluster")

	// ErrResourceNotExists the resource does not exists and cannot be removed
	ErrResourceNotExists = errors.New("Resource not exists in cluster")
)
View Source
var (
	// ErrEnsureParticipantConfig is returned when participant configuration cannot be
	// created in zookeeper
	ErrEnsureParticipantConfig = errors.New("Participant configuration could not be added")

	// ErrInvalidAddResourceOption is returned when user provides a invalid resource to add.
	ErrInvalidAddResourceOption = errors.New("Invalid AddResourceOption")

	ErrInvalidClusterName = errors.New("Invalid cluster name")

	ErrEmptyStateModel = errors.New("Register at least one valid state model before connecting.")

	// ErrNotConnected is returned when call a function without calling Connect() beforehand.
	ErrNotConnected = errors.New("Not connected yet")

	ErrPartialSuccess = errors.New("Partial success")

	ErrDupStateModelName = errors.New("Register a state model over once")

	ErrSessionChanged = errors.New("SessionID changed")

	ErrDupStateTransition = errors.New("Register a state model transition over once")

	ErrNotEmpty = errors.New("Not empty")

	ErrInvalidArgument = errors.New("Invalid arguments")

	ErrInvalidMessage = errors.New("Invalid message")

	ErrDupOperation = errors.New("Duplicated operation")

	ErrUnkownMessageType = errors.New("Unknown message type")

	ErrSystem = errors.New("Unknown system error")
)
View Source
var (
	ErrNotImplemented = errors.New("Not implemented")
)
View Source
var HelixBuiltinStateModels = map[string][]byte{
	StateModelLeaderStandby: []byte(`
{
  "id" : "LeaderStandby",
  "mapFields" : {
    "DROPPED.meta" : {
      "count" : "-1"
    },
    "LEADER.meta" : {
      "count" : "1"
    },
    "LEADER.next" : {
      "DROPPED" : "STANDBY",
      "STANDBY" : "STANDBY",
      "OFFLINE" : "STANDBY"
    },
    "OFFLINE.meta" : {
      "count" : "-1"
    },
    "OFFLINE.next" : {
      "DROPPED" : "DROPPED",
      "STANDBY" : "STANDBY",
      "LEADER" : "STANDBY"
    },
    "STANDBY.meta" : {
      "count" : "R"
    },
    "STANDBY.next" : {
      "DROPPED" : "OFFLINE",
      "OFFLINE" : "OFFLINE",
      "LEADER" : "LEADER"
    }
  },
  "listFields" : {
    "STATE_PRIORITY_LIST" : [ "LEADER", "STANDBY", "OFFLINE", "DROPPED" ],
    "STATE_TRANSITION_PRIORITYLIST" : [ "LEADER-STANDBY", "STANDBY-LEADER", "OFFLINE-STANDBY", "STANDBY-OFFLINE", "OFFLINE-DROPPED" ]
  },
  "simpleFields" : {
    "INITIAL_STATE" : "OFFLINE"
  }
}
`),

	StateModelMasterSlave: []byte(`
{
  "id" : "MasterSlave",
  "mapFields" : {
    "DROPPED.meta" : {
      "count" : "-1"
    },
    "ERROR.meta" : {
      "count" : "-1"
    },
    "ERROR.next" : {
      "DROPPED" : "DROPPED",
      "OFFLINE" : "OFFLINE"
    },
    "MASTER.meta" : {
      "count" : "1"
    },
    "MASTER.next" : {
      "SLAVE" : "SLAVE",
      "DROPPED" : "SLAVE",
      "OFFLINE" : "SLAVE"
    },
    "OFFLINE.meta" : {
      "count" : "-1"
    },
    "OFFLINE.next" : {
      "SLAVE" : "SLAVE",
      "DROPPED" : "DROPPED",
      "MASTER" : "SLAVE"
    },
    "SLAVE.meta" : {
      "count" : "R"
    },
    "SLAVE.next" : {
      "DROPPED" : "OFFLINE",
      "OFFLINE" : "OFFLINE",
      "MASTER" : "MASTER"
    }
  },
  "listFields" : {
    "STATE_PRIORITY_LIST" : [ "MASTER", "SLAVE", "OFFLINE", "DROPPED", "ERROR" ],
    "STATE_TRANSITION_PRIORITYLIST" : [ "MASTER-SLAVE", "SLAVE-MASTER", "OFFLINE-SLAVE", "SLAVE-OFFLINE", "OFFLINE-DROPPED" ]
  },
  "simpleFields" : {
    "INITIAL_STATE" : "OFFLINE"
  }
}
`),

	StateModelOnlineOffline: []byte(`
{
  "id" : "OnlineOffline",
  "mapFields" : {
    "DROPPED.meta" : {
      "count" : "-1"
    },
    "OFFLINE.meta" : {
      "count" : "-1"
    },
    "OFFLINE.next" : {
      "DROPPED" : "DROPPED",
      "ONLINE" : "ONLINE"
    },
    "ONLINE.meta" : {
      "count" : "R"
    },
    "ONLINE.next" : {
      "DROPPED" : "OFFLINE",
      "OFFLINE" : "OFFLINE"
    }
  },
  "listFields" : {
    "STATE_PRIORITY_LIST" : [ "ONLINE", "OFFLINE", "DROPPED" ],
    "STATE_TRANSITION_PRIORITYLIST" : [ "OFFLINE-ONLINE", "ONLINE-OFFLINE", "OFFLINE-DROPPED" ]
  },
  "simpleFields" : {
    "INITIAL_STATE" : "OFFLINE"
  }
}	
`),

	StateModelDefaultSchemata: []byte(`
{
  "id" : "STORAGE_DEFAULT_SM_SCHEMATA",
  "mapFields" : {
    "DROPPED.meta" : {
      "count" : "-1"
    },
    "ERROR.meta" : {
      "count" : "-1"
    },
    "ERROR.next" : {
      "DROPPED" : "DROPPED",
      "OFFLINE" : "OFFLINE"
    },
    "MASTER.meta" : {
      "count" : "N"
    },
    "MASTER.next" : {
      "DROPPED" : "OFFLINE",
      "OFFLINE" : "OFFLINE"
    },
    "OFFLINE.meta" : {
      "count" : "-1"
    },
    "OFFLINE.next" : {
      "DROPPED" : "DROPPED",
      "MASTER" : "MASTER"
    }
  },
  "listFields" : {
    "STATE_PRIORITY_LIST" : [ "MASTER", "OFFLINE", "DROPPED", "ERROR" ],
    "STATE_TRANSITION_PRIORITYLIST" : [ "MASTER-OFFLINE", "OFFLINE-MASTER" ]
  },
  "simpleFields" : {
    "INITIAL_STATE" : "OFFLINE"
  }
}
`),

	StateModelSchedulerTaskQueue: []byte(`
{
  "id" : "SchedulerTaskQueue",
  "mapFields" : {
    "COMPLETED.meta" : {
      "count" : "1"
    },
    "COMPLETED.next" : {
      "DROPPED" : "DROPPED",
      "COMPLETED" : "COMPLETED"
    },
    "DROPPED.meta" : {
      "count" : "-1"
    },
    "DROPPED.next" : {
      "DROPPED" : "DROPPED"
    },
    "OFFLINE.meta" : {
      "count" : "-1"
    },
    "OFFLINE.next" : {
      "DROPPED" : "DROPPED",
      "OFFLINE" : "OFFLINE",
      "COMPLETED" : "COMPLETED"
    }
  },
  "listFields" : {
    "STATE_PRIORITY_LIST" : [ "COMPLETED", "OFFLINE", "DROPPED" ],
    "STATE_TRANSITION_PRIORITYLIST" : [ "OFFLINE-COMPLETED", "OFFLINE-DROPPED", "COMPLETED-DROPPED" ]
  },
  "simpleFields" : {
    "INITIAL_STATE" : "OFFLINE"
  }
}
`),

	StateModelTask: []byte(`
{
  "id" : "Task",
  "mapFields" : {
    "COMPLETED.meta" : {
      "count" : "-1"
    },
    "COMPLETED.next" : {
      "STOPPED" : "INIT",
      "DROPPED" : "DROPPED",
      "RUNNING" : "INIT",
      "INIT" : "INIT",
      "COMPLETED" : "COMPLETED",
      "TASK_ERROR" : "INIT",
      "TIMED_OUT" : "INIT"
    },
    "DROPPED.meta" : {
      "count" : "-1"
    },
    "DROPPED.next" : {
      "DROPPED" : "DROPPED"
    },
    "INIT.meta" : {
      "count" : "-1"
    },
    "INIT.next" : {
      "STOPPED" : "RUNNING",
      "DROPPED" : "DROPPED",
      "RUNNING" : "RUNNING",
      "INIT" : "INIT",
      "COMPLETED" : "RUNNING",
      "TASK_ERROR" : "RUNNING",
      "TIMED_OUT" : "RUNNING"
    },
    "RUNNING.meta" : {
      "count" : "-1"
    },
    "RUNNING.next" : {
      "STOPPED" : "STOPPED",
      "DROPPED" : "DROPPED",
      "RUNNING" : "RUNNING",
      "INIT" : "INIT",
      "COMPLETED" : "COMPLETED",
      "TASK_ERROR" : "TASK_ERROR",
      "TIMED_OUT" : "TIMED_OUT"
    },
    "STOPPED.meta" : {
      "count" : "-1"
    },
    "STOPPED.next" : {
      "STOPPED" : "STOPPED",
      "DROPPED" : "DROPPED",
      "RUNNING" : "RUNNING",
      "INIT" : "INIT",
      "COMPLETED" : "RUNNING",
      "TASK_ERROR" : "RUNNING",
      "TIMED_OUT" : "RUNNING"
    },
    "TASK_ERROR.meta" : {
      "count" : "-1"
    },
    "TASK_ERROR.next" : {
      "STOPPED" : "INIT",
      "DROPPED" : "DROPPED",
      "RUNNING" : "INIT",
      "INIT" : "INIT",
      "COMPLETED" : "INIT",
      "TIMED_OUT" : "INIT",
      "TASK_ERROR" : "TASK_ERROR"
    },
    "TIMED_OUT.meta" : {
      "count" : "-1"
    },
    "TIMED_OUT.next" : {
      "STOPPED" : "INIT",
      "DROPPED" : "DROPPED",
      "RUNNING" : "INIT",
      "INIT" : "INIT",
      "COMPLETED" : "INIT",
      "TASK_ERROR" : "INIT",
      "TIMED_OUT" : "TIMED_OUT"
    }
  },
  "listFields" : {
    "STATE_PRIORITY_LIST" : [ "INIT", "RUNNING", "STOPPED", "COMPLETED", "TIMED_OUT", "TASK_ERROR", "DROPPED" ],
    "STATE_TRANSITION_PRIORITYLIST" : [ "INIT-RUNNING", "RUNNING-STOPPED", "RUNNING-COMPLETED", "RUNNING-TIMED_OUT", "RUNNING-TASK_ERROR", "STOPPED-RUNNING", "INIT-DROPPED", "RUNNING-DROPPED", "COMPLETED-DROPPED", "STOPPED-DROPPED", "TIMED_OUT-DROPPED", "TASK_ERROR-DROPPED", "RUNNING-INIT", "COMPLETED-INIT", "STOPPED-INIT", "TIMED_OUT-INIT", "TASK_ERROR-INIT" ]
  },
  "simpleFields" : {
    "INITIAL_STATE" : "INIT"
  }
}`),
}

Functions

This section is empty.

Types

type AddResourceOption

type AddResourceOption struct {
	Partitions               int
	StateModel               string
	RebalancerMode           string
	RebalanceStrategy        string
	BucketSize               int
	MaxPartitionsPerInstance int
}

func DefaultAddResourceOption

func DefaultAddResourceOption(partitions int, stateModel string) AddResourceOption

func (AddResourceOption) Valid

func (opt AddResourceOption) Valid() bool

type ChangeNotification

type ChangeNotification struct {
	ChangeType ChangeNotificationType
	ChangeData interface{}
}

func (ChangeNotification) String

func (ctx ChangeNotification) String() string

type ChangeNotificationType

type ChangeNotificationType uint8
const (
	ExternalViewChanged       ChangeNotificationType = 0
	LiveInstanceChanged       ChangeNotificationType = 1
	IdealStateChanged         ChangeNotificationType = 2
	CurrentStateChanged       ChangeNotificationType = 3
	InstanceConfigChanged     ChangeNotificationType = 4
	ControllerMessagesChanged ChangeNotificationType = 5
	InstanceMessagesChanged   ChangeNotificationType = 6
	ControllerChanged         ChangeNotificationType = 7

	CallbackInit     ChangeNotificationType = 101
	CallbackInvoke   ChangeNotificationType = 102
	CallbackFinalize ChangeNotificationType = 103
)

func (ChangeNotificationType) IsCallbackInvoke

func (ct ChangeNotificationType) IsCallbackInvoke() bool

func (ChangeNotificationType) String

func (ct ChangeNotificationType) String() string

type ClusterMessagingService

type ClusterMessagingService interface {
	Send(*model.Message) error

	// RegisterMessageHandler will register a message handler for given type of message.
	// StateMachineEngine is the factory of message handler.
	RegisterMessageHandlerFactory(messageType string, factory StateMachineEngine)
}

ClusterMessagingService can be used to send cluster wide messages.

Send message to a specific component in the cluster[participant, controller]. Broadcast message to all participants. Send message to instances that hold a specific resource.

type Context

type Context struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Context is a goroutine safe property bag for storing data between listeners and callbacks. TODO mv to model pkg

func NewContext

func NewContext(m HelixManager) *Context

NewContext creates a new Context instance.

func (*Context) Get

func (c *Context) Get(key string) interface{}

Get gets the value of a key.

func (Context) Manager

func (c Context) Manager() HelixManager

func (*Context) Set

func (c *Context) Set(key string, value interface{})

Set sets a key value pair.

func (*Context) SetNX

func (c *Context) SetNX(key string, value interface{}) (ok bool)

SetNX is Set if Not eXists.

type ControllerChangeListener

type ControllerChangeListener func(ctx *Context)

ControllerChangeListener is triggered when controller changes.

type CurrentStateChangeListener

type CurrentStateChangeListener func(instance string, currentState []*model.CurrentState, ctx *Context)

CurrentStateChangeListener is triggered when the current state of a participant changed.

type ExternalViewChangeListener

type ExternalViewChangeListener func(externalViews []*model.ExternalView, ctx *Context)

ExternalViewChangeListener is triggered when the external view is updated.

type HelixAdmin

type HelixAdmin interface {

	// Connect will connect to the storage.
	Connect() error

	// Disconnect will disconnect from the storage and release related resources.
	Disconnect()

	// AddCluster add a managed cluster to Helix.
	AddCluster(cluster string) error

	// EnableCluster disable/enable a cluster.
	EnableCluster(cluster string, yes bool) error

	// AddClusterToGrandCluster adds a cluster and also adds this cluster as a resource group in the super cluster.
	AddClusterToGrandCluster(cluster, grandCluster string) error

	// Clusters return all Helix managed clusters under '/'.
	Clusters() ([]string, error)

	// DropCluster removes a Helix managed cluster.
	DropCluster(cluster string) error

	// IsClusterSetup checks if a cluster is setup ok.
	IsClusterSetup(cluster string) (bool, error)

	// AllowParticipantAutoJoin permits a partitipant work without calling AddInstance beforehand.
	// By default this feature is off.
	AllowParticipantAutoJoin(cluster string, yes bool) error

	// ControllerLeader returns the active controller leader of a cluster.
	ControllerLeader(cluster string) string

	// ControllerHistory returns all controller instance names in history.
	ControllerHistory(cluster string) ([]string, error)

	// Add a node to a cluster.
	// node has the form of host_port.
	AddNode(cluster string, node string) error

	// DropNode drops a node from a cluster.
	DropNode(cluster string, node string) error

	// Add an instance to a cluster.
	AddInstance(cluster string, ic *model.InstanceConfig) error

	// AddInstanceTag adds a tag to an instance.
	AddInstanceTag(cluster, instance, tag string) error

	// RemoveInstanceTag removes a tag from an instance.
	RemoveInstanceTag(cluster, instance, tag string) error

	// DropInstance drops an instance from a cluster.
	DropInstance(cluster string, ic *model.InstanceConfig) error

	// Instances returns a list of instances participating under a cluster.
	Instances(cluster string) ([]string, error)

	// LiveInstances returns a list of live instances participating under a cluster.
	LiveInstances(cluster string) ([]string, error)

	// InstancesWithTag returns a list of resources in a cluster with a tag.
	InstancesWithTag(cluster, tag string) ([]string, error)

	// InstanceConfig returns configuration information of an instance in a cluster.
	InstanceConfig(cluster, instance string) (*model.InstanceConfig, error)

	// Enable or disable an instance.
	EnableInstance(cluster, instance string, yes bool) error

	// ResourceIdealState returns ideal state for a resource.
	ResourceIdealState(cluster, resource string) (*model.IdealState, error)

	// SetResourceIdealState sets ideal state for a resource.
	SetResourceIdealState(cluster, resource string, is *model.IdealState) error

	// Add a resource to a cluster.
	AddResource(cluster string, resource string, option AddResourceOption) error

	// DropResource removes the specified resource from the cluster.
	DropResource(cluster string, resource string) error

	// Resources returns a list of resources managed by the helix cluster.
	Resources(cluster string) ([]string, error)

	// ResourcesWithTag returns a list of resources in a cluster with a tag.
	ResourcesWithTag(cluster, tag string) ([]string, error)

	// EnableResource enables/disables the specified resource in the cluster.
	EnableResource(cluster string, resource string, yes bool) error

	// ScaleResource hot change partition num of a resource.
	ScaleResource(cluster string, resource string, partitions int) error

	// EnablePartitions disable or enable a list of partitions on an instance.
	EnablePartitions(cluster, resource string, partitions []string, yes bool) error

	// AddStateModelDef adds a state model to a cluster.
	AddStateModelDef(cluster string, stateModel string, definition *model.StateModelDef) error

	// StateModelDefs gets a list of state model names under a cluster.
	StateModelDefs(cluster string) ([]string, error)

	// StateModelDef returns a state model definition in a cluster.
	StateModelDef(cluster, stateModel string) (*model.StateModelDef, error)

	// ResourceExternalView gets external view for a resource.
	ResourceExternalView(cluster string, resource string) (*model.ExternalView, error)

	// Rebalance a resource in cluster.
	Rebalance(cluster string, resource string, replica int) error

	// SetConfig set the configuration values for the cluster, defined by the config scope.
	// TODO
	SetConfig(cluster string, scope HelixConfigScope, properties map[string]string, ident ...string) error

	// DropConfig will drop properties of keys from a configuration in simple field.
	DropConfig(scope HelixConfigScope, keys []string, ident ...string) error

	// GetConfig obtains the configuration value of a property, defined by a config scope.
	// TODO
	GetConfig(cluster string, scope HelixConfigScope, keys []string, ident ...string) (map[string]interface{}, error)

	AddConstaint()
	RemoveConstaint()
	Constraints()

	// SetInstallPath will setup the local helix installation base path.
	// TODO kill this
	SetInstallPath(path string)
}

HelixAdmin handles the administration task for the Helix cluster.

type HelixConfigScope

type HelixConfigScope string
const (
	ConfigScopeCluster     HelixConfigScope = "CLUSTER"
	ConfigScopeParticipant HelixConfigScope = "PARTICIPANT"
	ConfigScopeResource    HelixConfigScope = "RESOURCE"
	ConfigScopeConstraint  HelixConfigScope = "CONSTRAINT"
	ConfigScopePartition   HelixConfigScope = "PARTITION"
)

type HelixDefinedState

type HelixDefinedState string
const (
	HelixDefinedStateOnline  HelixDefinedState = "ONLINE"
	HelixDefinedStateOffline HelixDefinedState = "OFFLINE"

	HelixDefinedStateLeader  HelixDefinedState = "LEADER"
	HelixDefinedStateStandby HelixDefinedState = "STANDBY"

	HelixDefinedStateMaster HelixDefinedState = "MASTER"
	HelixDefinedStateSlave  HelixDefinedState = "SLAVE"

	// The DROPPED state is used to signify a replica that was served by a given participant, but is no longer served.
	HelixDefinedStateDropped HelixDefinedState = "DROPPED"

	// The ERROR state is used whenever the participant serving a partition encountered an error and cannot continue to serve the partition.
	HelixDefinedStateError HelixDefinedState = "ERROR"
)

type HelixManager

type HelixManager interface {

	// Connect will connect manager to storage and start housekeeping.
	Connect() error

	// Disconnect will disconnect manager from storage.
	Disconnect()

	// IsConnected checks if the connection is alive.
	// There is no need to invoke Connect again if IsConnected return false.
	IsConnected() bool

	// Cluster returns the cluster name associated with this cluster manager.
	Cluster() string

	// IsLeader checks if this is a controller and a leader of the cluster.
	IsLeader() bool

	// PropertyStore returns the cluster properties store that work with Record.
	// TODO
	PropertyStore()

	// Instance returns the instance name used to connect to the cluster.
	Instance() string

	// InstanceType returns the manager instance type.
	InstanceType() InstanceType

	// SessionID returns the session id associated with the connection to cluster data store.
	SessionID() string

	// AddPreConnectCallback adds a callback that is invoked before a participant joins the cluster.
	AddPreConnectCallback(PreConnectCallback)

	// AddPostConnectCallback adds a callback that is invoked after a participant joins the cluster.
	AddPostConnectCallback(PostConnectCallback)

	// AddExternalViewChangeListener add a listener to external view changes.
	AddExternalViewChangeListener(ExternalViewChangeListener) error

	// AddLiveInstanceChangeListener add a listener to live instance changes.
	AddLiveInstanceChangeListener(LiveInstanceChangeListener) error

	// AddCurrentStateChangeListener add a listener to current state changes of the specified instance.
	AddCurrentStateChangeListener(instance, sessionID string, listener CurrentStateChangeListener) error

	// AddMessageListener adds a listener to the messages of an instance.
	AddMessageListener(instance string, listener MessageListener) error

	// AddControllerMessageListener add a listener to controller messages.
	AddControllerMessageListener(MessageListener) error

	// AddControllerListener add a listener to respond to controller changes.
	// Used in distributed cluster controller. TODO
	AddControllerListener(ControllerChangeListener) error

	// AddIdealStateChangeListener add a listener to the cluster ideal state changes.
	AddIdealStateChangeListener(IdealStateChangeListener) error

	// AddInstanceConfigChangeListener add a listener to instance config changes.
	AddInstanceConfigChangeListener(InstanceConfigChangeListener) error

	// RemoveListener removes the listener.
	// If the same listener was used for multiple changes, all change notifications will be removed.
	RemoveListener(path string, lisenter interface{}) error

	// MessagingService returns ClusterMessagingService which can be used to send cluster wide messages.
	MessagingService() ClusterMessagingService

	// ClusterManagementTool provides admin interface to setup and modify cluster.
	ClusterManagementTool() HelixAdmin

	// StateMachineEngine returns the sme of the participant.
	StateMachineEngine() StateMachineEngine
}

HelixManager is a facade component that connects each system component with the controller.

type HelixTimerTask

type HelixTimerTask interface {

	// Start.
	Start()

	// Stop.
	Stop()
}

HelixTimerTask is an interface for defining a generic task to run periodically.

type IdealStateChangeListener

type IdealStateChangeListener func(idealState []*model.IdealState, ctx *Context)

IdealStateChangeListener is triggered when the ideal state changed.

type InstanceConfigChangeListener

type InstanceConfigChangeListener func(configs []*model.InstanceConfig, ctx *Context)

InstanceConfigChangeListener is triggered when the instance configs are updated.

type InstanceType

type InstanceType string
const (
	InstanceTypeParticipant           InstanceType = "PARTICIPANT"
	InstanceTypeSpectator             InstanceType = "SPECTATOR"
	InstanceTypeControllerStandalone  InstanceType = "CONTROLLER"
	InstanceTypeControllerDistributed InstanceType = "CONTROLLER_PARTICIPANT"
	InstanceTypeAdministrator         InstanceType = "ADMINISTRATOR"
)

func (InstanceType) IsController

func (it InstanceType) IsController() bool

func (InstanceType) IsControllerDistributed

func (it InstanceType) IsControllerDistributed() bool

func (InstanceType) IsControllerStandalone

func (it InstanceType) IsControllerStandalone() bool

func (InstanceType) IsParticipant

func (it InstanceType) IsParticipant() bool

func (InstanceType) IsSpectator

func (it InstanceType) IsSpectator() bool

type LiveInstanceChangeListener

type LiveInstanceChangeListener func(liveInstances []*model.LiveInstance, ctx *Context)

LiveInstanceChangeListener is triggered when live instances of the cluster are updated.

type MessageHandler

type MessageHandler interface {
	HandleMessage(*model.Message) error
}

type MessageListener

type MessageListener func(instance string, messages []*model.Message, ctx *Context)

MessageListener is triggered when the instance received new messages.

type PostConnectCallback

type PostConnectCallback func()

type PreConnectCallback

type PreConnectCallback func()

type StateMachineEngine

type StateMachineEngine interface {

	// RegisterStateModel associates state trasition functions with the participant.
	RegisterStateModel(stateModelDef string, sm *StateModel) error

	// RemoveStateModel disconnects a state transition with the participant.
	RemoveStateModel(stateModelDef string) error

	// StateModel returns a state model by name.
	StateModel(stateModelDef string) (*StateModel, bool)

	// CreateMessageHandler will create a handler to handle the message.
	CreateMessageHandler(message *model.Message, ctx *ChangeNotification) MessageHandler

	// MessageType returns the message type this state machine engine is interested in.
	MessageType() string

	// Reset will reset the state of the engine.
	Reset()
}

Helix participant manager uses StateMachineEngine to register/remove state model transition. The state transition handles state transition messages.

type StateModel

type StateModel struct {
	// contains filtered or unexported fields
}

StateModel is a collection of state transitions and their handlers.

func NewStateModel

func NewStateModel() *StateModel

NewStateModel creates an empty state model.

func (*StateModel) AddTransition

func (sm *StateModel) AddTransition(fromState string, toState string,
	handler func(*model.Message, *Context)) error

AddTransition add a state transition handler to the state model.

func (*StateModel) AddTransitions

func (sm *StateModel) AddTransitions(transitions []Transition) error

AddTransition add a state transition handler to the state model.

func (*StateModel) ExportDiagram

func (sm *StateModel) ExportDiagram(outfile string) error

ExportDiagram exports the state model transitions to a diagram.

func (*StateModel) Handler

func (sm *StateModel) Handler(fromState, toState string) func(*model.Message, *Context)

type Transition

type Transition struct {
	FromState string
	ToState   string
	Handler   func(*model.Message, *Context)
}

Transition associates a handler function with the state transition from the from state to the to state.

Directories

Path Synopsis
apps
Package controller provides implementation of the default Helix controller.
Package controller provides implementation of the default Helix controller.
pipeline
Package pipleline handles registry/pipeline/state definitions.
Package pipleline handles registry/pipeline/state definitions.
rebalancer
Package rebalancer is an abstraction of Helix rebalancer.
Package rebalancer is an abstraction of Helix rebalancer.
Package store provides storage for helix.
Package store provides storage for helix.
raft
Helix storage implemented with Raft protocol.
Helix storage implemented with Raft protocol.
zk

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL