controllerhost

package
v0.0.0-...-f5d5260 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2019 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package controllerhost is a Cherami Controller implemented as a TChannel Thrift Service. MCP is responsible for the following high level load balancing functions in Cherami :

  • Creation and Deletion of Extents
  • Assignment of In/Store hosts to Extents
  • Assignment of Producers to InputHosts
  • Assignment of Consumers to OutputHosts
  • Cluster re-balancing when hosts join / leave the cluster

Index

Constants

View Source
const (

	// QueueDepthTabulationString can be added to a destination or CG owner email to request queue depth tabulation
	// Note that Google allows something like this: gbailey+queueDepthTabulation@uber.com
	// The above is still a valid email and will be delivered to gbailey@uber.com
	QueueDepthTabulationString = `queueDepthTabulation`
)

Variables

View Source
var (
	// ErrTooManyUnHealthy is returned when there are too many open but unhealthy extents for a destination
	ErrTooManyUnHealthy = &shared.InternalServiceError{Message: "Too many open, but unhealthy extents for destination"}
	// ErrPublishToKafkaDestination is returned on invoking GetInputHosts on a Kafka destination
	ErrPublishToKafkaDestination = &shared.BadRequestError{Message: "Cannot publish to Kafka destinations"}
	// ErrPublishToReceiveOnlyDestination is returned on invoking GetInputHosts on a receive-only destination
	ErrPublishToReceiveOnlyDestination = &shared.BadRequestError{Message: "Cannot publish to receive-only destinations"}
)
View Source
var (
	// ErrMalformedUUID is returned when the UUID in the request is malformed
	ErrMalformedUUID = &shared.BadRequestError{Message: "Malformed UUID in request"}
	// ErrNoHealthyExtent is returned where there are no healthy extents in the system
	ErrNoHealthyExtent = &shared.InternalServiceError{Message: "No healthy extent found for destination"}
	// ErrEventQueueFull is returned when the internal event queue is full and
	// as a result, some action cannot be taken by the controller
	ErrEventQueueFull = &shared.InternalServiceError{Message: "EventQueue full"}
	// ErrTryLock is a temporary error that is thrown by the API
	// when it loses the race to do the computation needed to refreh the
	// API result cache. This should only happen during initial
	// bootstrap of Controller.
	ErrTryLock = &shared.InternalServiceError{Message: "Failed to acquire lock, backoff and retry"}
	// ErrUnavailable indicates any kind of intermittent
	// service error, most likely, metadata read/write
	// errors.
	ErrUnavailable = &shared.InternalServiceError{Message: "Service unavailable, backoff and retry"}

	// ErrDestinationNotExists is returned for a missing destination
	ErrDestinationNotExists = &shared.EntityNotExistsError{Message: "Destination does not exist"}
	// ErrConsumerGroupNotExists is returned for a missing consumer group
	ErrConsumerGroupNotExists = &shared.EntityNotExistsError{Message: "ConsumerGroup does not exist"}
	// ErrDestinationDisabled is returned after a destination is deleted
	ErrDestinationDisabled = &shared.EntityDisabledError{Message: "Destination is not enabled"}
	// ErrConsumerGroupDisabled is returned after a consumer group is deleted
	ErrConsumerGroupDisabled = &shared.EntityDisabledError{Message: "Consumer group is not enabled"}
)
View Source
var (

	// IntervalBtwnScans is the time that the scanner will sleep between scans. It is exported to allow tests to modify it.
	IntervalBtwnScans = time.Minute
)

Functions

func GetDrainExtentTimeout

func GetDrainExtentTimeout() time.Duration

GetDrainExtentTimeout returns the current drainExtentTimeout

func SetDrainExtentTimeout

func SetDrainExtentTimeout(timeout time.Duration)

SetDrainExtentTimeout overrides the drain extent timeout to the given value. This method is intended only for unit test

Types

type Boolean

type Boolean bool

Boolean is an alias for bool that can be used as a generic value type i.e. interface{}

type ConsGroupUpdatedEvent

type ConsGroupUpdatedEvent struct {
	// contains filtered or unexported fields
}

ConsGroupUpdatedEvent is generated when a new extent is available to the consumer group for consumption. Action will be to schedule notification to the concerned output hosts

func (*ConsGroupUpdatedEvent) Done

func (event *ConsGroupUpdatedEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*ConsGroupUpdatedEvent) Handle

func (event *ConsGroupUpdatedEvent) Handle(context *Context) error

Handle schedules output host notifications

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context holds the run-time context for controller

type ControllerDynamicConfig

type ControllerDynamicConfig struct {
	NumPublisherExtentsByPath      []string `name:"numPublisherExtentsByPath" default:"/=4"`
	NumConsumerExtentsByPath       []string `name:"numConsumerExtentsByPath" default:"/=8"`
	NumRemoteConsumerExtentsByPath []string `name:"numRemoteConsumerExtentsByPath" default:"/=4"`

	// configs for multi_zone consumer group
	ActiveZone   string `name:"activeZone" default:""`
	FailoverMode string `name:"failoverMode" default:"disabled"`
}

ControllerDynamicConfig contains the config parameters needed for controller

type Dfdd

type Dfdd interface {
	common.Daemon
	// ReportHostGoingDown reports a host as going down
	// for planned deployment or maintenance
	ReportHostGoingDown(service string, hostID string)
	// GetHostState returns a tuple representing the current
	// dfdd host state and the duration for which the host
	// has been in that state
	GetHostState(service string, hostID string) (dfddHostState, time.Duration)
}

Dfdd Discovery and Failure Detection Daemon is a background task that keeps track of the healthy members for all cherami services. It is also the place where any custom failure detection logic (on top of Ringpop) must go.

func NewDfdd

func NewDfdd(context *Context, timeSource common.TimeSource) Dfdd

NewDfdd creates and returns an instance of discovery and failure detection daemon. Dfdd will monitor the health of Input/Output/Store hosts and trigger Input/Output/StoreHostFailedEvent for every host that failed. It currently does not maintain a list of healthy hosts for every service, thats a WIP.

type DistancePlacement

type DistancePlacement struct {
	// contains filtered or unexported fields
}

DistancePlacement holds the context and distance map

func (*DistancePlacement) PickInputHost

func (p *DistancePlacement) PickInputHost(storeHosts []*common.HostInfo) (*common.HostInfo, error)

PickInputHost picks an input host with certain distance from the store hosts

func (*DistancePlacement) PickOutputHost

func (p *DistancePlacement) PickOutputHost(storeHosts []*common.HostInfo) (*common.HostInfo, error)

PickOutputHost picks an output host with certain distance from the store hosts

func (*DistancePlacement) PickStoreHosts

func (p *DistancePlacement) PickStoreHosts(count int) ([]*common.HostInfo, error)

PickStoreHosts picks n store hosts with certain distance between store replicas

type Event

type Event interface {
	// Handle is the handler for the event.
	Handle(context *Context) error
	// Done is a callback to indicate that the
	// event processing is done. This is
	// an opportunity to cleanup any state that
	// the event had. The err arg is set to the
	// error value from the last attempt. Nil
	// indicates the handler finished successfully.
	Done(context *Context, err error)
}

Event represents the API interface that all events must obey. The API is defined to follow the try-catch-finally pattern. When a new event arrives, the Handle() method will be called to handle it. If Handle returns a retryable error, it will retried one or more times. After the event is processed (success or failure), the Done method will be called.

func NewConsGroupUpdatedEvent

func NewConsGroupUpdatedEvent(dstID, consGroupID, extentID, outputHostID string) Event

NewConsGroupUpdatedEvent creates and returns a ConsGroupUpdatedEvent

func NewExtentCreatedEvent

func NewExtentCreatedEvent(dstID string, inHostID string, extentID string, storeIDs []string) Event

NewExtentCreatedEvent creates and returns a ExtentCreatedEvent

func NewExtentDownEvent

func NewExtentDownEvent(sealSeq int64, dstID string, extentID string) Event

NewExtentDownEvent creates and returns an ExtentDownEvent

func NewInputHostFailedEvent

func NewInputHostFailedEvent(hostUUID string) Event

NewInputHostFailedEvent creates and returns a InputHostFailedEvent

func NewInputHostNotificationEvent

func NewInputHostNotificationEvent(dstID, inputHostID, extentID string, storeIDs []string, reason, reasonContext string, notificationType admin.NotificationType) Event

NewInputHostNotificationEvent creates and returns a InputHostNotificationEvent

func NewOutputHostNotificationEvent

func NewOutputHostNotificationEvent(dstID, consGroupID, outputHostID, reason, reasonContext string, notificationType admin.NotificationType) Event

NewOutputHostNotificationEvent creates and returns a OutputHostNotificationEvent

func NewRemoteExtentPrimaryStoreDownEvent

func NewRemoteExtentPrimaryStoreDownEvent(storeID string, extentID string) Event

NewRemoteExtentPrimaryStoreDownEvent creates and returns an RemoteExtentPrimaryStoreDownEvent

func NewStartReplicationForRemoteZoneExtent

func NewStartReplicationForRemoteZoneExtent(dstID string, extentID string, storeIDs []string, remoteExtentPrimaryStore string) Event

NewStartReplicationForRemoteZoneExtent creates and returns a StartReplicationForRemoteZoneExtent

func NewStoreExtentStatusOutOfSyncEvent

func NewStoreExtentStatusOutOfSyncEvent(dstID string, extentID string, storeID string, desiredStatus shared.ExtentStatus) Event

NewStoreExtentStatusOutOfSyncEvent creates and returns a NewStoreExtentStatusOutOfSyncEvent

func NewStoreHostFailedEvent

func NewStoreHostFailedEvent(hostUUID string) Event

NewStoreHostFailedEvent creates and returns a StoreHostFailedEvent

type EventPipeline

type EventPipeline interface {
	common.Daemon
	// Add adds an event to the event pipeline
	// returns true on success
	// returns false when the event queue is full
	Add(event Event) bool
	// GetRetryableEventExecutor returns the executor for
	// running retryable events
	GetRetryableEventExecutor() RetryableEventExecutor
}

EventPipeline represents the execution pipeline that handles events in the background

func NewEventPipeline

func NewEventPipeline(context *Context, nWorkers int) EventPipeline

NewEventPipeline creates a new instance of EventPipeline The returned pipeline will have nWorkers number of concurrent worker routines that handle events in the background

type ExtentCreatedEvent

type ExtentCreatedEvent struct {
	// contains filtered or unexported fields
}

ExtentCreatedEvent is generated by the creation of a new extent. The action will be to schedul notifications to concerned input hosts.

func (*ExtentCreatedEvent) Done

func (event *ExtentCreatedEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*ExtentCreatedEvent) Handle

func (event *ExtentCreatedEvent) Handle(context *Context) error

Handle handles the creation of a new extent. Following are the async actions to be triggered on creation of an extent:

   a. For every input host that serves a open extent for the DST
			1. Addd a InputHostNotificationEvent to reconfigure its clients
	  b. For the input host that serves the newly created extent
			1. Add a InputHostNotificationEvent to reconfigure ALL

type ExtentDownEvent

type ExtentDownEvent struct {
	// contains filtered or unexported fields
}

ExtentDownEvent is triggered whenever an extent becomes unreachable and needs to be Sealed

func (*ExtentDownEvent) Done

func (event *ExtentDownEvent) Done(context *Context, err error)

Done does cleanup for ExtentDownEvent

func (*ExtentDownEvent) Handle

func (event *ExtentDownEvent) Handle(context *Context) error

Handle seals an extent and updates metadata

type InputHostFailedEvent

type InputHostFailedEvent struct {
	// contains filtered or unexported fields
}

InputHostFailedEvent is triggered when an input host fails

func (*InputHostFailedEvent) Done

func (event *InputHostFailedEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*InputHostFailedEvent) Handle

func (event *InputHostFailedEvent) Handle(context *Context) error

Handle handles an InputHostFailedEvent. All it does is to list all OPEN extents for the input host and enqueue an ExtentDownEvent for each one of them.

type InputHostNotificationEvent

type InputHostNotificationEvent struct {
	// contains filtered or unexported fields
}

InputHostNotificationEvent is generated to notify input hosts about a new extent

func (*InputHostNotificationEvent) Done

func (event *InputHostNotificationEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*InputHostNotificationEvent) Handle

func (event *InputHostNotificationEvent) Handle(context *Context) error

Handle sends notification to an input host

type InputPlacementConfig

type InputPlacementConfig struct {
	AdminStatus string `name:"adminStatus" default:"enabled"`
}

InputPlacementConfig contains the config parameters needed for inputhost placement

type LockMgr

type LockMgr interface {
	// TryLock attempts to acquire the lock
	// identified by the given key. If it
	// cannot acquire the lock within the
	// timeout, returns false. On success,
	// returns true
	TryLock(key string, timeout time.Duration) bool
	// Unlock releases the lock identified
	// by the key
	Unlock(key string)
	// Size returns the number of
	// keys currently held by the
	// lockmgr
	Size() int64
}

LockMgr represents the API for any implementation that manages a set of locks, where each lock is identified by a string key

func NewLockMgr

func NewLockMgr(concurrency int, hashfn common.HashFunc, logger bark.Logger) (LockMgr, error)

NewLockMgr creates and returns a new instance of LockMgr. The returned lockMgr uses a hashtable of hash tables for maintaining the mapping from (key -> lock). The first level table is fixed size and each entry in this table contains a lock, which needs to be acquired to get access to the second level hashtable. The second level hashtable maps the key to lock and grows dynamically.

@param concurrency

     Number of entries in the first level table. This directly
translates into the number of concurrent threads that can be
using the lockMgr at any given point of time. Ideally, this
number MUST be the number of cores.

@param hashfn

The hash function to index onto the first level table

@param logger

Logger to use

type Mcp

type Mcp struct {
	*common.Service
	// contains filtered or unexported fields
}

Mcp implements the ExtentController interface

func NewController

func NewController(cfg configure.CommonAppConfig, sVice *common.Service, metadataClient m.TChanMetadataService, zoneFailoverManager common.ZoneFailoverManager) (*Mcp, []thrift.TChanServer)

NewController creates and returns a new instance of Mcp controller

func (*Mcp) CreateConsumerGroup

func (mcp *Mcp) CreateConsumerGroup(ctx thrift.Context, createRequest *shared.CreateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)

CreateConsumerGroup creates local and remote consumer group

func (*Mcp) CreateDestination

func (mcp *Mcp) CreateDestination(ctx thrift.Context, createRequest *shared.CreateDestinationRequest) (*shared.DestinationDescription, error)

CreateDestination creates local and remote destination

func (*Mcp) CreateRemoteZoneConsumerGroupExtent

func (mcp *Mcp) CreateRemoteZoneConsumerGroupExtent(ctx thrift.Context, createRequest *shared.CreateConsumerGroupExtentRequest) error

CreateRemoteZoneConsumerGroupExtent creates an cg extent that originates from another zone

func (*Mcp) CreateRemoteZoneExtent

func (mcp *Mcp) CreateRemoteZoneExtent(ctx thrift.Context, createRequest *shared.CreateExtentRequest) (*shared.CreateExtentResult_, error)

CreateRemoteZoneExtent creates an extent that originates from another zone

func (*Mcp) DeleteConsumerGroup

func (mcp *Mcp) DeleteConsumerGroup(ctx thrift.Context, deleteRequest *shared.DeleteConsumerGroupRequest) error

DeleteConsumerGroup deletes local and remote consumer group

func (*Mcp) DeleteDestination

func (mcp *Mcp) DeleteDestination(ctx thrift.Context, deleteRequest *shared.DeleteDestinationRequest) error

DeleteDestination deletes local and remote destination

func (*Mcp) ExtentsUnreachable

func (mcp *Mcp) ExtentsUnreachable(ctx thrift.Context, extentsUnreachableRequest *a.ExtentsUnreachableRequest) error

ExtentsUnreachable is a way for other services to notify the Controller about an unreachable extent. When this notification is received, controller will enqueue an ExtentDownEvent to seal the extent asynchronously. A successful return code from this API does not guarantee that the extent is sealed, it acknowledges that the notification was successfully processed.

func (*Mcp) GetCapacities

func (mcp *Mcp) GetCapacities(ctx thrift.Context, getCapacitiesRequest *c.GetCapacitiesRequest) (*c.GetCapacitiesResult_, error)

GetCapacities todo

func (*Mcp) GetInputHosts

func (mcp *Mcp) GetInputHosts(ctx thrift.Context, inReq *c.GetInputHostsRequest) (*c.GetInputHostsResult_, error)

GetInputHosts finds and returns the in hosts serving the given destination The algorithm is as follows:

  1. Lookup result cache for precomputed result If a. Cache entry is expired or b. Any of the cached hosts has failed or c. nHealthy extents is below threshold then Then recompute the result and update cache Else return result from cache

Recompute Logic: 1. List all OPEN extents for this destination 2. If the number of open but healthy extents < threshold, try creating a new extent 3. Return all inputhosts serving the healthy open extents 4. Record result in the result cache

func (*Mcp) GetOutputHosts

func (mcp *Mcp) GetOutputHosts(ctx thrift.Context, inReq *c.GetOutputHostsRequest) (*c.GetOutputHostsResult_, error)

GetOutputHosts finds and returns the out hosts serving the given destination Algorithm is as follows:

  1. Lookup result cache for precomputed result If a. Cache entry is expired or b. Any of the cached hosts has failed or c. nHealthy extents is below threshold then Then recompute the result and update cache Else return result from cache

recompute logic: 1. List all the consumable extents for this consumer group (from the consumer_group_extents table) 2. Attempt to add new unadded open extents (from destination_extents) to consumer_group 3. If there unhealthy extents (due to out host failure), repair upto a threshold 4. Record result in resultsCache and return all consumable output hosts

func (*Mcp) GetQueueDepthInfo

func (mcp *Mcp) GetQueueDepthInfo(ctx thrift.Context, inReq *c.GetQueueDepthInfoRequest) (*c.GetQueueDepthInfoResult_, error)

GetQueueDepthInfo to return queue depth backlog infor for consumer group

func (*Mcp) RemoveCapacities

func (mcp *Mcp) RemoveCapacities(ctx thrift.Context, removeCapacitiesRequest *c.RemoveCapacitiesRequest) error

RemoveCapacities todo

func (*Mcp) ReportConsumerGroupExtentMetric

func (mcp *Mcp) ReportConsumerGroupExtentMetric(ctx thrift.Context, request *c.ReportConsumerGroupExtentMetricRequest) error

ReportConsumerGroupExtentMetric records the incoming set of extent metrics from an out host

func (*Mcp) ReportConsumerGroupMetric

func (mcp *Mcp) ReportConsumerGroupMetric(ctx thrift.Context, request *c.ReportConsumerGroupMetricRequest) error

ReportConsumerGroupMetric records the incoming set of extent metrics from an out host

func (*Mcp) ReportDestinationExtentMetric

func (mcp *Mcp) ReportDestinationExtentMetric(ctx thrift.Context, request *c.ReportDestinationExtentMetricRequest) error

ReportDestinationExtentMetric records the incoming set of extent metrics from inputhost

func (*Mcp) ReportDestinationMetric

func (mcp *Mcp) ReportDestinationMetric(ctx thrift.Context, request *c.ReportDestinationMetricRequest) error

ReportDestinationMetric records the incoming set of extent metrics from inputhost

func (*Mcp) ReportNodeMetric

func (mcp *Mcp) ReportNodeMetric(ctx thrift.Context, request *c.ReportNodeMetricRequest) error

ReportNodeMetric records the incoming set of metrics from a chearmi node

func (*Mcp) ReportStoreExtentMetric

func (mcp *Mcp) ReportStoreExtentMetric(ctx thrift.Context, request *c.ReportStoreExtentMetricRequest) error

ReportStoreExtentMetric records the incoking set of extent metrics from a store host

func (*Mcp) Start

func (mcp *Mcp) Start(thriftService []thrift.TChanServer)

Start starts the controller service

func (*Mcp) Stop

func (mcp *Mcp) Stop()

Stop stops the controller service

func (*Mcp) UpdateConsumerGroup

func (mcp *Mcp) UpdateConsumerGroup(ctx thrift.Context, updateRequest *shared.UpdateConsumerGroupRequest) (*shared.ConsumerGroupDescription, error)

UpdateConsumerGroup updates local and remote consumer group

func (*Mcp) UpdateDestination

func (mcp *Mcp) UpdateDestination(ctx thrift.Context, updateRequest *shared.UpdateDestinationRequest) (*shared.DestinationDescription, error)

UpdateDestination updates local and remote destination

func (*Mcp) UpsertInputHostCapacities

func (mcp *Mcp) UpsertInputHostCapacities(ctx thrift.Context, upsertCapacitiesRequest *c.UpsertInputHostCapacitiesRequest) error

UpsertInputHostCapacities todo

func (*Mcp) UpsertOutputHostCapacities

func (mcp *Mcp) UpsertOutputHostCapacities(ctx thrift.Context, upsertCapacitiesRequest *c.UpsertOutputHostCapacitiesRequest) error

UpsertOutputHostCapacities todo

func (*Mcp) UpsertStoreCapacities

func (mcp *Mcp) UpsertStoreCapacities(ctx thrift.Context, upsertCapacitiesRequest *c.UpsertStoreCapacitiesRequest) error

UpsertStoreCapacities todo

func (*Mcp) ValidateCgZoneConfig

func (mcp *Mcp) ValidateCgZoneConfig(ctx thrift.Context, logger bark.Logger, updateRequest *shared.UpdateConsumerGroupRequest) (bool, error)

ValidateCgZoneConfig validates the zone configs for a UpdateConsumerGroupRequest

func (*Mcp) ValidateDestZoneConfig

func (mcp *Mcp) ValidateDestZoneConfig(ctx thrift.Context, logger bark.Logger, updateRequest *shared.UpdateDestinationRequest) (bool, error)

ValidateDestZoneConfig validates the zone configs for a UpdateDestinationRequest

type MetadataMgr

type MetadataMgr interface {
	// ReadDestination returns the destination desc for the given uuid
	ReadDestination(dstID string, dstPath string) (*shared.DestinationDescription, error)
	// ReadExtentStats returns the extent stats for the given extent; it is not scalable, and therefore is to be used for debugging only
	ReadExtentStats(dstID string, extentID string) (*shared.ExtentStats, error)
	// ReadStoreExtentStats returns the extent stats for the given extent and store
	ReadStoreExtentStats(extentID string, storeID string) (*shared.ExtentStats, error)
	// ReadConsumerGroupExtent returns the consumer group extent stats corresponding to the given dst/cg/ext.
	ReadConsumerGroupExtent(dstID string, cgID string, extentID string) (*shared.ConsumerGroupExtent, error)
	// ListDestinations returns an list of adestinations
	ListDestinations() ([]*shared.DestinationDescription, error)
	// ListDestinationsPage returns an list of adestinations
	ListDestinationsPage(mReq *shared.ListDestinationsRequest) (*shared.ListDestinationsResult_, error)
	// ListDestinationsByUUID returns an iterator to the destinations
	ListDestinationsByUUID() ([]*shared.DestinationDescription, error)
	// ListDestinationExtentsByStatus lists extents by dstID/status
	// The returned type is a list of DestinationExtent objects as
	// opposed to list of Extent objects
	ListDestinationExtentsByStatus(dstID string, filterByStatus []shared.ExtentStatus) ([]*m.DestinationExtent, error)
	// ListExtentsByDstIDStatus lists extents dstID/Status
	ListExtentsByDstIDStatus(dstID string, filterByStatus []shared.ExtentStatus) ([]*shared.ExtentStats, error)
	// ListExtentsByInputIDStatus lists extents for dstID/InputUUID/Status
	ListExtentsByInputIDStatus(inputID string, status *shared.ExtentStatus) ([]*shared.ExtentStats, error)
	// ListExtentsByStoreIDStatus lists extents by storeID/Status
	ListExtentsByStoreIDStatus(storeID string, status *shared.ExtentStatus) ([]*shared.ExtentStats, error)
	// ListExtentsByReplicationStatus lists extents by storeID/ReplicationStatus
	ListExtentsByReplicationStatus(storeID string, status *shared.ExtentReplicaReplicationStatus) ([]*shared.ExtentStats, error)
	// ListExtentsByConsumerGroup lists all extents for the given destination / consumer group
	ListExtentsByConsumerGroup(dstID string, cgID string, filterByStatus []shared.ConsumerGroupExtentStatus) ([]*shared.ConsumerGroupExtent, error)
	// ListExtentsByConsumerGroupLite lists all extents for the given destination / consumer group
	// this api only returns a few interesting columns for each consumer group extent in the
	// result. For detailed info, see ListExtentsByConsumerGroup
	ListExtentsByConsumerGroupLite(dstID string, cgID string, filterByStatus []shared.ConsumerGroupExtentStatus) ([]*m.ConsumerGroupExtentLite, error)
	// CreateExtent creates a new extent for the given destination and marks the status as OPEN
	CreateExtent(dstID string, extentID string, inhostID string, storeIDs []string) (*shared.CreateExtentResult_, error)
	// CreateRemoteZoneExtent creates a new remote zone extent for the given destination and marks the status as OPEN
	CreateRemoteZoneExtent(dstID string, extentID string, inhostID string, storeIDs []string, originZone string, remoteExtentPrimaryStore string, consumerGroupVisibility string) (*shared.CreateExtentResult_, error)
	// AddExtentToConsumerGroup adds an open extent to consumer group for consumption
	AddExtentToConsumerGroup(dstID string, cgID string, extentID string, outHostID string, storeIDs []string) error
	// ListConsumerGroupsByDstID lists all consumer groups for a given destination uuid
	ListConsumerGroupsByDstID(dstID string) ([]*shared.ConsumerGroupDescription, error)
	// ListConsumerGroupsPage lists all consumer groups for a given destination uuid
	ListConsumerGroupsPage(mReq *shared.ListConsumerGroupRequest) (*shared.ListConsumerGroupResult_, error)
	// UpdateOutHost changes the out host for the consumer group extent
	UpdateOutHost(dstID string, cgID string, extentID string, outHostID string) error
	// DeleteConsumerGroup deletes the consumer group status
	DeleteConsumerGroup(dstID string, cgName string) error
	// SealExtent seals the extent if it was previously sealed
	SealExtent(dstID string, extentID string) error
	// UpdateDestinationDLQCursors updates the DLQCursor on a destination
	UpdateDestinationDLQCursors(dstID string, mergeBefore common.UnixNanoTime, purgeBefore common.UnixNanoTime) error
	// MoveExtent moves an extent
	MoveExtent(fromDstID, toDstID, extentID, cgUUID string) error
	// ReadConsumerGroup reads a consumer group
	ReadConsumerGroup(dstID, dstPath, cgUUID, cgName string) (*shared.ConsumerGroupDescription, error)
	// ReadConsumerGroupByUUID reads a consumer group by UUID
	ReadConsumerGroupByUUID(cgUUID string) (*shared.ConsumerGroupDescription, error)
	// UpdateExtentStatus updates the status of an extent
	UpdateExtentStatus(dstID, extID string, status shared.ExtentStatus) error
	// UpdateRemoteExtentPrimaryStore updates remoteExtentPrimaryStore
	UpdateRemoteExtentPrimaryStore(dstID string, extentID string, remoteExtentPrimaryStore string) (*m.UpdateExtentStatsResult_, error)
	// UpdateConsumerGroupExtentStatus updates the status of a consumer group extent
	UpdateConsumerGroupExtentStatus(cgID, extID string, status shared.ConsumerGroupExtentStatus) error
	// DeleteDestination marks a destination to be deleted
	DeleteDestination(dstID string) error
}

MetadataMgr manages extents. It exposes easy to use CRUD API on top of the cassandra metadata client API

func NewMetadataMgr

func NewMetadataMgr(mClient m.TChanMetadataService, m3Client metrics.Client, logger bark.Logger) MetadataMgr

NewMetadataMgr creates and returns a new instance of MetadataMgr

type OutputHostNotificationEvent

type OutputHostNotificationEvent struct {
	// contains filtered or unexported fields
}

OutputHostNotificationEvent is generated to notify output hosts about a new extent

func (*OutputHostNotificationEvent) Done

func (event *OutputHostNotificationEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*OutputHostNotificationEvent) Handle

func (event *OutputHostNotificationEvent) Handle(context *Context) error

Handle sends notification to an output host

type OutputPlacementConfig

type OutputPlacementConfig struct {
	AdminStatus string `name:"adminStatus" default:"enabled"`
}

OutputPlacementConfig contains the config parameters needed for outputhost placement

type Placement

type Placement interface {
	// PickInputHost picks an input host with certain distance from the store hosts
	PickInputHost(storeHosts []*common.HostInfo) (*common.HostInfo, error)
	// PickOutputHost picks an output host with certain distance from the store hosts
	PickOutputHost(storeHosts []*common.HostInfo) (*common.HostInfo, error)
	// PickStoreHosts picks n store hosts with certain distance between store replicas
	PickStoreHosts(count int) ([]*common.HostInfo, error)
}

Placement is the placement strategy interface for picking hosts

func NewDistancePlacement

func NewDistancePlacement(context *Context) (Placement, error)

NewDistancePlacement initializes a new placement topology

type QueueDepthCache

type QueueDepthCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

QueueDepthCache caches the queueDepth results for every consumer-group based on the previous run on the queueDepth calculator.

type QueueDepthCacheEntry

type QueueDepthCacheEntry struct {
	// Time is the cache entry time
	Time common.UnixNanoTime
	// BacklogAvailable is the available backlog
	BacklogAvailable int64
	// BacklogUnavailable is the unavailable backlog (only useful for timer queues)
	BacklogUnavailable int64
	// BacklogInflight is the in flight message count
	BacklogInflight int64
	// BacklogDLQ is the number of messages in DLQ
	BacklogDLQ int64
}

QueueDepthCacheEntry is a cache structure for testing queue depth

type QueueDepthCacheJSONFields

type QueueDepthCacheJSONFields struct {
	CacheTime        common.UnixNanoTime `json:"cache_time,omitempty"`
	BacklogAvailable int64               `json:"backlog_available"`
	BacklogInflight  int64               `json:"backlog_inflight"`
	BacklogDLQ       int64               `json:"backlog_dlq"`
}

QueueDepthCacheJSONFields is the json fields for QueueDepthCacheEntry

type RemoteExtentPrimaryStoreDownEvent

type RemoteExtentPrimaryStoreDownEvent struct {
	// contains filtered or unexported fields
}

RemoteExtentPrimaryStoreDownEvent is triggered for an extent when the primary store host that's responsible for replicating that extent from origin zone is down

func (*RemoteExtentPrimaryStoreDownEvent) Done

func (event *RemoteExtentPrimaryStoreDownEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*RemoteExtentPrimaryStoreDownEvent) Handle

func (event *RemoteExtentPrimaryStoreDownEvent) Handle(context *Context) error

Handle handles a RemoteExtentPrimaryStoreDownEvent. This event is triggered when a store host that acts as the primary replicator for an extent is down for a period greater than a typical deployment interval (~5 mins max). The action taken is to assign a new primary replicator for the affected extent

type RetryableEventExecutor

type RetryableEventExecutor interface {
	// Submit takes a retryable event for
	// executing in the background. Returns
	// true on success, false otherwise
	Submit(event Event) bool
	// Stop stops the executor
	Stop()
}

RetryableEventExecutor is an executor for executing retryable event handlers. The executor has in-bult retry/backoff logic for retryable failures

func NewRetryableEventExecutor

func NewRetryableEventExecutor(context *Context, maxWorkers int) RetryableEventExecutor

NewRetryableEventExecutor returns a new instance of RetryableEventExecutor This executor spins up a separate go routine for processing every event that's submitted. maxWorkers is an upper bound on the number of events that can be processed simulatenously. There are a few reasons for having a separate executor for retries as opposed to using the event pipeline:

  • Retries can span upto an hour
  • Retry workers are mostly idle, sleeping, we can have lots of them in the background without consuming any resources
  • We want the ability to spin up go routines when there is lots retries needed, but in steady state, we dont need these many go routines
  • Retries should not impact or delay the processing of other events

type StartReplicationForRemoteZoneExtent

type StartReplicationForRemoteZoneExtent struct {
	// contains filtered or unexported fields
}

StartReplicationForRemoteZoneExtent is triggered when a remote zone extent is created or the primary store is switched

func (*StartReplicationForRemoteZoneExtent) Done

func (event *StartReplicationForRemoteZoneExtent) Done(context *Context, err error)

Done provides default callback for all events

func (*StartReplicationForRemoteZoneExtent) Handle

func (event *StartReplicationForRemoteZoneExtent) Handle(context *Context) error

Handle handles a StartReplicationForRemoteZoneExtent. This handler calls store to start replication. The primary store will be issued with a remote replication request The rest of stores will be issued with a re-replication request This is the fast path to notify store to start or resume a replication. If the notificaiton is lost, the slow path (a periodic job in store) will kick in to start replication

type StoreExtentStatusOutOfSyncEvent

type StoreExtentStatusOutOfSyncEvent struct {
	// contains filtered or unexported fields
}

StoreExtentStatusOutOfSyncEvent is triggered when one of the extent replicas (store) is out of sync with others i.e. the extent is SEALED but one of the stores still reports it as OPEN

func (*StoreExtentStatusOutOfSyncEvent) Done

func (event *StoreExtentStatusOutOfSyncEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*StoreExtentStatusOutOfSyncEvent) Handle

func (event *StoreExtentStatusOutOfSyncEvent) Handle(context *Context) error

Handle handles an StoreExtentStatusOutOfSyncEvent. This handler reissues SealExtent call to an out of sync store host without updating metadata state This handler assumes that the extent was previously sealed successfully in atleast one store.

type StoreHostFailedEvent

type StoreHostFailedEvent struct {
	// contains filtered or unexported fields
}

StoreHostFailedEvent is triggered when a store host fails

func (*StoreHostFailedEvent) Done

func (event *StoreHostFailedEvent) Done(context *Context, err error)

Done provides default callback for all events

func (*StoreHostFailedEvent) Handle

func (event *StoreHostFailedEvent) Handle(context *Context) error

Handle handles a StoreHostFailedEvent. This handler will list all OPEN extents for the store and enqueue an ExtentDownEvent for each one of them.

type StorePlacementConfig

type StorePlacementConfig struct {
	AdminStatus string `name:"adminStatus" default:"enabled"`
	// MinFreeDiskSpaceBytes represents the min
	// required free disk space on a store host
	// to host an extent. The default value
	// translates to 40GB which is 2 percent
	// for a 2TB drive
	MinFreeDiskSpaceBytes int64 `name:"minFreeDiskSpaceBytes" default:"40000000000"`
}

StorePlacementConfig contains the config parameters needed for store placement

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL