common

package
v1.18.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2017 License: MIT Imports: 51 Imported by: 156

README

This is the directory which holds all the common stuff required by all the different services.

The structure within this is folder is something like this:

config.go - holds all the config items util.go - holds the utlity stuff like creating tchannel, boot strapping ringpop, etc. *types.go - holds the types for the specific interface (eg: servicetypes.go - holds the object and interface definition for all common stuff used by our services) *.go - business logic implementing the interface (eg: service.go - implements the methods specific to servicetypes.go)

Documentation

Index

Constants

View Source
const (
	// SequenceBegin refers to the beginning of an extent
	SequenceBegin = 0
	// SequenceEnd refers to the end of an extent
	SequenceEnd = math.MaxInt64
	// CallerUserName is the name of thrift context header contains current user name
	CallerUserName = "user-name"
	// CallerHostName is the name of thrift context header contains current host name
	CallerHostName = "host-name"
	// CallerServiceName is the name of thrift context header contains current service name
	CallerServiceName = "cn"
)

Extent sequence const

View Source
const (
	// DLQMaxMergeAge is the maximum time that we expect a partition to exist
	DLQMaxMergeAge = common.UnixNanoTime(int64(time.Hour)) * 24 * 1 // one day

	// TenancyProd is the tenancy of production
	// Deployment name can be in the format of <tenancy>_<zone>
	TenancyProd = "prod"

	// InputHostForRemoteExtent is a special (and fake) input host ID for remote extent
	InputHostForRemoteExtent = "88888888-8888-8888-8888-888888888888"
)
View Source
const (
	// InputServiceName refers to the name of the cherami in service
	InputServiceName = "cherami-inputhost"
	// OutputServiceName refers to the name of the cherami out service
	OutputServiceName = "cherami-outputhost"
	// FrontendServiceName refers to the name of the cherami frontend service
	FrontendServiceName = "cherami-frontendhost"
	// ControllerServiceName refers to the name of the cherami controller service
	ControllerServiceName = "cherami-controllerhost"
	// StoreServiceName refers to the name of the cherami store service
	StoreServiceName = "cherami-storehost"
	// ReplicatorServiceName refers to the name of the cherami replicator service
	ReplicatorServiceName = "cherami-replicator"
)
View Source
const (
	// MaxHostOverallConn is the maximam overall connection limit for this host
	// TODO: Need to figure out the suitable values
	MaxHostOverallConn = 100000
	// HostOverallConnLimit is the overall connection limit for this host
	HostOverallConnLimit = 10000

	// MaxHostPerSecondConn is the maximam per second  rate limit for this host
	// TODO: Need to figure out the suitable values
	MaxHostPerSecondConn = 10000
	// HostPerSecondConnLimit is the per second rate limit for this host
	HostPerSecondConnLimit = 1000

	//MaxHostPerConnMsgsLimitPerSecond is the maximam for per connection messages limit
	// TODO: Need to figure out the suitable values
	MaxHostPerConnMsgsLimitPerSecond = 800000
	// HostPerConnMsgsLimitPerSecond is the per connection messages limit
	HostPerConnMsgsLimitPerSecond = 80000

	//MaxHostPerExtentMsgsLimitPerSecond is the maximam for per extent messages limit
	// TODO: Need to figure out the suitable values
	MaxHostPerExtentMsgsLimitPerSecond = 200000
	// HostPerExtentMsgsLimitPerSecond is the per extent messages limit
	HostPerExtentMsgsLimitPerSecond = 20000

	// MaxHostMaxConnPerDestination is the maximam for max connections per destination
	// TODO: Need to figure out the suitable values
	MaxHostMaxConnPerDestination = 10000
	// HostMaxConnPerDestination is the max connections per destination
	HostMaxConnPerDestination = 1000
)

some default values for the limits TODO: this will be moved behind a separate "limits" interface which is also dynamically tunable

View Source
const (
	FlushThreshold int           = 64
	FlushTimeout   time.Duration = 5 * time.Millisecond
)

Flush stream thresholds; this is used by the "pumps" that wrap the websocket-stream and provide go-channel interface to read/write from the stream. the flush thresholds below control how often we do a "Flush" on the tchannel-stream. Currently configured for every 64 messages sent or every 5 milliseconds (whichever is sooner)

View Source
const MaxDuration time.Duration = 1<<62 - 1

MaxDuration is maximum time duration

View Source
const TagAckID = `ackID`

TagAckID is the logging tag for AckId

View Source
const TagAddr = `addr`

TagAddr is the logging tag for address

View Source
const TagCnsPth = `cnsPth`

TagCnsPth is the logging tag for Consumer group Path

View Source
const TagCnsm = `cnsmID`

TagCnsm is the logging tag for Consumer Group UUID

View Source
const TagCnsmID = `cnsmID`

TagCnsmID is the logging tag for the consumer ID

View Source
const TagCtrl = `ctrlID`

TagCtrl is the logging tag for Extent Controller UUID

View Source
const TagDLQID = `dlqID`

TagDLQID is the logging tag for a Dead Letter Queue destination UUID

View Source
const TagDbPath = "dbpath"

TagDbPath is the path to the db of the extent in manyrocks

View Source
const TagDeploymentName = `deployment`

TagDeploymentName is the logging tag for deployment name

View Source
const TagDplName = `deploymentName`

TagDplName is the logging tag for deployment name

View Source
const TagDst = `destID`

TagDst is the tag for Destination UUID

View Source
const TagDstPth = `dstPth`

TagDstPth is the logging tag for Destination Path

View Source
const TagErr = `err`

TagErr is the tag for error object message

View Source
const TagEvent = `event`

TagEvent is for "event" from Discovery and Failure Detection Daemon

View Source
const TagExt = `extnID`

TagExt is the logging tag for Extent UUID

View Source
const TagExtentCacheSize = `extentCacheSize`

TagExtentCacheSize is the logging tag for PathCache ExtentCache map size

View Source
const TagExtentStatus = `extStatus`

TagExtentStatus is for extent status

View Source
const TagFrnt = `frntID`

TagFrnt is the logging tag for Frontend UUID

View Source
const TagHostConnLimit = "hostconnlimit"

TagHostConnLimit is the log tag for hostconnection limit

View Source
const TagHostIP = `hostIP`

TagHostIP is the logging tag for host IP

View Source
const TagHostName = `hostName`

TagHostName is the logging tag for host name

View Source
const TagHostPort = "hostport"

TagHostPort is the log tag for hostport

View Source
const TagIn = `inhoID`

TagIn is the logging tag for Inputhost UUID

View Source
const TagInPubConnID = `inPubConnID`

TagInPubConnID is the logging tag for input pubconnection ID

View Source
const TagInPutAckID = `inPutAckID`

TagInPutAckID is the logging tag for PutMessageAck ID

View Source
const TagInReplicaHost = `inReplicaHost`

TagInReplicaHost is the logging tag for replica host on input

View Source
const TagModule = `module`

TagModule is the logging tag used to identify the module within a service

View Source
const TagMsgID = `msgID`

TagMsgID is the logging tag for MsgId

View Source
const TagOut = `outhID`

TagOut is the logging tag for Outputhost UUID

View Source
const TagReconfigureID = `reconfigID`

TagReconfigureID is the logging tag for reconfiguration identifiers

View Source
const TagReconfigureType = `reconfigType`

TagReconfigureType is the logging tag for reconfiguration type

View Source
const TagReplicator = "replicatorID"

TagReplicator is the logging tag for replicator host UUID

View Source
const TagRunnerName = "runnerName"

TagRunnerName is the log tag for runner name, value is basic, timers, dlqTimedout, etc.

View Source
const TagSeq = `seq`

TagSeq is for sequence number

View Source
const TagService = "service"

TagService is the log tag for the service

View Source
const TagSlowDownSeconds = `slowDownSecs`

TagSlowDownSeconds is the logging tag for slow down time on consconnection

View Source
const TagState = `state`

TagState is for "state" in event handlers

View Source
const TagStor = `storID`

TagStor is the logging tag for StoreHost UUID

View Source
const TagTbSleep = "tokenbucketduration"

TagTbSleep is the log tag for token bucket sleep duration

View Source
const TagTenancy = `tenancy`

TagTenancy is the logging tag for tenancy

View Source
const TagUnknowPth = `unknowPth`

TagUnknowPth is the logging tag for Unknow Path

View Source
const TagUpdateUUID = `updateUUID`

TagUpdateUUID is the logging tag for reconfiguration update UUIDs

View Source
const TagZoneName = `zoneName`

TagZoneName is the logging tag for zone name

View Source
const (
	// UUIDStringLength is the length of an UUID represented as a hex string
	UUIDStringLength = 36 // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
)

Variables

View Source
var ConsumerGroupRegex = PathRegex

ConsumerGroupRegex regex for consumer group path

View Source
var ErrInsufficientHosts = errors.New("Not enough hosts to serve the request")

ErrInsufficientHosts is thrown when there are not enough hosts to serve the request

View Source
var ErrListenerAlreadyExist = errors.New("Listener already exist for the service")

ErrListenerAlreadyExist is thrown on a duplicate AddListener call from the same listener

View Source
var (
	// ErrLoadReportThrottled is the error returned by LoadReporter when it runs out of tokens to serve the request
	ErrLoadReportThrottled = errors.New("Too many load reports from the host.")
)
View Source
var ErrNoClient = &cherami.InternalServiceError{Message: "Unable to create client"}

ErrNoClient is returned when the host is already shutdown

View Source
var ErrUUIDLookupFailed = errors.New("Cannot find ip:port corresponding to uuid")

ErrUUIDLookupFailed is thrown when a uuid cannot be mapped to addr

View Source
var ErrUnknownService = errors.New("Service not tracked by RingpopMonitor")

ErrUnknownService is thrown for a service that is not tracked by this instance

View Source
var PathDLQRegex = regexp.MustCompile(`^/[\w.]*[a-zA-Z][\w.]*/[\w.]*[a-zA-Z][\w.]*.dlq$`)

PathDLQRegex regex for dlq destination path

View Source
var PathRegex = regexp.MustCompile(`^/[\w.]*[a-zA-Z][\w.]*/[\w.]*[a-zA-Z][\w.]*$`)

PathRegex regex for destination path

View Source
var PathRegexAllowUUID, _ = regexp.Compile(`^(/[\w.]*[a-zA-Z][\w.]*/[\w.]*[a-zA-Z][\w.]*|[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12})$`)

PathRegexAllowUUID For special destinations (e.g. Dead letter queues) we allow a string UUID as path

View Source
var ServiceToPort = map[string]string{
	InputServiceName:      "4240",
	OutputServiceName:     "4254",
	StoreServiceName:      "4253",
	FrontendServiceName:   "4922",
	ControllerServiceName: "5425",
	ReplicatorServiceName: "6280",
}

ServiceToPort is service name to ports mapping This map should be syced with the port nums in config file and use by command line. We need this because for command line, we can not generated the config path automatically.

View Source
var UUIDRegex, _ = regexp.Compile(`^[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12}$`)

UUIDRegex regex for uuid

Functions

func AdminNotificationTypePtr

func AdminNotificationTypePtr(notificationType admin.NotificationType) *admin.NotificationType

AdminNotificationTypePtr makes a copy and returns the pointer to a MetadataNotificationType.

func AwaitWaitGroup

func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool

AwaitWaitGroup calls Wait on the given wait Returns true if the Wait() call succeeded before the timeout Returns false if the Wait() did not return before the timeout

func BoolPtr

func BoolPtr(v bool) *bool

BoolPtr makes a copy and returns the pointer to a bool.

func BootstrapRingpop

func BootstrapRingpop(rp *ringpop.Ringpop, ipaddr string, port int, cfg configure.CommonServiceConfig) error

BootstrapRingpop tries to bootstrap the given ringpop instance using the hosts list

func CQLTimestampToUnixNano

func CQLTimestampToUnixNano(milliseconds int64) int64

CQLTimestampToUnixNano converts CQL timestamp to UnixNano

func CalculateRate

func CalculateRate(last, curr SequenceNumber, lastTime, currTime UnixNanoTime) float64

CalculateRate does a simple rate calculation

func CheramiChecksumOptionPtr

func CheramiChecksumOptionPtr(checksumOption cherami.ChecksumOption) *cherami.ChecksumOption

CheramiChecksumOptionPtr makes a copy and return the pointer too a CheramiChecksumOption.

func CheramiConsumerGroupExtentStatusPtr

func CheramiConsumerGroupExtentStatusPtr(status metadata.ConsumerGroupExtentStatus) *metadata.ConsumerGroupExtentStatus

CheramiConsumerGroupExtentStatusPtr makes a copy and returns the pointer to a CheramiConsumerGroupExtentStatus.

func CheramiConsumerGroupStatusPtr

func CheramiConsumerGroupStatusPtr(status cherami.ConsumerGroupStatus) *cherami.ConsumerGroupStatus

CheramiConsumerGroupStatusPtr makes a copy and returns the pointer to a CheramiConsumerGroupStatus.

func CheramiDestinationStatusPtr

func CheramiDestinationStatusPtr(status cherami.DestinationStatus) *cherami.DestinationStatus

CheramiDestinationStatusPtr makes a copy and returns the pointer to a CheramiDestinationStatus.

func CheramiDestinationType

func CheramiDestinationType(internalDestinationType shared.DestinationType) (cheramiDestinationType cherami.DestinationType, err error)

CheramiDestinationType converts from shared.DestinationType to cherami.DestinationType

func CheramiDestinationTypePtr

func CheramiDestinationTypePtr(destType cherami.DestinationType) *cherami.DestinationType

CheramiDestinationTypePtr makes a copy and returns the pointer to a CheramiDestinationType.

func CheramiInputHostCommandTypePtr

func CheramiInputHostCommandTypePtr(cmdType cherami.InputHostCommandType) *cherami.InputHostCommandType

CheramiInputHostCommandTypePtr makes a copy and returns the pointer to a CheramiInputHostCommandType.

func CheramiOutputHostCommandTypePtr

func CheramiOutputHostCommandTypePtr(cmdType cherami.OutputHostCommandType) *cherami.OutputHostCommandType

CheramiOutputHostCommandTypePtr makes a copy and returns the pointer to a CheramiOutputHostCommandType.

func CheramiProtocolPtr

func CheramiProtocolPtr(protocol cherami.Protocol) *cherami.Protocol

CheramiProtocolPtr makes a copy and returns the pointer to a CheramiProtocol.

func CheramiStatusPtr

func CheramiStatusPtr(status cherami.Status) *cherami.Status

CheramiStatusPtr makes a copy and returns the pointer to a CheramiStatus.

func ClassifyErrorByType

func ClassifyErrorByType(in error) metrics.ErrorClass

ClassifyErrorByType gives the metrics error class for any cherami or common error

func ConstructAckID

func ConstructAckID(sessionID uint16, ackMgrID uint16, seqNum uint32, address int64) string

ConstructAckID is a helper routine to construct the ackID from the given args

func ConsumerGroupExtentMetricsPtr

func ConsumerGroupExtentMetricsPtr(cgExtMetrics controller.ConsumerGroupExtentMetrics) *controller.ConsumerGroupExtentMetrics

ConsumerGroupExtentMetricsPtr makes a copy and returns the pointer to a ConsumerGroupExtentMetrics.

func ConsumerGroupMetricsPtr

func ConsumerGroupMetricsPtr(cgMetrics controller.ConsumerGroupMetrics) *controller.ConsumerGroupMetrics

ConsumerGroupMetricsPtr makes a copy and returns the pointer to a ConsumerGroupMetrics.

func ContainsEmpty added in v1.26.0

func ContainsEmpty(a []string) bool

ContainsEmpty scans a string slice for an empty string, returning true if one is found

func ContainsString added in v1.26.0

func ContainsString(a []string, x string) bool

ContainsString scans a string slice for a matching string, returning true if one is found

func ConvertDownstreamErrors

func ConvertDownstreamErrors(l bark.Logger, in error) (metrics.ErrorClass, error)

ConvertDownstreamErrors is a helper function to convert a error from metadata client or controller client to client-cherami.thrift error that can be returned to caller. It also classifies the error for metrics

func CreateCassandraKeyspace

func CreateCassandraKeyspace(s *gocql.Session, keyspace string, replicas int, overwrite bool) (err error)

CreateCassandraKeyspace creates the keyspace using this session for given replica count

func CreateHyperbahnClient

func CreateHyperbahnClient(ch *tchannel.Channel, bootstrapFile string) *hyperbahn.Client

CreateHyperbahnClient returns a hyperbahn client

func CreateInputHostAdminClient

func CreateInputHostAdminClient(ch *tchannel.Channel, hostPort string) (admin.TChanInputHostAdmin, error)

CreateInputHostAdminClient creates and returns tchannel client for the input host admin API

func CreateOutputHostAdminClient

func CreateOutputHostAdminClient(ch *tchannel.Channel, hostPort string) (admin.TChanOutputHostAdmin, error)

CreateOutputHostAdminClient creates and returns tchannel client for the output host admin API

func CreateRingpop

func CreateRingpop(service string, ch *tchannel.Channel, ipaddr string, port int) *(ringpop.Ringpop)

CreateRingpop instantiates the ringpop for the provided channel and host,

func DestinationExtentMetricsPtr

func DestinationExtentMetricsPtr(dstExtMetrics controller.DestinationExtentMetrics) *controller.DestinationExtentMetrics

DestinationExtentMetricsPtr makes a copy and returns the pointer to a DestinationExtentMetrics.

func DestinationMetricsPtr

func DestinationMetricsPtr(dstMetrics controller.DestinationMetrics) *controller.DestinationMetrics

DestinationMetricsPtr makes a copy and returns the pointer to a DestinationMetrics.

func DropCassandraKeyspace

func DropCassandraKeyspace(s *gocql.Session, keyspace string) (err error)

DropCassandraKeyspace drops the given keyspace, if it exists

func ExtrapolateDifference

func ExtrapolateDifference(observedA, observedB SequenceNumber, observedARate, observedBRate float64, observedATime, observedBTime, extrapolatedTime UnixNanoTime, maxExtrapolationTime Seconds) (extrapolated int64)

ExtrapolateDifference calculates the extrapolated difference in two observed value with rates, at some arbitrary time. It is assumed that A > B, so if B is extrapolated to be greater than A, the difference will be presumed to be zero.

func FindNearestInt added in v1.26.0

func FindNearestInt(target int64, nums ...int64) (nearest int64)

FindNearestInt finds the integer that is closest to the given 'target'

func Float64Ptr

func Float64Ptr(v float64) *float64

Float64Ptr makes a copy and returns the pointer to an int64.

func FmtAckID

func FmtAckID(s string) string

FmtAckID formats a string to be used with TagAckID

func FmtAddr

func FmtAddr(i int64) string

FmtAddr formats an int64 to be used with TagAddr

func FmtCnsPth

func FmtCnsPth(s string) string

FmtCnsPth formats a string to be used with TagCnsPth

func FmtCnsm

func FmtCnsm(s string) string

FmtCnsm formats a string to be used with TagCnsm

func FmtCnsmID

func FmtCnsmID(s int) string

FmtCnsmID formats an int to be used with TagCnsmID

func FmtCtrl

func FmtCtrl(s string) string

FmtCtrl formats a string to be used with TagCtrl

func FmtDLQID

func FmtDLQID(s string) string

FmtDLQID formats a string to be used with TagDLQID

func FmtDplName

func FmtDplName(s string) string

FmtDplName formats a string to be used with TagDplName

func FmtDst

func FmtDst(s string) string

FmtDst formats a string to be used with TagDst

func FmtDstPth

func FmtDstPth(s string) string

FmtDstPth formats a string to be used with TagDstPth

func FmtExt

func FmtExt(s string) string

FmtExt formats a string to be used with TagExt

func FmtExtentStatus added in v1.26.0

func FmtExtentStatus(status shared.ExtentStatus) string

FmtExtentStatus formats ExtentStatus to be used with TagExtentStatus

func FmtFrnt

func FmtFrnt(s string) string

FmtFrnt formats a string to be used with TagFrnt

func FmtHostConnLimit

func FmtHostConnLimit(s int) string

FmtHostConnLimit formats an int to be used with TagHostConnLimit

func FmtHostIP

func FmtHostIP(s string) string

FmtHostIP formats a string to be used with TagHostIP

func FmtHostName

func FmtHostName(s string) string

FmtHostName formats a string to be used with TagHostName

func FmtHostPort

func FmtHostPort(s string) string

FmtHostPort formats a string to be used with TagHostPort

func FmtIn

func FmtIn(s string) string

FmtIn formats a string to be used with TagIn

func FmtInPubConnID

func FmtInPubConnID(s int) string

FmtInPubConnID formats an int to be used with TagInPubConnID

func FmtInPutAckID

func FmtInPutAckID(s string) string

FmtInPutAckID formats a string to be used with TagInPutAckID

func FmtInReplicaHost

func FmtInReplicaHost(s string) string

FmtInReplicaHost formats a string to be used with TagInReplicaHost

func FmtMsgID

func FmtMsgID(s string) string

FmtMsgID formats a string to be used with TagMsgID

func FmtOut

func FmtOut(s string) string

FmtOut formats a string to be used with TagOut

func FmtReconfigureID

func FmtReconfigureID(s string) string

FmtReconfigureID formats a string to be used with TagReconfigureID

func FmtReconfigureType

func FmtReconfigureType(s admin.NotificationType) string

FmtReconfigureType formats admin.NotificationType to be used with TagNotificationType

func FmtService

func FmtService(s string) string

FmtService formats a string to be used with TagService

func FmtSlowDown

func FmtSlowDown(s time.Duration) string

FmtSlowDown formats an int to be used with TagSlowDown

func FmtStor

func FmtStor(s string) string

FmtStor formats a string to be used with TagStor

func FmtTbSleep

func FmtTbSleep(s time.Duration) string

FmtTbSleep formats a time.Duration to be used with TagTbSleep

func FmtTenancy

func FmtTenancy(s string) string

FmtTenancy formats a string to be used with TagTenancy

func FmtZoneName

func FmtZoneName(s string) string

FmtZoneName formats a string to be used with TagZoneName

func GetConnectionKey

func GetConnectionKey(host *cherami.HostAddress) string

GetConnectionKey is used to create a key used by connections for looking up connections

func GetDLQPathNameFromCGName

func GetDLQPathNameFromCGName(CGName string) (string, error)

GetDLQPathNameFromCGName function return the DLQ destination name based on the consumer group passed Usually pass the Consumer group name to get a DLQ path name DEVNOTE: DO NOT QUERY A DLQ DESTINATION BY THIS NAME. This name is for reporting purposes only. All destination APIs support passing the DLQ UUID as the path.

func GetDateTag

func GetDateTag() string

GetDateTag returns the current date used for tagging daily metric

func GetDefaultLogger

func GetDefaultLogger() bark.Logger

GetDefaultLogger is a utility routine to get the default logger

func GetDirectoryName

func GetDirectoryName(path string) (string, error)

GetDirectoryName function gives the directory name given a path used for destination or consumer groups

func GetEnvVariableFromHostPort

func GetEnvVariableFromHostPort(hostPort string) (envVar string)

GetEnvVariableFromHostPort gets the environment variable corresponding to this host port. XXX: this can be removed once we move to ringpop labels and exchange websocket port as part of the ringpop metadata

func GetLocalClusterInfo

func GetLocalClusterInfo(deployment string) (zone string, tenancy string)

GetLocalClusterInfo gets the zone and tenancy from the given deployment

func GetOpenAppendStreamRequestHTTP

func GetOpenAppendStreamRequestHTTP(httpHeader http.Header) (req *store.OpenAppendStreamRequest, err error)

GetOpenAppendStreamRequestHTTP extracts OpenAppendStreamRequest from http headers

func GetOpenAppendStreamRequestHeaders

func GetOpenAppendStreamRequestHeaders(req *store.OpenAppendStreamRequest) (headers map[string]string)

GetOpenAppendStreamRequestHeaders converts an OpenAppendStreamRequest struct to headers to pass as tchannel headers to OpenAppendStream

func GetOpenAppendStreamRequestStruct

func GetOpenAppendStreamRequestStruct(headers map[string]string) (req *store.OpenAppendStreamRequest, err error)

GetOpenAppendStreamRequestStruct extracts OpenAppendStreamRequest from tchannel headers

func GetOpenReadStreamRequestHTTP

func GetOpenReadStreamRequestHTTP(httpHeader http.Header) (req *store.OpenReadStreamRequest, err error)

GetOpenReadStreamRequestHTTP extracts OpenReadStreamRequest from http headers

func GetOpenReadStreamRequestHTTPHeaders

func GetOpenReadStreamRequestHTTPHeaders(req *store.OpenReadStreamRequest) http.Header

GetOpenReadStreamRequestHTTPHeaders converts an OpenReadStreamRequest struct to http headers for OpenReadStream

func GetOpenReadStreamRequestHeaders

func GetOpenReadStreamRequestHeaders(req *store.OpenReadStreamRequest) (headers map[string]string)

GetOpenReadStreamRequestHeaders converts an OpenReadStreamRequest struct to headers to pass as tchannel headers to OpenReadStream

func GetOpenReadStreamRequestStruct

func GetOpenReadStreamRequestStruct(headers map[string]string) (req *store.OpenReadStreamRequest, err error)

GetOpenReadStreamRequestStruct extracts OpenReadStreamRequest from tchannel headers

func GetOpenReplicationReadStreamRequestHTTPHeaders

func GetOpenReplicationReadStreamRequestHTTPHeaders(req *OpenReplicationReadStreamRequest) http.Header

GetOpenReplicationReadStreamRequestHTTPHeaders converts an OpenReplicationReadStreamRequest struct to http headers for OpenReplicationReadStreamRequest

func GetOpenReplicationRemoteReadStreamRequestHTTPHeaders

func GetOpenReplicationRemoteReadStreamRequestHTTPHeaders(req *OpenReplicationRemoteReadStreamRequest) http.Header

GetOpenReplicationRemoteReadStreamRequestHTTPHeaders converts an OpenReplicationRemoteReadStreamRequest struct to http headers for OpenReplicationRemoteReadStreamRequest

func GetRandInt64

func GetRandInt64(min int64, max int64) int64

GetRandInt64 is used to get a 64 bit random number between min and max

func GetTagsFromPath

func GetTagsFromPath(path string) (string, error)

GetTagsFromPath function return the tags name for path based on directory path name passed Usually pass the Consumer group name or a destination path name to get a tag name

func Int16Ptr added in v1.26.0

func Int16Ptr(v int16) *int16

Int16Ptr makes a copy and returns the pointer to an int16.

func Int32Ptr

func Int32Ptr(v int32) *int32

Int32Ptr makes a copy and returns the pointer to an int32.

func Int64Ptr

func Int64Ptr(v int64) *int64

Int64Ptr makes a copy and returns the pointer to an int64.

func IntPtr

func IntPtr(v int) *int

IntPtr makes a copy and returns the pointer to an int.

func InternalChecksumOptionPtr

func InternalChecksumOptionPtr(checksumOption shared.ChecksumOption) *shared.ChecksumOption

InternalChecksumOptionPtr makes a copy and return the pointer too a internal shared ChecksumOption.

func InternalConsumerGroupStatusPtr

func InternalConsumerGroupStatusPtr(status shared.ConsumerGroupStatus) *shared.ConsumerGroupStatus

InternalConsumerGroupStatusPtr makes a copy and returns the pointer to a internal shared ConsumerGroupStatus.

func InternalConsumerGroupTypePtr

func InternalConsumerGroupTypePtr(cgType shared.ConsumerGroupType) *shared.ConsumerGroupType

InternalConsumerGroupTypePtr makes a copy and returns the pointer to a internal shared ConsumerGroupType.

func InternalDestinationStatusPtr

func InternalDestinationStatusPtr(status shared.DestinationStatus) *shared.DestinationStatus

InternalDestinationStatusPtr makes a copy and returns the pointer to a internal shared DestinationStatus.

func InternalDestinationTypePtr

func InternalDestinationTypePtr(destType shared.DestinationType) *shared.DestinationType

InternalDestinationTypePtr makes a copy and returns the pointer to a internal shared DestinationType.

func InternalExtentReplicaReplicationStatusTypePtr

func InternalExtentReplicaReplicationStatusTypePtr(status shared.ExtentReplicaReplicationStatus) *shared.ExtentReplicaReplicationStatus

InternalExtentReplicaReplicationStatusTypePtr makes a copy and returns the pointer to a ExtentReplicaReplicationStatus

func IsDLQDestination

func IsDLQDestination(dstDesc *shared.DestinationDescription) bool

IsDLQDestination checks whether a destination is dlq type

func IsDLQDestinationPath

func IsDLQDestinationPath(path string) bool

IsDLQDestinationPath checks whether a destination path is dlq type

func IsDevelopmentEnvironment

func IsDevelopmentEnvironment(deploymentName string) (devEnv bool)

IsDevelopmentEnvironment detects if we are running in a development environment

func IsRemoteZoneExtent

func IsRemoteZoneExtent(extentOriginZone string, localZone string) bool

IsRemoteZoneExtent returns whether the extent is a remote zone extent

func IsRetryableTChanErr

func IsRetryableTChanErr(err error) bool

IsRetryableTChanErr returns true if the given tchannel error is a retryable error.

func IsValidServiceName

func IsValidServiceName(input string) bool

IsValidServiceName returns true if the given input is a valid service name, false otherwise

func LoadCassandraSchema

func LoadCassandraSchema(cqlshpath string, fileName string, keyspace string) (err error)

LoadCassandraSchema loads the schema from the given .cql file on this keyspace using cqlsh

func MaxInt

func MaxInt(a, b int) int

MaxInt returns the max of given two integers

func MaxInt64

func MaxInt64(a, b int64) int64

MaxInt64 returns the max of given two integers

func MetadataConsumerGroupExtentStatusPtr

func MetadataConsumerGroupExtentStatusPtr(status metadata.ConsumerGroupExtentStatus) *metadata.ConsumerGroupExtentStatus

MetadataConsumerGroupExtentStatusPtr makes a copy and returns the pointer to a MetadataConsumerGroupExtentStatus.

func MetadataExtentReplicaStatusPtr

func MetadataExtentReplicaStatusPtr(status shared.ExtentReplicaStatus) *shared.ExtentReplicaStatus

MetadataExtentReplicaStatusPtr makes a copy and returns the pointer to a MetadataExtentReplicaStatus.

func MetadataExtentStatusPtr

func MetadataExtentStatusPtr(status shared.ExtentStatus) *shared.ExtentStatus

MetadataExtentStatusPtr makes a copy and returns the pointer to a MetadataExtentStatus.

func MinInt

func MinInt(a, b int) int

MinInt returns the min of given two integers

func MinInt64

func MinInt64(a, b int64) int64

MinInt64 returns the min of given two integers

func NewCassandraCluster

func NewCassandraCluster(clusterHosts string) *gocql.ClusterConfig

NewCassandraCluster creates a cassandra cluster given comma separated list of clusterHosts

func NewMetricReporterWithHostname

func NewMetricReporterWithHostname(cfg configure.CommonServiceConfig) metrics.Reporter

NewMetricReporterWithHostname create statsd/simple reporter based on config

func NewTestMetricsReporter added in v0.2.0

func NewTestMetricsReporter() metrics.Reporter

NewTestMetricsReporter creates a test reporter that allows registration of handler functions

func NodeMetricsPtr

func NodeMetricsPtr(nodeMetrics controller.NodeMetrics) *controller.NodeMetrics

NodeMetricsPtr makes a copy and returns the pointer to a NodeMetrics.

func OverrideValueByPrefix

func OverrideValueByPrefix(logFn func() bark.Logger, path string, overrides []string, defaultVal int64, valName string) int64

OverrideValueByPrefix takes a list of override rules in the form 'prefix=val' and a given string, and determines the most specific rule that applies to the given string. It then replaces the given default value with the override value. logFn is a logging closure that allows lazy instatiation of a logger interface to log error conditions and override status. valName is used for logging purposes, to differentiate multiple instantiations in the same context

As an example, you could override a parameter, like the number of desired extents, according to various destination paths. We could try to have 8 extents by default, and give destinations beginning with /test only 1, and give a particular destination specifically a higher amount. To achieve this, we could configure the overrides like this:

overrides := {`=8`, `/test=1`, `/JobPlatform/TripEvents$=16`}

func RWLockReadAndConditionalWrite

func RWLockReadAndConditionalWrite(m *sync.RWMutex, readFn func() bool, writeFn func())

RWLockReadAndConditionalWrite implements the RWLock Read+Read&Conditional-Write pattern. m is the RWMutex covering a shared resource readFn is a function that returns a true if a write on the shared resource is required. writeFn is a function that updates the shared resource. The result of the read/write can be returned by capturing return variables in your provided functions

func RandomBytes

func RandomBytes(size int) []byte

RandomBytes generates random bytes of given size

func RolePtr

func RolePtr(role controller.Role) *controller.Role

RolePtr makes a copy and returns the pointer to a SKU.

func SKUPtr

func SKUPtr(sku controller.SKU) *controller.SKU

SKUPtr makes a copy and returns the pointer to a SKU.

func ServiceLoop

func ServiceLoop(port int, cfg configure.CommonAppConfig, service *Service)

ServiceLoop runs the http admin endpoints. This is a blocking call.

func SetupServerConfig

func SetupServerConfig(configurator configure.Configure) configure.CommonAppConfig

SetupServerConfig reads on-disk config (in config/)

func ShortenGUIDString

func ShortenGUIDString(s string) string

ShortenGUIDString takes a string with one or more GUIDs and elides them to make it more human readable. It turns "354754bd-b73e-4d20-8021-ab93a3d145c0:67af70c5-f45e-4b3d-9d20-6758195e2ff4:3:2" into "354754bd:67af70c5:3:2"

func SpinWaitOnCondition

func SpinWaitOnCondition(condition ConditionFunc, timeout time.Duration) bool

SpinWaitOnCondition busy waits for a given condition to be true until the timeout Returns true if the condition was satisfied, false on timeout

func SplitHostPort

func SplitHostPort(hostPort string) (string, int, error)

SplitHostPort takes a x.x.x.x:yyyy string and split it into host and ports

func StartEKG

func StartEKG(lclLg bark.Logger)

StartEKG starts a goroutine to check the heartbeats

func StoreExtentMetricsPtr

func StoreExtentMetricsPtr(storeExtMetrics controller.StoreExtentMetrics) *controller.StoreExtentMetrics

StoreExtentMetricsPtr makes a copy and returns the pointer to a StoreExtentMetrics.

func StringPtr

func StringPtr(v string) *string

StringPtr makes a copy and returns the pointer to a string.

func StringSetEqual added in v1.26.0

func StringSetEqual(a, b []string) bool

StringSetEqual checks for set equality (i.e. non-ordered, discounting duplicates) for two string slices StringSetEqual([]string{`a`,`a`,`b`,`b`}, []string{`a`,`a`,`b`}) == TRUE !! DEVNOTE: This is O(N^2), so don't use it with large N; better if len(a) > len(b) if you have duplicates

func TSPtr

func TSPtr(v time.Time) *time.Time

TSPtr makes a copy and returns the pointer to an Time.

func UUIDHashCode

func UUIDHashCode(key string) uint32

UUIDHashCode is a hash function for hashing string uuid if the uuid is malformed, then the hash function always returns 0 as the hash value

func UUIDToUint16

func UUIDToUint16(s string) uint16

UUIDToUint16 uses the UUID and returns a uint16 out of it

func Uint32Ptr

func Uint32Ptr(v uint32) *uint32

Uint32Ptr makes a copy and returns the pointer to a uint32.

func Uint64Ptr

func Uint64Ptr(v uint64) *uint64

Uint64Ptr makes a copy and returns the pointer to a uint64.

func UnixNanoToCQLTimestamp

func UnixNanoToCQLTimestamp(timestamp int64) int64

UnixNanoToCQLTimestamp converts UnixNano to CQL timestamp

func WSStart

func WSStart(listenAddress string, port int, wsservice WSService)

WSStart register websocket handlers and spin up websocket server. This is not a blocking call

func WaitTimeout

func WaitTimeout(timeout time.Duration, fn func()) bool

WaitTimeout waits for given func until timeout (return true if timeout)

Types

type AckID

type AckID struct {
	MutatedID CombinedID
	Address   int64
}

AckID designates a consumer message to ack/nack

func AckIDFromString

func AckIDFromString(ackID string) (*AckID, error)

AckIDFromString deserializes a string into the object.

func (AckID) ConstructCombinedID

func (a AckID) ConstructCombinedID(sessionID uint64, ackMgrID uint64, seqNum uint64) CombinedID

ConstructCombinedID constructs the combinedID from the session, ackmgr and the seq numbers based on the bit masks

func (AckID) ToString

func (a AckID) ToString() string

ToString serializes AckID object into a base64 encoded string First 64 bits of the AckID is as follows: 16 bit - Session ID (constructed from the uuid) 16 bit - Monotonically increasing number to identify all unique ack managers within a host 32 bit - Sequence Number within the AckManager which is used to update the data structure within

the ack manager

The reason for having the above fileds in the ackID is as follows: sessionID - to make sure ack is to the same outputhost (let's say to prevent a bad client) ackID - to uniquely identify the ack managers within this outputhost seqNum - to identify the data structure within the ackMgr Address - to make sure we validate the ack based on what we get from the store We need all the above to prevent collisions

type CliHelper

type CliHelper interface {
	// GetDefaultOwnerEmail is used to get the default owner email
	GetDefaultOwnerEmail() string
	// SetDefaultOwnerEmail is used to set the default owner email to be used in the CLI
	SetDefaultOwnerEmail(string)
	// GetCanonicalZone is used to get the canonical zone name from the
	// given zone. This is useful in cases where we have short names for
	// zones. For example, if "zone1" and  "z1" both map to "zone1", then
	// use "zone1"
	GetCanonicalZone(zone string) (string, error)
	// SetCanonicalZones is used to populate all valid zones that can be given from CLI
	SetCanonicalZones(map[string]string)
}

CliHelper is the interface to help with the args passed to CLI

func NewCliHelper

func NewCliHelper() CliHelper

NewCliHelper is used to create an uber specific CliHelper

type ClientFactory

type ClientFactory interface {
	// GetAdminControllerClient gets the thrift version of the controller client
	GetAdminControllerClient() (admin.TChanControllerHostAdmin, error)
	// GetThriftStoreClient gets the thrift version of the store client
	GetThriftStoreClient(replica string, pathUUID string) (store.TChanBStore, error)
	// ReleaseThriftStoreClient releases the ref on this pathUUID
	ReleaseThriftStoreClient(pathUUID string)
	// GetThriftStoreClientUUID first resolves the store UUID to an address, before creating a thrift client
	GetThriftStoreClientUUID(storeUUID string, contextID string) (store.TChanBStore, string, error)
	// GetControllerClient gets the thrift client for making calls to Cherami Controller
	GetControllerClient() (controller.TChanController, error)
	// GetReplicatorClient gets the thrift client for making calls to local Replicator
	GetReplicatorClient() (replicator.TChanReplicator, error)
}

ClientFactory is the thrift clients interface which is used to get thrift clients for this service. This make sure we don't allocate new thrift clients all the time and also makes sure the returned client is up and running by consulting ringpop monitor TODO: Add store, in, out, etc here so that we don't end up creating new thrift clients all the time

func NewClientFactory

func NewClientFactory(rpm RingpopMonitor, ch *tchannel.Channel, log bark.Logger) ClientFactory

NewClientFactory just instantiates a thriftClientImpl object

type CombinedID

type CombinedID uint64

CombinedID is the one which holds session, ackmgr and seqnum together

func (CombinedID) DeconstructCombinedID

func (c CombinedID) DeconstructCombinedID() (uint16, uint16, uint32)

DeconstructCombinedID deconstructs the combinedID

type ConcurrentMap

type ConcurrentMap interface {
	// Get returns the value for the given key
	Get(key string) (interface{}, bool)
	// Contains returns true if the key exist and false otherwise
	Contains(key string) bool
	// Put records the mapping from given key to value
	Put(key string, value interface{})
	// PutIfNotExist records the key value mapping only
	// if the mapping does not already exist
	PutIfNotExist(key string, value interface{}) bool
	// Remove deletes the key from the map
	Remove(key string)
	// Iter returns an iterator to the map
	Iter() MapIterator
	// Size returns the number of items in the map
	Size() int
}

ConcurrentMap is a generic interface for any implementation of a dictionary or a key value lookup table that is thread safe.

func NewShardedConcurrentMap

func NewShardedConcurrentMap(initialCap int, hashfn HashFunc) ConcurrentMap

NewShardedConcurrentMap returns an instance of ShardedConcurrentMap

ShardedConcurrentMap is a thread safe map that maintains upto nShards number of maps internally to allow nShards writers to be acive at the same time. This map *does not* use re-entrant locks, so access to the map during iterator can cause a dead lock.

@param initialSz

The initial size for the map

@param hashfn

The hash function to use for sharding

type ConditionFunc

type ConditionFunc func() bool

ConditionFunc represents an expression that evaluates to true on when some condition is satisfied and false otherwise

type CounterBank

type CounterBank struct {
	// contains filtered or unexported fields
}

CounterBank represents a set of counters that all belong to the same group ex - dst or extent counters A counterBank supports methods to inc/dec/get counter values. Each of these methods takes an index, that represents the offset of the counter within the counter bank. All operations supported by counterBank are atomic

func NewCounterBank

func NewCounterBank(size int) *CounterBank

NewCounterBank returns a new instance of counterBank containing size number of counters.

func (*CounterBank) Add

func (c *CounterBank) Add(idx int, delta int64) int64

Add adds the given value to the counter at the given offset.

func (*CounterBank) Decrement

func (c *CounterBank) Decrement(idx int) int64

Decrement decrements the counter at the given offset index and returns the new value

func (*CounterBank) Get

func (c *CounterBank) Get(idx int) int64

Get returns the counter value at the given offset index

func (*CounterBank) GetAndReset

func (c *CounterBank) GetAndReset(idx int) int64

GetAndReset resets the counter value for given offset to zero and returns the old value atomically

func (*CounterBank) Increment

func (c *CounterBank) Increment(idx int) int64

Increment increments the counter at the given offset index and returns the new value

func (*CounterBank) Set

func (c *CounterBank) Set(idx int, value int64) int64

Set sets the counter at the given offset to the specified value

type Daemon

type Daemon interface {
	Start()
	Stop()
}

Daemon is the base interfaces implemented by background tasks within cherami

type GeometricRollingAverage

type GeometricRollingAverage float64

GeometricRollingAverage is the value of a geometrically diminishing rolling average

func (*GeometricRollingAverage) GetGeometricRollingAverage

func (avg *GeometricRollingAverage) GetGeometricRollingAverage() float64

GetGeometricRollingAverage returns the result of the geometric rolling average

func (*GeometricRollingAverage) SetGeometricRollingAverage

func (avg *GeometricRollingAverage) SetGeometricRollingAverage(val float64)

SetGeometricRollingAverage adds a value to the geometric rolling average

type HTTPHandler

type HTTPHandler struct {
	// contains filtered or unexported fields
}

HTTPHandler contains the http handlers for controller

func NewHTTPHandler

func NewHTTPHandler(cfg configure.CommonAppConfig, service *Service) *HTTPHandler

NewHTTPHandler returns a new instance of http handler. This call must be followed by a call to httpHandler.Register().

func (*HTTPHandler) Register

func (handler *HTTPHandler) Register(mux *http.ServeMux)

Register registers all the http handlers supported by controller. This method does not start a http server.

type HashFunc

type HashFunc func(string) uint32

HashFunc represents a hash function for string

type Heartbeat

type Heartbeat struct {
	// contains filtered or unexported fields
}

Heartbeat is just a timestamp and an identifier

func NewHeartbeat

func NewHeartbeat(id *string) *Heartbeat

NewHeartbeat creates a new Heartbeat object

func (*Heartbeat) Beat

func (h *Heartbeat) Beat()

Beat updates the timestamp on the Heartbeat

func (*Heartbeat) CloseHeartbeat

func (h *Heartbeat) CloseHeartbeat()

CloseHeartbeat removes the given heart from the Heartbeat system

type HostAckIDGenerator

type HostAckIDGenerator interface {
	// GetNextAckID is the routine which gets the next ackID
	GetNextAckID() uint32
}

HostAckIDGenerator is the per host ackID generator for this host Right now, it is a monotonically increasing uint32

func NewHostAckIDGenerator

func NewHostAckIDGenerator(startFrom uint32) HostAckIDGenerator

NewHostAckIDGenerator returns a HostAckIDGenerator object and starts from the given value

type HostHardwareInfo

type HostHardwareInfo struct {
	Sku  string
	Rack string
	Zone string
}

HostHardwareInfo is the type that contains hardware properties about a cherami host

type HostHardwareInfoReader

type HostHardwareInfoReader interface {
	// Read reads and returns the hardware info
	// corresponding to the given hostname
	Read(hostname string) (*HostHardwareInfo, error)
}

HostHardwareInfoReader is the interface for any implementation that vends hardware info related to a given hostname

func NewHostHardwareInfoReader

func NewHostHardwareInfoReader(mClient m.TChanMetadataService) HostHardwareInfoReader

NewHostHardwareInfoReader creates and returns an implementation of hardwareInfoReader that uses Cassandra as the backing store

type HostIDHeartbeater

type HostIDHeartbeater interface {
	Daemon
}

HostIDHeartbeater keeps the host uuid to ip address mapping alive by periodically heartbeating to the registry. Currently, the uuid to addr mapping is stored within a cassandra table.

func NewHostIDHeartbeater

func NewHostIDHeartbeater(mClient metadata.TChanMetadataService, hostID string, hostAddr string, hostName string, log bark.Logger) HostIDHeartbeater

NewHostIDHeartbeater creates and returns a new instance of HostIDHeartbeater

type HostInfo

type HostInfo struct {
	UUID string
	Addr string // ip:port
	Name string
	Sku  string
	Rack string
	Zone string
}

HostInfo is a type that contains the info about a cherami host

func (*HostInfo) String

func (hi *HostInfo) String() string

type Item

type Item struct {
	Value interface{} // The value of the item; arbitrary.
	Key   int64       // The key of the item in the queue.
	// contains filtered or unexported fields
}

An Item is something we manage in a key queue.

type LoadReporter

type LoadReporter interface {
	ReportHostMetric(metrics controller.NodeMetrics) error
	ReportDestinationMetric(destinationUUID string, metrics controller.DestinationMetrics) error
	ReportDestinationExtentMetric(destinationUUID string, extentUUID string, metrics controller.DestinationExtentMetrics) error
	ReportConsumerGroupMetric(destinationUUID string, consumerGroupUUID string, metrics controller.ConsumerGroupMetrics) error
	ReportConsumerGroupExtentMetric(destinationUUID string, consumerGroupUUID string, extentUUID string, metrics controller.ConsumerGroupExtentMetrics) error
	ReportStoreExtentMetric(extentUUID string, metrics controller.StoreExtentMetrics) error
}

LoadReporter is used by each component interested in reporting load to Controller

type LoadReporterDaemon

type LoadReporterDaemon interface {
	Daemon
}

LoadReporterDaemon is used for periodically reporting load to controller

type LoadReporterDaemonFactory

type LoadReporterDaemonFactory interface {
	CreateReporter(interval time.Duration, source LoadReporterSource, logger bark.Logger) LoadReporterDaemon
}

LoadReporterDaemonFactory is used to create a daemon task for reporting load to controller

func NewLoadReporterDaemonFactory

func NewLoadReporterDaemonFactory(hostUUID string, sku controller.SKU, role controller.Role,
	clientFactory ClientFactory, tickerFactory TickerSourceFactory, tokenBucketFactory TokenBucketFactory, logger bark.Logger) LoadReporterDaemonFactory

NewLoadReporterDaemonFactory is used to create a factory for creating LoadReporterDaemon

type LoadReporterSource

type LoadReporterSource interface {
	Report(reporter LoadReporter)
}

LoadReporterSource is implemented by any component reporting load to controller

type MapEntry

type MapEntry struct {
	// Key represents the key
	Key string
	// Value represents the value
	Value interface{}
}

MapEntry represents a key-value entry within the map

type MapIterator

type MapIterator interface {
	// Close closes the iterator
	// and releases any allocated resources
	Close()
	// Entries returns a channel of MapEntry
	// objects that can be used in a range loop
	Entries() <-chan *MapEntry
}

MapIterator represents the interface for map iterators

type MinHeap

type MinHeap []*Item

A MinHeap implements heap.Interface and holds Items.

func (MinHeap) Len

func (mh MinHeap) Len() int

Len is the function needed by heap Interface

func (MinHeap) Less

func (mh MinHeap) Less(i, j int) bool

Less is the function needed by heap Interface

func (*MinHeap) Pop

func (mh *MinHeap) Pop() interface{}

Pop is the function needed by heap Interface

func (*MinHeap) Push

func (mh *MinHeap) Push(x interface{})

Push is the function needed by heap Interface

func (MinHeap) Swap

func (mh MinHeap) Swap(i, j int)

Swap is the function needed by heap Interface

type MockLoadReporterDaemon

type MockLoadReporterDaemon struct {
	mock.Mock
}

MockLoadReporterDaemon is the mock of common.LoadReporterDaemon interface

func (*MockLoadReporterDaemon) Start

func (m *MockLoadReporterDaemon) Start()

Start is the mock implementation for Start function on common.LoadReporterDaemon

func (*MockLoadReporterDaemon) Stop

func (m *MockLoadReporterDaemon) Stop()

Stop is the mock implementation for Stop function on common.LoadReporterDaemon

type MockLoadReporterDaemonFactory

type MockLoadReporterDaemonFactory struct {
	mock.Mock
}

MockLoadReporterDaemonFactory is the mock of common.LoadReporterDaemonFactory interface

func (*MockLoadReporterDaemonFactory) CreateReporter

func (m *MockLoadReporterDaemonFactory) CreateReporter(interval time.Duration, source LoadReporterSource, logger bark.Logger) LoadReporterDaemon

CreateReporter is the mock implementation for CreateReporter function on common.LoadReporterDaemonFactory

type MockRingpopMonitor

type MockRingpopMonitor struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MockRingpopMonitor is an implementation of RingpopMonitor for UTs

func NewMockRingpopMonitor

func NewMockRingpopMonitor() *MockRingpopMonitor

NewMockRingpopMonitor returns a new instance

func (*MockRingpopMonitor) Add

func (rpm *MockRingpopMonitor) Add(service string, uuid string, addr string)

Add adds the given (uuid, addr) mapping

func (*MockRingpopMonitor) AddListener

func (rpm *MockRingpopMonitor) AddListener(service string, name string, ch chan<- *RingpopListenerEvent) error

AddListener adds a listener for this service.

func (*MockRingpopMonitor) FindHostForAddr

func (rpm *MockRingpopMonitor) FindHostForAddr(service string, addr string) (*HostInfo, error)

FindHostForAddr finds and returns the host for the given address

func (*MockRingpopMonitor) FindHostForKey

func (rpm *MockRingpopMonitor) FindHostForKey(service string, key string) (*HostInfo, error)

FindHostForKey finds and returns the host responsible for handling the given key

func (*MockRingpopMonitor) FindRandomHost added in v0.2.0

func (rpm *MockRingpopMonitor) FindRandomHost(service string) (*HostInfo, error)

FindRandomHost finds and returns a random host responsible for handling the given key

func (*MockRingpopMonitor) GetBootstrappedChannel

func (rpm *MockRingpopMonitor) GetBootstrappedChannel() chan struct{}

GetBootstrappedChannel returns a channel, which will be closed once ringpop is bootstrapped

func (*MockRingpopMonitor) GetHosts

func (rpm *MockRingpopMonitor) GetHosts(service string) ([]*HostInfo, error)

GetHosts retrieves all the members for the given service

func (*MockRingpopMonitor) IsHostHealthy

func (rpm *MockRingpopMonitor) IsHostHealthy(service string, uuid string) bool

IsHostHealthy returns true if the given host is healthy and false otherwise

func (*MockRingpopMonitor) Remove

func (rpm *MockRingpopMonitor) Remove(service string, uuid string)

Remove removes the host identified by given uuid from rpm

func (*MockRingpopMonitor) RemoveListener

func (rpm *MockRingpopMonitor) RemoveListener(service string, name string) error

RemoveListener removes a listener for this service.

func (*MockRingpopMonitor) ResolveUUID

func (rpm *MockRingpopMonitor) ResolveUUID(service string, uuid string) (string, error)

ResolveUUID converts the given UUID into a host:port string Returns true on success and false on lookup failure

func (*MockRingpopMonitor) SetMetadata

func (rpm *MockRingpopMonitor) SetMetadata(key string, data string) error

SetMetadata sets the given metadata on the rp instance

func (*MockRingpopMonitor) Start

func (rpm *MockRingpopMonitor) Start()

Start starts the ringpop monitor

func (*MockRingpopMonitor) Stop

func (rpm *MockRingpopMonitor) Stop()

Stop attempts to stop the RingpopMonitor routines

type MockService

type MockService struct {
	mock.Mock
}

MockService is the mock of common.SCommon interface

func (*MockService) GetClientFactory

func (m *MockService) GetClientFactory() ClientFactory

GetClientFactory is a mock of the corresponding common. routine

func (*MockService) GetConfig

func (m *MockService) GetConfig() configure.CommonServiceConfig

GetConfig returns the AppConfig for this service

func (*MockService) GetDConfigClient

func (m *MockService) GetDConfigClient() dconfig.Client

GetDConfigClient is a mock of the corresponding common. routine

func (*MockService) GetHostName

func (m *MockService) GetHostName() string

GetHostName returns the name of host running the service

func (*MockService) GetHostPort

func (m *MockService) GetHostPort() string

GetHostPort is a mock of the corresponding common. routine

func (*MockService) GetHostUUID

func (m *MockService) GetHostUUID() string

GetHostUUID returns the uuid of host running the service

func (*MockService) GetLoadReporterDaemonFactory

func (m *MockService) GetLoadReporterDaemonFactory() LoadReporterDaemonFactory

GetLoadReporterDaemonFactory is the factory interface for creating load reporters

func (*MockService) GetMetricsReporter

func (m *MockService) GetMetricsReporter() metrics.Reporter

GetMetricsReporter is a mock of the corresponding common. routine

func (*MockService) GetRingpopMonitor

func (m *MockService) GetRingpopMonitor() RingpopMonitor

GetRingpopMonitor is a mock of the corresponding common. routine

func (*MockService) GetTChannel

func (m *MockService) GetTChannel() *tchannel.Channel

GetTChannel is a mock of the corresponding common. routine

func (*MockService) GetWSConnector

func (m *MockService) GetWSConnector() WSConnector

GetWSConnector is a mock of the corresponding common. routine

func (*MockService) IsLimitsEnabled

func (m *MockService) IsLimitsEnabled() bool

IsLimitsEnabled is the implementation of the corresponding routine

func (*MockService) Report

func (m *MockService) Report(reporter LoadReporter)

Report is the implementation for reporting host specific load to controller

func (*MockService) SetClientFactory

func (m *MockService) SetClientFactory(cf ClientFactory)

SetClientFactory is a mock of the corresponding common. routine

func (*MockService) Start

func (m *MockService) Start(thriftService []thrift.TChanServer)

Start is a mock of the corresponding common. routine

func (*MockService) Stop

func (m *MockService) Stop()

Stop is a mock of the corresponding common. routine

type MockTimeSource

type MockTimeSource struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MockTimeSource is a time source for unit tests

func NewMockTimeSource

func NewMockTimeSource() *MockTimeSource

NewMockTimeSource returns a new instance of a controllable time source

func (*MockTimeSource) Advance

func (ts *MockTimeSource) Advance(d time.Duration)

Advance advances the clock by the specified duration

func (*MockTimeSource) Now

func (ts *MockTimeSource) Now() time.Time

Now returns the current time

type OneShotTimer

type OneShotTimer interface {
	// Chan returns the underlying channel
	// for receiving expired timer event
	Chan() <-chan time.Time
	// Reset resets the timer with the
	// new duration
	Reset(d time.Duration) bool
	// Stop stops the timer
	Stop() bool
}

OneShotTimer is an interface around golang Timer to help with unit tests

type OpenAppendInWebsocketStream

type OpenAppendInWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenAppendInWebsocketStream is a wrapper for websocket to work with OpenAppendStream (hopefully rename to OpenAppendStream)

func NewOpenAppendInWebsocketStream

func NewOpenAppendInWebsocketStream(stream websocket.StreamServer) *OpenAppendInWebsocketStream

NewOpenAppendInWebsocketStream returns a new OpenAppendInWebsocketStream object

func (*OpenAppendInWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenAppendInWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenAppendInWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenAppendInWebsocketStream) SetResponseHeaders

func (s *OpenAppendInWebsocketStream) SetResponseHeaders(headers map[string]string) error

SetResponseHeaders sets the response headers.

func (*OpenAppendInWebsocketStream) Write

Write writes a result to the response stream

type OpenAppendOutWebsocketStream

type OpenAppendOutWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenAppendOutWebsocketStream is a wrapper for websocket to work with OpenAppendStream (hopefully rename to OpenAppendStream)

func NewOpenAppendOutWebsocketStream

func NewOpenAppendOutWebsocketStream(stream websocket.StreamClient) *OpenAppendOutWebsocketStream

NewOpenAppendOutWebsocketStream returns a new OpenAppendOutWebsocketStream object

func (*OpenAppendOutWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenAppendOutWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenAppendOutWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenAppendOutWebsocketStream) ResponseHeaders

func (s *OpenAppendOutWebsocketStream) ResponseHeaders() (map[string]string, error)

ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface

func (*OpenAppendOutWebsocketStream) Write

Write writes a result to the response stream

type OpenConsumerInWebsocketStream

type OpenConsumerInWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenConsumerInWebsocketStream is a wrapper for websocket to work with OpenConsumerStream

func NewOpenConsumerInWebsocketStream

func NewOpenConsumerInWebsocketStream(stream websocket.StreamServer) *OpenConsumerInWebsocketStream

NewOpenConsumerInWebsocketStream returns a new OpenConsumerInWebsocketStream object

func (*OpenConsumerInWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenConsumerInWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenConsumerInWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenConsumerInWebsocketStream) SetResponseHeaders

func (s *OpenConsumerInWebsocketStream) SetResponseHeaders(headers map[string]string) error

SetResponseHeaders sets the response headers.

func (*OpenConsumerInWebsocketStream) Write

Write writes a result to the response stream

type OpenConsumerOutWebsocketStream

type OpenConsumerOutWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenConsumerOutWebsocketStream is a wrapper for websocket to work with OpenConsumerStream

func NewOpenConsumerOutWebsocketStream

func NewOpenConsumerOutWebsocketStream(stream websocket.StreamClient) *OpenConsumerOutWebsocketStream

NewOpenConsumerOutWebsocketStream returns a new OpenConsumerOutWebsocketStream object

func (*OpenConsumerOutWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenConsumerOutWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenConsumerOutWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenConsumerOutWebsocketStream) ResponseHeaders

func (s *OpenConsumerOutWebsocketStream) ResponseHeaders() (map[string]string, error)

ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface

func (*OpenConsumerOutWebsocketStream) Write

Write writes a result to the response stream

type OpenPublisherInWebsocketStream

type OpenPublisherInWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenPublisherInWebsocketStream is a wrapper for websocket to work with OpenPublisherStream

func NewOpenPublisherInWebsocketStream

func NewOpenPublisherInWebsocketStream(stream websocket.StreamServer) *OpenPublisherInWebsocketStream

NewOpenPublisherInWebsocketStream returns a new OpenPublisherInWebsocketStream object

func (*OpenPublisherInWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenPublisherInWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenPublisherInWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenPublisherInWebsocketStream) SetResponseHeaders

func (s *OpenPublisherInWebsocketStream) SetResponseHeaders(headers map[string]string) error

SetResponseHeaders sets the response headers.

func (*OpenPublisherInWebsocketStream) Write

Write writes a result to the response stream

type OpenPublisherOutWebsocketStream

type OpenPublisherOutWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenPublisherOutWebsocketStream is a wrapper for websocket to work with OpenPublisherStream

func NewOpenPublisherOutWebsocketStream

func NewOpenPublisherOutWebsocketStream(stream websocket.StreamClient) *OpenPublisherOutWebsocketStream

NewOpenPublisherOutWebsocketStream returns a new OpenPublisherOutWebsocketStream object

func (*OpenPublisherOutWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenPublisherOutWebsocketStream) Flush

Flush flushes all written arguments.

func (*OpenPublisherOutWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenPublisherOutWebsocketStream) ResponseHeaders

func (s *OpenPublisherOutWebsocketStream) ResponseHeaders() (map[string]string, error)

ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface

func (*OpenPublisherOutWebsocketStream) Write

Write writes a result to the response stream

type OpenReadInWebsocketStream

type OpenReadInWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenReadInWebsocketStream is a wrapper for websocket to work with OpenReadStream

func NewOpenReadInWebsocketStream

func NewOpenReadInWebsocketStream(stream websocket.StreamServer) *OpenReadInWebsocketStream

NewOpenReadInWebsocketStream returns a new OpenReadInWebsocketStream object

func (*OpenReadInWebsocketStream) Done

func (s *OpenReadInWebsocketStream) Done() error

Done closes the request stream and should be called after all arguments have been written.

func (*OpenReadInWebsocketStream) Flush

func (s *OpenReadInWebsocketStream) Flush() error

Flush flushes all written arguments.

func (*OpenReadInWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenReadInWebsocketStream) SetResponseHeaders

func (s *OpenReadInWebsocketStream) SetResponseHeaders(headers map[string]string) error

SetResponseHeaders sets the response headers.

func (*OpenReadInWebsocketStream) Write

Write writes a result to the response stream

type OpenReadOutWebsocketStream

type OpenReadOutWebsocketStream struct {
	// contains filtered or unexported fields
}

OpenReadOutWebsocketStream is a wrapper for websocket to work with OpenReadStream

func NewOpenReadOutWebsocketStream

func NewOpenReadOutWebsocketStream(stream websocket.StreamClient) *OpenReadOutWebsocketStream

NewOpenReadOutWebsocketStream returns a new OpenReadOutWebsocketStream object

func (*OpenReadOutWebsocketStream) Done

Done closes the request stream and should be called after all arguments have been written.

func (*OpenReadOutWebsocketStream) Flush

func (s *OpenReadOutWebsocketStream) Flush() error

Flush flushes all written arguments.

func (*OpenReadOutWebsocketStream) Read

Read returns the next argument, if any is available.

func (*OpenReadOutWebsocketStream) ResponseHeaders

func (s *OpenReadOutWebsocketStream) ResponseHeaders() (map[string]string, error)

ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface

func (*OpenReadOutWebsocketStream) Write

Write writes a result to the response stream

type OpenReplicationReadStreamRequest

type OpenReplicationReadStreamRequest struct {
	store.OpenReadStreamRequest
}

OpenReplicationReadStreamRequest is the request type for OpenReplicationReadStream API

func GetOpenReplicationReadStreamRequestHTTP

func GetOpenReplicationReadStreamRequestHTTP(httpHeader http.Header) (req *OpenReplicationReadStreamRequest, err error)

GetOpenReplicationReadStreamRequestHTTP extracts OpenReplicationReadStreamRequest from http headers

type OpenReplicationRemoteReadStreamRequest

type OpenReplicationRemoteReadStreamRequest struct {
	store.OpenReadStreamRequest
}

OpenReplicationRemoteReadStreamRequest is the request type for OpenReplicationRemoteReadStream API

func GetOpenReplicationRemoteReadStreamRequestHTTP

func GetOpenReplicationRemoteReadStreamRequestHTTP(httpHeader http.Header) (req *OpenReplicationRemoteReadStreamRequest, err error)

GetOpenReplicationRemoteReadStreamRequestHTTP extracts OpenReplicationRemoteReadStreamRequest from http headers

type RingpopEventType

type RingpopEventType int

RingpopEventType is a value type that identifies a RingpopListener event.

const (
	// HostAddedEvent indicates that a new host joined the ring
	HostAddedEvent RingpopEventType = 1 << iota
	// HostRemovedEvent indicates that a host left the ring
	HostRemovedEvent
)

type RingpopListenerEvent

type RingpopListenerEvent struct {
	// Type returns the RingpopEventType
	Type RingpopEventType
	// Key returns the HostUUID that
	// this event is about
	Key string
}

RingpopListenerEvent represents any event that a RingpopListener can get notified about.

type RingpopMonitor

type RingpopMonitor interface {
	// Start starts the RingpopMonitor
	Start()
	// Stop stops the RingpopMonitor
	Stop()
	// GetBootstrappedChannel returns a channel, which will be closed once ringpop is bootstrapped
	GetBootstrappedChannel() chan struct{}
	// GetHosts retrieves all the members for the given service
	GetHosts(service string) ([]*HostInfo, error)
	// FindHostForAddr finds and returns the host for the given service:addr
	FindHostForAddr(service string, addr string) (*HostInfo, error)
	// FindHostForKey finds and returns the host responsible for handling the given (service, key)
	FindHostForKey(service string, key string) (*HostInfo, error)
	// FindRandomHost finds and returns a random host responsible for handling the given service
	FindRandomHost(service string) (*HostInfo, error)
	// IsHostHealthy returns true if the given (service, host) is healthy
	IsHostHealthy(service string, uuid string) bool
	// ResolveUUID resovles a host UUID to an IP address, if the host is found
	ResolveUUID(service string, uuid string) (string, error)
	// AddListener adds a listener for this service.
	// The listener will get notified on the given
	// channel, whenever there is host joining/leaving
	// the ring.
	// @service: The service to be listened on
	// @name: The name for identifying the listener
	// @notifyChannel: The channel on which the caller receives notifications
	AddListener(service string, name string, notifyChannel chan<- *RingpopListenerEvent) error
	// RemoveListener removes a listener for this service.
	RemoveListener(service string, name string) error
	// SetMetadata is used to set the given metadata on this ringpop instance
	SetMetadata(key string, data string) error
}

RingpopMonitor monitors and maintains a list of healthy hosts for a set of ring pop services

func NewRingpopMonitor

func NewRingpopMonitor(rp *ringpop.Ringpop, services []string, resolver UUIDResolver, hwInfoReader HostHardwareInfoReader, log bark.Logger) RingpopMonitor

NewRingpopMonitor returns a new instance of RingpopMonitor, rp:

Ringpop instance of the local node

services:

list of services we need to track

UUIDResolver:

Resolver instance that can map uuids to addrs and vice-versa

HostHardWareInfoReader:

HwInfoReader instance that can get the hosts' hardware spec

type SCommon

type SCommon interface {
	// GetTChannel returns the tchannel for this service
	GetTChannel() *tchannel.Channel

	// Returns Ringpop monitor for this service
	GetRingpopMonitor() RingpopMonitor

	// GetHostPort returns the host port for this service
	GetHostPort() string

	// GetHostName returns the name of host running the service
	GetHostName() string

	// GetHostUUID returns the uuid of this host
	GetHostUUID() string

	// Start creates the channel, starts & boots ringpop on the given streaming server
	Start(thriftService []thrift.TChanServer)

	// Stop stops the service
	Stop()

	// GetConfig returns the AppConfig for this service
	GetConfig() configure.CommonServiceConfig

	// GetMetricsReporter() returns the root metric reporter for this service
	GetMetricsReporter() metrics.Reporter

	// GetDConfigClient() returns the dynamic config client for this service
	GetDConfigClient() dconfig.Client

	// GetClientFactory returns the thrift client interface for getting thrift clients for this service
	GetClientFactory() ClientFactory

	// SetClientFactory allowes change of client factory for getting thrift clients
	SetClientFactory(cf ClientFactory)

	// GetWSConnector returns websocket connector for establishing websocket connections
	GetWSConnector() WSConnector

	// GetLoadReporterDaemonFactory is the factory interface for creating load reporters
	GetLoadReporterDaemonFactory() LoadReporterDaemonFactory

	// Report is the implementation for reporting host specific load to controller
	Report(reporter LoadReporter)

	// IsLimitsEnabled is used to see if we need to enforce limits on this service
	IsLimitsEnabled() bool
}

SCommon is the interface which must be implemented by all the services

type Seconds

type Seconds float64

Seconds is time as seconds, either relative or absolute since the epoch

func DurationToSeconds

func DurationToSeconds(t time.Duration) Seconds

DurationToSeconds converts a time.Duration to Seconds

type SequenceNumber

type SequenceNumber int64

SequenceNumber is an int64 number represents the sequence of messages in Extent

func ExtrapolateValue

func ExtrapolateValue(observed SequenceNumber, observedRate float64, observedTime, extrapolatedTime UnixNanoTime, maxExtrapolationTime Seconds) (extrapolated SequenceNumber)

ExtrapolateValue extrapolates a value based on an observed value and rate at a given time

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service contains the objects specific to this service

func NewService

func NewService(serviceName string, uuid string, cfg configure.CommonServiceConfig, resolver UUIDResolver, hostHWInfoReader HostHardwareInfoReader, reporter metrics.Reporter, dClient dconfig.Client) *Service

NewService instantiates a ServiceInstance TODO: have a better name for Service. this is the object which holds all the common stuff shared by all the services.

func (*Service) GetClientFactory

func (h *Service) GetClientFactory() ClientFactory

GetClientFactory returns the ClientFactory interface for this service

func (*Service) GetConfig

func (h *Service) GetConfig() configure.CommonServiceConfig

GetConfig returns the AppConfig for this service

func (*Service) GetDConfigClient

func (h *Service) GetDConfigClient() dconfig.Client

GetDConfigClient returns the dconfig client

func (*Service) GetHostName

func (h *Service) GetHostName() string

GetHostName returns the name of host running the service

func (*Service) GetHostPort

func (h *Service) GetHostPort() string

GetHostPort returns the host port for this service

func (*Service) GetHostUUID

func (h *Service) GetHostUUID() string

GetHostUUID returns the uuid for this service

func (*Service) GetLoadReporterDaemonFactory

func (h *Service) GetLoadReporterDaemonFactory() LoadReporterDaemonFactory

GetLoadReporterDaemonFactory is the factory interface for creating load reporters

func (*Service) GetMetricsReporter

func (h *Service) GetMetricsReporter() metrics.Reporter

GetMetricsReporter returns the metrics metrics for this service

func (*Service) GetRingpopMonitor

func (h *Service) GetRingpopMonitor() RingpopMonitor

GetRingpopMonitor returns the RingpopMonitor for this service

func (*Service) GetTChannel

func (h *Service) GetTChannel() *tchannel.Channel

GetTChannel returns the tchannel for this service

func (*Service) GetWSConnector

func (h *Service) GetWSConnector() WSConnector

GetWSConnector returns websocket connector for establishing websocket connections

func (*Service) IsLimitsEnabled

func (h *Service) IsLimitsEnabled() bool

IsLimitsEnabled returns whether limits are enabled

func (*Service) Report

func (h *Service) Report(reporter LoadReporter)

Report is used for reporting Host specific load to controller

func (*Service) SetClientFactory

func (h *Service) SetClientFactory(cf ClientFactory)

SetClientFactory allowes change of client factory for getting thrift clients

func (*Service) Start

func (h *Service) Start(thriftServices []thrift.TChanServer)

Start starts a TChannel-Thrift service

func (*Service) Stop

func (h *Service) Stop()

Stop destroys ringpop for that service, and closes the associated listening tchannel

func (*Service) UpdateAdvertisedName

func (h *Service) UpdateAdvertisedName(deploymentName string)

UpdateAdvertisedName is used to update the advertised name to be deployment specific

type ShardedConcurrentMap

type ShardedConcurrentMap struct {
	// contains filtered or unexported fields
}

ShardedConcurrentMap is an implementation of ConcurrentMap that internally uses multiple sharded maps to increase parallelism

func (*ShardedConcurrentMap) Contains

func (cmap *ShardedConcurrentMap) Contains(key string) bool

Contains returns true if the key exist and false otherwise

func (*ShardedConcurrentMap) Get

func (cmap *ShardedConcurrentMap) Get(key string) (interface{}, bool)

Get returns the value corresponding to the key, if it exist

func (*ShardedConcurrentMap) Iter

func (cmap *ShardedConcurrentMap) Iter() MapIterator

Iter returns an iterator to the map. This map does not use re-entrant locks, so access or modification to the map during iteration can cause a dead lock.

func (*ShardedConcurrentMap) Put

func (cmap *ShardedConcurrentMap) Put(key string, value interface{})

Put records the given key value mapping. Overwrites previous values

func (*ShardedConcurrentMap) PutIfNotExist

func (cmap *ShardedConcurrentMap) PutIfNotExist(key string, value interface{}) bool

PutIfNotExist records the mapping, if there is no mapping for this key already Returns true if the mapping was recorded, false otherwise

func (*ShardedConcurrentMap) Remove

func (cmap *ShardedConcurrentMap) Remove(key string)

Remove deletes the given key from the map

func (*ShardedConcurrentMap) Size

func (cmap *ShardedConcurrentMap) Size() int

Size returns the number of items in the map

type TickerSource

type TickerSource interface {
	Ticks() <-chan time.Time
	Stop()
}

TickerSource is an interface created over time.Ticker so we can easily unit test any component which uses time.Ticker functionality.

type TickerSourceFactory

type TickerSourceFactory interface {
	CreateTicker(interval time.Duration) TickerSource
}

TickerSourceFactory is the interface mainly used for injecting mock implementations of Ticker for unit testing

func NewRealTimeTickerFactory

func NewRealTimeTickerFactory() TickerSourceFactory

NewRealTimeTickerFactory creates and instance of TickerSourceFactory used by service code

type TimeSource

type TimeSource interface {
	Now() time.Time
}

TimeSource is an interface for any entity that provides the current time. Its primarily used to mock out timesources in unit test

func NewRealTimeSource

func NewRealTimeSource() TimeSource

NewRealTimeSource returns a time source that servers real wall clock time using CLOCK_REALTIME

type Timer

type Timer struct {
	*time.Timer
}

Timer is a wrapper for time.Timer which does a safe reset of the timer.

func NewTimer

func NewTimer(d time.Duration) *Timer

NewTimer returns a Timer object to be used to perform timer functionality

func (*Timer) Chan

func (t *Timer) Chan() <-chan time.Time

Chan returns the underlying channel for receiving timer expired events

func (*Timer) Reset

func (t *Timer) Reset(d time.Duration) bool

Reset resets the Timer to expire after duration 'd' reliably

func (*Timer) Stop

func (t *Timer) Stop() bool

Stop stops the timer

type TimerFactory

type TimerFactory interface {
	NewTimer(d time.Duration) OneShotTimer
}

TimerFactory vends OneShotTimers

func NewTimerFactory

func NewTimerFactory() TimerFactory

NewTimerFactory creates and returns a new factory for OneShotTimers

type TokenBucket

type TokenBucket interface {
	// TryConsume attempts to take count tokens from the
	// bucket. Returns true on success, false
	// otherwise along with the duration for the next refill
	TryConsume(count int) (bool, time.Duration)
	// Consume waits up to timeout duration to take count
	// tokens from the bucket. Returns true if count
	// tokens were acquired before timeout, false
	// otherwise
	Consume(count int, timeout time.Duration) bool
}

TokenBucket is the interface for any implememtation of a token bucket rate limiter

func NewTokenBucket

func NewTokenBucket(rps int, timeSource TimeSource) TokenBucket

NewTokenBucket creates and returns a new token bucket rate limiter that repelenishes the bucket every 100 milliseconds. Thread safe.

@param rps

Desired rate per second

Golang.org has an alternative implementation of the rate limiter. On benchmarking, golang's implementation was order of magnitude slower. In addition, it does a lot more than what we need. These are the benchmarks under different scenarios

BenchmarkTokenBucketParallel 50000000 40.7 ns/op BenchmarkGolangRateParallel 10000000 150 ns/op BenchmarkTokenBucketParallel-8 20000000 124 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op BenchmarkTokenBucketParallel 50000000 37.8 ns/op BenchmarkGolangRateParallel 10000000 153 ns/op BenchmarkTokenBucketParallel-8 10000000 129 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op

type TokenBucketFactory

type TokenBucketFactory interface {
	CreateTokenBucket(rps int, timeSource TimeSource) TokenBucket
}

TokenBucketFactory is an interface mainly used for injecting mock implementation of TokenBucket for unit testing

func NewTokenBucketFactory

func NewTokenBucketFactory() TokenBucketFactory

NewTokenBucketFactory creates an instance of factory used for creating TokenBucket instances

type UUIDResolver

type UUIDResolver interface {
	// Lookup returns the host addr corresponding to the uuid
	Lookup(uuid string) (string, error)
	// Reverse lookup returns the uuid corresponding to the host addr
	ReverseLookup(addr string) (string, error)
	// Clears the in-memory cache
	ClearCache()
}

UUIDResolver maps UUIDs to IP addrs and vice-versa

func NewUUIDResolver

func NewUUIDResolver(mClient metadata.TChanMetadataService) UUIDResolver

NewUUIDResolver returns an instance of UUIDResolver that can be used to resovle host uuids to ip:port addresses and vice-versa. The returned resolver uses Cassandra as the backend store for persisting the mapping. The resolver also maintains an in-memory cache for fast-lookups. Thread safe.

type UnixNanoTime

type UnixNanoTime int64

UnixNanoTime is Unix time as nanoseconds since Jan 1st, 1970, 00:00 GMT

func Now

func Now() UnixNanoTime

Now is the version to return UnixNanoTime

func (UnixNanoTime) ToSeconds

func (u UnixNanoTime) ToSeconds() Seconds

ToSeconds turns a relative or absolute UnixNanoTime to float Seconds

func (UnixNanoTime) ToSecondsFmt

func (u UnixNanoTime) ToSecondsFmt() string

ToSecondsFmt turns a relative or absolute UnixNanoTime to float Seconds, and returns 'never' if the input is zero

type WSConnector

type WSConnector interface {
	OpenPublisherStream(hostPort string, requestHeader http.Header) (stream.BInOpenPublisherStreamOutCall, error)
	AcceptPublisherStream(w http.ResponseWriter, r *http.Request) (serverStream.BInOpenPublisherStreamInCall, error)
	OpenConsumerStream(hostPort string, requestHeader http.Header) (stream.BOutOpenConsumerStreamOutCall, error)
	AcceptConsumerStream(w http.ResponseWriter, r *http.Request) (serverStream.BOutOpenConsumerStreamInCall, error)
	OpenAppendStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenAppendStreamOutCall, error)
	AcceptAppendStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenAppendStreamInCall, error)
	OpenReadStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenReadStreamOutCall, error)
	AcceptReadStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenReadStreamInCall, error)
	OpenReplicationReadStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenReadStreamOutCall, error)
	AcceptReplicationReadStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenReadStreamInCall, error)
	OpenReplicationRemoteReadStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenReadStreamOutCall, error)
	AcceptReplicationRemoteReadStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenReadStreamInCall, error)
}

WSConnector takes care of establishing connection via websocket stream

func NewWSConnector

func NewWSConnector() WSConnector

NewWSConnector creates a WSConnector

type WSService

type WSService interface {
	RegisterWSHandler() *http.ServeMux
}

WSService is the interface which should be implemented by websocket service

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL