Documentation ¶
Index ¶
- Constants
- Variables
- func AdminNotificationTypePtr(notificationType admin.NotificationType) *admin.NotificationType
- func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool
- func BoolPtr(v bool) *bool
- func BootstrapRingpop(rp *ringpop.Ringpop, ipaddr string, port int, ...) error
- func CQLTimestampToUnixNano(milliseconds int64) int64
- func CalculateRate(last, curr SequenceNumber, lastTime, currTime UnixNanoTime) float64
- func CheramiChecksumOptionPtr(checksumOption cherami.ChecksumOption) *cherami.ChecksumOption
- func CheramiConsumerGroupExtentStatusPtr(status metadata.ConsumerGroupExtentStatus) *metadata.ConsumerGroupExtentStatus
- func CheramiConsumerGroupStatusPtr(status cherami.ConsumerGroupStatus) *cherami.ConsumerGroupStatus
- func CheramiDestinationStatusPtr(status cherami.DestinationStatus) *cherami.DestinationStatus
- func CheramiDestinationType(internalDestinationType shared.DestinationType) (cheramiDestinationType cherami.DestinationType, err error)
- func CheramiDestinationTypePtr(destType cherami.DestinationType) *cherami.DestinationType
- func CheramiInputHostCommandTypePtr(cmdType cherami.InputHostCommandType) *cherami.InputHostCommandType
- func CheramiOutputHostCommandTypePtr(cmdType cherami.OutputHostCommandType) *cherami.OutputHostCommandType
- func CheramiProtocolPtr(protocol cherami.Protocol) *cherami.Protocol
- func CheramiStatusPtr(status cherami.Status) *cherami.Status
- func ClassifyErrorByType(in error) metrics.ErrorClass
- func ConstructAckID(sessionID uint16, ackMgrID uint16, seqNum uint32, address int64) string
- func ConsumerGroupExtentMetricsPtr(cgExtMetrics controller.ConsumerGroupExtentMetrics) *controller.ConsumerGroupExtentMetrics
- func ConsumerGroupMetricsPtr(cgMetrics controller.ConsumerGroupMetrics) *controller.ConsumerGroupMetrics
- func ContainsEmpty(a []string) bool
- func ContainsString(a []string, x string) bool
- func ConvertDownstreamErrors(l bark.Logger, in error) (metrics.ErrorClass, error)
- func CreateCassandraKeyspace(s *gocql.Session, keyspace string, replicas int, overwrite bool) (err error)
- func CreateHyperbahnClient(ch *tchannel.Channel, bootstrapFile string) *hyperbahn.Client
- func CreateInputHostAdminClient(ch *tchannel.Channel, hostPort string) (admin.TChanInputHostAdmin, error)
- func CreateOutputHostAdminClient(ch *tchannel.Channel, hostPort string) (admin.TChanOutputHostAdmin, error)
- func CreateRingpop(service string, ch *tchannel.Channel, ipaddr string, port int) *(ringpop.Ringpop)
- func DestinationExtentMetricsPtr(dstExtMetrics controller.DestinationExtentMetrics) *controller.DestinationExtentMetrics
- func DestinationMetricsPtr(dstMetrics controller.DestinationMetrics) *controller.DestinationMetrics
- func DropCassandraKeyspace(s *gocql.Session, keyspace string) (err error)
- func ExtrapolateDifference(observedA, observedB SequenceNumber, observedARate, observedBRate float64, ...) (extrapolated int64)
- func FindNearestInt(target int64, nums ...int64) (nearest int64)
- func Float64Ptr(v float64) *float64
- func FmtAckID(s string) string
- func FmtAddr(i int64) string
- func FmtCnsPth(s string) string
- func FmtCnsm(s string) string
- func FmtCnsmID(s int) string
- func FmtCtrl(s string) string
- func FmtDLQID(s string) string
- func FmtDplName(s string) string
- func FmtDst(s string) string
- func FmtDstPth(s string) string
- func FmtExt(s string) string
- func FmtExtentStatus(status shared.ExtentStatus) string
- func FmtFrnt(s string) string
- func FmtHostConnLimit(s int) string
- func FmtHostIP(s string) string
- func FmtHostName(s string) string
- func FmtHostPort(s string) string
- func FmtIn(s string) string
- func FmtInPubConnID(s int) string
- func FmtInPutAckID(s string) string
- func FmtInReplicaHost(s string) string
- func FmtMsgID(s string) string
- func FmtOut(s string) string
- func FmtReconfigureID(s string) string
- func FmtReconfigureType(s admin.NotificationType) string
- func FmtService(s string) string
- func FmtSlowDown(s time.Duration) string
- func FmtStor(s string) string
- func FmtTbSleep(s time.Duration) string
- func FmtTenancy(s string) string
- func FmtZoneName(s string) string
- func GetConnectionKey(host *cherami.HostAddress) string
- func GetDLQPathNameFromCGName(CGName string) (string, error)
- func GetDateTag() string
- func GetDefaultLogger() bark.Logger
- func GetDirectoryName(path string) (string, error)
- func GetEnvVariableFromHostPort(hostPort string) (envVar string)
- func GetLocalClusterInfo(deployment string) (zone string, tenancy string)
- func GetOpenAppendStreamRequestHTTP(httpHeader http.Header) (req *store.OpenAppendStreamRequest, err error)
- func GetOpenAppendStreamRequestHeaders(req *store.OpenAppendStreamRequest) (headers map[string]string)
- func GetOpenAppendStreamRequestStruct(headers map[string]string) (req *store.OpenAppendStreamRequest, err error)
- func GetOpenReadStreamRequestHTTP(httpHeader http.Header) (req *store.OpenReadStreamRequest, err error)
- func GetOpenReadStreamRequestHTTPHeaders(req *store.OpenReadStreamRequest) http.Header
- func GetOpenReadStreamRequestHeaders(req *store.OpenReadStreamRequest) (headers map[string]string)
- func GetOpenReadStreamRequestStruct(headers map[string]string) (req *store.OpenReadStreamRequest, err error)
- func GetOpenReplicationReadStreamRequestHTTPHeaders(req *OpenReplicationReadStreamRequest) http.Header
- func GetOpenReplicationRemoteReadStreamRequestHTTPHeaders(req *OpenReplicationRemoteReadStreamRequest) http.Header
- func GetRandInt64(min int64, max int64) int64
- func GetTagsFromPath(path string) (string, error)
- func Int16Ptr(v int16) *int16
- func Int32Ptr(v int32) *int32
- func Int64Ptr(v int64) *int64
- func IntPtr(v int) *int
- func InternalChecksumOptionPtr(checksumOption shared.ChecksumOption) *shared.ChecksumOption
- func InternalConsumerGroupStatusPtr(status shared.ConsumerGroupStatus) *shared.ConsumerGroupStatus
- func InternalConsumerGroupTypePtr(cgType shared.ConsumerGroupType) *shared.ConsumerGroupType
- func InternalDestinationStatusPtr(status shared.DestinationStatus) *shared.DestinationStatus
- func InternalDestinationTypePtr(destType shared.DestinationType) *shared.DestinationType
- func InternalExtentReplicaReplicationStatusTypePtr(status shared.ExtentReplicaReplicationStatus) *shared.ExtentReplicaReplicationStatus
- func IsDLQDestination(dstDesc *shared.DestinationDescription) bool
- func IsDLQDestinationPath(path string) bool
- func IsDevelopmentEnvironment(deploymentName string) (devEnv bool)
- func IsRemoteZoneExtent(extentOriginZone string, localZone string) bool
- func IsRetryableTChanErr(err error) bool
- func IsValidServiceName(input string) bool
- func LoadCassandraSchema(cqlshpath string, fileName string, keyspace string) (err error)
- func MaxInt(a, b int) int
- func MaxInt64(a, b int64) int64
- func MetadataConsumerGroupExtentStatusPtr(status metadata.ConsumerGroupExtentStatus) *metadata.ConsumerGroupExtentStatus
- func MetadataExtentReplicaStatusPtr(status shared.ExtentReplicaStatus) *shared.ExtentReplicaStatus
- func MetadataExtentStatusPtr(status shared.ExtentStatus) *shared.ExtentStatus
- func MinInt(a, b int) int
- func MinInt64(a, b int64) int64
- func NewCassandraCluster(clusterHosts string) *gocql.ClusterConfig
- func NewMetricReporterWithHostname(cfg configure.CommonServiceConfig) metrics.Reporter
- func NewTestMetricsReporter() metrics.Reporter
- func NodeMetricsPtr(nodeMetrics controller.NodeMetrics) *controller.NodeMetrics
- func OverrideValueByPrefix(logFn func() bark.Logger, path string, overrides []string, defaultVal int64, ...) int64
- func RWLockReadAndConditionalWrite(m *sync.RWMutex, readFn func() bool, writeFn func())
- func RandomBytes(size int) []byte
- func RolePtr(role controller.Role) *controller.Role
- func SKUPtr(sku controller.SKU) *controller.SKU
- func ServiceLoop(port int, cfg configure.CommonAppConfig, service *Service)
- func SetupServerConfig(configurator configure.Configure) configure.CommonAppConfig
- func ShortenGUIDString(s string) string
- func SpinWaitOnCondition(condition ConditionFunc, timeout time.Duration) bool
- func SplitHostPort(hostPort string) (string, int, error)
- func StartEKG(lclLg bark.Logger)
- func StoreExtentMetricsPtr(storeExtMetrics controller.StoreExtentMetrics) *controller.StoreExtentMetrics
- func StringPtr(v string) *string
- func StringSetEqual(a, b []string) bool
- func TSPtr(v time.Time) *time.Time
- func UUIDHashCode(key string) uint32
- func UUIDToUint16(s string) uint16
- func Uint32Ptr(v uint32) *uint32
- func Uint64Ptr(v uint64) *uint64
- func UnixNanoToCQLTimestamp(timestamp int64) int64
- func WSStart(listenAddress string, port int, wsservice WSService)
- func WaitTimeout(timeout time.Duration, fn func()) bool
- type AckID
- type CliHelper
- type ClientFactory
- type CombinedID
- type ConcurrentMap
- type ConditionFunc
- type CounterBank
- type Daemon
- type GeometricRollingAverage
- type HTTPHandler
- type HashFunc
- type Heartbeat
- type HostAckIDGenerator
- type HostHardwareInfo
- type HostHardwareInfoReader
- type HostIDHeartbeater
- type HostInfo
- type Item
- type LoadReporter
- type LoadReporterDaemon
- type LoadReporterDaemonFactory
- type LoadReporterSource
- type MapEntry
- type MapIterator
- type MinHeap
- type MockLoadReporterDaemon
- type MockLoadReporterDaemonFactory
- type MockRingpopMonitor
- func (rpm *MockRingpopMonitor) Add(service string, uuid string, addr string)
- func (rpm *MockRingpopMonitor) AddListener(service string, name string, ch chan<- *RingpopListenerEvent) error
- func (rpm *MockRingpopMonitor) FindHostForAddr(service string, addr string) (*HostInfo, error)
- func (rpm *MockRingpopMonitor) FindHostForKey(service string, key string) (*HostInfo, error)
- func (rpm *MockRingpopMonitor) FindRandomHost(service string) (*HostInfo, error)
- func (rpm *MockRingpopMonitor) GetBootstrappedChannel() chan struct{}
- func (rpm *MockRingpopMonitor) GetHosts(service string) ([]*HostInfo, error)
- func (rpm *MockRingpopMonitor) IsHostHealthy(service string, uuid string) bool
- func (rpm *MockRingpopMonitor) Remove(service string, uuid string)
- func (rpm *MockRingpopMonitor) RemoveListener(service string, name string) error
- func (rpm *MockRingpopMonitor) ResolveUUID(service string, uuid string) (string, error)
- func (rpm *MockRingpopMonitor) SetMetadata(key string, data string) error
- func (rpm *MockRingpopMonitor) Start()
- func (rpm *MockRingpopMonitor) Stop()
- type MockService
- func (m *MockService) GetClientFactory() ClientFactory
- func (m *MockService) GetConfig() configure.CommonServiceConfig
- func (m *MockService) GetDConfigClient() dconfig.Client
- func (m *MockService) GetHostName() string
- func (m *MockService) GetHostPort() string
- func (m *MockService) GetHostUUID() string
- func (m *MockService) GetLoadReporterDaemonFactory() LoadReporterDaemonFactory
- func (m *MockService) GetMetricsReporter() metrics.Reporter
- func (m *MockService) GetRingpopMonitor() RingpopMonitor
- func (m *MockService) GetTChannel() *tchannel.Channel
- func (m *MockService) GetWSConnector() WSConnector
- func (m *MockService) IsLimitsEnabled() bool
- func (m *MockService) Report(reporter LoadReporter)
- func (m *MockService) SetClientFactory(cf ClientFactory)
- func (m *MockService) Start(thriftService []thrift.TChanServer)
- func (m *MockService) Stop()
- type MockTimeSource
- type OneShotTimer
- type OpenAppendInWebsocketStream
- func (s *OpenAppendInWebsocketStream) Done() error
- func (s *OpenAppendInWebsocketStream) Flush() error
- func (s *OpenAppendInWebsocketStream) Read() (*store.AppendMessage, error)
- func (s *OpenAppendInWebsocketStream) SetResponseHeaders(headers map[string]string) error
- func (s *OpenAppendInWebsocketStream) Write(arg *store.AppendMessageAck) error
- type OpenAppendOutWebsocketStream
- func (s *OpenAppendOutWebsocketStream) Done() error
- func (s *OpenAppendOutWebsocketStream) Flush() error
- func (s *OpenAppendOutWebsocketStream) Read() (*store.AppendMessageAck, error)
- func (s *OpenAppendOutWebsocketStream) ResponseHeaders() (map[string]string, error)
- func (s *OpenAppendOutWebsocketStream) Write(arg *store.AppendMessage) error
- type OpenConsumerInWebsocketStream
- func (s *OpenConsumerInWebsocketStream) Done() error
- func (s *OpenConsumerInWebsocketStream) Flush() error
- func (s *OpenConsumerInWebsocketStream) Read() (*cherami.ControlFlow, error)
- func (s *OpenConsumerInWebsocketStream) SetResponseHeaders(headers map[string]string) error
- func (s *OpenConsumerInWebsocketStream) Write(arg *cherami.OutputHostCommand) error
- type OpenConsumerOutWebsocketStream
- func (s *OpenConsumerOutWebsocketStream) Done() error
- func (s *OpenConsumerOutWebsocketStream) Flush() error
- func (s *OpenConsumerOutWebsocketStream) Read() (*cherami.OutputHostCommand, error)
- func (s *OpenConsumerOutWebsocketStream) ResponseHeaders() (map[string]string, error)
- func (s *OpenConsumerOutWebsocketStream) Write(arg *cherami.ControlFlow) error
- type OpenPublisherInWebsocketStream
- func (s *OpenPublisherInWebsocketStream) Done() error
- func (s *OpenPublisherInWebsocketStream) Flush() error
- func (s *OpenPublisherInWebsocketStream) Read() (*cherami.PutMessage, error)
- func (s *OpenPublisherInWebsocketStream) SetResponseHeaders(headers map[string]string) error
- func (s *OpenPublisherInWebsocketStream) Write(arg *cherami.InputHostCommand) error
- type OpenPublisherOutWebsocketStream
- func (s *OpenPublisherOutWebsocketStream) Done() error
- func (s *OpenPublisherOutWebsocketStream) Flush() error
- func (s *OpenPublisherOutWebsocketStream) Read() (*cherami.InputHostCommand, error)
- func (s *OpenPublisherOutWebsocketStream) ResponseHeaders() (map[string]string, error)
- func (s *OpenPublisherOutWebsocketStream) Write(arg *cherami.PutMessage) error
- type OpenReadInWebsocketStream
- func (s *OpenReadInWebsocketStream) Done() error
- func (s *OpenReadInWebsocketStream) Flush() error
- func (s *OpenReadInWebsocketStream) Read() (*cherami.ControlFlow, error)
- func (s *OpenReadInWebsocketStream) SetResponseHeaders(headers map[string]string) error
- func (s *OpenReadInWebsocketStream) Write(arg *store.ReadMessageContent) error
- type OpenReadOutWebsocketStream
- func (s *OpenReadOutWebsocketStream) Done() error
- func (s *OpenReadOutWebsocketStream) Flush() error
- func (s *OpenReadOutWebsocketStream) Read() (*store.ReadMessageContent, error)
- func (s *OpenReadOutWebsocketStream) ResponseHeaders() (map[string]string, error)
- func (s *OpenReadOutWebsocketStream) Write(arg *cherami.ControlFlow) error
- type OpenReplicationReadStreamRequest
- type OpenReplicationRemoteReadStreamRequest
- type RingpopEventType
- type RingpopListenerEvent
- type RingpopMonitor
- type SCommon
- type Seconds
- type SequenceNumber
- type Service
- func (h *Service) GetClientFactory() ClientFactory
- func (h *Service) GetConfig() configure.CommonServiceConfig
- func (h *Service) GetDConfigClient() dconfig.Client
- func (h *Service) GetHostName() string
- func (h *Service) GetHostPort() string
- func (h *Service) GetHostUUID() string
- func (h *Service) GetLoadReporterDaemonFactory() LoadReporterDaemonFactory
- func (h *Service) GetMetricsReporter() metrics.Reporter
- func (h *Service) GetRingpopMonitor() RingpopMonitor
- func (h *Service) GetTChannel() *tchannel.Channel
- func (h *Service) GetWSConnector() WSConnector
- func (h *Service) IsLimitsEnabled() bool
- func (h *Service) Report(reporter LoadReporter)
- func (h *Service) SetClientFactory(cf ClientFactory)
- func (h *Service) Start(thriftServices []thrift.TChanServer)
- func (h *Service) Stop()
- func (h *Service) UpdateAdvertisedName(deploymentName string)
- type ShardedConcurrentMap
- func (cmap *ShardedConcurrentMap) Contains(key string) bool
- func (cmap *ShardedConcurrentMap) Get(key string) (interface{}, bool)
- func (cmap *ShardedConcurrentMap) Iter() MapIterator
- func (cmap *ShardedConcurrentMap) Put(key string, value interface{})
- func (cmap *ShardedConcurrentMap) PutIfNotExist(key string, value interface{}) bool
- func (cmap *ShardedConcurrentMap) Remove(key string)
- func (cmap *ShardedConcurrentMap) Size() int
- type TickerSource
- type TickerSourceFactory
- type TimeSource
- type Timer
- type TimerFactory
- type TokenBucket
- type TokenBucketFactory
- type UUIDResolver
- type UnixNanoTime
- type WSConnector
- type WSService
Constants ¶
const ( // SequenceBegin refers to the beginning of an extent SequenceBegin = 0 // SequenceEnd refers to the end of an extent SequenceEnd = math.MaxInt64 // CallerUserName is the name of thrift context header contains current user name CallerUserName = "user-name" // CallerHostName is the name of thrift context header contains current host name CallerHostName = "host-name" // CallerServiceName is the name of thrift context header contains current service name CallerServiceName = "cn" )
Extent sequence const
const ( // DLQMaxMergeAge is the maximum time that we expect a partition to exist DLQMaxMergeAge = common.UnixNanoTime(int64(time.Hour)) * 24 * 1 // one day // TenancyProd is the tenancy of production // Deployment name can be in the format of <tenancy>_<zone> TenancyProd = "prod" // InputHostForRemoteExtent is a special (and fake) input host ID for remote extent InputHostForRemoteExtent = "88888888-8888-8888-8888-888888888888" )
const ( // InputServiceName refers to the name of the cherami in service InputServiceName = "cherami-inputhost" // OutputServiceName refers to the name of the cherami out service OutputServiceName = "cherami-outputhost" // FrontendServiceName refers to the name of the cherami frontend service FrontendServiceName = "cherami-frontendhost" // ControllerServiceName refers to the name of the cherami controller service ControllerServiceName = "cherami-controllerhost" // StoreServiceName refers to the name of the cherami store service StoreServiceName = "cherami-storehost" // ReplicatorServiceName refers to the name of the cherami replicator service ReplicatorServiceName = "cherami-replicator" )
const ( // MaxHostOverallConn is the maximam overall connection limit for this host // TODO: Need to figure out the suitable values MaxHostOverallConn = 100000 // HostOverallConnLimit is the overall connection limit for this host HostOverallConnLimit = 10000 // MaxHostPerSecondConn is the maximam per second rate limit for this host // TODO: Need to figure out the suitable values MaxHostPerSecondConn = 10000 // HostPerSecondConnLimit is the per second rate limit for this host HostPerSecondConnLimit = 1000 //MaxHostPerConnMsgsLimitPerSecond is the maximam for per connection messages limit // TODO: Need to figure out the suitable values MaxHostPerConnMsgsLimitPerSecond = 800000 // HostPerConnMsgsLimitPerSecond is the per connection messages limit HostPerConnMsgsLimitPerSecond = 80000 //MaxHostPerExtentMsgsLimitPerSecond is the maximam for per extent messages limit // TODO: Need to figure out the suitable values MaxHostPerExtentMsgsLimitPerSecond = 200000 // HostPerExtentMsgsLimitPerSecond is the per extent messages limit HostPerExtentMsgsLimitPerSecond = 20000 // MaxHostMaxConnPerDestination is the maximam for max connections per destination // TODO: Need to figure out the suitable values MaxHostMaxConnPerDestination = 10000 // HostMaxConnPerDestination is the max connections per destination HostMaxConnPerDestination = 1000 )
some default values for the limits TODO: this will be moved behind a separate "limits" interface which is also dynamically tunable
const ( FlushThreshold int = 64 FlushTimeout time.Duration = 5 * time.Millisecond )
Flush stream thresholds; this is used by the "pumps" that wrap the websocket-stream and provide go-channel interface to read/write from the stream. the flush thresholds below control how often we do a "Flush" on the tchannel-stream. Currently configured for every 64 messages sent or every 5 milliseconds (whichever is sooner)
const MaxDuration time.Duration = 1<<62 - 1
MaxDuration is maximum time duration
const TagAckID = `ackID`
TagAckID is the logging tag for AckId
const TagAddr = `addr`
TagAddr is the logging tag for address
const TagCnsPth = `cnsPth`
TagCnsPth is the logging tag for Consumer group Path
const TagCnsm = `cnsmID`
TagCnsm is the logging tag for Consumer Group UUID
const TagCnsmID = `cnsmID`
TagCnsmID is the logging tag for the consumer ID
const TagCtrl = `ctrlID`
TagCtrl is the logging tag for Extent Controller UUID
const TagDLQID = `dlqID`
TagDLQID is the logging tag for a Dead Letter Queue destination UUID
const TagDbPath = "dbpath"
TagDbPath is the path to the db of the extent in manyrocks
const TagDeploymentName = `deployment`
TagDeploymentName is the logging tag for deployment name
const TagDplName = `deploymentName`
TagDplName is the logging tag for deployment name
const TagDst = `destID`
TagDst is the tag for Destination UUID
const TagDstPth = `dstPth`
TagDstPth is the logging tag for Destination Path
const TagErr = `err`
TagErr is the tag for error object message
const TagEvent = `event`
TagEvent is for "event" from Discovery and Failure Detection Daemon
const TagExt = `extnID`
TagExt is the logging tag for Extent UUID
const TagExtentCacheSize = `extentCacheSize`
TagExtentCacheSize is the logging tag for PathCache ExtentCache map size
const TagExtentStatus = `extStatus`
TagExtentStatus is for extent status
const TagFrnt = `frntID`
TagFrnt is the logging tag for Frontend UUID
const TagHostConnLimit = "hostconnlimit"
TagHostConnLimit is the log tag for hostconnection limit
const TagHostIP = `hostIP`
TagHostIP is the logging tag for host IP
const TagHostName = `hostName`
TagHostName is the logging tag for host name
const TagHostPort = "hostport"
TagHostPort is the log tag for hostport
const TagIn = `inhoID`
TagIn is the logging tag for Inputhost UUID
const TagInPubConnID = `inPubConnID`
TagInPubConnID is the logging tag for input pubconnection ID
const TagInPutAckID = `inPutAckID`
TagInPutAckID is the logging tag for PutMessageAck ID
const TagInReplicaHost = `inReplicaHost`
TagInReplicaHost is the logging tag for replica host on input
const TagModule = `module`
TagModule is the logging tag used to identify the module within a service
const TagMsgID = `msgID`
TagMsgID is the logging tag for MsgId
const TagOut = `outhID`
TagOut is the logging tag for Outputhost UUID
const TagReconfigureID = `reconfigID`
TagReconfigureID is the logging tag for reconfiguration identifiers
const TagReconfigureType = `reconfigType`
TagReconfigureType is the logging tag for reconfiguration type
const TagReplicator = "replicatorID"
TagReplicator is the logging tag for replicator host UUID
const TagRunnerName = "runnerName"
TagRunnerName is the log tag for runner name, value is basic, timers, dlqTimedout, etc.
const TagSeq = `seq`
TagSeq is for sequence number
const TagService = "service"
TagService is the log tag for the service
const TagSlowDownSeconds = `slowDownSecs`
TagSlowDownSeconds is the logging tag for slow down time on consconnection
const TagState = `state`
TagState is for "state" in event handlers
const TagStor = `storID`
TagStor is the logging tag for StoreHost UUID
const TagTbSleep = "tokenbucketduration"
TagTbSleep is the log tag for token bucket sleep duration
const TagTenancy = `tenancy`
TagTenancy is the logging tag for tenancy
const TagUnknowPth = `unknowPth`
TagUnknowPth is the logging tag for Unknow Path
const TagUpdateUUID = `updateUUID`
TagUpdateUUID is the logging tag for reconfiguration update UUIDs
const TagZoneName = `zoneName`
TagZoneName is the logging tag for zone name
const ( // UUIDStringLength is the length of an UUID represented as a hex string UUIDStringLength = 36 // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx )
Variables ¶
var ConsumerGroupRegex = PathRegex
ConsumerGroupRegex regex for consumer group path
var ErrInsufficientHosts = errors.New("Not enough hosts to serve the request")
ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrListenerAlreadyExist = errors.New("Listener already exist for the service")
ErrListenerAlreadyExist is thrown on a duplicate AddListener call from the same listener
var ( // ErrLoadReportThrottled is the error returned by LoadReporter when it runs out of tokens to serve the request ErrLoadReportThrottled = errors.New("Too many load reports from the host.") )
var ErrNoClient = &cherami.InternalServiceError{Message: "Unable to create client"}
ErrNoClient is returned when the host is already shutdown
var ErrUUIDLookupFailed = errors.New("Cannot find ip:port corresponding to uuid")
ErrUUIDLookupFailed is thrown when a uuid cannot be mapped to addr
var ErrUnknownService = errors.New("Service not tracked by RingpopMonitor")
ErrUnknownService is thrown for a service that is not tracked by this instance
var PathDLQRegex = regexp.MustCompile(`^/[\w.]*[a-zA-Z][\w.]*/[\w.]*[a-zA-Z][\w.]*.dlq$`)
PathDLQRegex regex for dlq destination path
var PathRegex = regexp.MustCompile(`^/[\w.]*[a-zA-Z][\w.]*/[\w.]*[a-zA-Z][\w.]*$`)
PathRegex regex for destination path
var PathRegexAllowUUID, _ = regexp.Compile(`^(/[\w.]*[a-zA-Z][\w.]*/[\w.]*[a-zA-Z][\w.]*|[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12})$`)
PathRegexAllowUUID For special destinations (e.g. Dead letter queues) we allow a string UUID as path
var ServiceToPort = map[string]string{
InputServiceName: "4240",
OutputServiceName: "4254",
StoreServiceName: "4253",
FrontendServiceName: "4922",
ControllerServiceName: "5425",
ReplicatorServiceName: "6280",
}
ServiceToPort is service name to ports mapping This map should be syced with the port nums in config file and use by command line. We need this because for command line, we can not generated the config path automatically.
var UUIDRegex, _ = regexp.Compile(`^[[:xdigit:]]{8}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{4}-[[:xdigit:]]{12}$`)
UUIDRegex regex for uuid
Functions ¶
func AdminNotificationTypePtr ¶
func AdminNotificationTypePtr(notificationType admin.NotificationType) *admin.NotificationType
AdminNotificationTypePtr makes a copy and returns the pointer to a MetadataNotificationType.
func AwaitWaitGroup ¶
AwaitWaitGroup calls Wait on the given wait Returns true if the Wait() call succeeded before the timeout Returns false if the Wait() did not return before the timeout
func BootstrapRingpop ¶
func BootstrapRingpop(rp *ringpop.Ringpop, ipaddr string, port int, cfg configure.CommonServiceConfig) error
BootstrapRingpop tries to bootstrap the given ringpop instance using the hosts list
func CQLTimestampToUnixNano ¶
CQLTimestampToUnixNano converts CQL timestamp to UnixNano
func CalculateRate ¶
func CalculateRate(last, curr SequenceNumber, lastTime, currTime UnixNanoTime) float64
CalculateRate does a simple rate calculation
func CheramiChecksumOptionPtr ¶
func CheramiChecksumOptionPtr(checksumOption cherami.ChecksumOption) *cherami.ChecksumOption
CheramiChecksumOptionPtr makes a copy and return the pointer too a CheramiChecksumOption.
func CheramiConsumerGroupExtentStatusPtr ¶
func CheramiConsumerGroupExtentStatusPtr(status metadata.ConsumerGroupExtentStatus) *metadata.ConsumerGroupExtentStatus
CheramiConsumerGroupExtentStatusPtr makes a copy and returns the pointer to a CheramiConsumerGroupExtentStatus.
func CheramiConsumerGroupStatusPtr ¶
func CheramiConsumerGroupStatusPtr(status cherami.ConsumerGroupStatus) *cherami.ConsumerGroupStatus
CheramiConsumerGroupStatusPtr makes a copy and returns the pointer to a CheramiConsumerGroupStatus.
func CheramiDestinationStatusPtr ¶
func CheramiDestinationStatusPtr(status cherami.DestinationStatus) *cherami.DestinationStatus
CheramiDestinationStatusPtr makes a copy and returns the pointer to a CheramiDestinationStatus.
func CheramiDestinationType ¶
func CheramiDestinationType(internalDestinationType shared.DestinationType) (cheramiDestinationType cherami.DestinationType, err error)
CheramiDestinationType converts from shared.DestinationType to cherami.DestinationType
func CheramiDestinationTypePtr ¶
func CheramiDestinationTypePtr(destType cherami.DestinationType) *cherami.DestinationType
CheramiDestinationTypePtr makes a copy and returns the pointer to a CheramiDestinationType.
func CheramiInputHostCommandTypePtr ¶
func CheramiInputHostCommandTypePtr(cmdType cherami.InputHostCommandType) *cherami.InputHostCommandType
CheramiInputHostCommandTypePtr makes a copy and returns the pointer to a CheramiInputHostCommandType.
func CheramiOutputHostCommandTypePtr ¶
func CheramiOutputHostCommandTypePtr(cmdType cherami.OutputHostCommandType) *cherami.OutputHostCommandType
CheramiOutputHostCommandTypePtr makes a copy and returns the pointer to a CheramiOutputHostCommandType.
func CheramiProtocolPtr ¶
CheramiProtocolPtr makes a copy and returns the pointer to a CheramiProtocol.
func CheramiStatusPtr ¶
CheramiStatusPtr makes a copy and returns the pointer to a CheramiStatus.
func ClassifyErrorByType ¶
func ClassifyErrorByType(in error) metrics.ErrorClass
ClassifyErrorByType gives the metrics error class for any cherami or common error
func ConstructAckID ¶
ConstructAckID is a helper routine to construct the ackID from the given args
func ConsumerGroupExtentMetricsPtr ¶
func ConsumerGroupExtentMetricsPtr(cgExtMetrics controller.ConsumerGroupExtentMetrics) *controller.ConsumerGroupExtentMetrics
ConsumerGroupExtentMetricsPtr makes a copy and returns the pointer to a ConsumerGroupExtentMetrics.
func ConsumerGroupMetricsPtr ¶
func ConsumerGroupMetricsPtr(cgMetrics controller.ConsumerGroupMetrics) *controller.ConsumerGroupMetrics
ConsumerGroupMetricsPtr makes a copy and returns the pointer to a ConsumerGroupMetrics.
func ContainsEmpty ¶ added in v1.26.0
ContainsEmpty scans a string slice for an empty string, returning true if one is found
func ContainsString ¶ added in v1.26.0
ContainsString scans a string slice for a matching string, returning true if one is found
func ConvertDownstreamErrors ¶
ConvertDownstreamErrors is a helper function to convert a error from metadata client or controller client to client-cherami.thrift error that can be returned to caller. It also classifies the error for metrics
func CreateCassandraKeyspace ¶
func CreateCassandraKeyspace(s *gocql.Session, keyspace string, replicas int, overwrite bool) (err error)
CreateCassandraKeyspace creates the keyspace using this session for given replica count
func CreateHyperbahnClient ¶
CreateHyperbahnClient returns a hyperbahn client
func CreateInputHostAdminClient ¶
func CreateInputHostAdminClient(ch *tchannel.Channel, hostPort string) (admin.TChanInputHostAdmin, error)
CreateInputHostAdminClient creates and returns tchannel client for the input host admin API
func CreateOutputHostAdminClient ¶
func CreateOutputHostAdminClient(ch *tchannel.Channel, hostPort string) (admin.TChanOutputHostAdmin, error)
CreateOutputHostAdminClient creates and returns tchannel client for the output host admin API
func CreateRingpop ¶
func CreateRingpop(service string, ch *tchannel.Channel, ipaddr string, port int) *(ringpop.Ringpop)
CreateRingpop instantiates the ringpop for the provided channel and host,
func DestinationExtentMetricsPtr ¶
func DestinationExtentMetricsPtr(dstExtMetrics controller.DestinationExtentMetrics) *controller.DestinationExtentMetrics
DestinationExtentMetricsPtr makes a copy and returns the pointer to a DestinationExtentMetrics.
func DestinationMetricsPtr ¶
func DestinationMetricsPtr(dstMetrics controller.DestinationMetrics) *controller.DestinationMetrics
DestinationMetricsPtr makes a copy and returns the pointer to a DestinationMetrics.
func DropCassandraKeyspace ¶
DropCassandraKeyspace drops the given keyspace, if it exists
func ExtrapolateDifference ¶
func ExtrapolateDifference(observedA, observedB SequenceNumber, observedARate, observedBRate float64, observedATime, observedBTime, extrapolatedTime UnixNanoTime, maxExtrapolationTime Seconds) (extrapolated int64)
ExtrapolateDifference calculates the extrapolated difference in two observed value with rates, at some arbitrary time. It is assumed that A > B, so if B is extrapolated to be greater than A, the difference will be presumed to be zero.
func FindNearestInt ¶ added in v1.26.0
FindNearestInt finds the integer that is closest to the given 'target'
func Float64Ptr ¶
Float64Ptr makes a copy and returns the pointer to an int64.
func FmtDplName ¶
FmtDplName formats a string to be used with TagDplName
func FmtExtentStatus ¶ added in v1.26.0
func FmtExtentStatus(status shared.ExtentStatus) string
FmtExtentStatus formats ExtentStatus to be used with TagExtentStatus
func FmtHostConnLimit ¶
FmtHostConnLimit formats an int to be used with TagHostConnLimit
func FmtHostName ¶
FmtHostName formats a string to be used with TagHostName
func FmtHostPort ¶
FmtHostPort formats a string to be used with TagHostPort
func FmtInPubConnID ¶
FmtInPubConnID formats an int to be used with TagInPubConnID
func FmtInPutAckID ¶
FmtInPutAckID formats a string to be used with TagInPutAckID
func FmtInReplicaHost ¶
FmtInReplicaHost formats a string to be used with TagInReplicaHost
func FmtReconfigureID ¶
FmtReconfigureID formats a string to be used with TagReconfigureID
func FmtReconfigureType ¶
func FmtReconfigureType(s admin.NotificationType) string
FmtReconfigureType formats admin.NotificationType to be used with TagNotificationType
func FmtService ¶
FmtService formats a string to be used with TagService
func FmtSlowDown ¶
FmtSlowDown formats an int to be used with TagSlowDown
func FmtTbSleep ¶
FmtTbSleep formats a time.Duration to be used with TagTbSleep
func FmtTenancy ¶
FmtTenancy formats a string to be used with TagTenancy
func FmtZoneName ¶
FmtZoneName formats a string to be used with TagZoneName
func GetConnectionKey ¶
func GetConnectionKey(host *cherami.HostAddress) string
GetConnectionKey is used to create a key used by connections for looking up connections
func GetDLQPathNameFromCGName ¶
GetDLQPathNameFromCGName function return the DLQ destination name based on the consumer group passed Usually pass the Consumer group name to get a DLQ path name DEVNOTE: DO NOT QUERY A DLQ DESTINATION BY THIS NAME. This name is for reporting purposes only. All destination APIs support passing the DLQ UUID as the path.
func GetDateTag ¶
func GetDateTag() string
GetDateTag returns the current date used for tagging daily metric
func GetDefaultLogger ¶
GetDefaultLogger is a utility routine to get the default logger
func GetDirectoryName ¶
GetDirectoryName function gives the directory name given a path used for destination or consumer groups
func GetEnvVariableFromHostPort ¶
GetEnvVariableFromHostPort gets the environment variable corresponding to this host port. XXX: this can be removed once we move to ringpop labels and exchange websocket port as part of the ringpop metadata
func GetLocalClusterInfo ¶
GetLocalClusterInfo gets the zone and tenancy from the given deployment
func GetOpenAppendStreamRequestHTTP ¶
func GetOpenAppendStreamRequestHTTP(httpHeader http.Header) (req *store.OpenAppendStreamRequest, err error)
GetOpenAppendStreamRequestHTTP extracts OpenAppendStreamRequest from http headers
func GetOpenAppendStreamRequestHeaders ¶
func GetOpenAppendStreamRequestHeaders(req *store.OpenAppendStreamRequest) (headers map[string]string)
GetOpenAppendStreamRequestHeaders converts an OpenAppendStreamRequest struct to headers to pass as tchannel headers to OpenAppendStream
func GetOpenAppendStreamRequestStruct ¶
func GetOpenAppendStreamRequestStruct(headers map[string]string) (req *store.OpenAppendStreamRequest, err error)
GetOpenAppendStreamRequestStruct extracts OpenAppendStreamRequest from tchannel headers
func GetOpenReadStreamRequestHTTP ¶
func GetOpenReadStreamRequestHTTP(httpHeader http.Header) (req *store.OpenReadStreamRequest, err error)
GetOpenReadStreamRequestHTTP extracts OpenReadStreamRequest from http headers
func GetOpenReadStreamRequestHTTPHeaders ¶
func GetOpenReadStreamRequestHTTPHeaders(req *store.OpenReadStreamRequest) http.Header
GetOpenReadStreamRequestHTTPHeaders converts an OpenReadStreamRequest struct to http headers for OpenReadStream
func GetOpenReadStreamRequestHeaders ¶
func GetOpenReadStreamRequestHeaders(req *store.OpenReadStreamRequest) (headers map[string]string)
GetOpenReadStreamRequestHeaders converts an OpenReadStreamRequest struct to headers to pass as tchannel headers to OpenReadStream
func GetOpenReadStreamRequestStruct ¶
func GetOpenReadStreamRequestStruct(headers map[string]string) (req *store.OpenReadStreamRequest, err error)
GetOpenReadStreamRequestStruct extracts OpenReadStreamRequest from tchannel headers
func GetOpenReplicationReadStreamRequestHTTPHeaders ¶
func GetOpenReplicationReadStreamRequestHTTPHeaders(req *OpenReplicationReadStreamRequest) http.Header
GetOpenReplicationReadStreamRequestHTTPHeaders converts an OpenReplicationReadStreamRequest struct to http headers for OpenReplicationReadStreamRequest
func GetOpenReplicationRemoteReadStreamRequestHTTPHeaders ¶
func GetOpenReplicationRemoteReadStreamRequestHTTPHeaders(req *OpenReplicationRemoteReadStreamRequest) http.Header
GetOpenReplicationRemoteReadStreamRequestHTTPHeaders converts an OpenReplicationRemoteReadStreamRequest struct to http headers for OpenReplicationRemoteReadStreamRequest
func GetRandInt64 ¶
GetRandInt64 is used to get a 64 bit random number between min and max
func GetTagsFromPath ¶
GetTagsFromPath function return the tags name for path based on directory path name passed Usually pass the Consumer group name or a destination path name to get a tag name
func InternalChecksumOptionPtr ¶
func InternalChecksumOptionPtr(checksumOption shared.ChecksumOption) *shared.ChecksumOption
InternalChecksumOptionPtr makes a copy and return the pointer too a internal shared ChecksumOption.
func InternalConsumerGroupStatusPtr ¶
func InternalConsumerGroupStatusPtr(status shared.ConsumerGroupStatus) *shared.ConsumerGroupStatus
InternalConsumerGroupStatusPtr makes a copy and returns the pointer to a internal shared ConsumerGroupStatus.
func InternalConsumerGroupTypePtr ¶
func InternalConsumerGroupTypePtr(cgType shared.ConsumerGroupType) *shared.ConsumerGroupType
InternalConsumerGroupTypePtr makes a copy and returns the pointer to a internal shared ConsumerGroupType.
func InternalDestinationStatusPtr ¶
func InternalDestinationStatusPtr(status shared.DestinationStatus) *shared.DestinationStatus
InternalDestinationStatusPtr makes a copy and returns the pointer to a internal shared DestinationStatus.
func InternalDestinationTypePtr ¶
func InternalDestinationTypePtr(destType shared.DestinationType) *shared.DestinationType
InternalDestinationTypePtr makes a copy and returns the pointer to a internal shared DestinationType.
func InternalExtentReplicaReplicationStatusTypePtr ¶
func InternalExtentReplicaReplicationStatusTypePtr(status shared.ExtentReplicaReplicationStatus) *shared.ExtentReplicaReplicationStatus
InternalExtentReplicaReplicationStatusTypePtr makes a copy and returns the pointer to a ExtentReplicaReplicationStatus
func IsDLQDestination ¶
func IsDLQDestination(dstDesc *shared.DestinationDescription) bool
IsDLQDestination checks whether a destination is dlq type
func IsDLQDestinationPath ¶
IsDLQDestinationPath checks whether a destination path is dlq type
func IsDevelopmentEnvironment ¶
IsDevelopmentEnvironment detects if we are running in a development environment
func IsRemoteZoneExtent ¶
IsRemoteZoneExtent returns whether the extent is a remote zone extent
func IsRetryableTChanErr ¶
IsRetryableTChanErr returns true if the given tchannel error is a retryable error.
func IsValidServiceName ¶
IsValidServiceName returns true if the given input is a valid service name, false otherwise
func LoadCassandraSchema ¶
LoadCassandraSchema loads the schema from the given .cql file on this keyspace using cqlsh
func MetadataConsumerGroupExtentStatusPtr ¶
func MetadataConsumerGroupExtentStatusPtr(status metadata.ConsumerGroupExtentStatus) *metadata.ConsumerGroupExtentStatus
MetadataConsumerGroupExtentStatusPtr makes a copy and returns the pointer to a MetadataConsumerGroupExtentStatus.
func MetadataExtentReplicaStatusPtr ¶
func MetadataExtentReplicaStatusPtr(status shared.ExtentReplicaStatus) *shared.ExtentReplicaStatus
MetadataExtentReplicaStatusPtr makes a copy and returns the pointer to a MetadataExtentReplicaStatus.
func MetadataExtentStatusPtr ¶
func MetadataExtentStatusPtr(status shared.ExtentStatus) *shared.ExtentStatus
MetadataExtentStatusPtr makes a copy and returns the pointer to a MetadataExtentStatus.
func NewCassandraCluster ¶
func NewCassandraCluster(clusterHosts string) *gocql.ClusterConfig
NewCassandraCluster creates a cassandra cluster given comma separated list of clusterHosts
func NewMetricReporterWithHostname ¶
func NewMetricReporterWithHostname(cfg configure.CommonServiceConfig) metrics.Reporter
NewMetricReporterWithHostname create statsd/simple reporter based on config
func NewTestMetricsReporter ¶ added in v0.2.0
NewTestMetricsReporter creates a test reporter that allows registration of handler functions
func NodeMetricsPtr ¶
func NodeMetricsPtr(nodeMetrics controller.NodeMetrics) *controller.NodeMetrics
NodeMetricsPtr makes a copy and returns the pointer to a NodeMetrics.
func OverrideValueByPrefix ¶
func OverrideValueByPrefix(logFn func() bark.Logger, path string, overrides []string, defaultVal int64, valName string) int64
OverrideValueByPrefix takes a list of override rules in the form 'prefix=val' and a given string, and determines the most specific rule that applies to the given string. It then replaces the given default value with the override value. logFn is a logging closure that allows lazy instatiation of a logger interface to log error conditions and override status. valName is used for logging purposes, to differentiate multiple instantiations in the same context
As an example, you could override a parameter, like the number of desired extents, according to various destination paths. We could try to have 8 extents by default, and give destinations beginning with /test only 1, and give a particular destination specifically a higher amount. To achieve this, we could configure the overrides like this:
overrides := {`=8`, `/test=1`, `/JobPlatform/TripEvents$=16`}
func RWLockReadAndConditionalWrite ¶
RWLockReadAndConditionalWrite implements the RWLock Read+Read&Conditional-Write pattern. m is the RWMutex covering a shared resource readFn is a function that returns a true if a write on the shared resource is required. writeFn is a function that updates the shared resource. The result of the read/write can be returned by capturing return variables in your provided functions
func RandomBytes ¶
RandomBytes generates random bytes of given size
func RolePtr ¶
func RolePtr(role controller.Role) *controller.Role
RolePtr makes a copy and returns the pointer to a SKU.
func SKUPtr ¶
func SKUPtr(sku controller.SKU) *controller.SKU
SKUPtr makes a copy and returns the pointer to a SKU.
func ServiceLoop ¶
func ServiceLoop(port int, cfg configure.CommonAppConfig, service *Service)
ServiceLoop runs the http admin endpoints. This is a blocking call.
func SetupServerConfig ¶
func SetupServerConfig(configurator configure.Configure) configure.CommonAppConfig
SetupServerConfig reads on-disk config (in config/)
func ShortenGUIDString ¶
ShortenGUIDString takes a string with one or more GUIDs and elides them to make it more human readable. It turns "354754bd-b73e-4d20-8021-ab93a3d145c0:67af70c5-f45e-4b3d-9d20-6758195e2ff4:3:2" into "354754bd:67af70c5:3:2"
func SpinWaitOnCondition ¶
func SpinWaitOnCondition(condition ConditionFunc, timeout time.Duration) bool
SpinWaitOnCondition busy waits for a given condition to be true until the timeout Returns true if the condition was satisfied, false on timeout
func SplitHostPort ¶
SplitHostPort takes a x.x.x.x:yyyy string and split it into host and ports
func StoreExtentMetricsPtr ¶
func StoreExtentMetricsPtr(storeExtMetrics controller.StoreExtentMetrics) *controller.StoreExtentMetrics
StoreExtentMetricsPtr makes a copy and returns the pointer to a StoreExtentMetrics.
func StringSetEqual ¶ added in v1.26.0
StringSetEqual checks for set equality (i.e. non-ordered, discounting duplicates) for two string slices StringSetEqual([]string{`a`,`a`,`b`,`b`}, []string{`a`,`a`,`b`}) == TRUE !! DEVNOTE: This is O(N^2), so don't use it with large N; better if len(a) > len(b) if you have duplicates
func UUIDHashCode ¶
UUIDHashCode is a hash function for hashing string uuid if the uuid is malformed, then the hash function always returns 0 as the hash value
func UUIDToUint16 ¶
UUIDToUint16 uses the UUID and returns a uint16 out of it
func UnixNanoToCQLTimestamp ¶
UnixNanoToCQLTimestamp converts UnixNano to CQL timestamp
func WSStart ¶
WSStart register websocket handlers and spin up websocket server. This is not a blocking call
func WaitTimeout ¶
WaitTimeout waits for given func until timeout (return true if timeout)
Types ¶
type AckID ¶
type AckID struct { MutatedID CombinedID Address int64 }
AckID designates a consumer message to ack/nack
func AckIDFromString ¶
AckIDFromString deserializes a string into the object.
func (AckID) ConstructCombinedID ¶
func (a AckID) ConstructCombinedID(sessionID uint64, ackMgrID uint64, seqNum uint64) CombinedID
ConstructCombinedID constructs the combinedID from the session, ackmgr and the seq numbers based on the bit masks
func (AckID) ToString ¶
ToString serializes AckID object into a base64 encoded string First 64 bits of the AckID is as follows: 16 bit - Session ID (constructed from the uuid) 16 bit - Monotonically increasing number to identify all unique ack managers within a host 32 bit - Sequence Number within the AckManager which is used to update the data structure within
the ack manager
The reason for having the above fileds in the ackID is as follows: sessionID - to make sure ack is to the same outputhost (let's say to prevent a bad client) ackID - to uniquely identify the ack managers within this outputhost seqNum - to identify the data structure within the ackMgr Address - to make sure we validate the ack based on what we get from the store We need all the above to prevent collisions
type CliHelper ¶
type CliHelper interface { // GetDefaultOwnerEmail is used to get the default owner email GetDefaultOwnerEmail() string // SetDefaultOwnerEmail is used to set the default owner email to be used in the CLI SetDefaultOwnerEmail(string) // GetCanonicalZone is used to get the canonical zone name from the // given zone. This is useful in cases where we have short names for // zones. For example, if "zone1" and "z1" both map to "zone1", then // use "zone1" GetCanonicalZone(zone string) (string, error) // SetCanonicalZones is used to populate all valid zones that can be given from CLI SetCanonicalZones(map[string]string) }
CliHelper is the interface to help with the args passed to CLI
func NewCliHelper ¶
func NewCliHelper() CliHelper
NewCliHelper is used to create an uber specific CliHelper
type ClientFactory ¶
type ClientFactory interface { // GetAdminControllerClient gets the thrift version of the controller client GetAdminControllerClient() (admin.TChanControllerHostAdmin, error) // GetThriftStoreClient gets the thrift version of the store client GetThriftStoreClient(replica string, pathUUID string) (store.TChanBStore, error) // ReleaseThriftStoreClient releases the ref on this pathUUID ReleaseThriftStoreClient(pathUUID string) // GetThriftStoreClientUUID first resolves the store UUID to an address, before creating a thrift client GetThriftStoreClientUUID(storeUUID string, contextID string) (store.TChanBStore, string, error) // GetControllerClient gets the thrift client for making calls to Cherami Controller GetControllerClient() (controller.TChanController, error) // GetReplicatorClient gets the thrift client for making calls to local Replicator GetReplicatorClient() (replicator.TChanReplicator, error) }
ClientFactory is the thrift clients interface which is used to get thrift clients for this service. This make sure we don't allocate new thrift clients all the time and also makes sure the returned client is up and running by consulting ringpop monitor TODO: Add store, in, out, etc here so that we don't end up creating new thrift clients all the time
func NewClientFactory ¶
func NewClientFactory(rpm RingpopMonitor, ch *tchannel.Channel, log bark.Logger) ClientFactory
NewClientFactory just instantiates a thriftClientImpl object
type CombinedID ¶
type CombinedID uint64
CombinedID is the one which holds session, ackmgr and seqnum together
func (CombinedID) DeconstructCombinedID ¶
func (c CombinedID) DeconstructCombinedID() (uint16, uint16, uint32)
DeconstructCombinedID deconstructs the combinedID
type ConcurrentMap ¶
type ConcurrentMap interface { // Get returns the value for the given key Get(key string) (interface{}, bool) // Contains returns true if the key exist and false otherwise Contains(key string) bool // Put records the mapping from given key to value Put(key string, value interface{}) // PutIfNotExist records the key value mapping only // if the mapping does not already exist PutIfNotExist(key string, value interface{}) bool // Remove deletes the key from the map Remove(key string) // Iter returns an iterator to the map Iter() MapIterator // Size returns the number of items in the map Size() int }
ConcurrentMap is a generic interface for any implementation of a dictionary or a key value lookup table that is thread safe.
func NewShardedConcurrentMap ¶
func NewShardedConcurrentMap(initialCap int, hashfn HashFunc) ConcurrentMap
NewShardedConcurrentMap returns an instance of ShardedConcurrentMap
ShardedConcurrentMap is a thread safe map that maintains upto nShards number of maps internally to allow nShards writers to be acive at the same time. This map *does not* use re-entrant locks, so access to the map during iterator can cause a dead lock.
@param initialSz
The initial size for the map
@param hashfn
The hash function to use for sharding
type ConditionFunc ¶
type ConditionFunc func() bool
ConditionFunc represents an expression that evaluates to true on when some condition is satisfied and false otherwise
type CounterBank ¶
type CounterBank struct {
// contains filtered or unexported fields
}
CounterBank represents a set of counters that all belong to the same group ex - dst or extent counters A counterBank supports methods to inc/dec/get counter values. Each of these methods takes an index, that represents the offset of the counter within the counter bank. All operations supported by counterBank are atomic
func NewCounterBank ¶
func NewCounterBank(size int) *CounterBank
NewCounterBank returns a new instance of counterBank containing size number of counters.
func (*CounterBank) Add ¶
func (c *CounterBank) Add(idx int, delta int64) int64
Add adds the given value to the counter at the given offset.
func (*CounterBank) Decrement ¶
func (c *CounterBank) Decrement(idx int) int64
Decrement decrements the counter at the given offset index and returns the new value
func (*CounterBank) Get ¶
func (c *CounterBank) Get(idx int) int64
Get returns the counter value at the given offset index
func (*CounterBank) GetAndReset ¶
func (c *CounterBank) GetAndReset(idx int) int64
GetAndReset resets the counter value for given offset to zero and returns the old value atomically
func (*CounterBank) Increment ¶
func (c *CounterBank) Increment(idx int) int64
Increment increments the counter at the given offset index and returns the new value
type Daemon ¶
type Daemon interface { Start() Stop() }
Daemon is the base interfaces implemented by background tasks within cherami
type GeometricRollingAverage ¶
type GeometricRollingAverage float64
GeometricRollingAverage is the value of a geometrically diminishing rolling average
func (*GeometricRollingAverage) GetGeometricRollingAverage ¶
func (avg *GeometricRollingAverage) GetGeometricRollingAverage() float64
GetGeometricRollingAverage returns the result of the geometric rolling average
func (*GeometricRollingAverage) SetGeometricRollingAverage ¶
func (avg *GeometricRollingAverage) SetGeometricRollingAverage(val float64)
SetGeometricRollingAverage adds a value to the geometric rolling average
type HTTPHandler ¶
type HTTPHandler struct {
// contains filtered or unexported fields
}
HTTPHandler contains the http handlers for controller
func NewHTTPHandler ¶
func NewHTTPHandler(cfg configure.CommonAppConfig, service *Service) *HTTPHandler
NewHTTPHandler returns a new instance of http handler. This call must be followed by a call to httpHandler.Register().
func (*HTTPHandler) Register ¶
func (handler *HTTPHandler) Register(mux *http.ServeMux)
Register registers all the http handlers supported by controller. This method does not start a http server.
type Heartbeat ¶
type Heartbeat struct {
// contains filtered or unexported fields
}
Heartbeat is just a timestamp and an identifier
func NewHeartbeat ¶
NewHeartbeat creates a new Heartbeat object
func (*Heartbeat) CloseHeartbeat ¶
func (h *Heartbeat) CloseHeartbeat()
CloseHeartbeat removes the given heart from the Heartbeat system
type HostAckIDGenerator ¶
type HostAckIDGenerator interface { // GetNextAckID is the routine which gets the next ackID GetNextAckID() uint32 }
HostAckIDGenerator is the per host ackID generator for this host Right now, it is a monotonically increasing uint32
func NewHostAckIDGenerator ¶
func NewHostAckIDGenerator(startFrom uint32) HostAckIDGenerator
NewHostAckIDGenerator returns a HostAckIDGenerator object and starts from the given value
type HostHardwareInfo ¶
HostHardwareInfo is the type that contains hardware properties about a cherami host
type HostHardwareInfoReader ¶
type HostHardwareInfoReader interface { // Read reads and returns the hardware info // corresponding to the given hostname Read(hostname string) (*HostHardwareInfo, error) }
HostHardwareInfoReader is the interface for any implementation that vends hardware info related to a given hostname
func NewHostHardwareInfoReader ¶
func NewHostHardwareInfoReader(mClient m.TChanMetadataService) HostHardwareInfoReader
NewHostHardwareInfoReader creates and returns an implementation of hardwareInfoReader that uses Cassandra as the backing store
type HostIDHeartbeater ¶
type HostIDHeartbeater interface { Daemon }
HostIDHeartbeater keeps the host uuid to ip address mapping alive by periodically heartbeating to the registry. Currently, the uuid to addr mapping is stored within a cassandra table.
func NewHostIDHeartbeater ¶
func NewHostIDHeartbeater(mClient metadata.TChanMetadataService, hostID string, hostAddr string, hostName string, log bark.Logger) HostIDHeartbeater
NewHostIDHeartbeater creates and returns a new instance of HostIDHeartbeater
type HostInfo ¶
type HostInfo struct { UUID string Addr string // ip:port Name string Sku string Rack string Zone string }
HostInfo is a type that contains the info about a cherami host
type Item ¶
type Item struct { Value interface{} // The value of the item; arbitrary. Key int64 // The key of the item in the queue. // contains filtered or unexported fields }
An Item is something we manage in a key queue.
type LoadReporter ¶
type LoadReporter interface { ReportHostMetric(metrics controller.NodeMetrics) error ReportDestinationMetric(destinationUUID string, metrics controller.DestinationMetrics) error ReportDestinationExtentMetric(destinationUUID string, extentUUID string, metrics controller.DestinationExtentMetrics) error ReportConsumerGroupMetric(destinationUUID string, consumerGroupUUID string, metrics controller.ConsumerGroupMetrics) error ReportConsumerGroupExtentMetric(destinationUUID string, consumerGroupUUID string, extentUUID string, metrics controller.ConsumerGroupExtentMetrics) error ReportStoreExtentMetric(extentUUID string, metrics controller.StoreExtentMetrics) error }
LoadReporter is used by each component interested in reporting load to Controller
type LoadReporterDaemon ¶
type LoadReporterDaemon interface { Daemon }
LoadReporterDaemon is used for periodically reporting load to controller
type LoadReporterDaemonFactory ¶
type LoadReporterDaemonFactory interface {
CreateReporter(interval time.Duration, source LoadReporterSource, logger bark.Logger) LoadReporterDaemon
}
LoadReporterDaemonFactory is used to create a daemon task for reporting load to controller
func NewLoadReporterDaemonFactory ¶
func NewLoadReporterDaemonFactory(hostUUID string, sku controller.SKU, role controller.Role, clientFactory ClientFactory, tickerFactory TickerSourceFactory, tokenBucketFactory TokenBucketFactory, logger bark.Logger) LoadReporterDaemonFactory
NewLoadReporterDaemonFactory is used to create a factory for creating LoadReporterDaemon
type LoadReporterSource ¶
type LoadReporterSource interface {
Report(reporter LoadReporter)
}
LoadReporterSource is implemented by any component reporting load to controller
type MapEntry ¶
type MapEntry struct { // Key represents the key Key string // Value represents the value Value interface{} }
MapEntry represents a key-value entry within the map
type MapIterator ¶
type MapIterator interface { // Close closes the iterator // and releases any allocated resources Close() // Entries returns a channel of MapEntry // objects that can be used in a range loop Entries() <-chan *MapEntry }
MapIterator represents the interface for map iterators
type MinHeap ¶
type MinHeap []*Item
A MinHeap implements heap.Interface and holds Items.
func (*MinHeap) Pop ¶
func (mh *MinHeap) Pop() interface{}
Pop is the function needed by heap Interface
type MockLoadReporterDaemon ¶
MockLoadReporterDaemon is the mock of common.LoadReporterDaemon interface
func (*MockLoadReporterDaemon) Start ¶
func (m *MockLoadReporterDaemon) Start()
Start is the mock implementation for Start function on common.LoadReporterDaemon
func (*MockLoadReporterDaemon) Stop ¶
func (m *MockLoadReporterDaemon) Stop()
Stop is the mock implementation for Stop function on common.LoadReporterDaemon
type MockLoadReporterDaemonFactory ¶
MockLoadReporterDaemonFactory is the mock of common.LoadReporterDaemonFactory interface
func (*MockLoadReporterDaemonFactory) CreateReporter ¶
func (m *MockLoadReporterDaemonFactory) CreateReporter(interval time.Duration, source LoadReporterSource, logger bark.Logger) LoadReporterDaemon
CreateReporter is the mock implementation for CreateReporter function on common.LoadReporterDaemonFactory
type MockRingpopMonitor ¶
MockRingpopMonitor is an implementation of RingpopMonitor for UTs
func NewMockRingpopMonitor ¶
func NewMockRingpopMonitor() *MockRingpopMonitor
NewMockRingpopMonitor returns a new instance
func (*MockRingpopMonitor) Add ¶
func (rpm *MockRingpopMonitor) Add(service string, uuid string, addr string)
Add adds the given (uuid, addr) mapping
func (*MockRingpopMonitor) AddListener ¶
func (rpm *MockRingpopMonitor) AddListener(service string, name string, ch chan<- *RingpopListenerEvent) error
AddListener adds a listener for this service.
func (*MockRingpopMonitor) FindHostForAddr ¶
func (rpm *MockRingpopMonitor) FindHostForAddr(service string, addr string) (*HostInfo, error)
FindHostForAddr finds and returns the host for the given address
func (*MockRingpopMonitor) FindHostForKey ¶
func (rpm *MockRingpopMonitor) FindHostForKey(service string, key string) (*HostInfo, error)
FindHostForKey finds and returns the host responsible for handling the given key
func (*MockRingpopMonitor) FindRandomHost ¶ added in v0.2.0
func (rpm *MockRingpopMonitor) FindRandomHost(service string) (*HostInfo, error)
FindRandomHost finds and returns a random host responsible for handling the given key
func (*MockRingpopMonitor) GetBootstrappedChannel ¶
func (rpm *MockRingpopMonitor) GetBootstrappedChannel() chan struct{}
GetBootstrappedChannel returns a channel, which will be closed once ringpop is bootstrapped
func (*MockRingpopMonitor) GetHosts ¶
func (rpm *MockRingpopMonitor) GetHosts(service string) ([]*HostInfo, error)
GetHosts retrieves all the members for the given service
func (*MockRingpopMonitor) IsHostHealthy ¶
func (rpm *MockRingpopMonitor) IsHostHealthy(service string, uuid string) bool
IsHostHealthy returns true if the given host is healthy and false otherwise
func (*MockRingpopMonitor) Remove ¶
func (rpm *MockRingpopMonitor) Remove(service string, uuid string)
Remove removes the host identified by given uuid from rpm
func (*MockRingpopMonitor) RemoveListener ¶
func (rpm *MockRingpopMonitor) RemoveListener(service string, name string) error
RemoveListener removes a listener for this service.
func (*MockRingpopMonitor) ResolveUUID ¶
func (rpm *MockRingpopMonitor) ResolveUUID(service string, uuid string) (string, error)
ResolveUUID converts the given UUID into a host:port string Returns true on success and false on lookup failure
func (*MockRingpopMonitor) SetMetadata ¶
func (rpm *MockRingpopMonitor) SetMetadata(key string, data string) error
SetMetadata sets the given metadata on the rp instance
func (*MockRingpopMonitor) Start ¶
func (rpm *MockRingpopMonitor) Start()
Start starts the ringpop monitor
func (*MockRingpopMonitor) Stop ¶
func (rpm *MockRingpopMonitor) Stop()
Stop attempts to stop the RingpopMonitor routines
type MockService ¶
MockService is the mock of common.SCommon interface
func (*MockService) GetClientFactory ¶
func (m *MockService) GetClientFactory() ClientFactory
GetClientFactory is a mock of the corresponding common. routine
func (*MockService) GetConfig ¶
func (m *MockService) GetConfig() configure.CommonServiceConfig
GetConfig returns the AppConfig for this service
func (*MockService) GetDConfigClient ¶
func (m *MockService) GetDConfigClient() dconfig.Client
GetDConfigClient is a mock of the corresponding common. routine
func (*MockService) GetHostName ¶
func (m *MockService) GetHostName() string
GetHostName returns the name of host running the service
func (*MockService) GetHostPort ¶
func (m *MockService) GetHostPort() string
GetHostPort is a mock of the corresponding common. routine
func (*MockService) GetHostUUID ¶
func (m *MockService) GetHostUUID() string
GetHostUUID returns the uuid of host running the service
func (*MockService) GetLoadReporterDaemonFactory ¶
func (m *MockService) GetLoadReporterDaemonFactory() LoadReporterDaemonFactory
GetLoadReporterDaemonFactory is the factory interface for creating load reporters
func (*MockService) GetMetricsReporter ¶
func (m *MockService) GetMetricsReporter() metrics.Reporter
GetMetricsReporter is a mock of the corresponding common. routine
func (*MockService) GetRingpopMonitor ¶
func (m *MockService) GetRingpopMonitor() RingpopMonitor
GetRingpopMonitor is a mock of the corresponding common. routine
func (*MockService) GetTChannel ¶
func (m *MockService) GetTChannel() *tchannel.Channel
GetTChannel is a mock of the corresponding common. routine
func (*MockService) GetWSConnector ¶
func (m *MockService) GetWSConnector() WSConnector
GetWSConnector is a mock of the corresponding common. routine
func (*MockService) IsLimitsEnabled ¶
func (m *MockService) IsLimitsEnabled() bool
IsLimitsEnabled is the implementation of the corresponding routine
func (*MockService) Report ¶
func (m *MockService) Report(reporter LoadReporter)
Report is the implementation for reporting host specific load to controller
func (*MockService) SetClientFactory ¶
func (m *MockService) SetClientFactory(cf ClientFactory)
SetClientFactory is a mock of the corresponding common. routine
func (*MockService) Start ¶
func (m *MockService) Start(thriftService []thrift.TChanServer)
Start is a mock of the corresponding common. routine
func (*MockService) Stop ¶
func (m *MockService) Stop()
Stop is a mock of the corresponding common. routine
type MockTimeSource ¶
MockTimeSource is a time source for unit tests
func NewMockTimeSource ¶
func NewMockTimeSource() *MockTimeSource
NewMockTimeSource returns a new instance of a controllable time source
func (*MockTimeSource) Advance ¶
func (ts *MockTimeSource) Advance(d time.Duration)
Advance advances the clock by the specified duration
type OneShotTimer ¶
type OneShotTimer interface { // Chan returns the underlying channel // for receiving expired timer event Chan() <-chan time.Time // Reset resets the timer with the // new duration Reset(d time.Duration) bool // Stop stops the timer Stop() bool }
OneShotTimer is an interface around golang Timer to help with unit tests
type OpenAppendInWebsocketStream ¶
type OpenAppendInWebsocketStream struct {
// contains filtered or unexported fields
}
OpenAppendInWebsocketStream is a wrapper for websocket to work with OpenAppendStream (hopefully rename to OpenAppendStream)
func NewOpenAppendInWebsocketStream ¶
func NewOpenAppendInWebsocketStream(stream websocket.StreamServer) *OpenAppendInWebsocketStream
NewOpenAppendInWebsocketStream returns a new OpenAppendInWebsocketStream object
func (*OpenAppendInWebsocketStream) Done ¶
func (s *OpenAppendInWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenAppendInWebsocketStream) Flush ¶
func (s *OpenAppendInWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenAppendInWebsocketStream) Read ¶
func (s *OpenAppendInWebsocketStream) Read() (*store.AppendMessage, error)
Read returns the next argument, if any is available.
func (*OpenAppendInWebsocketStream) SetResponseHeaders ¶
func (s *OpenAppendInWebsocketStream) SetResponseHeaders(headers map[string]string) error
SetResponseHeaders sets the response headers.
func (*OpenAppendInWebsocketStream) Write ¶
func (s *OpenAppendInWebsocketStream) Write(arg *store.AppendMessageAck) error
Write writes a result to the response stream
type OpenAppendOutWebsocketStream ¶
type OpenAppendOutWebsocketStream struct {
// contains filtered or unexported fields
}
OpenAppendOutWebsocketStream is a wrapper for websocket to work with OpenAppendStream (hopefully rename to OpenAppendStream)
func NewOpenAppendOutWebsocketStream ¶
func NewOpenAppendOutWebsocketStream(stream websocket.StreamClient) *OpenAppendOutWebsocketStream
NewOpenAppendOutWebsocketStream returns a new OpenAppendOutWebsocketStream object
func (*OpenAppendOutWebsocketStream) Done ¶
func (s *OpenAppendOutWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenAppendOutWebsocketStream) Flush ¶
func (s *OpenAppendOutWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenAppendOutWebsocketStream) Read ¶
func (s *OpenAppendOutWebsocketStream) Read() (*store.AppendMessageAck, error)
Read returns the next argument, if any is available.
func (*OpenAppendOutWebsocketStream) ResponseHeaders ¶
func (s *OpenAppendOutWebsocketStream) ResponseHeaders() (map[string]string, error)
ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface
func (*OpenAppendOutWebsocketStream) Write ¶
func (s *OpenAppendOutWebsocketStream) Write(arg *store.AppendMessage) error
Write writes a result to the response stream
type OpenConsumerInWebsocketStream ¶
type OpenConsumerInWebsocketStream struct {
// contains filtered or unexported fields
}
OpenConsumerInWebsocketStream is a wrapper for websocket to work with OpenConsumerStream
func NewOpenConsumerInWebsocketStream ¶
func NewOpenConsumerInWebsocketStream(stream websocket.StreamServer) *OpenConsumerInWebsocketStream
NewOpenConsumerInWebsocketStream returns a new OpenConsumerInWebsocketStream object
func (*OpenConsumerInWebsocketStream) Done ¶
func (s *OpenConsumerInWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenConsumerInWebsocketStream) Flush ¶
func (s *OpenConsumerInWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenConsumerInWebsocketStream) Read ¶
func (s *OpenConsumerInWebsocketStream) Read() (*cherami.ControlFlow, error)
Read returns the next argument, if any is available.
func (*OpenConsumerInWebsocketStream) SetResponseHeaders ¶
func (s *OpenConsumerInWebsocketStream) SetResponseHeaders(headers map[string]string) error
SetResponseHeaders sets the response headers.
func (*OpenConsumerInWebsocketStream) Write ¶
func (s *OpenConsumerInWebsocketStream) Write(arg *cherami.OutputHostCommand) error
Write writes a result to the response stream
type OpenConsumerOutWebsocketStream ¶
type OpenConsumerOutWebsocketStream struct {
// contains filtered or unexported fields
}
OpenConsumerOutWebsocketStream is a wrapper for websocket to work with OpenConsumerStream
func NewOpenConsumerOutWebsocketStream ¶
func NewOpenConsumerOutWebsocketStream(stream websocket.StreamClient) *OpenConsumerOutWebsocketStream
NewOpenConsumerOutWebsocketStream returns a new OpenConsumerOutWebsocketStream object
func (*OpenConsumerOutWebsocketStream) Done ¶
func (s *OpenConsumerOutWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenConsumerOutWebsocketStream) Flush ¶
func (s *OpenConsumerOutWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenConsumerOutWebsocketStream) Read ¶
func (s *OpenConsumerOutWebsocketStream) Read() (*cherami.OutputHostCommand, error)
Read returns the next argument, if any is available.
func (*OpenConsumerOutWebsocketStream) ResponseHeaders ¶
func (s *OpenConsumerOutWebsocketStream) ResponseHeaders() (map[string]string, error)
ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface
func (*OpenConsumerOutWebsocketStream) Write ¶
func (s *OpenConsumerOutWebsocketStream) Write(arg *cherami.ControlFlow) error
Write writes a result to the response stream
type OpenPublisherInWebsocketStream ¶
type OpenPublisherInWebsocketStream struct {
// contains filtered or unexported fields
}
OpenPublisherInWebsocketStream is a wrapper for websocket to work with OpenPublisherStream
func NewOpenPublisherInWebsocketStream ¶
func NewOpenPublisherInWebsocketStream(stream websocket.StreamServer) *OpenPublisherInWebsocketStream
NewOpenPublisherInWebsocketStream returns a new OpenPublisherInWebsocketStream object
func (*OpenPublisherInWebsocketStream) Done ¶
func (s *OpenPublisherInWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenPublisherInWebsocketStream) Flush ¶
func (s *OpenPublisherInWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenPublisherInWebsocketStream) Read ¶
func (s *OpenPublisherInWebsocketStream) Read() (*cherami.PutMessage, error)
Read returns the next argument, if any is available.
func (*OpenPublisherInWebsocketStream) SetResponseHeaders ¶
func (s *OpenPublisherInWebsocketStream) SetResponseHeaders(headers map[string]string) error
SetResponseHeaders sets the response headers.
func (*OpenPublisherInWebsocketStream) Write ¶
func (s *OpenPublisherInWebsocketStream) Write(arg *cherami.InputHostCommand) error
Write writes a result to the response stream
type OpenPublisherOutWebsocketStream ¶
type OpenPublisherOutWebsocketStream struct {
// contains filtered or unexported fields
}
OpenPublisherOutWebsocketStream is a wrapper for websocket to work with OpenPublisherStream
func NewOpenPublisherOutWebsocketStream ¶
func NewOpenPublisherOutWebsocketStream(stream websocket.StreamClient) *OpenPublisherOutWebsocketStream
NewOpenPublisherOutWebsocketStream returns a new OpenPublisherOutWebsocketStream object
func (*OpenPublisherOutWebsocketStream) Done ¶
func (s *OpenPublisherOutWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenPublisherOutWebsocketStream) Flush ¶
func (s *OpenPublisherOutWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenPublisherOutWebsocketStream) Read ¶
func (s *OpenPublisherOutWebsocketStream) Read() (*cherami.InputHostCommand, error)
Read returns the next argument, if any is available.
func (*OpenPublisherOutWebsocketStream) ResponseHeaders ¶
func (s *OpenPublisherOutWebsocketStream) ResponseHeaders() (map[string]string, error)
ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface
func (*OpenPublisherOutWebsocketStream) Write ¶
func (s *OpenPublisherOutWebsocketStream) Write(arg *cherami.PutMessage) error
Write writes a result to the response stream
type OpenReadInWebsocketStream ¶
type OpenReadInWebsocketStream struct {
// contains filtered or unexported fields
}
OpenReadInWebsocketStream is a wrapper for websocket to work with OpenReadStream
func NewOpenReadInWebsocketStream ¶
func NewOpenReadInWebsocketStream(stream websocket.StreamServer) *OpenReadInWebsocketStream
NewOpenReadInWebsocketStream returns a new OpenReadInWebsocketStream object
func (*OpenReadInWebsocketStream) Done ¶
func (s *OpenReadInWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenReadInWebsocketStream) Flush ¶
func (s *OpenReadInWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenReadInWebsocketStream) Read ¶
func (s *OpenReadInWebsocketStream) Read() (*cherami.ControlFlow, error)
Read returns the next argument, if any is available.
func (*OpenReadInWebsocketStream) SetResponseHeaders ¶
func (s *OpenReadInWebsocketStream) SetResponseHeaders(headers map[string]string) error
SetResponseHeaders sets the response headers.
func (*OpenReadInWebsocketStream) Write ¶
func (s *OpenReadInWebsocketStream) Write(arg *store.ReadMessageContent) error
Write writes a result to the response stream
type OpenReadOutWebsocketStream ¶
type OpenReadOutWebsocketStream struct {
// contains filtered or unexported fields
}
OpenReadOutWebsocketStream is a wrapper for websocket to work with OpenReadStream
func NewOpenReadOutWebsocketStream ¶
func NewOpenReadOutWebsocketStream(stream websocket.StreamClient) *OpenReadOutWebsocketStream
NewOpenReadOutWebsocketStream returns a new OpenReadOutWebsocketStream object
func (*OpenReadOutWebsocketStream) Done ¶
func (s *OpenReadOutWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenReadOutWebsocketStream) Flush ¶
func (s *OpenReadOutWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenReadOutWebsocketStream) Read ¶
func (s *OpenReadOutWebsocketStream) Read() (*store.ReadMessageContent, error)
Read returns the next argument, if any is available.
func (*OpenReadOutWebsocketStream) ResponseHeaders ¶
func (s *OpenReadOutWebsocketStream) ResponseHeaders() (map[string]string, error)
ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface
func (*OpenReadOutWebsocketStream) Write ¶
func (s *OpenReadOutWebsocketStream) Write(arg *cherami.ControlFlow) error
Write writes a result to the response stream
type OpenReplicationReadStreamRequest ¶
type OpenReplicationReadStreamRequest struct {
store.OpenReadStreamRequest
}
OpenReplicationReadStreamRequest is the request type for OpenReplicationReadStream API
func GetOpenReplicationReadStreamRequestHTTP ¶
func GetOpenReplicationReadStreamRequestHTTP(httpHeader http.Header) (req *OpenReplicationReadStreamRequest, err error)
GetOpenReplicationReadStreamRequestHTTP extracts OpenReplicationReadStreamRequest from http headers
type OpenReplicationRemoteReadStreamRequest ¶
type OpenReplicationRemoteReadStreamRequest struct {
store.OpenReadStreamRequest
}
OpenReplicationRemoteReadStreamRequest is the request type for OpenReplicationRemoteReadStream API
func GetOpenReplicationRemoteReadStreamRequestHTTP ¶
func GetOpenReplicationRemoteReadStreamRequestHTTP(httpHeader http.Header) (req *OpenReplicationRemoteReadStreamRequest, err error)
GetOpenReplicationRemoteReadStreamRequestHTTP extracts OpenReplicationRemoteReadStreamRequest from http headers
type RingpopEventType ¶
type RingpopEventType int
RingpopEventType is a value type that identifies a RingpopListener event.
const ( // HostAddedEvent indicates that a new host joined the ring HostAddedEvent RingpopEventType = 1 << iota // HostRemovedEvent indicates that a host left the ring HostRemovedEvent )
type RingpopListenerEvent ¶
type RingpopListenerEvent struct { // Type returns the RingpopEventType Type RingpopEventType // Key returns the HostUUID that // this event is about Key string }
RingpopListenerEvent represents any event that a RingpopListener can get notified about.
type RingpopMonitor ¶
type RingpopMonitor interface { // Start starts the RingpopMonitor Start() // Stop stops the RingpopMonitor Stop() // GetBootstrappedChannel returns a channel, which will be closed once ringpop is bootstrapped GetBootstrappedChannel() chan struct{} // GetHosts retrieves all the members for the given service GetHosts(service string) ([]*HostInfo, error) // FindHostForAddr finds and returns the host for the given service:addr FindHostForAddr(service string, addr string) (*HostInfo, error) // FindHostForKey finds and returns the host responsible for handling the given (service, key) FindHostForKey(service string, key string) (*HostInfo, error) // FindRandomHost finds and returns a random host responsible for handling the given service FindRandomHost(service string) (*HostInfo, error) // IsHostHealthy returns true if the given (service, host) is healthy IsHostHealthy(service string, uuid string) bool // ResolveUUID resovles a host UUID to an IP address, if the host is found ResolveUUID(service string, uuid string) (string, error) // AddListener adds a listener for this service. // The listener will get notified on the given // channel, whenever there is host joining/leaving // the ring. // @service: The service to be listened on // @name: The name for identifying the listener // @notifyChannel: The channel on which the caller receives notifications AddListener(service string, name string, notifyChannel chan<- *RingpopListenerEvent) error // RemoveListener removes a listener for this service. RemoveListener(service string, name string) error // SetMetadata is used to set the given metadata on this ringpop instance SetMetadata(key string, data string) error }
RingpopMonitor monitors and maintains a list of healthy hosts for a set of ring pop services
func NewRingpopMonitor ¶
func NewRingpopMonitor(rp *ringpop.Ringpop, services []string, resolver UUIDResolver, hwInfoReader HostHardwareInfoReader, log bark.Logger) RingpopMonitor
NewRingpopMonitor returns a new instance of RingpopMonitor, rp:
Ringpop instance of the local node
services:
list of services we need to track
UUIDResolver:
Resolver instance that can map uuids to addrs and vice-versa
HostHardWareInfoReader:
HwInfoReader instance that can get the hosts' hardware spec
type SCommon ¶
type SCommon interface { // GetTChannel returns the tchannel for this service GetTChannel() *tchannel.Channel // Returns Ringpop monitor for this service GetRingpopMonitor() RingpopMonitor // GetHostPort returns the host port for this service GetHostPort() string // GetHostName returns the name of host running the service GetHostName() string // GetHostUUID returns the uuid of this host GetHostUUID() string // Start creates the channel, starts & boots ringpop on the given streaming server Start(thriftService []thrift.TChanServer) // Stop stops the service Stop() // GetConfig returns the AppConfig for this service GetConfig() configure.CommonServiceConfig // GetMetricsReporter() returns the root metric reporter for this service GetMetricsReporter() metrics.Reporter // GetDConfigClient() returns the dynamic config client for this service GetDConfigClient() dconfig.Client // GetClientFactory returns the thrift client interface for getting thrift clients for this service GetClientFactory() ClientFactory // SetClientFactory allowes change of client factory for getting thrift clients SetClientFactory(cf ClientFactory) // GetWSConnector returns websocket connector for establishing websocket connections GetWSConnector() WSConnector // GetLoadReporterDaemonFactory is the factory interface for creating load reporters GetLoadReporterDaemonFactory() LoadReporterDaemonFactory // Report is the implementation for reporting host specific load to controller Report(reporter LoadReporter) // IsLimitsEnabled is used to see if we need to enforce limits on this service IsLimitsEnabled() bool }
SCommon is the interface which must be implemented by all the services
type Seconds ¶
type Seconds float64
Seconds is time as seconds, either relative or absolute since the epoch
func DurationToSeconds ¶
DurationToSeconds converts a time.Duration to Seconds
type SequenceNumber ¶
type SequenceNumber int64
SequenceNumber is an int64 number represents the sequence of messages in Extent
func ExtrapolateValue ¶
func ExtrapolateValue(observed SequenceNumber, observedRate float64, observedTime, extrapolatedTime UnixNanoTime, maxExtrapolationTime Seconds) (extrapolated SequenceNumber)
ExtrapolateValue extrapolates a value based on an observed value and rate at a given time
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service contains the objects specific to this service
func NewService ¶
func NewService(serviceName string, uuid string, cfg configure.CommonServiceConfig, resolver UUIDResolver, hostHWInfoReader HostHardwareInfoReader, reporter metrics.Reporter, dClient dconfig.Client) *Service
NewService instantiates a ServiceInstance TODO: have a better name for Service. this is the object which holds all the common stuff shared by all the services.
func (*Service) GetClientFactory ¶
func (h *Service) GetClientFactory() ClientFactory
GetClientFactory returns the ClientFactory interface for this service
func (*Service) GetConfig ¶
func (h *Service) GetConfig() configure.CommonServiceConfig
GetConfig returns the AppConfig for this service
func (*Service) GetDConfigClient ¶
GetDConfigClient returns the dconfig client
func (*Service) GetHostName ¶
GetHostName returns the name of host running the service
func (*Service) GetHostPort ¶
GetHostPort returns the host port for this service
func (*Service) GetHostUUID ¶
GetHostUUID returns the uuid for this service
func (*Service) GetLoadReporterDaemonFactory ¶
func (h *Service) GetLoadReporterDaemonFactory() LoadReporterDaemonFactory
GetLoadReporterDaemonFactory is the factory interface for creating load reporters
func (*Service) GetMetricsReporter ¶
GetMetricsReporter returns the metrics metrics for this service
func (*Service) GetRingpopMonitor ¶
func (h *Service) GetRingpopMonitor() RingpopMonitor
GetRingpopMonitor returns the RingpopMonitor for this service
func (*Service) GetTChannel ¶
func (h *Service) GetTChannel() *tchannel.Channel
GetTChannel returns the tchannel for this service
func (*Service) GetWSConnector ¶
func (h *Service) GetWSConnector() WSConnector
GetWSConnector returns websocket connector for establishing websocket connections
func (*Service) IsLimitsEnabled ¶
IsLimitsEnabled returns whether limits are enabled
func (*Service) Report ¶
func (h *Service) Report(reporter LoadReporter)
Report is used for reporting Host specific load to controller
func (*Service) SetClientFactory ¶
func (h *Service) SetClientFactory(cf ClientFactory)
SetClientFactory allowes change of client factory for getting thrift clients
func (*Service) Start ¶
func (h *Service) Start(thriftServices []thrift.TChanServer)
Start starts a TChannel-Thrift service
func (*Service) Stop ¶
func (h *Service) Stop()
Stop destroys ringpop for that service, and closes the associated listening tchannel
func (*Service) UpdateAdvertisedName ¶
UpdateAdvertisedName is used to update the advertised name to be deployment specific
type ShardedConcurrentMap ¶
type ShardedConcurrentMap struct {
// contains filtered or unexported fields
}
ShardedConcurrentMap is an implementation of ConcurrentMap that internally uses multiple sharded maps to increase parallelism
func (*ShardedConcurrentMap) Contains ¶
func (cmap *ShardedConcurrentMap) Contains(key string) bool
Contains returns true if the key exist and false otherwise
func (*ShardedConcurrentMap) Get ¶
func (cmap *ShardedConcurrentMap) Get(key string) (interface{}, bool)
Get returns the value corresponding to the key, if it exist
func (*ShardedConcurrentMap) Iter ¶
func (cmap *ShardedConcurrentMap) Iter() MapIterator
Iter returns an iterator to the map. This map does not use re-entrant locks, so access or modification to the map during iteration can cause a dead lock.
func (*ShardedConcurrentMap) Put ¶
func (cmap *ShardedConcurrentMap) Put(key string, value interface{})
Put records the given key value mapping. Overwrites previous values
func (*ShardedConcurrentMap) PutIfNotExist ¶
func (cmap *ShardedConcurrentMap) PutIfNotExist(key string, value interface{}) bool
PutIfNotExist records the mapping, if there is no mapping for this key already Returns true if the mapping was recorded, false otherwise
func (*ShardedConcurrentMap) Remove ¶
func (cmap *ShardedConcurrentMap) Remove(key string)
Remove deletes the given key from the map
func (*ShardedConcurrentMap) Size ¶
func (cmap *ShardedConcurrentMap) Size() int
Size returns the number of items in the map
type TickerSource ¶
TickerSource is an interface created over time.Ticker so we can easily unit test any component which uses time.Ticker functionality.
type TickerSourceFactory ¶
type TickerSourceFactory interface {
CreateTicker(interval time.Duration) TickerSource
}
TickerSourceFactory is the interface mainly used for injecting mock implementations of Ticker for unit testing
func NewRealTimeTickerFactory ¶
func NewRealTimeTickerFactory() TickerSourceFactory
NewRealTimeTickerFactory creates and instance of TickerSourceFactory used by service code
type TimeSource ¶
TimeSource is an interface for any entity that provides the current time. Its primarily used to mock out timesources in unit test
func NewRealTimeSource ¶
func NewRealTimeSource() TimeSource
NewRealTimeSource returns a time source that servers real wall clock time using CLOCK_REALTIME
type Timer ¶
Timer is a wrapper for time.Timer which does a safe reset of the timer.
type TimerFactory ¶
type TimerFactory interface {
NewTimer(d time.Duration) OneShotTimer
}
TimerFactory vends OneShotTimers
func NewTimerFactory ¶
func NewTimerFactory() TimerFactory
NewTimerFactory creates and returns a new factory for OneShotTimers
type TokenBucket ¶
type TokenBucket interface { // TryConsume attempts to take count tokens from the // bucket. Returns true on success, false // otherwise along with the duration for the next refill TryConsume(count int) (bool, time.Duration) // Consume waits up to timeout duration to take count // tokens from the bucket. Returns true if count // tokens were acquired before timeout, false // otherwise Consume(count int, timeout time.Duration) bool }
TokenBucket is the interface for any implememtation of a token bucket rate limiter
func NewTokenBucket ¶
func NewTokenBucket(rps int, timeSource TimeSource) TokenBucket
NewTokenBucket creates and returns a new token bucket rate limiter that repelenishes the bucket every 100 milliseconds. Thread safe.
@param rps
Desired rate per second
Golang.org has an alternative implementation of the rate limiter. On benchmarking, golang's implementation was order of magnitude slower. In addition, it does a lot more than what we need. These are the benchmarks under different scenarios
BenchmarkTokenBucketParallel 50000000 40.7 ns/op BenchmarkGolangRateParallel 10000000 150 ns/op BenchmarkTokenBucketParallel-8 20000000 124 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op BenchmarkTokenBucketParallel 50000000 37.8 ns/op BenchmarkGolangRateParallel 10000000 153 ns/op BenchmarkTokenBucketParallel-8 10000000 129 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op
type TokenBucketFactory ¶
type TokenBucketFactory interface {
CreateTokenBucket(rps int, timeSource TimeSource) TokenBucket
}
TokenBucketFactory is an interface mainly used for injecting mock implementation of TokenBucket for unit testing
func NewTokenBucketFactory ¶
func NewTokenBucketFactory() TokenBucketFactory
NewTokenBucketFactory creates an instance of factory used for creating TokenBucket instances
type UUIDResolver ¶
type UUIDResolver interface { // Lookup returns the host addr corresponding to the uuid Lookup(uuid string) (string, error) // Reverse lookup returns the uuid corresponding to the host addr ReverseLookup(addr string) (string, error) // Clears the in-memory cache ClearCache() }
UUIDResolver maps UUIDs to IP addrs and vice-versa
func NewUUIDResolver ¶
func NewUUIDResolver(mClient metadata.TChanMetadataService) UUIDResolver
NewUUIDResolver returns an instance of UUIDResolver that can be used to resovle host uuids to ip:port addresses and vice-versa. The returned resolver uses Cassandra as the backend store for persisting the mapping. The resolver also maintains an in-memory cache for fast-lookups. Thread safe.
type UnixNanoTime ¶
type UnixNanoTime int64
UnixNanoTime is Unix time as nanoseconds since Jan 1st, 1970, 00:00 GMT
func (UnixNanoTime) ToSeconds ¶
func (u UnixNanoTime) ToSeconds() Seconds
ToSeconds turns a relative or absolute UnixNanoTime to float Seconds
func (UnixNanoTime) ToSecondsFmt ¶
func (u UnixNanoTime) ToSecondsFmt() string
ToSecondsFmt turns a relative or absolute UnixNanoTime to float Seconds, and returns 'never' if the input is zero
type WSConnector ¶
type WSConnector interface { OpenPublisherStream(hostPort string, requestHeader http.Header) (stream.BInOpenPublisherStreamOutCall, error) AcceptPublisherStream(w http.ResponseWriter, r *http.Request) (serverStream.BInOpenPublisherStreamInCall, error) OpenConsumerStream(hostPort string, requestHeader http.Header) (stream.BOutOpenConsumerStreamOutCall, error) AcceptConsumerStream(w http.ResponseWriter, r *http.Request) (serverStream.BOutOpenConsumerStreamInCall, error) OpenAppendStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenAppendStreamOutCall, error) AcceptAppendStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenAppendStreamInCall, error) OpenReadStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenReadStreamOutCall, error) AcceptReadStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenReadStreamInCall, error) OpenReplicationReadStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenReadStreamOutCall, error) AcceptReplicationReadStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenReadStreamInCall, error) OpenReplicationRemoteReadStream(hostPort string, requestHeader http.Header) (serverStream.BStoreOpenReadStreamOutCall, error) AcceptReplicationRemoteReadStream(w http.ResponseWriter, r *http.Request) (serverStream.BStoreOpenReadStreamInCall, error) }
WSConnector takes care of establishing connection via websocket stream
Source Files ¶
- ackid.go
- cassandra_helpers.go
- clientfactory.go
- concurrentmap.go
- constants.go
- convert.go
- counterbank.go
- daemon.go
- defs.go
- heartbeat.go
- hosthwinfo.go
- hostid_hb.go
- httphandlers.go
- knobs.go
- loadreporter.go
- log_tag.go
- metadata.go
- minHeap.go
- mockloadreporterdaemon.go
- mockloadreporterdaemonfactory.go
- mockrpm.go
- mockservice.go
- mocktime.go
- resolver.go
- rpm.go
- service.go
- servicetypes.go
- tb.go
- ticker_source.go
- time_source.go
- timer.go
- typeConv.go
- util.go
- wsconnector.go