Documentation ¶
Overview ¶
Package iot comments. TODO: package comments for this pub/sub system.
Index ¶
- Constants
- Variables
- func BucketClear(dest *StatsWithTime)
- func BucketCopy(src *tokens.KnotFreeContactStats, dest *tokens.KnotFreeContactStats)
- func BucketSubtract(src *StatsWithTime, dest *StatsWithTime)
- func ConnectGuruToSuperAide(guru *Executive, aide *Executive)
- func HasError(p packets.Interface) *packets.Disconnect
- func IsWholeMqttPacket(data []byte) (bool, int)
- func MQTTHandlePacket(cc *mqttContact, control libmqtt.Packet)
- func NewWithInt64Comparator() *redblacktree.Tree
- func PostClusterStats(stats *ClusterStats, addr string) error
- func PostUpstreamNames(guruList []string, addressList []string, addr string) error
- func PushDownFromTop(looker *LookupTableStruct, p packets.Interface) error
- func PushPacketUpFromBottom(ssi ContactInterface, p packets.Interface) error
- func SocketSetup(tcpConn *net.TCPConn) error
- func SpecialPrint(p *packets.PacketCommon, fn func())
- func Text2Packet(text string) (packets.Interface, error)
- func WebSocketLoop(wsConn *websocket.Conn, config *ContactStructConfig)
- type BillingAccumulator
- func (ba *BillingAccumulator) Add(stats *tokens.KnotFreeContactStats, now uint32)
- func (ba *BillingAccumulator) AreUnderMax(now uint32) (bool, string)
- func (ba *BillingAccumulator) GetConnections(now uint32) float32
- func (ba *BillingAccumulator) GetInput(now uint32) float32
- func (ba *BillingAccumulator) GetOutput(now uint32) float32
- func (ba *BillingAccumulator) GetStats(now uint32, dest *tokens.KnotFreeContactStats)
- func (ba *BillingAccumulator) GetSubscriptions(now uint32) float32
- type ByteChan
- type ByteCountingReader
- type ByteCountingWriter
- type ClusterExecutive
- type ClusterStats
- type ContactInterface
- type ContactStruct
- func (ss *ContactStruct) Close(err error)
- func (ss *ContactStruct) GetClosed() bool
- func (ss *ContactStruct) GetConfig() *ContactStructConfig
- func (ss *ContactStruct) GetExpires() uint32
- func (ss *ContactStruct) GetKey() HalfHash
- func (ss *ContactStruct) GetRates(now uint32) (int, int, int)
- func (ss *ContactStruct) GetSequence() uint64
- func (ss *ContactStruct) GetToken() *tokens.KnotFreeTokenPayload
- func (ss *ContactStruct) Heartbeat(now uint32)
- func (ss *ContactStruct) IncOutput(amt int)
- func (ss *ContactStruct) Read(p []byte) (int, error)
- func (ss *ContactStruct) ReadByte() (byte, error)
- func (ss *ContactStruct) SetExpires(when uint32)
- func (ss *ContactStruct) SetReader(r io.Reader)
- func (ss *ContactStruct) SetToken(t *tokens.KnotFreeTokenPayload)
- func (ss *ContactStruct) SetWriter(w io.Writer)
- func (ss *ContactStruct) String() string
- func (ss *ContactStruct) Write(p []byte) (int, error)
- func (ss *ContactStruct) WriteByte(c byte) error
- func (ss *ContactStruct) WriteDownstream(cmd packets.Interface) error
- func (ss *ContactStruct) WriteUpstream(cmd packets.Interface) error
- type ContactStructConfig
- func (config *ContactStructConfig) AccessContactsList(fn func(config *ContactStructConfig, listOfCi *list.List))
- func (config *ContactStructConfig) GetCe() *ClusterExecutive
- func (config *ContactStructConfig) GetContactsListCopy() []ContactInterface
- func (config *ContactStructConfig) GetLookup() *LookupTableStruct
- func (config *ContactStructConfig) IsGuru() bool
- func (config *ContactStructConfig) Len() int
- type DevNull
- type Executive
- func MakeHTTPExecutive(ex *Executive, serverName string) *Executive
- func MakeMqttExecutive(ex *Executive, serverName string) *Executive
- func MakeTCPExecutive(ex *Executive, serverName string) *Executive
- func MakeTextExecutive(ex *Executive, serverName string) *Executive
- func NewExecutive(sizeEstimate int, aname string, timegetter func() uint32, isGuru bool, ...) *Executive
- func (ex *Executive) DialContactToAnyAide(isTCP bool, ce *ClusterExecutive)
- func (ex *Executive) GetExecutiveStats() *ExecutiveStats
- func (ex *Executive) GetHTTPAddress() string
- func (ex *Executive) GetMQTTAddress() string
- func (ex *Executive) GetSubsCount() (int, float64)
- func (ex *Executive) GetTCPAddress() string
- func (ex *Executive) GetTextAddress() string
- func (ex *Executive) Heartbeat(now uint32)
- func (ex *Executive) WaitForActions()
- type ExecutiveLimits
- type ExecutiveStats
- type ExpansionDesired
- type HalfHash
- type HashType
- func (h *HashType) FromHashType(src *HashType)
- func (h *HashType) GetBytes(b []byte)
- func (h *HashType) GetFractionalBits(n int) int
- func (h *HashType) GetHalfHash() HalfHash
- func (h *HashType) GetUint64() uint64
- func (h *HashType) HashBytes(s []byte)
- func (h *HashType) HashString(s string)
- func (h *HashType) InitFromBytes(addressBytes []byte)
- func (h *HashType) Random()
- func (h *HashType) String() string
- type LookReply
- type LookupTableStruct
- func (me *LookupTableStruct) FlushMarkerAndWait()
- func (me *LookupTableStruct) GetAllSubsCount() (int, float64)
- func (me *LookupTableStruct) Heartbeat(now uint32)
- func (me *LookupTableStruct) PushUp(p packets.Interface, h HashType) error
- func (me *LookupTableStruct) SetUpstreamNames(names []string, addresses []string)
- type StatsWithTime
- type UpstreamNamesArg
- type WatchedTopic
- func (wt *WatchedTopic) DeleteOption(key string)
- func (wt *WatchedTopic) GetOption(key string) ([]byte, bool)
- func (wt *WatchedTopic) IsBilling() (*BillingAccumulator, bool)
- func (wt *WatchedTopic) Iterator() *subIterator
- func (wt *WatchedTopic) OptionSize() int
- func (wt *WatchedTopic) SetOption(key string, val interface{})
Constants ¶
const HashTypeLen = 24
HashTypeLen now it's 24 bytes long
Variables ¶
var ( // TopicsAdded is TopicsAdded = promauto.NewCounter(prometheus.CounterOpts{ Name: "look_topics_added", Help: "The total number new topics/subscriptions] added", }) // API1GetStats is API1GetStats = promauto.NewCounter( prometheus.CounterOpts{ Name: "api1_getstats", Help: "http requests.", }, ) // IotHTTP404 is used in TCPUtil.go IotHTTP404 = promauto.NewCounter( prometheus.CounterOpts{ Name: "iot_http_404", Help: "http 404.", }, ) // API1PostGurus is API1PostGurus = promauto.NewCounter( prometheus.CounterOpts{ Name: "api2_post_gurus", Help: "http post /api2/set.", }, ) // API1PostGurusFail is searchable API1PostGurusFail = promauto.NewCounter( prometheus.CounterOpts{ Name: "api1_post_gurus_fail", Help: "http post /api2/set.", }, ) // TCPNameResolverFail1 is TCPNameResolverFail1 = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_name_resolver_fail1", Help: "failed to resolve address of guru.", }, ) // TCPNameResolverFail2 is TCPNameResolverFail2 = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_name_resolver_fail2", Help: "dial timeout looking for gurus.", }, ) // TCPNameResolverConnected is TCPNameResolverConnected = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_name_resolver_connected", Help: "normal connect happened.", }, ) //TCPServerDidntStart is TCPServerDidntStart = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_server_fail1", Help: "packet server listen fail.", }, ) //TCPServerAcceptError is TCPServerAcceptError = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_server_fail2", Help: "packet server acceptor fail.", }, ) //TCPServerConnAccept is TCPServerConnAccept = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_server_conn_accept", Help: "normal packer server connection.", }, ) //TCPServerNewConnection is TCPServerNewConnection = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_server_new_conn", Help: "normal packer server connection.", }, ) //TCPServerPacketReadError is TCPServerPacketReadError = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_server_packet_read_error", Help: "packets.ReadPacket error.", }, ) //TCPServerIotPushEror is TCPServerIotPushEror = promauto.NewCounter( prometheus.CounterOpts{ Name: "tcp_server_packet_push_error", Help: "Push error.", }, ) )
var ( // HTTPServe404 is HTTPServe404 = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_http_404", Help: "Number of 404 main.ServeHTTP.", }, ) // ForwardsCount3100 is ForwardsCount3100 = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_3100_forwards", Help: "Number forwards main.startPublicServer3100.", }, ) // ForwardsCount9090 is for main.go ForwardsCount9090 = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_9090_forwards", Help: "http forwards main.startPublicServer9090.", }, ) // ForwardsCount8000 is ForwardsCount8000 = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_8000_forwards", Help: "tcp count main.startPublicServer9090.", }, ) // ForwardsDialFail8000 is ForwardsDialFail8000 = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_8000_dialfail", Help: "tcp dialfail main.startPublicServer9090.", }, ) // ForwardsConnectedl8000 is ForwardsConnectedl8000 = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_8000_connected", Help: "tcp conected main.startPublicServer9090.", }, ) // ForwardsAcceptl8000 is ForwardsAcceptl8000 = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_8000_accepted", Help: "tcp accepted main.startPublicServer9090.", }, ) // BadTokenRequests is BadTokenRequests = promauto.NewCounter( prometheus.CounterOpts{ Name: "main_bad_token_requests", Help: "Token requests with flaws.", }, ) )
var DEBUG = false
DEBUG because I don't know a better way. todo: look into conditional inclusion
var GuruNameToConfigMap map[string]*Executive
GuruNameToConfigMap for ease of unit test.
var TestLimits = ExecutiveLimits{}
TestLimits is for tests of autoscaling.TestLimits These are executive limits and not to be confused with token limits. irl connections limit is likely to be 10k and subscriptions 1e6
Functions ¶
func BucketCopy ¶
func BucketCopy(src *tokens.KnotFreeContactStats, dest *tokens.KnotFreeContactStats)
BucketCopy is
func BucketSubtract ¶
func BucketSubtract(src *StatsWithTime, dest *StatsWithTime)
BucketSubtract is
func ConnectGuruToSuperAide ¶
ConnectGuruToSuperAide for testing a cluster with a supercluster we need channels from guru to aide.
func HasError ¶
func HasError(p packets.Interface) *packets.Disconnect
HasError literally means does this packet have an "error" option returns a Disconnect if the p has an error
func IsWholeMqttPacket ¶
IsWholeMqttPacket returns true if the data is an mqtt packet and returns the length used.
func MQTTHandlePacket ¶
MQTTHandlePacket is for when the packet was parsed elsewhere (like in the websocket).
func NewWithInt64Comparator ¶
func NewWithInt64Comparator() *redblacktree.Tree
NewWithInt64Comparator for HalfHash
func PostClusterStats ¶
func PostClusterStats(stats *ClusterStats, addr string) error
PostClusterStats makes http client
func PostUpstreamNames ¶
PostUpstreamNames does SetUpstreamNames the hard way we are not going over the internet. Inside a ns should ba well under 1000 ms.
func PushDownFromTop ¶
func PushDownFromTop(looker *LookupTableStruct, p packets.Interface) error
PushDownFromTop to deal with an incoming message going down. typically called by an upperChannel receiving a packet via it's tcp that it dialed. todo: upgrade and consolidate the address logic.
func PushPacketUpFromBottom ¶
func PushPacketUpFromBottom(ssi ContactInterface, p packets.Interface) error
PushPacketUpFromBottom to deal with an incoming message on a bottom contact heading up.
func SpecialPrint ¶
func SpecialPrint(p *packets.PacketCommon, fn func())
func Text2Packet ¶
Text2Packet turns badjson into a packet
func WebSocketLoop ¶
func WebSocketLoop(wsConn *websocket.Conn, config *ContactStructConfig)
WebSocketLoop loops
Types ¶
type BillingAccumulator ¶
type BillingAccumulator struct {
// contains filtered or unexported fields
}
BillingAccumulator is terse
func (*BillingAccumulator) Add ¶
func (ba *BillingAccumulator) Add(stats *tokens.KnotFreeContactStats, now uint32)
Add accumulates the stats into the BillingAccumulator
func (*BillingAccumulator) AreUnderMax ¶
func (ba *BillingAccumulator) AreUnderMax(now uint32) (bool, string)
AreUnderMax returns if the stats are under the limits and, if not true, returns a message about it.
func (*BillingAccumulator) GetConnections ¶
func (ba *BillingAccumulator) GetConnections(now uint32) float32
GetConnections is
func (*BillingAccumulator) GetInput ¶
func (ba *BillingAccumulator) GetInput(now uint32) float32
GetInput - we sum up some prevous buckets and divide. do we need to sync with Add? No, because all access to this goes through a q in lookup.
func (*BillingAccumulator) GetOutput ¶
func (ba *BillingAccumulator) GetOutput(now uint32) float32
GetOutput is
func (*BillingAccumulator) GetStats ¶
func (ba *BillingAccumulator) GetStats(now uint32, dest *tokens.KnotFreeContactStats)
GetStats calcs them all at once into dest. dest should be zeroed before calling.
func (*BillingAccumulator) GetSubscriptions ¶
func (ba *BillingAccumulator) GetSubscriptions(now uint32) float32
GetSubscriptions is
type ByteCountingReader ¶
type ByteCountingReader struct {
// contains filtered or unexported fields
}
ByteCountingReader keeps track of how much was read.
type ByteCountingWriter ¶
type ByteCountingWriter struct {
// contains filtered or unexported fields
}
ByteCountingWriter keeps track of how much was written.
type ClusterExecutive ¶
type ClusterExecutive struct { Aides []*Executive Gurus []*Executive PublicKeyTemp *[32]byte //curve25519.PublicKey // temporary to this run not ed25519 PrivateKeyTemp *[32]byte //curve25519.PrivateKey // contains filtered or unexported fields }
ClusterExecutive is a list of Executive used for testing.
func MakeSimplestCluster ¶
func MakeSimplestCluster(timegetter func() uint32, isTCP bool, aideCount int, suffix string) *ClusterExecutive
MakeSimplestCluster is just for testing as k8s doesn't work like this.
func MakeTCPMain ¶
func MakeTCPMain(name string, limits *ExecutiveLimits, token string, isGuru bool) *ClusterExecutive
MakeTCPMain is called by main(s) and it news a table and contacts list and starts tcp acceptors.
func (*ClusterExecutive) GetNextAddress ¶
func (ce *ClusterExecutive) GetNextAddress() string
GetNextAddress hands out localhost addresses starting at 9000
func (*ClusterExecutive) GetSubsCount ¶
func (ce *ClusterExecutive) GetSubsCount() int
GetSubsCount returns count of all the subscriptions in all the lookup tables. this is really only good for test.
func (*ClusterExecutive) Heartbeat ¶
func (ce *ClusterExecutive) Heartbeat(now uint32)
Heartbeat everyone when testing
func (*ClusterExecutive) Operate ¶
func (ce *ClusterExecutive) Operate()
Operate where we pretend to be an Operator and resize the cluster. This is really only for test. Only works in non-tcp mode Does not call heartbeat or advance the time.
func (*ClusterExecutive) WaitForActions ¶
func (ce *ClusterExecutive) WaitForActions()
WaitForActions is a utility for unit tests. we must wait for things to happen during tests we pretend to get service from the operator.
type ClusterStats ¶
type ClusterStats struct { When uint32 // unix time Stats []*ExecutiveStats }
ClusterStats is ExecutiveStats from everyone in the cluster. maybe slightly delayed
type ContactInterface ¶
type ContactInterface interface { Close(err error) GetClosed() bool GetKey() HalfHash GetExpires() uint32 SetExpires(when uint32) GetToken() *tokens.KnotFreeTokenPayload SetToken(*tokens.KnotFreeTokenPayload) GetConfig() *ContactStructConfig WriteDownstream(cmd packets.Interface) error WriteUpstream(cmd packets.Interface) error // called by LookupTableStruct.PushUp String() string // used as a default channel name in test Heartbeat(uint32) // periodic service ~= 10 sec Read(p []byte) (int, error) Write(p []byte) (int, error) GetRates(now uint32) (int, int, int) SetReader(r io.Reader) SetWriter(w io.Writer) }
ContactInterface is usually supplied by a tcp connection
type ContactStruct ¶
type ContactStruct struct {
// contains filtered or unexported fields
}
ContactStruct is our idea of channel or socket which is downstream from us.
func AddContactStruct ¶
func AddContactStruct(ss *ContactStruct, ssi ContactInterface, config *ContactStructConfig) *ContactStruct
AddContactStruct initializes a contact, and puts the new ss on the global list. It also increments the sequence number in SockStructConfig. note that you must pass the same object twice, once as a ContactStruct and once as the Interface
func (*ContactStruct) Close ¶
func (ss *ContactStruct) Close(err error)
Close closes the conn and the rest of the work too. doesn't send error or disconnect. needs to be overridden
func (*ContactStruct) GetClosed ¶
func (ss *ContactStruct) GetClosed() bool
GetClosed because the contact is still referenced by looker after closed.
func (*ContactStruct) GetConfig ¶
func (ss *ContactStruct) GetConfig() *ContactStructConfig
GetConfig is because we're passing around an interface
func (*ContactStruct) GetExpires ¶
func (ss *ContactStruct) GetExpires() uint32
GetExpires returns when the cc should expire
func (*ContactStruct) GetKey ¶
func (ss *ContactStruct) GetKey() HalfHash
GetKey is because we're passing around an interface
func (*ContactStruct) GetRates ¶
func (ss *ContactStruct) GetRates(now uint32) (int, int, int)
GetRates to peek into in, out, dt := cc.GetRates(now)
func (*ContactStruct) GetToken ¶
func (ss *ContactStruct) GetToken() *tokens.KnotFreeTokenPayload
GetToken return the verified and decoded payload or else nil
func (*ContactStruct) Heartbeat ¶
func (ss *ContactStruct) Heartbeat(now uint32)
Heartbeat is periodic service ~= 10 sec It's going forward stats to to the billing channel
func (*ContactStruct) IncOutput ¶
func (ss *ContactStruct) IncOutput(amt int)
IncOutput so test and fake bytes written
func (*ContactStruct) ReadByte ¶
func (ss *ContactStruct) ReadByte() (byte, error)
ReadByte implements BufferedReader for libmqtt
func (*ContactStruct) SetExpires ¶
func (ss *ContactStruct) SetExpires(when uint32)
SetExpires sets when the ss will expire in unix time
func (*ContactStruct) SetReader ¶
func (ss *ContactStruct) SetReader(r io.Reader)
SetReader allows test to monkey with the flow
func (*ContactStruct) SetToken ¶
func (ss *ContactStruct) SetToken(t *tokens.KnotFreeTokenPayload)
SetToken return the verified and decoded payload or else nil
func (*ContactStruct) SetWriter ¶
func (ss *ContactStruct) SetWriter(w io.Writer)
SetWriter used nuy helpersof_test.go
func (*ContactStruct) String ¶
func (ss *ContactStruct) String() string
func (*ContactStruct) WriteByte ¶
func (ss *ContactStruct) WriteByte(c byte) error
WriteByte implements BufferedWriter for libmqtt
func (*ContactStruct) WriteDownstream ¶
func (ss *ContactStruct) WriteDownstream(cmd packets.Interface) error
WriteDownstream is often overridden in *test* we force plain contacts on the bottom of the guru's they just need to write.
func (*ContactStruct) WriteUpstream ¶
func (ss *ContactStruct) WriteUpstream(cmd packets.Interface) error
WriteUpstream will be overridden this is used by an upper contact and is overridden. See tcpUpperContact
type ContactStructConfig ¶
type ContactStructConfig struct { Name string // for debug // contains filtered or unexported fields }
ContactStructConfig could be just a stack frame but I'd like to return it. This could be an interface that implements range and len or and the callbacks. Instead we have function pointers. TODO: revisit.
func NewContactStructConfig ¶
func NewContactStructConfig(looker *LookupTableStruct) *ContactStructConfig
NewContactStructConfig is
func (*ContactStructConfig) AccessContactsList ¶
func (config *ContactStructConfig) AccessContactsList(fn func(config *ContactStructConfig, listOfCi *list.List))
AccessContactsList so we can disconnect them in test and stuff. be sure to always lock. Don't call close or recurse in here or it will deadlock.
func (*ContactStructConfig) GetCe ¶
func (config *ContactStructConfig) GetCe() *ClusterExecutive
GetCe is a getter
func (*ContactStructConfig) GetContactsListCopy ¶
func (config *ContactStructConfig) GetContactsListCopy() []ContactInterface
GetContactsListCopy copies the list.
func (*ContactStructConfig) GetLookup ¶
func (config *ContactStructConfig) GetLookup() *LookupTableStruct
GetLookup is a getter
func (*ContactStructConfig) IsGuru ¶
func (config *ContactStructConfig) IsGuru() bool
IsGuru exposes onfig.lookup.isGuru
func (*ContactStructConfig) Len ¶
func (config *ContactStructConfig) Len() int
Len returns the count of the contacts.
type Executive ¶
type Executive struct { Looker *LookupTableStruct Config *ContactStructConfig Name string Limits *ExecutiveLimits ClusterStats *ClusterStats // All the stats ClusterStatsString string // serialization of ClusterStats IAmBadError error // if something happened to simply ruin us and we're quitting. // contains filtered or unexported fields }
func MakeHTTPExecutive ¶
MakeHTTPExecutive sets up a http server for serving api1 and api2
func MakeMqttExecutive ¶
MakeMqttExecutive is a thing like a server, not the exec
func MakeTCPExecutive ¶
MakeTCPExecutive is a thing like a server, not the exec
func MakeTextExecutive ¶
MakeTextExecutive is a thing like a server, not the exec
func NewExecutive ¶
func NewExecutive(sizeEstimate int, aname string, timegetter func() uint32, isGuru bool, ce *ClusterExecutive) *Executive
NewExecutive A wrapper to hold and operate
func (*Executive) DialContactToAnyAide ¶
func (ex *Executive) DialContactToAnyAide(isTCP bool, ce *ClusterExecutive)
DialContactToAnyAide is a utility to wait until we have a reference to an aide address and then get a tcp conn and keep it up and retry and keep it up forever. In test there is a ClusterExecutive struct that has references to all the names and addresses In k8s there is an operator that is periodically sending
func (*Executive) GetExecutiveStats ¶
func (ex *Executive) GetExecutiveStats() *ExecutiveStats
GetExecutiveStats is fractions relative to the limits.
func (*Executive) GetHTTPAddress ¶
GetHTTPAddress is a getter
func (*Executive) GetMQTTAddress ¶
GetMQTTAddress is a getter
func (*Executive) GetSubsCount ¶
GetSubsCount returns a count of how many names it's remembering. it also returns a fraction of buffer usage where 0.0 is empty and 1.0 is full.
func (*Executive) GetTCPAddress ¶
GetTCPAddress is a getter
func (*Executive) GetTextAddress ¶
GetTextAddress is a getter
func (*Executive) WaitForActions ¶
func (ex *Executive) WaitForActions()
WaitForActions needs to be properly implemented. The we inject tracer packets with wait groups into q's and then wait for that.
type ExecutiveLimits ¶
type ExecutiveLimits struct {
tokens.KnotFreeContactStats // in out su co
}
ExecutiveLimits will be how we tell if the ex is 'full'
type ExecutiveStats ¶
type ExecutiveStats struct { // four float32 : in out su co tokens.KnotFreeContactStats Buffers float32 `json:"buf"` Name string `json:"name"` HTTPAddress string `json:"http"` TCPAddress string `json:"tcp"` IsGuru bool `json:"guru"` Limits *ExecutiveLimits `json:"limits"` }
ExecutiveStats is fractions relative to the limits. a fraction: 1.0 is 100% maxed out. 0 is idle.
func GetServerStats ¶
func GetServerStats(addr string) (*ExecutiveStats, error)
GetServerStats asks nicely over http
func (*ExecutiveStats) DeepCopy ¶
func (in *ExecutiveStats) DeepCopy() *ExecutiveStats
DeepCopy is an atwgenerated deepcopy function, copying the receiver, creating a new AppService.
func (*ExecutiveStats) DeepCopyInto ¶
func (in *ExecutiveStats) DeepCopyInto(out *ExecutiveStats)
DeepCopyInto the slow way
type ExpansionDesired ¶
type ExpansionDesired struct { ChangeAides int // +1 for grow, 0 for same, -1 for shrink RemoveAide string // the name of the aide to delete ChangeGurus int // +1 for grow, 0 for same, -1 for shrink }
ExpansionDesired is
func CalcExpansionDesired ¶
func CalcExpansionDesired(aides []*ExecutiveStats, gurus []*ExecutiveStats) ExpansionDesired
CalcExpansionDesired is used locally in tests and used by the operator to manage the cluster.
type HashType ¶
type HashType [3]uint64
HashType is for the hash table that Lookup uses.
func (*HashType) FromHashType ¶
FromHashType init an existing hash from another - basically a copy
func (*HashType) GetFractionalBits ¶
GetFractionalBits returns a slice of n bits. Values of n greater than 64 are not implemented.
func (*HashType) GetHalfHash ¶
GetHalfHash is for cases when we can do with 'just' 64 bits.
func (*HashType) HashBytes ¶
HashBytes will initialize an existing hash from a string. The string will get hashed to provide the bits so we'll wish this was faster. It doesn't have to be crypto safe but it does need to be evenly distributed. allocates. wanted to use highwayhash.New128 but was scared of 128 bits.
func (*HashType) HashString ¶
HashString will hash the string and init the HashType
func (*HashType) InitFromBytes ¶
InitFromBytes because I need to convert from [] to HashType should return error? rename
type LookupTableStruct ¶
type LookupTableStruct struct {
// contains filtered or unexported fields
}
LookupTableStruct is good for message routing and address lookup.
func NewLookupTable ¶
func NewLookupTable(projectedTopicCount int, aname string, isGuru bool, getTime func() uint32) *LookupTableStruct
NewLookupTable makes a LookupTableStruct, usually a singleton. In the tests we call here and then use the result to init a server. Starts 16 go routines that are hung on their 32 deep q's
func (*LookupTableStruct) FlushMarkerAndWait ¶
func (me *LookupTableStruct) FlushMarkerAndWait()
FlushMarkerAndWait puts a command into the head of *all* the q's and waits for *all* of them to arrive. This way we can wait. for testing.
func (*LookupTableStruct) GetAllSubsCount ¶
func (me *LookupTableStruct) GetAllSubsCount() (int, float64)
GetAllSubsCount returns the count of subscriptions and the average depth of the channels.
func (*LookupTableStruct) Heartbeat ¶
func (me *LookupTableStruct) Heartbeat(now uint32)
Heartbeat is every 10 sec. now is unix seconds.
func (*LookupTableStruct) PushUp ¶
func (me *LookupTableStruct) PushUp(p packets.Interface, h HashType) error
PushUp is to send msg up to guruness. has a q per contact. this is called directly by the pub/sub/look commands. getting an error here is kinda fatal.
func (*LookupTableStruct) SetUpstreamNames ¶
func (me *LookupTableStruct) SetUpstreamNames(names []string, addresses []string)
SetUpstreamNames is called by a cluster exec of some kind when changing the guru count. We will update upstreamRouterStruct names are like: guru-0f3bca46d414d506ecce3de9762df6c3 addresses are like: 10.244.0.149:8384
type StatsWithTime ¶
type StatsWithTime struct { tokens.KnotFreeContactStats Start uint32 `json:"st"` }
StatsWithTime is
type UpstreamNamesArg ¶
UpstreamNamesArg just has the one job
type WatchedTopic ¶
type WatchedTopic struct {
// contains filtered or unexported fields
}
watchedTopic is what we'll be collecting a lot of. what if *everyone* is watching this topic? and then the watchers.thetree is huge. these normally time out. See the heartbeat
func (*WatchedTopic) DeleteOption ¶
func (wt *WatchedTopic) DeleteOption(key string)
DeleteOption returns the value,true to go with the key or nil,false
func (*WatchedTopic) GetOption ¶
func (wt *WatchedTopic) GetOption(key string) ([]byte, bool)
GetOption returns the value,true to go with the key or nil,false
func (*WatchedTopic) IsBilling ¶
func (wt *WatchedTopic) IsBilling() (*BillingAccumulator, bool)
GetOption returns the value,true to go with the key or nil,false
func (*WatchedTopic) Iterator ¶
func (wt *WatchedTopic) Iterator() *subIterator
func (*WatchedTopic) OptionSize ¶
func (wt *WatchedTopic) OptionSize() int
utility routines for watchedTopic options OptionSize returns key count which is same as value count
func (*WatchedTopic) SetOption ¶
func (wt *WatchedTopic) SetOption(key string, val interface{})
SetOption adds the key,value