Documentation ¶
Index ¶
- Constants
- Variables
- func IsLambdaTimeout(err error) bool
- type AvailableLink
- type AvailableLinks
- func (l *AvailableLinks) AddAvailable(link manageableLink, nolimit bool) bool
- func (l *AvailableLinks) GetRequestPipe() *AvailableLink
- func (l *AvailableLinks) Len() int
- func (l *AvailableLinks) OffsetLimit(offset int, max int) int
- func (l *AvailableLinks) Reset()
- func (l *AvailableLinks) SetLimit(limit int) int
- type Backer
- type BackerGetter
- type BackupIterator
- type Backups
- func (b *Backups) Availables() int
- func (b *Backups) GetByHash(hash uint64) (*Instance, bool)
- func (b *Backups) GetByKey(key string) (*Instance, bool)
- func (b *Backups) GetByLocation(loc int, required int) (*Instance, bool)
- func (b *Backups) Invalidate()
- func (b *Backups) Iter() *BackupIterator
- func (b *Backups) Len() int
- func (b *Backups) Locator() *protocol.BackupLocator
- func (b *Backups) Reserve(fallback mapreduce.Iterator) int
- func (b *Backups) ResetCandidates(required int, candidates []*Instance)
- func (b *Backups) Start(target *Instance) int
- func (b *Backups) StartByIndex(i int, target *Instance) (*Instance, bool)
- func (b *Backups) Stop(target *Instance)
- type CandidateProvider
- type CandidateQueue
- type CandidatesProvider
- type ClusterManager
- type Connection
- func (conn *Connection) BindInstance(ins *Instance) *Connection
- func (conn *Connection) ClearResponses()
- func (conn *Connection) Close() error
- func (conn *Connection) CloseAndWait() error
- func (conn *Connection) CloseWithReason(reason string, block bool) error
- func (conn *Connection) IsClosed() bool
- func (conn *Connection) IsSameWorker(another *Connection) bool
- func (conn *Connection) SendControl(ctrl *types.Control) error
- func (conn *Connection) SendPing(payload []byte) error
- func (conn *Connection) SendRequest(req *types.Request, args ...interface{}) error
- func (conn *Connection) ServeLambda()
- func (conn *Connection) String() string
- func (conn *Connection) Writer() *resp.RequestWriter
- type DefaultInstanceEnumerator
- type Delegate
- type DelegateBackerAdapter
- type Deployment
- type Instance
- func (ins *Instance) AbandonLambda()
- func (ins *Instance) AssignBackups(numBak int, candidates []*Instance)
- func (ins *Instance) Close()
- func (ins *Instance) CollectData()
- func (ins *Instance) Degrade()
- func (ins *Instance) Description() string
- func (ins *Instance) Dispatch(cmd types.Command) error
- func (ins *Instance) DispatchWithOptions(cmd types.Command, opts int) error
- func (ins *Instance) Expire()
- func (ins *Instance) FlagDataCollected(ok string)
- func (ins *Instance) ForbidBacking() bool
- func (ins *Instance) GetShadowInstance() *Instance
- func (ins *Instance) HandleRequests()
- func (ins *Instance) IsActive() bool
- func (ins *Instance) IsBacking(includingPrepare bool) bool
- func (ins *Instance) IsBusy(cmd types.Command) (uint64, bool)
- func (ins *Instance) IsClosed() bool
- func (ins *Instance) IsReclaimed() bool
- func (ins *Instance) IsRecovering() bool
- func (ins *Instance) Migrate() error
- func (ins *Instance) Occupancy(mode types.InstanceOccupancyMode) float64
- func (ins *Instance) Phase() uint32
- func (ins *Instance) ReserveBacking() error
- func (ins *Instance) ResetDue(delay bool, reason string)
- func (ins *Instance) ResumeServing()
- func (ins *Instance) SetDue(due int64, delay bool, reason string, args ...interface{})
- func (ins *Instance) StartBacking(bakIns *Instance, bakId int, total int) bool
- func (ins *Instance) StartDelegation() int
- func (ins *Instance) StartRecovery() int
- func (ins *Instance) Status() uint64
- func (ins *Instance) StatusDescription() string
- func (ins *Instance) StopBacking(bakIns *Instance)
- func (ins *Instance) String() string
- func (ins *Instance) TryFlagValidated(conn *Connection, sid string, flags int64) (*Connection, time.Duration, error)
- func (ins *Instance) Validate(opts ...*ValidateOption) (*Connection, time.Duration, error)
- func (ins *Instance) WarmUp()
- type InstanceEnumerator
- type InstanceManager
- type LambdaError
- type LambdaErrorType
- type LinkBucket
- type LinkManager
- func (m *LinkManager) AddDataLink(link *Connection) bool
- func (m *LinkManager) Close()
- func (m *LinkManager) DataLinks() *AvailableLinks
- func (m *LinkManager) FlagAvailableForRequest(link *Connection) bool
- func (m *LinkManager) GetAvailableForRequest() *AvailableLink
- func (m *LinkManager) GetControl() *Connection
- func (m *LinkManager) GetLastControl() *Connection
- func (m *LinkManager) InvalidateControl(link manageableLink)
- func (m *LinkManager) RemoveDataLink(link *Connection)
- func (m *LinkManager) Reset()
- func (m *LinkManager) SetControl(link *Connection)
- func (m *LinkManager) SetMaxActiveDataLinks(num int)
- type Meta
- func (m *Meta) AddChunk(key string, sz int64) (num int, size uint64)
- func (m *Meta) DecreaseSize(dec int64) uint64
- func (m *Meta) EffectiveCapacity() uint64
- func (m *Meta) FromProtocolMeta(meta *protocol.Meta) (bool, error)
- func (m *Meta) IncreaseSize(inc int64) uint64
- func (m *Meta) ModifiedOccupancy(adjustment uint64) float64
- func (m *Meta) NumChunks() int
- func (m *Meta) Reconcile(meta *protocol.ShortMeta)
- func (m *Meta) RemoveChunk(key string, sz int64) (num int, size uint64)
- func (m *Meta) ReservedCapacity() uint64
- func (m *Meta) ResetCapacity(capacity uint64, effective uint64)
- func (m *Meta) Size() uint64
- func (m *Meta) ToBackupPayload(id uint64, key int, total int, maxChunkSize uint64) ([]byte, error)
- func (m *Meta) ToDelegatePayload(id uint64, key int, total int, maxChunkSize uint64) (*protocol.Meta, []byte, error)
- func (m *Meta) ToPayload(id uint64) ([]byte, error)
- func (m *Meta) ToProtocolMeta(id uint64) *protocol.Meta
- type Relocator
- type ValidateOption
Constants ¶
const ( ConnectionOpen uint32 = iota ConnectionClosing ConnectionClosed )
const ( INSTANCE_MASK_STATUS_START = 0x0000000F INSTANCE_MASK_STATUS_CONNECTION = 0x000000F0 INSTANCE_MASK_STATUS_BACKING = 0x00000F00 INSTANCE_MASK_STATUS_LIFECYCLE = 0x0000F000 INSTANCE_MASK_STATUS_FAILURE = 0xF0000000 // Start status INSTANCE_UNSTARTED = 0 INSTANCE_RUNNING = 1 INSTANCE_CLOSED = 2 INSTANCE_SHADOW = 15 // Connection status // Activate: Sleeping -> Activating -> Active // Retry: Activating/Active -> Activate (validating) // Abandon: Activating/Active -> Sleeping (validating, warmup) // Sleep: Active -> Sleeping // Switch: Active -> Maybe (Unmanaged) // Sleep: Maybe -> Sleeping INSTANCE_SLEEPING = 0 INSTANCE_ACTIVATING = 1 INSTANCE_ACTIVE = 2 INSTANCE_MAYBE = 3 // Backing status INSTANCE_RECOVERING = 1 INSTANCE_BACKING = 2 // Lifecycle status PHASE_ACTIVE = 0 // Instance is actively serving main repository and backup PHASE_BACKING_ONLY = 1 // Instance is expiring and serving backup only, warmup should be degraded. PHASE_RECLAIMED = 2 // Instance has been reclaimed. PHASE_EXPIRED = 3 // Instance is expired, no invocation will be made, and it is safe to recycle. // Abnormal status FAILURE_MAX_QUEUE_REACHED = 1 MAX_CONCURRENCY = 2 IN_CONCURRENCY = 1 OUT_CONCURRENCY = 2 TEMP_MAP_SIZE = 10 BACKING_DISABLED = 0 BACKING_RESERVED = 1 BACKING_ENABLED = 2 BACKING_FORBID = 3 DESCRIPTION_UNSTARTED = "unstarted" DESCRIPTION_CLOSED = "closed" DESCRIPTION_SLEEPING = "sleeping" DESCRIPTION_ACTIVATING = "activating" DESCRIPTION_ACTIVE = "active" DESCRIPTION_MAYBE = "unmanaged" DESCRIPTION_UNDEFINED = "undefined" DISPATCH_OPT_BUSY_CHECK = 0x0001 DISPATCH_OPT_RELOCATED = 0x0002 NUM_REQUEST_UNIT = 0x0000000000000001 NUM_WRITE_REQUEST_UNIT = 0x0000000100000000 NUM_REQUEST_MASK = 0x00000000FFFFFFFF NUM_WRITE_REQUEST_MASK = 0xFFFFFFFF00000000 NUM_WRITE_REQUEST_BITS = 32 )
const ( LinkBucketSize = 10 UnlimitedActiveLinks = 0 // Max spare links // Set to 2 to reserve 1 for ready and 1 for buffering. // Set to MAX_CONCURRENCY + 1 to avoid links being frenquently closed and re-created. // Can be override dynamically by instance.SetMaxActiveDataLinks(). ActiveLinks = 2 )
Variables ¶
var ( ErrConnectionClosed = errors.New("connection closed") ErrMissingResponse = errors.New("missing response") ErrUnexpectedCommand = errors.New("unexpected command") ErrUnexpectedType = errors.New("unexpected type") ErrMissingRequest = errors.New("missing request") ErrUnexpectedSendRequest = errors.New("unexpected SendRequest call") )
var ( CM ClusterManager WarmTimeout = config.InstanceWarmTimeout TriggerTimeout = 1 * time.Second // Triggering cost is about 20ms, set large enough to avoid exceeded timeout // TODO: Make RTT dynamic, global or per instance. RTT = 20 * time.Millisecond DefaultConnectTimeout = 1000 * time.Millisecond // Decide by RTT. MaxConnectTimeout = 1 * time.Second PromisedGoodDue = 1 * time.Second // Keep consistent with lambda/lifetime/timeout.TICK_ERROR_EXTEND MinValidationInterval = RTT // MinValidationInterval The minimum interval between validations. MaxValidationFailure = 3 BackoffFactor = 2 MaxControlRequestSize = int64(200000) // 200KB, which can be transmitted in 20ms. DefaultPingPayload = []byte{} AwsSession = awsSession.Must(awsSession.NewSessionWithOptions(awsSession.Options{ SharedConfigState: awsSession.SharedConfigEnable, })) // Errors ErrInstanceClosed = errors.New("instance closed") ErrInstanceReclaimed = errors.New("instance reclaimed") ErrInstanceSleeping = errors.New("instance is sleeping") ErrInstanceRecovering = errors.New("instance is recovering") ErrReservationFailed = errors.New("reservation failed") ErrDuplicatedSession = errors.New("session has started") ErrNotCtrlLink = errors.New("not control link") ErrInstanceValidated = errors.New("instance has been validated by another connection") ErrInstanceBusy = errors.New("instance busy") ErrWarmupReturn = errors.New("return from warmup") ErrUnknown = errors.New("unknown error") ErrValidationTimeout = &LambdaError{error: errors.New("funciton validation timeout"), typ: LambdaErrorTimeout} ErrCapacityExceeded = errors.New("capacity exceeded") ErrQueueTimeout = &LambdaError{error: errors.New("queue timeout"), typ: LambdaErrorTimeout} ErrRelocationFailed = errors.New("relocation failed") )
var ( ErrLinkRequestClosed = &LambdaError{error: errors.New("link request closed")} ErrLinkRequestTimeout = &LambdaError{error: errors.New("link request timeout"), typ: LambdaErrorTimeout} ErrLinkManagerReset = &LambdaError{error: errors.New("link manager reset")} ErrNilLink = &LambdaError{error: errors.New("unexpected nil link")} // Keep following variables as false. They are only for unit tests. UnitTestMTC1 = false )
Functions ¶
func IsLambdaTimeout ¶
Types ¶
type AvailableLink ¶
type AvailableLink struct {
// contains filtered or unexported fields
}
func (*AvailableLink) Close ¶
func (l *AvailableLink) Close()
func (*AvailableLink) Closed ¶
func (l *AvailableLink) Closed() <-chan struct{}
func (*AvailableLink) Error ¶
func (l *AvailableLink) Error() error
func (*AvailableLink) Request ¶
func (l *AvailableLink) Request() chan<- *types.Request
func (*AvailableLink) SetTimeout ¶
func (l *AvailableLink) SetTimeout(d time.Duration)
type AvailableLinks ¶
type AvailableLinks struct {
// contains filtered or unexported fields
}
func (*AvailableLinks) AddAvailable ¶
func (l *AvailableLinks) AddAvailable(link manageableLink, nolimit bool) bool
func (*AvailableLinks) GetRequestPipe ¶
func (l *AvailableLinks) GetRequestPipe() *AvailableLink
func (*AvailableLinks) Len ¶
func (l *AvailableLinks) Len() int
func (*AvailableLinks) OffsetLimit ¶
func (l *AvailableLinks) OffsetLimit(offset int, max int) int
func (*AvailableLinks) Reset ¶
func (l *AvailableLinks) Reset()
func (*AvailableLinks) SetLimit ¶
func (l *AvailableLinks) SetLimit(limit int) int
type BackerGetter ¶
type BackerGetter interface {
// contains filtered or unexported methods
}
type BackupIterator ¶
type BackupIterator struct {
// contains filtered or unexported fields
}
func (*BackupIterator) Len ¶
func (iter *BackupIterator) Len() int
func (*BackupIterator) Next ¶
func (iter *BackupIterator) Next() bool
func (*BackupIterator) Value ¶
func (iter *BackupIterator) Value() (int, interface{})
type Backups ¶
type Backups struct {
// contains filtered or unexported fields
}
Backups for a instace. If not specified, all operation are not thread safe.
func NewBackups ¶
func NewBackupsFromInstances ¶
func NewBackupsFromInstances(ins *Instance, backups []*Instance, adapter BackerGetter) *Backups
func (*Backups) Availables ¶
func (*Backups) GetByLocation ¶
func (*Backups) Invalidate ¶
func (b *Backups) Invalidate()
func (*Backups) Iter ¶
func (b *Backups) Iter() *BackupIterator
func (*Backups) Locator ¶
func (b *Backups) Locator() *protocol.BackupLocator
func (*Backups) ResetCandidates ¶
func (*Backups) StartByIndex ¶
type CandidateProvider ¶
type CandidateProvider func() *Instance
type CandidateQueue ¶
type CandidateQueue struct {
// contains filtered or unexported fields
}
func NewCandidateQueue ¶
func NewCandidateQueue(bufsize int, provider CandidatesProvider) *CandidateQueue
func (*CandidateQueue) Candidates ¶
func (q *CandidateQueue) Candidates() <-chan *Instance
func (*CandidateQueue) Close ¶
func (q *CandidateQueue) Close()
type CandidatesProvider ¶
type CandidatesProvider interface {
LoadCandidates(*CandidateQueue, []*Instance) int
}
type ClusterManager ¶
type ClusterManager interface { InstanceManager Relocator }
type Connection ¶
TODO: use bsm/pool
func NewConnection ¶
func NewConnection(cn net.Conn) *Connection
func (*Connection) BindInstance ¶
func (conn *Connection) BindInstance(ins *Instance) *Connection
func (*Connection) ClearResponses ¶
func (conn *Connection) ClearResponses()
func (*Connection) Close ¶
func (conn *Connection) Close() error
func (*Connection) CloseAndWait ¶
func (conn *Connection) CloseAndWait() error
func (*Connection) CloseWithReason ¶
func (conn *Connection) CloseWithReason(reason string, block bool) error
Close Signal connection should be closed. Function close() will be called later for actural operation
func (*Connection) IsClosed ¶
func (conn *Connection) IsClosed() bool
func (*Connection) IsSameWorker ¶
func (conn *Connection) IsSameWorker(another *Connection) bool
func (*Connection) SendControl ¶
func (conn *Connection) SendControl(ctrl *types.Control) error
func (*Connection) SendPing ¶
func (conn *Connection) SendPing(payload []byte) error
SendPing send ping with piggyback infos.
func (*Connection) SendRequest ¶
func (conn *Connection) SendRequest(req *types.Request, args ...interface{}) error
func (*Connection) ServeLambda ¶
func (conn *Connection) ServeLambda()
blocking on lambda peek Type lambda handle incoming lambda store response
field 0 : conn id field 1 : req id field 2 : chunk id field 3 : obj val
func (*Connection) String ¶
func (conn *Connection) String() string
func (*Connection) Writer ¶
func (conn *Connection) Writer() *resp.RequestWriter
type DefaultInstanceEnumerator ¶
type DefaultInstanceEnumerator struct {
mapreduce.Enumerator
}
func NewInstanceEnumerator ¶
func NewInstanceEnumerator(instances []*Instance) *DefaultInstanceEnumerator
func (*DefaultInstanceEnumerator) Instance ¶
func (enum *DefaultInstanceEnumerator) Instance(i int) *Instance
type Delegate ¶
type Delegate struct {
*Instance
}
Delegate offers new Backup impletation for delagation.
type DelegateBackerAdapter ¶
type DelegateBackerAdapter struct { }
type Deployment ¶
type Deployment struct { Block int // contains filtered or unexported fields }
func NewDeployment ¶
func NewDeployment(name string, id uint64) *Deployment
func (*Deployment) Id ¶
func (d *Deployment) Id() uint64
func (*Deployment) Name ¶
func (d *Deployment) Name() string
type Instance ¶
type Instance struct { *Deployment Meta // contains filtered or unexported fields }
func NewInstanceFromDeployment ¶
func NewInstanceFromDeployment(dp *Deployment, id uint64) *Instance
func (*Instance) AbandonLambda ¶
func (ins *Instance) AbandonLambda()
func (*Instance) AssignBackups ¶
func (*Instance) CollectData ¶
func (ins *Instance) CollectData()
func (*Instance) Description ¶
func (*Instance) DispatchWithOptions ¶
func (*Instance) FlagDataCollected ¶
func (*Instance) ForbidBacking ¶
func (*Instance) GetShadowInstance ¶
func (*Instance) HandleRequests ¶
func (ins *Instance) HandleRequests()
Handle incoming client requests lambda facing goroutine
func (*Instance) IsReclaimed ¶
func (*Instance) IsRecovering ¶
func (*Instance) Occupancy ¶
func (ins *Instance) Occupancy(mode types.InstanceOccupancyMode) float64
func (*Instance) ReserveBacking ¶
Check if the instance is available for serving as a backup for specified instance. Return false if the instance is backing another instance.
func (*Instance) StartBacking ¶
Start serving as the backup for specified instance. Return false if the instance is backing another instance.
func (*Instance) StartDelegation ¶
StartDelegation delegates the instance to parallel buffer instances. Return # of delegates
func (*Instance) StartRecovery ¶
StartRecovery starts parallel recovery mode. Return # of ready backups
func (*Instance) StatusDescription ¶
func (*Instance) StopBacking ¶
Stop serving as a backup
func (*Instance) TryFlagValidated ¶
func (ins *Instance) TryFlagValidated(conn *Connection, sid string, flags int64) (*Connection, time.Duration, error)
Flag the instance as validated by specified connection. This also validate the connection belonging to the instance by setting instance field of the connection.
func (*Instance) Validate ¶
func (ins *Instance) Validate(opts ...*ValidateOption) (*Connection, time.Duration, error)
type InstanceEnumerator ¶
type InstanceEnumerator interface { mapreduce.Enumerator Instance(i int) *Instance }
type InstanceManager ¶
type LambdaError ¶
type LambdaError struct {
// contains filtered or unexported fields
}
func (*LambdaError) IsTimeout ¶
func (e *LambdaError) IsTimeout() bool
type LambdaErrorType ¶
type LambdaErrorType int
const ( LambdaErrorUncategoried LambdaErrorType = iota LambdaErrorTimeout )
type LinkBucket ¶
type LinkBucket struct {
// contains filtered or unexported fields
}
func (*LinkBucket) Reset ¶
func (b *LinkBucket) Reset()
type LinkManager ¶
type LinkManager struct {
// contains filtered or unexported fields
}
func NewLinkManager ¶
func NewLinkManager(ins *Instance) *LinkManager
func (*LinkManager) AddDataLink ¶
func (m *LinkManager) AddDataLink(link *Connection) bool
func (*LinkManager) Close ¶
func (m *LinkManager) Close()
func (*LinkManager) DataLinks ¶
func (m *LinkManager) DataLinks() *AvailableLinks
func (*LinkManager) FlagAvailableForRequest ¶
func (m *LinkManager) FlagAvailableForRequest(link *Connection) bool
func (*LinkManager) GetAvailableForRequest ¶
func (m *LinkManager) GetAvailableForRequest() *AvailableLink
func (*LinkManager) GetControl ¶
func (m *LinkManager) GetControl() *Connection
func (*LinkManager) GetLastControl ¶
func (m *LinkManager) GetLastControl() *Connection
func (*LinkManager) InvalidateControl ¶
func (m *LinkManager) InvalidateControl(link manageableLink)
func (*LinkManager) RemoveDataLink ¶
func (m *LinkManager) RemoveDataLink(link *Connection)
func (*LinkManager) Reset ¶
func (m *LinkManager) Reset()
func (*LinkManager) SetControl ¶
func (m *LinkManager) SetControl(link *Connection)
func (*LinkManager) SetMaxActiveDataLinks ¶
func (m *LinkManager) SetMaxActiveDataLinks(num int)
type Meta ¶
type Meta struct { // Sequence of the last confirmed log. Logs store by sequence. Term uint64 // Total transmission size for restoring all confirmed logs. Updates uint64 // Rank for lambda to decide if a fast recovery is required. DiffRank float64 // Hash of the last confirmed log. Hash string // Sequence of snapshot. SnapshotTerm uint64 // Total transmission size for restoring all confirmed logs from start to SnapShotSeq. SnapshotUpdates uint64 // Total size of snapshot for transmission. SnapshotSize uint64 // Flag shows that if meta is out of sync with the corresponding lambda. Stale bool // Capacity of the instance. Capacity uint64 // contains filtered or unexported fields }
FULL = (Updates - SnapshotUpdates + SnapshotSize) / Bandwidth + (Term - SnapShotTerm + 1) * RTT INCREMENTAL = (Updates - LastUpdates) / Bandwidth + (Seq - LastSeq) * RTT FULL < INCREMENTAL
func (*Meta) DecreaseSize ¶
func (*Meta) EffectiveCapacity ¶
func (*Meta) IncreaseSize ¶
func (*Meta) ModifiedOccupancy ¶
func (*Meta) ReservedCapacity ¶
func (*Meta) ResetCapacity ¶
func (*Meta) ToBackupPayload ¶
func (*Meta) ToDelegatePayload ¶
type Relocator ¶
type Relocator interface { // Relocate relocate the chunk specified by the meta(interface{}) and chunkId(int). // Return the instance the chunk is relocated to. Relocate(interface{}, int, types.Command) (*Instance, error) // TryRelocate Test and relocate the chunk specified by the meta(interface{}) and chunkId(int). // Return the instance, trigggered or not(bool), and error if the chunk is triggered. TryRelocate(interface{}, int, types.Command) (*Instance, bool, error) }
type ValidateOption ¶
type ValidateOption struct { Notifier chan struct{} Validated *Connection Error error // Options WarmUp bool Command types.Command }