Documentation ¶
Index ¶
- Constants
- func DecodeStream(input io.Reader, callback func(map[string][]*service.Service, error)) error
- type ChangeEvent
- type ListByName
- type Listener
- type Server
- func (server *Server) HasService(id string) bool
- func (j *Server) MarshalJSON() ([]byte, error)
- func (j *Server) MarshalJSONBuf(buf fflib.EncodingBuffer) error
- func (s *Server) SortedServices() []*service.Service
- func (j *Server) UnmarshalJSON(input []byte) error
- func (j *Server) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
- type ServerByName
- type ServicesByAge
- type ServicesByName
- type ServicesState
- func (state *ServicesState) AddListener(listener Listener)
- func (state *ServicesState) AddServiceEntry(newSvc service.Service)
- func (state *ServicesState) BroadcastServices(fn func() []service.Service, looper director.Looper)
- func (state *ServicesState) BroadcastTombstones(fn func() []service.Service, looper director.Looper)
- func (state *ServicesState) ByService() map[string][]*service.Service
- func (state *ServicesState) EachLocalService(fn func(hostname *string, serviceId *string, svc *service.Service))
- func (state *ServicesState) EachServer(fn func(hostname *string, server *Server))
- func (state *ServicesState) EachService(fn func(hostname *string, serviceId *string, svc *service.Service))
- func (state *ServicesState) EachServiceSorted(fn func(hostname *string, serviceId *string, svc *service.Service))
- func (state *ServicesState) Encode() []byte
- func (state *ServicesState) ExpireServer(hostname string)
- func (state *ServicesState) Format(list *memberlist.Memberlist) string
- func (state *ServicesState) GetListeners() []Listener
- func (state *ServicesState) GetLocalServiceByID(id string) (service.Service, error)
- func (state *ServicesState) HasServer(hostname string) bool
- func (state *ServicesState) IsNewService(svc *service.Service) bool
- func (j *ServicesState) MarshalJSON() ([]byte, error)
- func (j *ServicesState) MarshalJSONBuf(buf fflib.EncodingBuffer) error
- func (state *ServicesState) Merge(otherState *ServicesState)
- func (state *ServicesState) NotifyListeners(svc *service.Service, previousStatus int, changedTime time.Time)
- func (state *ServicesState) Print(list *memberlist.Memberlist)
- func (state *ServicesState) ProcessServiceMsgs(looper director.Looper)
- func (state *ServicesState) RemoveListener(name string) error
- func (state *ServicesState) SendServices(services []service.Service, looper director.Looper)
- func (state *ServicesState) ServiceChanged(svc *service.Service, previousStatus int, updated time.Time)
- func (state *ServicesState) SortedServers() []*Server
- func (state *ServicesState) TombstoneOthersServices() []service.Service
- func (state *ServicesState) TombstoneServices(hostname string, containerList []service.Service) []service.Service
- func (state *ServicesState) TrackLocalListeners(fn func() []Listener, looper director.Looper)
- func (state *ServicesState) TrackNewServices(fn func() []service.Service, looper director.Looper)
- func (j *ServicesState) UnmarshalJSON(input []byte) error
- func (j *ServicesState) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
- func (state *ServicesState) UpdateService(svc service.Service)
- type StateChangedEvent
- type UrlListener
Examples ¶
Constants ¶
const ( TOMBSTONE_LIFESPAN = 3 * time.Hour // How long we keep tombstones around TOMBSTONE_COUNT = 10 // Send tombstones at 1 per second 10 times ALIVE_COUNT = 5 // Send new services at 1 per second 5 times TOMBSTONE_SLEEP_INTERVAL = 2 * time.Second // Sleep between local service checks TOMBSTONE_RETRANSMIT = 1 * time.Second // Time between tombstone retranmission ALIVE_LIFESPAN = 1*time.Minute + 20*time.Second // Down if not heard from in 80 seconds DRAINING_LIFESPAN = 10 * time.Minute // Down if not heard from in 10 minutes ALIVE_SLEEP_INTERVAL = 1 * time.Second // Sleep between local service checks ALIVE_BROADCAST_INTERVAL = 1 * time.Minute // Broadcast Alive messages every minute )
const ( ClientTimeout = 3 * time.Second DefaultRetries = 5 )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ChangeEvent ¶
A ChangeEvent represents the time and hostname that was modified and signals a major state change event. It is passed to listeners over the listeners channel in the state object.
func (*ChangeEvent) MarshalJSON ¶ added in v1.3.0
func (j *ChangeEvent) MarshalJSON() ([]byte, error)
MarshalJSON marshal bytes to json - template
func (*ChangeEvent) MarshalJSONBuf ¶ added in v1.3.0
func (j *ChangeEvent) MarshalJSONBuf(buf fflib.EncodingBuffer) error
MarshalJSONBuf marshal buff to json - template
func (*ChangeEvent) UnmarshalJSON ¶ added in v1.3.0
func (j *ChangeEvent) UnmarshalJSON(input []byte) error
UnmarshalJSON umarshall json - template of ffjson
func (*ChangeEvent) UnmarshalJSONFFLexer ¶ added in v1.3.0
func (j *ChangeEvent) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
UnmarshalJSONFFLexer fast json unmarshall - template ffjson
type ListByName ¶
type ListByName []*memberlist.Node
Memberlist --------------------------------
by Name
func (ListByName) Len ¶
func (a ListByName) Len() int
func (ListByName) Less ¶
func (a ListByName) Less(i, j int) bool
func (ListByName) Swap ¶
func (a ListByName) Swap(i, j int)
type Listener ¶ added in v1.1.0
type Listener interface { Chan() chan ChangeEvent // The event channel Name() string // The name of this listener Managed() bool // Is this managed by us? (e.g. auto-added/removed) }
A Listener receives update events from state changes.
type Server ¶
type Server struct { Name string Services map[string]*service.Service LastUpdated time.Time LastChanged time.Time }
Holds the state about one server in our cluster
func (*Server) HasService ¶
Shortcut for checking if the server has this service or not. Note: Not synchronized!
func (*Server) MarshalJSON ¶ added in v1.3.0
MarshalJSON marshal bytes to json - template
func (*Server) MarshalJSONBuf ¶ added in v1.3.0
func (j *Server) MarshalJSONBuf(buf fflib.EncodingBuffer) error
MarshalJSONBuf marshal buff to json - template
func (*Server) SortedServices ¶
func (*Server) UnmarshalJSON ¶ added in v1.3.0
UnmarshalJSON umarshall json - template of ffjson
func (*Server) UnmarshalJSONFFLexer ¶ added in v1.3.0
UnmarshalJSONFFLexer fast json unmarshall - template ffjson
type ServerByName ¶
type ServerByName []*Server
Servers --------------------------------
func (ServerByName) Len ¶
func (s ServerByName) Len() int
func (ServerByName) Less ¶
func (s ServerByName) Less(i, j int) bool
func (ServerByName) Swap ¶
func (s ServerByName) Swap(i, j int)
type ServicesByAge ¶
Services -------------------------------
by Age
func (ServicesByAge) Len ¶
func (s ServicesByAge) Len() int
func (ServicesByAge) Less ¶
func (s ServicesByAge) Less(i, j int) bool
func (ServicesByAge) Swap ¶
func (s ServicesByAge) Swap(i, j int)
type ServicesByName ¶
by Name
func (ServicesByName) Len ¶
func (a ServicesByName) Len() int
func (ServicesByName) Less ¶
func (a ServicesByName) Less(i, j int) bool
func (ServicesByName) Swap ¶
func (a ServicesByName) Swap(i, j int)
type ServicesState ¶
type ServicesState struct { Servers map[string]*Server LastChanged time.Time ClusterName string Hostname string Broadcasts chan [][]byte `json:"-"` ServiceMsgs chan service.Service `json:"-"` sync.RWMutex // contains filtered or unexported fields }
Holds the state about all the servers in the cluster
func Decode ¶
func Decode(data []byte) (*ServicesState, error)
Take a byte slice and return a properly reconstituted state struct
func NewServicesState ¶
func NewServicesState() *ServicesState
Returns a pointer to a properly configured ServicesState
func (*ServicesState) AddListener ¶
func (state *ServicesState) AddListener(listener Listener)
Add an event listener channel to the list that will be notified on major state change events. Channels must be buffered by at least 1 or they will block. Channels must be ready to receive input.
func (*ServicesState) AddServiceEntry ¶
func (state *ServicesState) AddServiceEntry(newSvc service.Service)
Take a service and merge it into our state. Correctly handle timestamps so we only add things newer than what we already know about. Retransmits updates to cluster peers.
func (*ServicesState) BroadcastServices ¶
func (state *ServicesState) BroadcastServices(fn func() []service.Service, looper director.Looper)
BroadcastServices loops forever, transmitting info about our containers on the broadcast channel. Intended to run as a background goroutine.
func (*ServicesState) BroadcastTombstones ¶
func (state *ServicesState) BroadcastTombstones(fn func() []service.Service, looper director.Looper)
Example ¶
state := NewServicesState() state.Hostname = "something" looper := director.NewTimedLooper(1, 1*time.Nanosecond, nil) go func() { <-state.Broadcasts }() state.BroadcastTombstones(func() []service.Service { return []service.Service{} }, looper) // TODO go test seems broken. It should match this, but can't for some reason: // XXX it can't see output generated _by_ the test code itself // TombstoneServices(): New host or not running services, skipping.
Output:
func (*ServicesState) ByService ¶
func (state *ServicesState) ByService() map[string][]*service.Service
Group the services into a map by service name rather than by the hosts they run on.
Example (Withmatcher) ¶
state := NewServicesState() state.Servers[hostname] = NewServer(hostname) svcId1 := "deadbeef123" svcId2 := "deadbeef101" svcId3 := "deadbeef105" baseTime := time.Now().UTC().Round(time.Second) service1 := service.Service{ ID: svcId1, Name: "service1-deadabba999", Image: "img1", Hostname: hostname, Updated: baseTime, } service2 := service.Service{ ID: svcId2, Name: "service1-abba1231234", Image: "img1", Hostname: hostname, Updated: baseTime, } service3 := service.Service{ ID: svcId3, Name: "service3", Image: "img2", Hostname: hostname, Updated: baseTime, } state.AddServiceEntry(service1) state.AddServiceEntry(service2) state.AddServiceEntry(service3) json, _ := json.MarshalIndent(state.ByService(), "", " ") println(string(json))
Output:
Example (Withoutmatcher) ¶
state := NewServicesState() state.Servers[hostname] = NewServer(hostname) svcId1 := "deadbeef123" svcId2 := "deadbeef101" svcId3 := "deadbeef105" baseTime := time.Now().UTC().Round(time.Second) service1 := service.Service{ ID: svcId1, Name: "service1", Image: "img1", Hostname: hostname, Updated: baseTime, } service2 := service.Service{ ID: svcId2, Name: "service2", Image: "img1", Hostname: hostname, Updated: baseTime, } service3 := service.Service{ ID: svcId3, Name: "service3", Image: "img2", Hostname: hostname, Updated: baseTime, } state.AddServiceEntry(service1) state.AddServiceEntry(service2) state.AddServiceEntry(service3) json, _ := json.MarshalIndent(state.ByService(), "", " ") println(string(json))
Output:
func (*ServicesState) EachLocalService ¶
func (state *ServicesState) EachLocalService(fn func(hostname *string, serviceId *string, svc *service.Service))
func (*ServicesState) EachServer ¶
func (state *ServicesState) EachServer(fn func(hostname *string, server *Server))
func (*ServicesState) EachService ¶
func (state *ServicesState) EachService(fn func(hostname *string, serviceId *string, svc *service.Service))
func (*ServicesState) EachServiceSorted ¶
func (state *ServicesState) EachServiceSorted(fn func(hostname *string, serviceId *string, svc *service.Service))
func (*ServicesState) Encode ¶
func (state *ServicesState) Encode() []byte
Return a Marshaled/Encoded byte array that can be deocoded with catalog.Decode()
func (*ServicesState) ExpireServer ¶
func (state *ServicesState) ExpireServer(hostname string)
A server has left the cluster, so tombstone all of its records
func (*ServicesState) Format ¶
func (state *ServicesState) Format(list *memberlist.Memberlist) string
Pretty-print(ish) a services state struct so a human can read it on the terminal. Makes for awesome web apps.
func (*ServicesState) GetListeners ¶ added in v1.3.0
func (state *ServicesState) GetListeners() []Listener
GetListeners returns a slice containing all the current listeners
func (*ServicesState) GetLocalServiceByID ¶
func (state *ServicesState) GetLocalServiceByID(id string) (service.Service, error)
GetLocalServiceByID returns a service for a given ID if it happens to exist on the current host. Returns an error otherwise.
func (*ServicesState) HasServer ¶
func (state *ServicesState) HasServer(hostname string) bool
Shortcut for checking if the Servers map has an entry for this hostname.
func (*ServicesState) IsNewService ¶
func (state *ServicesState) IsNewService(svc *service.Service) bool
Do we know about this service already? If we do, is it a tombstone?
func (*ServicesState) MarshalJSON ¶ added in v1.3.0
func (j *ServicesState) MarshalJSON() ([]byte, error)
MarshalJSON marshal bytes to json - template
func (*ServicesState) MarshalJSONBuf ¶ added in v1.3.0
func (j *ServicesState) MarshalJSONBuf(buf fflib.EncodingBuffer) error
MarshalJSONBuf marshal buff to json - template
func (*ServicesState) Merge ¶
func (state *ServicesState) Merge(otherState *ServicesState)
Merge a complete state struct into this one. Usually used on node startup and during anti-entropy operations.
func (*ServicesState) NotifyListeners ¶
func (state *ServicesState) NotifyListeners(svc *service.Service, previousStatus int, changedTime time.Time)
Tell all of our listeners that something changed for a host at set timestamp. See AddListener() for information about how channels must be configured.
func (*ServicesState) Print ¶
func (state *ServicesState) Print(list *memberlist.Memberlist)
Print the formatted struct
func (*ServicesState) ProcessServiceMsgs ¶ added in v1.1.0
func (state *ServicesState) ProcessServiceMsgs(looper director.Looper)
ProcessNewServiceMsgs is to be run in a goroutine, and processes incoming service notices.
func (*ServicesState) RemoveListener ¶ added in v1.1.1
func (state *ServicesState) RemoveListener(name string) error
Remove an event listener channel by name. This will find the first listener in the list with the specified name and will remove it.
func (*ServicesState) SendServices ¶
func (state *ServicesState) SendServices(services []service.Service, looper director.Looper)
Actually transmit an encoded service record into the channel. Runs a background goroutine that continues the broadcast for 10 seconds so we have a pretty good idea that it was delivered.
func (*ServicesState) ServiceChanged ¶
func (state *ServicesState) ServiceChanged(svc *service.Service, previousStatus int, updated time.Time)
Tell the state that a particular service transitioned from one state to another.
func (*ServicesState) SortedServers ¶
func (state *ServicesState) SortedServers() []*Server
func (*ServicesState) TombstoneOthersServices ¶
func (state *ServicesState) TombstoneOthersServices() []service.Service
func (*ServicesState) TombstoneServices ¶
func (*ServicesState) TrackLocalListeners ¶ added in v1.3.0
func (state *ServicesState) TrackLocalListeners(fn func() []Listener, looper director.Looper)
TrackLocalListeners runs in the background and repeatedly calls a discovery function to return a list of event listeners. These will then be added to to the listener list. Managed listeners no longer reported from discovery will be removed.
func (*ServicesState) TrackNewServices ¶
func (state *ServicesState) TrackNewServices(fn func() []service.Service, looper director.Looper)
TrackNewServices talks to the discovery mechanism and tracks any services we don't already know about.
func (*ServicesState) UnmarshalJSON ¶ added in v1.3.0
func (j *ServicesState) UnmarshalJSON(input []byte) error
UnmarshalJSON umarshall json - template of ffjson
func (*ServicesState) UnmarshalJSONFFLexer ¶ added in v1.3.0
func (j *ServicesState) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
UnmarshalJSONFFLexer fast json unmarshall - template ffjson
func (*ServicesState) UpdateService ¶
func (state *ServicesState) UpdateService(svc service.Service)
UpdateService enqueues a state update for a given service
type StateChangedEvent ¶
type StateChangedEvent struct { State *ServicesState ChangeEvent ChangeEvent }
A StateChangedEvent is sent to UrlListeners when a significant event has changed the ServicesState.
type UrlListener ¶
type UrlListener struct { Url string Retries int Client *http.Client // contains filtered or unexported fields }
An UrlListener is an event listener that receives updates over an HTTP POST to an endpoint.
func NewUrlListener ¶
func NewUrlListener(listenurl string, managed bool) *UrlListener
func (*UrlListener) Chan ¶ added in v1.1.0
func (u *UrlListener) Chan() chan ChangeEvent
func (*UrlListener) Managed ¶ added in v1.3.0
func (u *UrlListener) Managed() bool
func (*UrlListener) Name ¶ added in v1.1.0
func (u *UrlListener) Name() string
func (*UrlListener) SetName ¶ added in v1.3.0
func (u *UrlListener) SetName(name string)
func (*UrlListener) Stop ¶ added in v1.3.1
func (u *UrlListener) Stop()
func (*UrlListener) Watch ¶
func (u *UrlListener) Watch(state *ServicesState)