Documentation ¶
Overview ¶
Package controllerhost is a Cherami Controller implemented as a TChannel Thrift Service. MCP is responsible for the following high level load balancing functions in Cherami :
- Creation and Deletion of Extents
- Assignment of In/Store hosts to Extents
- Assignment of Producers to InputHosts
- Assignment of Consumers to OutputHosts
- Cluster re-balancing when hosts join / leave the cluster
Index ¶
- Constants
- Variables
- func GetDrainExtentTimeout() time.Duration
- func SetDrainExtentTimeout(timeout time.Duration)
- type Boolean
- type ConsGroupUpdatedEvent
- type Context
- type ControllerDynamicConfig
- type Dfdd
- type DistancePlacement
- type Event
- func NewConsGroupUpdatedEvent(dstID, consGroupID, extentID, outputHostID string) Event
- func NewExtentCreatedEvent(dstID string, inHostID string, extentID string, storeIDs []string) Event
- func NewExtentDownEvent(sealSeq int64, dstID string, extentID string) Event
- func NewInputHostFailedEvent(hostUUID string) Event
- func NewInputHostNotificationEvent(dstID, inputHostID, extentID string, storeIDs []string, ...) Event
- func NewOutputHostNotificationEvent(dstID, consGroupID, outputHostID, reason, reasonContext string, ...) Event
- func NewRemoteExtentPrimaryStoreDownEvent(storeID string, extentID string) Event
- func NewStartReplicationForRemoteZoneExtent(dstID string, extentID string, storeIDs []string, ...) Event
- func NewStoreExtentStatusOutOfSyncEvent(dstID string, extentID string, storeID string, ...) Event
- func NewStoreHostFailedEvent(hostUUID string) Event
- type EventPipeline
- type ExtentCreatedEvent
- type ExtentDownEvent
- type InputHostFailedEvent
- type InputHostNotificationEvent
- type InputPlacementConfig
- type LockMgr
- type Mcp
- func (mcp *Mcp) CreateConsumerGroup(ctx thrift.Context, createRequest *shared.CreateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
- func (mcp *Mcp) CreateDestination(ctx thrift.Context, createRequest *shared.CreateDestinationRequest) (*shared.DestinationDescription, error)
- func (mcp *Mcp) CreateRemoteZoneConsumerGroupExtent(ctx thrift.Context, createRequest *shared.CreateConsumerGroupExtentRequest) error
- func (mcp *Mcp) CreateRemoteZoneExtent(ctx thrift.Context, createRequest *shared.CreateExtentRequest) (*shared.CreateExtentResult_, error)
- func (mcp *Mcp) DeleteConsumerGroup(ctx thrift.Context, deleteRequest *shared.DeleteConsumerGroupRequest) error
- func (mcp *Mcp) DeleteDestination(ctx thrift.Context, deleteRequest *shared.DeleteDestinationRequest) error
- func (mcp *Mcp) ExtentsUnreachable(ctx thrift.Context, extentsUnreachableRequest *a.ExtentsUnreachableRequest) error
- func (mcp *Mcp) GetCapacities(ctx thrift.Context, getCapacitiesRequest *c.GetCapacitiesRequest) (*c.GetCapacitiesResult_, error)
- func (mcp *Mcp) GetInputHosts(ctx thrift.Context, inReq *c.GetInputHostsRequest) (*c.GetInputHostsResult_, error)
- func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsRequest) (*c.GetOutputHostsResult_, error)
- func (mcp *Mcp) GetQueueDepthInfo(ctx thrift.Context, inReq *c.GetQueueDepthInfoRequest) (*c.GetQueueDepthInfoResult_, error)
- func (mcp *Mcp) RemoveCapacities(ctx thrift.Context, removeCapacitiesRequest *c.RemoveCapacitiesRequest) error
- func (mcp *Mcp) ReportConsumerGroupExtentMetric(ctx thrift.Context, request *c.ReportConsumerGroupExtentMetricRequest) error
- func (mcp *Mcp) ReportConsumerGroupMetric(ctx thrift.Context, request *c.ReportConsumerGroupMetricRequest) error
- func (mcp *Mcp) ReportDestinationExtentMetric(ctx thrift.Context, request *c.ReportDestinationExtentMetricRequest) error
- func (mcp *Mcp) ReportDestinationMetric(ctx thrift.Context, request *c.ReportDestinationMetricRequest) error
- func (mcp *Mcp) ReportNodeMetric(ctx thrift.Context, request *c.ReportNodeMetricRequest) error
- func (mcp *Mcp) ReportStoreExtentMetric(ctx thrift.Context, request *c.ReportStoreExtentMetricRequest) error
- func (mcp *Mcp) Start(thriftService []thrift.TChanServer)
- func (mcp *Mcp) Stop()
- func (mcp *Mcp) UpdateConsumerGroup(ctx thrift.Context, updateRequest *shared.UpdateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
- func (mcp *Mcp) UpdateDestination(ctx thrift.Context, updateRequest *shared.UpdateDestinationRequest) (*shared.DestinationDescription, error)
- func (mcp *Mcp) UpsertInputHostCapacities(ctx thrift.Context, ...) error
- func (mcp *Mcp) UpsertOutputHostCapacities(ctx thrift.Context, ...) error
- func (mcp *Mcp) UpsertStoreCapacities(ctx thrift.Context, upsertCapacitiesRequest *c.UpsertStoreCapacitiesRequest) error
- func (mcp *Mcp) ValidateCgZoneConfig(ctx thrift.Context, logger bark.Logger, ...) (bool, error)
- func (mcp *Mcp) ValidateDestZoneConfig(ctx thrift.Context, logger bark.Logger, ...) (bool, error)
- type MetadataMgr
- type OutputHostNotificationEvent
- type OutputPlacementConfig
- type Placement
- type QueueDepthCache
- type QueueDepthCacheEntry
- type QueueDepthCacheJSONFields
- type RemoteExtentPrimaryStoreDownEvent
- type RetryableEventExecutor
- type StartReplicationForRemoteZoneExtent
- type StoreExtentStatusOutOfSyncEvent
- type StoreHostFailedEvent
- type StorePlacementConfig
Constants ¶
const ( // QueueDepthTabulationString can be added to a destination or CG owner email to request queue depth tabulation // Note that Google allows something like this: gbailey+queueDepthTabulation@uber.com // The above is still a valid email and will be delivered to gbailey@uber.com QueueDepthTabulationString = `queueDepthTabulation` )
Variables ¶
var ( // ErrTooManyUnHealthy is returned when there are too many open but unhealthy extents for a destination ErrTooManyUnHealthy = &shared.InternalServiceError{Message: "Too many open, but unhealthy extents for destination"} // ErrPublishToKafkaDestination is returned on invoking GetInputHosts on a Kafka destination ErrPublishToKafkaDestination = &shared.BadRequestError{Message: "Cannot publish to Kafka destinations"} // ErrPublishToReceiveOnlyDestination is returned on invoking GetInputHosts on a receive-only destination ErrPublishToReceiveOnlyDestination = &shared.BadRequestError{Message: "Cannot publish to receive-only destinations"} )
var ( // ErrMalformedUUID is returned when the UUID in the request is malformed ErrMalformedUUID = &shared.BadRequestError{Message: "Malformed UUID in request"} // ErrNoHealthyExtent is returned where there are no healthy extents in the system ErrNoHealthyExtent = &shared.InternalServiceError{Message: "No healthy extent found for destination"} // ErrEventQueueFull is returned when the internal event queue is full and // as a result, some action cannot be taken by the controller ErrEventQueueFull = &shared.InternalServiceError{Message: "EventQueue full"} // ErrTryLock is a temporary error that is thrown by the API // when it loses the race to do the computation needed to refreh the // API result cache. This should only happen during initial // bootstrap of Controller. ErrTryLock = &shared.InternalServiceError{Message: "Failed to acquire lock, backoff and retry"} // service error, most likely, metadata read/write // errors. ErrUnavailable = &shared.InternalServiceError{Message: "Service unavailable, backoff and retry"} // ErrDestinationNotExists is returned for a missing destination ErrDestinationNotExists = &shared.EntityNotExistsError{Message: "Destination does not exist"} // ErrConsumerGroupNotExists is returned for a missing consumer group ErrConsumerGroupNotExists = &shared.EntityNotExistsError{Message: "ConsumerGroup does not exist"} // ErrDestinationDisabled is returned after a destination is deleted ErrDestinationDisabled = &shared.EntityDisabledError{Message: "Destination is not enabled"} // ErrConsumerGroupDisabled is returned after a consumer group is deleted ErrConsumerGroupDisabled = &shared.EntityDisabledError{Message: "Consumer group is not enabled"} )
var ( // IntervalBtwnScans is the time that the scanner will sleep between scans. It is exported to allow tests to modify it. IntervalBtwnScans = time.Minute )
Functions ¶
func GetDrainExtentTimeout ¶
GetDrainExtentTimeout returns the current drainExtentTimeout
func SetDrainExtentTimeout ¶
SetDrainExtentTimeout overrides the drain extent timeout to the given value. This method is intended only for unit test
Types ¶
type Boolean ¶
type Boolean bool
Boolean is an alias for bool that can be used as a generic value type i.e. interface{}
type ConsGroupUpdatedEvent ¶
type ConsGroupUpdatedEvent struct {
// contains filtered or unexported fields
}
ConsGroupUpdatedEvent is generated when a new extent is available to the consumer group for consumption. Action will be to schedule notification to the concerned output hosts
func (*ConsGroupUpdatedEvent) Handle ¶
func (event *ConsGroupUpdatedEvent) Handle(context *Context) error
Handle schedules output host notifications
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context holds the run-time context for controller
type ControllerDynamicConfig ¶
type ControllerDynamicConfig struct { NumPublisherExtentsByPath []string `name:"numPublisherExtentsByPath" default:"/=4"` NumConsumerExtentsByPath []string `name:"numConsumerExtentsByPath" default:"/=8"` NumRemoteConsumerExtentsByPath []string `name:"numRemoteConsumerExtentsByPath" default:"/=4"` // configs for multi_zone consumer group ActiveZone string `name:"activeZone" default:""` FailoverMode string `name:"failoverMode" default:"disabled"` }
ControllerDynamicConfig contains the config parameters needed for controller
type Dfdd ¶
type Dfdd interface { common.Daemon // ReportHostGoingDown reports a host as going down // for planned deployment or maintenance ReportHostGoingDown(service string, hostID string) // GetHostState returns a tuple representing the current // dfdd host state and the duration for which the host // has been in that state GetHostState(service string, hostID string) (dfddHostState, time.Duration) }
Dfdd Discovery and Failure Detection Daemon is a background task that keeps track of the healthy members for all cherami services. It is also the place where any custom failure detection logic (on top of Ringpop) must go.
func NewDfdd ¶
func NewDfdd(context *Context, timeSource common.TimeSource) Dfdd
NewDfdd creates and returns an instance of discovery and failure detection daemon. Dfdd will monitor the health of Input/Output/Store hosts and trigger Input/Output/StoreHostFailedEvent for every host that failed. It currently does not maintain a list of healthy hosts for every service, thats a WIP.
type DistancePlacement ¶
type DistancePlacement struct {
// contains filtered or unexported fields
}
DistancePlacement holds the context and distance map
func (*DistancePlacement) PickInputHost ¶
PickInputHost picks an input host with certain distance from the store hosts
func (*DistancePlacement) PickOutputHost ¶
PickOutputHost picks an output host with certain distance from the store hosts
func (*DistancePlacement) PickStoreHosts ¶
func (p *DistancePlacement) PickStoreHosts(count int) ([]*common.HostInfo, error)
PickStoreHosts picks n store hosts with certain distance between store replicas
type Event ¶
type Event interface { // Handle is the handler for the event. Handle(context *Context) error // Done is a callback to indicate that the // event processing is done. This is // an opportunity to cleanup any state that // the event had. The err arg is set to the // error value from the last attempt. Nil // indicates the handler finished successfully. Done(context *Context, err error) }
Event represents the API interface that all events must obey. The API is defined to follow the try-catch-finally pattern. When a new event arrives, the Handle() method will be called to handle it. If Handle returns a retryable error, it will retried one or more times. After the event is processed (success or failure), the Done method will be called.
func NewConsGroupUpdatedEvent ¶
NewConsGroupUpdatedEvent creates and returns a ConsGroupUpdatedEvent
func NewExtentCreatedEvent ¶
NewExtentCreatedEvent creates and returns a ExtentCreatedEvent
func NewExtentDownEvent ¶
NewExtentDownEvent creates and returns an ExtentDownEvent
func NewInputHostFailedEvent ¶
NewInputHostFailedEvent creates and returns a InputHostFailedEvent
func NewInputHostNotificationEvent ¶
func NewInputHostNotificationEvent(dstID, inputHostID, extentID string, storeIDs []string, reason, reasonContext string, notificationType admin.NotificationType) Event
NewInputHostNotificationEvent creates and returns a InputHostNotificationEvent
func NewOutputHostNotificationEvent ¶
func NewOutputHostNotificationEvent(dstID, consGroupID, outputHostID, reason, reasonContext string, notificationType admin.NotificationType) Event
NewOutputHostNotificationEvent creates and returns a OutputHostNotificationEvent
func NewRemoteExtentPrimaryStoreDownEvent ¶
NewRemoteExtentPrimaryStoreDownEvent creates and returns an RemoteExtentPrimaryStoreDownEvent
func NewStartReplicationForRemoteZoneExtent ¶
func NewStartReplicationForRemoteZoneExtent(dstID string, extentID string, storeIDs []string, remoteExtentPrimaryStore string) Event
NewStartReplicationForRemoteZoneExtent creates and returns a StartReplicationForRemoteZoneExtent
func NewStoreExtentStatusOutOfSyncEvent ¶
func NewStoreExtentStatusOutOfSyncEvent(dstID string, extentID string, storeID string, desiredStatus shared.ExtentStatus) Event
NewStoreExtentStatusOutOfSyncEvent creates and returns a NewStoreExtentStatusOutOfSyncEvent
func NewStoreHostFailedEvent ¶
NewStoreHostFailedEvent creates and returns a StoreHostFailedEvent
type EventPipeline ¶
type EventPipeline interface { common.Daemon // Add adds an event to the event pipeline // returns true on success // returns false when the event queue is full Add(event Event) bool // GetRetryableEventExecutor returns the executor for // running retryable events GetRetryableEventExecutor() RetryableEventExecutor }
EventPipeline represents the execution pipeline that handles events in the background
func NewEventPipeline ¶
func NewEventPipeline(context *Context, nWorkers int) EventPipeline
NewEventPipeline creates a new instance of EventPipeline The returned pipeline will have nWorkers number of concurrent worker routines that handle events in the background
type ExtentCreatedEvent ¶
type ExtentCreatedEvent struct {
// contains filtered or unexported fields
}
ExtentCreatedEvent is generated by the creation of a new extent. The action will be to schedul notifications to concerned input hosts.
func (*ExtentCreatedEvent) Handle ¶
func (event *ExtentCreatedEvent) Handle(context *Context) error
Handle handles the creation of a new extent. Following are the async actions to be triggered on creation of an extent:
a. For every input host that serves a open extent for the DST 1. Addd a InputHostNotificationEvent to reconfigure its clients b. For the input host that serves the newly created extent 1. Add a InputHostNotificationEvent to reconfigure ALL
type ExtentDownEvent ¶
type ExtentDownEvent struct {
// contains filtered or unexported fields
}
ExtentDownEvent is triggered whenever an extent becomes unreachable and needs to be Sealed
func (*ExtentDownEvent) Done ¶
func (event *ExtentDownEvent) Done(context *Context, err error)
Done does cleanup for ExtentDownEvent
func (*ExtentDownEvent) Handle ¶
func (event *ExtentDownEvent) Handle(context *Context) error
Handle seals an extent and updates metadata
type InputHostFailedEvent ¶
type InputHostFailedEvent struct {
// contains filtered or unexported fields
}
InputHostFailedEvent is triggered when an input host fails
func (*InputHostFailedEvent) Handle ¶
func (event *InputHostFailedEvent) Handle(context *Context) error
Handle handles an InputHostFailedEvent. All it does is to list all OPEN extents for the input host and enqueue an ExtentDownEvent for each one of them.
type InputHostNotificationEvent ¶
type InputHostNotificationEvent struct {
// contains filtered or unexported fields
}
InputHostNotificationEvent is generated to notify input hosts about a new extent
func (*InputHostNotificationEvent) Handle ¶
func (event *InputHostNotificationEvent) Handle(context *Context) error
Handle sends notification to an input host
type InputPlacementConfig ¶
type InputPlacementConfig struct {
AdminStatus string `name:"adminStatus" default:"enabled"`
}
InputPlacementConfig contains the config parameters needed for inputhost placement
type LockMgr ¶
type LockMgr interface { // TryLock attempts to acquire the lock // identified by the given key. If it // cannot acquire the lock within the // timeout, returns false. On success, // returns true TryLock(key string, timeout time.Duration) bool // Unlock releases the lock identified // by the key Unlock(key string) // Size returns the number of // keys currently held by the // lockmgr Size() int64 }
LockMgr represents the API for any implementation that manages a set of locks, where each lock is identified by a string key
func NewLockMgr ¶
NewLockMgr creates and returns a new instance of LockMgr. The returned lockMgr uses a hashtable of hash tables for maintaining the mapping from (key -> lock). The first level table is fixed size and each entry in this table contains a lock, which needs to be acquired to get access to the second level hashtable. The second level hashtable maps the key to lock and grows dynamically.
@param concurrency
Number of entries in the first level table. This directly translates into the number of concurrent threads that can be using the lockMgr at any given point of time. Ideally, this number MUST be the number of cores.
@param hashfn
The hash function to index onto the first level table
@param logger
Logger to use
type Mcp ¶
Mcp implements the ExtentController interface
func NewController ¶
func NewController(cfg configure.CommonAppConfig, sVice *common.Service, metadataClient m.TChanMetadataService, zoneFailoverManager common.ZoneFailoverManager) (*Mcp, []thrift.TChanServer)
NewController creates and returns a new instance of Mcp controller
func (*Mcp) CreateConsumerGroup ¶
func (mcp *Mcp) CreateConsumerGroup(ctx thrift.Context, createRequest *shared.CreateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
CreateConsumerGroup creates local and remote consumer group
func (*Mcp) CreateDestination ¶
func (mcp *Mcp) CreateDestination(ctx thrift.Context, createRequest *shared.CreateDestinationRequest) (*shared.DestinationDescription, error)
CreateDestination creates local and remote destination
func (*Mcp) CreateRemoteZoneConsumerGroupExtent ¶
func (mcp *Mcp) CreateRemoteZoneConsumerGroupExtent(ctx thrift.Context, createRequest *shared.CreateConsumerGroupExtentRequest) error
CreateRemoteZoneConsumerGroupExtent creates an cg extent that originates from another zone
func (*Mcp) CreateRemoteZoneExtent ¶
func (mcp *Mcp) CreateRemoteZoneExtent(ctx thrift.Context, createRequest *shared.CreateExtentRequest) (*shared.CreateExtentResult_, error)
CreateRemoteZoneExtent creates an extent that originates from another zone
func (*Mcp) DeleteConsumerGroup ¶
func (mcp *Mcp) DeleteConsumerGroup(ctx thrift.Context, deleteRequest *shared.DeleteConsumerGroupRequest) error
DeleteConsumerGroup deletes local and remote consumer group
func (*Mcp) DeleteDestination ¶
func (mcp *Mcp) DeleteDestination(ctx thrift.Context, deleteRequest *shared.DeleteDestinationRequest) error
DeleteDestination deletes local and remote destination
func (*Mcp) ExtentsUnreachable ¶
func (mcp *Mcp) ExtentsUnreachable(ctx thrift.Context, extentsUnreachableRequest *a.ExtentsUnreachableRequest) error
ExtentsUnreachable is a way for other services to notify the Controller about an unreachable extent. When this notification is received, controller will enqueue an ExtentDownEvent to seal the extent asynchronously. A successful return code from this API does not guarantee that the extent is sealed, it acknowledges that the notification was successfully processed.
func (*Mcp) GetCapacities ¶
func (mcp *Mcp) GetCapacities(ctx thrift.Context, getCapacitiesRequest *c.GetCapacitiesRequest) (*c.GetCapacitiesResult_, error)
GetCapacities todo
func (*Mcp) GetInputHosts ¶
func (mcp *Mcp) GetInputHosts(ctx thrift.Context, inReq *c.GetInputHostsRequest) (*c.GetInputHostsResult_, error)
GetInputHosts finds and returns the in hosts serving the given destination The algorithm is as follows:
- Lookup result cache for precomputed result If a. Cache entry is expired or b. Any of the cached hosts has failed or c. nHealthy extents is below threshold then Then recompute the result and update cache Else return result from cache
Recompute Logic: 1. List all OPEN extents for this destination 2. If the number of open but healthy extents < threshold, try creating a new extent 3. Return all inputhosts serving the healthy open extents 4. Record result in the result cache
func (*Mcp) GetOutputHosts ¶
func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsRequest) (*c.GetOutputHostsResult_, error)
GetOutputHosts finds and returns the out hosts serving the given destination Algorithm is as follows:
- Lookup result cache for precomputed result If a. Cache entry is expired or b. Any of the cached hosts has failed or c. nHealthy extents is below threshold then Then recompute the result and update cache Else return result from cache
recompute logic: 1. List all the consumable extents for this consumer group (from the consumer_group_extents table) 2. Attempt to add new unadded open extents (from destination_extents) to consumer_group 3. If there unhealthy extents (due to out host failure), repair upto a threshold 4. Record result in resultsCache and return all consumable output hosts
func (*Mcp) GetQueueDepthInfo ¶
func (mcp *Mcp) GetQueueDepthInfo(ctx thrift.Context, inReq *c.GetQueueDepthInfoRequest) (*c.GetQueueDepthInfoResult_, error)
GetQueueDepthInfo to return queue depth backlog infor for consumer group
func (*Mcp) RemoveCapacities ¶
func (mcp *Mcp) RemoveCapacities(ctx thrift.Context, removeCapacitiesRequest *c.RemoveCapacitiesRequest) error
RemoveCapacities todo
func (*Mcp) ReportConsumerGroupExtentMetric ¶
func (mcp *Mcp) ReportConsumerGroupExtentMetric(ctx thrift.Context, request *c.ReportConsumerGroupExtentMetricRequest) error
ReportConsumerGroupExtentMetric records the incoming set of extent metrics from an out host
func (*Mcp) ReportConsumerGroupMetric ¶
func (mcp *Mcp) ReportConsumerGroupMetric(ctx thrift.Context, request *c.ReportConsumerGroupMetricRequest) error
ReportConsumerGroupMetric records the incoming set of extent metrics from an out host
func (*Mcp) ReportDestinationExtentMetric ¶
func (mcp *Mcp) ReportDestinationExtentMetric(ctx thrift.Context, request *c.ReportDestinationExtentMetricRequest) error
ReportDestinationExtentMetric records the incoming set of extent metrics from inputhost
func (*Mcp) ReportDestinationMetric ¶
func (mcp *Mcp) ReportDestinationMetric(ctx thrift.Context, request *c.ReportDestinationMetricRequest) error
ReportDestinationMetric records the incoming set of extent metrics from inputhost
func (*Mcp) ReportNodeMetric ¶
ReportNodeMetric records the incoming set of metrics from a chearmi node
func (*Mcp) ReportStoreExtentMetric ¶
func (mcp *Mcp) ReportStoreExtentMetric(ctx thrift.Context, request *c.ReportStoreExtentMetricRequest) error
ReportStoreExtentMetric records the incoking set of extent metrics from a store host
func (*Mcp) Start ¶
func (mcp *Mcp) Start(thriftService []thrift.TChanServer)
Start starts the controller service
func (*Mcp) UpdateConsumerGroup ¶
func (mcp *Mcp) UpdateConsumerGroup(ctx thrift.Context, updateRequest *shared.UpdateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)
UpdateConsumerGroup updates local and remote consumer group
func (*Mcp) UpdateDestination ¶
func (mcp *Mcp) UpdateDestination(ctx thrift.Context, updateRequest *shared.UpdateDestinationRequest) (*shared.DestinationDescription, error)
UpdateDestination updates local and remote destination
func (*Mcp) UpsertInputHostCapacities ¶
func (mcp *Mcp) UpsertInputHostCapacities(ctx thrift.Context, upsertCapacitiesRequest *c.UpsertInputHostCapacitiesRequest) error
UpsertInputHostCapacities todo
func (*Mcp) UpsertOutputHostCapacities ¶
func (mcp *Mcp) UpsertOutputHostCapacities(ctx thrift.Context, upsertCapacitiesRequest *c.UpsertOutputHostCapacitiesRequest) error
UpsertOutputHostCapacities todo
func (*Mcp) UpsertStoreCapacities ¶
func (mcp *Mcp) UpsertStoreCapacities(ctx thrift.Context, upsertCapacitiesRequest *c.UpsertStoreCapacitiesRequest) error
UpsertStoreCapacities todo
type MetadataMgr ¶
type MetadataMgr interface { // ReadDestination returns the destination desc for the given uuid ReadDestination(dstID string, dstPath string) (*shared.DestinationDescription, error) // ReadExtentStats returns the extent stats for the given extent; it is not scalable, and therefore is to be used for debugging only ReadExtentStats(dstID string, extentID string) (*shared.ExtentStats, error) // ReadStoreExtentStats returns the extent stats for the given extent and store ReadStoreExtentStats(extentID string, storeID string) (*shared.ExtentStats, error) // ReadConsumerGroupExtent returns the consumer group extent stats corresponding to the given dst/cg/ext. ReadConsumerGroupExtent(dstID string, cgID string, extentID string) (*shared.ConsumerGroupExtent, error) // ListDestinations returns an list of adestinations ListDestinations() ([]*shared.DestinationDescription, error) // ListDestinationsPage returns an list of adestinations ListDestinationsPage(mReq *shared.ListDestinationsRequest) (*shared.ListDestinationsResult_, error) // ListDestinationsByUUID returns an iterator to the destinations ListDestinationsByUUID() ([]*shared.DestinationDescription, error) // ListDestinationExtentsByStatus lists extents by dstID/status // The returned type is a list of DestinationExtent objects as // opposed to list of Extent objects ListDestinationExtentsByStatus(dstID string, filterByStatus []shared.ExtentStatus) ([]*m.DestinationExtent, error) // ListExtentsByDstIDStatus lists extents dstID/Status ListExtentsByDstIDStatus(dstID string, filterByStatus []shared.ExtentStatus) ([]*shared.ExtentStats, error) // ListExtentsByInputIDStatus lists extents for dstID/InputUUID/Status ListExtentsByInputIDStatus(inputID string, status *shared.ExtentStatus) ([]*shared.ExtentStats, error) // ListExtentsByStoreIDStatus lists extents by storeID/Status ListExtentsByStoreIDStatus(storeID string, status *shared.ExtentStatus) ([]*shared.ExtentStats, error) // ListExtentsByReplicationStatus lists extents by storeID/ReplicationStatus ListExtentsByReplicationStatus(storeID string, status *shared.ExtentReplicaReplicationStatus) ([]*shared.ExtentStats, error) // ListExtentsByConsumerGroup lists all extents for the given destination / consumer group ListExtentsByConsumerGroup(dstID string, cgID string, filterByStatus []shared.ConsumerGroupExtentStatus) ([]*shared.ConsumerGroupExtent, error) // ListExtentsByConsumerGroupLite lists all extents for the given destination / consumer group // this api only returns a few interesting columns for each consumer group extent in the // result. For detailed info, see ListExtentsByConsumerGroup ListExtentsByConsumerGroupLite(dstID string, cgID string, filterByStatus []shared.ConsumerGroupExtentStatus) ([]*m.ConsumerGroupExtentLite, error) // CreateExtent creates a new extent for the given destination and marks the status as OPEN CreateExtent(dstID string, extentID string, inhostID string, storeIDs []string) (*shared.CreateExtentResult_, error) // CreateRemoteZoneExtent creates a new remote zone extent for the given destination and marks the status as OPEN CreateRemoteZoneExtent(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string, consumerGroupVisibility string) (*shared.CreateExtentResult_, error) // AddExtentToConsumerGroup adds an open extent to consumer group for consumption AddExtentToConsumerGroup(dstID string, cgID string, extentID string, outHostID string, storeIDs []string) error // ListConsumerGroupsByDstID lists all consumer groups for a given destination uuid ListConsumerGroupsByDstID(dstID string) ([]*shared.ConsumerGroupDescription, error) // ListConsumerGroupsPage lists all consumer groups for a given destination uuid ListConsumerGroupsPage(mReq *shared.ListConsumerGroupRequest) (*shared.ListConsumerGroupResult_, error) // UpdateOutHost changes the out host for the consumer group extent UpdateOutHost(dstID string, cgID string, extentID string, outHostID string) error // DeleteConsumerGroup deletes the consumer group status DeleteConsumerGroup(dstID string, cgName string) error // SealExtent seals the extent if it was previously sealed SealExtent(dstID string, extentID string) error // UpdateDestinationDLQCursors updates the DLQCursor on a destination UpdateDestinationDLQCursors(dstID string, mergeBefore common.UnixNanoTime, purgeBefore common.UnixNanoTime) error // MoveExtent moves an extent MoveExtent(fromDstID, toDstID, extentID, cgUUID string) error // ReadConsumerGroup reads a consumer group ReadConsumerGroup(dstID, dstPath, cgUUID, cgName string) (*shared.ConsumerGroupDescription, error) // ReadConsumerGroupByUUID reads a consumer group by UUID ReadConsumerGroupByUUID(cgUUID string) (*shared.ConsumerGroupDescription, error) // UpdateExtentStatus updates the status of an extent UpdateExtentStatus(dstID, extID string, status shared.ExtentStatus) error // UpdateRemoteExtentPrimaryStore updates remoteExtentPrimaryStore UpdateRemoteExtentPrimaryStore(dstID string, extentID string, remoteExtentPrimaryStore string) (*m.UpdateExtentStatsResult_, error) // UpdateConsumerGroupExtentStatus updates the status of a consumer group extent UpdateConsumerGroupExtentStatus(cgID, extID string, status shared.ConsumerGroupExtentStatus) error // DeleteDestination marks a destination to be deleted DeleteDestination(dstID string) error }
MetadataMgr manages extents. It exposes easy to use CRUD API on top of the cassandra metadata client API
func NewMetadataMgr ¶
func NewMetadataMgr(mClient m.TChanMetadataService, m3Client metrics.Client, logger bark.Logger) MetadataMgr
NewMetadataMgr creates and returns a new instance of MetadataMgr
type OutputHostNotificationEvent ¶
type OutputHostNotificationEvent struct {
// contains filtered or unexported fields
}
OutputHostNotificationEvent is generated to notify output hosts about a new extent
func (*OutputHostNotificationEvent) Handle ¶
func (event *OutputHostNotificationEvent) Handle(context *Context) error
Handle sends notification to an output host
type OutputPlacementConfig ¶
type OutputPlacementConfig struct {
AdminStatus string `name:"adminStatus" default:"enabled"`
}
OutputPlacementConfig contains the config parameters needed for outputhost placement
type Placement ¶
type Placement interface { // PickInputHost picks an input host with certain distance from the store hosts PickInputHost(storeHosts []*common.HostInfo) (*common.HostInfo, error) // PickOutputHost picks an output host with certain distance from the store hosts PickOutputHost(storeHosts []*common.HostInfo) (*common.HostInfo, error) // PickStoreHosts picks n store hosts with certain distance between store replicas PickStoreHosts(count int) ([]*common.HostInfo, error) }
Placement is the placement strategy interface for picking hosts
func NewDistancePlacement ¶
NewDistancePlacement initializes a new placement topology
type QueueDepthCache ¶
QueueDepthCache caches the queueDepth results for every consumer-group based on the previous run on the queueDepth calculator.
type QueueDepthCacheEntry ¶
type QueueDepthCacheEntry struct { // Time is the cache entry time Time common.UnixNanoTime // BacklogAvailable is the available backlog BacklogAvailable int64 BacklogUnavailable int64 // BacklogInflight is the in flight message count BacklogInflight int64 // BacklogDLQ is the number of messages in DLQ BacklogDLQ int64 }
QueueDepthCacheEntry is a cache structure for testing queue depth
type QueueDepthCacheJSONFields ¶
type QueueDepthCacheJSONFields struct { CacheTime common.UnixNanoTime `json:"cache_time,omitempty"` BacklogAvailable int64 `json:"backlog_available"` BacklogInflight int64 `json:"backlog_inflight"` BacklogDLQ int64 `json:"backlog_dlq"` }
QueueDepthCacheJSONFields is the json fields for QueueDepthCacheEntry
type RemoteExtentPrimaryStoreDownEvent ¶
type RemoteExtentPrimaryStoreDownEvent struct {
// contains filtered or unexported fields
}
RemoteExtentPrimaryStoreDownEvent is triggered for an extent when the primary store host that's responsible for replicating that extent from origin zone is down
func (*RemoteExtentPrimaryStoreDownEvent) Handle ¶
func (event *RemoteExtentPrimaryStoreDownEvent) Handle(context *Context) error
Handle handles a RemoteExtentPrimaryStoreDownEvent. This event is triggered when a store host that acts as the primary replicator for an extent is down for a period greater than a typical deployment interval (~5 mins max). The action taken is to assign a new primary replicator for the affected extent
type RetryableEventExecutor ¶
type RetryableEventExecutor interface { // Submit takes a retryable event for // executing in the background. Returns // true on success, false otherwise Submit(event Event) bool // Stop stops the executor Stop() }
RetryableEventExecutor is an executor for executing retryable event handlers. The executor has in-bult retry/backoff logic for retryable failures
func NewRetryableEventExecutor ¶
func NewRetryableEventExecutor(context *Context, maxWorkers int) RetryableEventExecutor
NewRetryableEventExecutor returns a new instance of RetryableEventExecutor This executor spins up a separate go routine for processing every event that's submitted. maxWorkers is an upper bound on the number of events that can be processed simulatenously. There are a few reasons for having a separate executor for retries as opposed to using the event pipeline:
- Retries can span upto an hour
- Retry workers are mostly idle, sleeping, we can have lots of them in the background without consuming any resources
- We want the ability to spin up go routines when there is lots retries needed, but in steady state, we dont need these many go routines
- Retries should not impact or delay the processing of other events
type StartReplicationForRemoteZoneExtent ¶
type StartReplicationForRemoteZoneExtent struct {
// contains filtered or unexported fields
}
StartReplicationForRemoteZoneExtent is triggered when a remote zone extent is created or the primary store is switched
func (*StartReplicationForRemoteZoneExtent) Handle ¶
func (event *StartReplicationForRemoteZoneExtent) Handle(context *Context) error
Handle handles a StartReplicationForRemoteZoneExtent. This handler calls store to start replication. The primary store will be issued with a remote replication request The rest of stores will be issued with a re-replication request This is the fast path to notify store to start or resume a replication. If the notificaiton is lost, the slow path (a periodic job in store) will kick in to start replication
type StoreExtentStatusOutOfSyncEvent ¶
type StoreExtentStatusOutOfSyncEvent struct {
// contains filtered or unexported fields
}
StoreExtentStatusOutOfSyncEvent is triggered when one of the extent replicas (store) is out of sync with others i.e. the extent is SEALED but one of the stores still reports it as OPEN
func (*StoreExtentStatusOutOfSyncEvent) Handle ¶
func (event *StoreExtentStatusOutOfSyncEvent) Handle(context *Context) error
Handle handles an StoreExtentStatusOutOfSyncEvent. This handler reissues SealExtent call to an out of sync store host without updating metadata state This handler assumes that the extent was previously sealed successfully in atleast one store.
type StoreHostFailedEvent ¶
type StoreHostFailedEvent struct {
// contains filtered or unexported fields
}
StoreHostFailedEvent is triggered when a store host fails
func (*StoreHostFailedEvent) Handle ¶
func (event *StoreHostFailedEvent) Handle(context *Context) error
Handle handles a StoreHostFailedEvent. This handler will list all OPEN extents for the store and enqueue an ExtentDownEvent for each one of them.
type StorePlacementConfig ¶
type StorePlacementConfig struct { AdminStatus string `name:"adminStatus" default:"enabled"` // MinFreeDiskSpaceBytes represents the min // required free disk space on a store host // to host an extent. The default value // translates to 40GB which is 2 percent // for a 2TB drive MinFreeDiskSpaceBytes int64 `name:"minFreeDiskSpaceBytes" default:"40000000000"` }
StorePlacementConfig contains the config parameters needed for store placement