Documentation ¶
Index ¶
- type Endpoint
- func (e *Endpoint) AverageLatency() time.Duration
- func (e *Endpoint) EstDelTime() time.Time
- func (e *Endpoint) EventChan() chan<- transports.Event
- func (e *Endpoint) Init()
- func (e *Endpoint) IsActive() bool
- func (e *Endpoint) IsAlive() bool
- func (e *Endpoint) IsClosing() bool
- func (e *Endpoint) IsFailed() bool
- func (e *Endpoint) IsIdle() bool
- func (e *Endpoint) IsPinging() bool
- func (e *Endpoint) IsWarming() bool
- func (e *Endpoint) LineCount() int64
- func (e *Endpoint) Next() *Endpoint
- func (e *Endpoint) NumPending() int
- func (e *Endpoint) Pool() *addresspool.Pool
- func (e *Endpoint) Prev() *Endpoint
- func (e *Endpoint) PullBackPending() []*payload.Payload
- func (e *Endpoint) ReduceLatency()
- func (e *Endpoint) ReloadConfig(config *config.Network, finishOnFail bool) bool
- func (e *Endpoint) SendPing() error
- func (e *Endpoint) Server() string
- type Observer
- type Sink
- func (f *Sink) APINavigatable() admin.APINavigatable
- func (f *Sink) AddEndpoint(server string, addressPool *addresspool.Pool, finishOnFail bool) *Endpoint
- func (f *Sink) AddEndpointAfter(server string, addressPool *addresspool.Pool, finishOnFail bool, ...) *Endpoint
- func (f *Sink) CanQueue() bool
- func (f *Sink) ClearTimeout(timeout *Timeout)
- func (f *Sink) Count() int
- func (f *Sink) EventChan() <-chan transports.Event
- func (f *Sink) FindEndpoint(server string) *Endpoint
- func (f *Sink) ForceFailure(endpoint *Endpoint)
- func (f *Sink) Front() *Endpoint
- func (f *Sink) MoveEndpointAfter(endpoint *Endpoint, after *Endpoint)
- func (s *Sink) ProcessEvent(event transports.Event, observer Observer)
- func (f *Sink) ProcessTimeouts()
- func (f *Sink) QueuePayload(payload *payload.Payload) (*Endpoint, error)
- func (f *Sink) RegisterTimeout(timeout *Timeout, duration time.Duration, timeoutFunc TimeoutFunc)
- func (f *Sink) ReloadConfig(config *config.Network)
- func (f *Sink) Shutdown()
- func (f *Sink) ShutdownEndpoint(server string) bool
- func (f *Sink) TimeoutChan() <-chan time.Time
- type Timeout
- type TimeoutFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Endpoint ¶
type Endpoint struct { // Support scheduled task for this endpoint Timeout // contains filtered or unexported fields }
Endpoint structure represents a single remote endpoint
func (*Endpoint) AverageLatency ¶
AverageLatency returns the endpoint's average latency
func (*Endpoint) EstDelTime ¶
EstDelTime returns the expected time this endpoint will have delivered all of its events
func (*Endpoint) EventChan ¶
func (e *Endpoint) EventChan() chan<- transports.Event
EventChan returns the event channel transports should send events through
func (*Endpoint) Init ¶
func (e *Endpoint) Init()
Init prepares the internal Element structures for InternalList and prepares the pending payload structures
func (*Endpoint) IsPinging ¶
IsPinging returns true if the endpoint is still awaiting for a PONG response to a previous Ping request
func (*Endpoint) IsWarming ¶
IsWarming returns whether the endpoint is warming up or not (slow-start)
func (*Endpoint) NumPending ¶
NumPending returns the number of pending payloads on this endpoint
func (*Endpoint) Pool ¶
func (e *Endpoint) Pool() *addresspool.Pool
Pool returns the associated address pool This implements part of the transports.Proxy interface for callbacks
func (*Endpoint) PullBackPending ¶
PullBackPending returns all queued payloads back to the publisher Called when a failure happens
func (*Endpoint) ReduceLatency ¶
func (e *Endpoint) ReduceLatency()
ReduceLatency artificially reduces the recorded latency of the endpoint. It is used to ensure that really bad endpoints do not get ignored forever, as if events are never sent to it, the latency is never recalculated
func (*Endpoint) ReloadConfig ¶
ReloadConfig submits a new configuration to the transport, and returns true if the transports requested that it be restarted in order for the configuration to take effect
type Observer ¶
type Observer interface { // OnAck is called when an acknowledgement response is received // The payload is given and the second argument is true if this ack is the // first ack for this payload OnAck(*Endpoint, *payload.Payload, bool, int) // OnFail is called when the endpoint fails OnFail(*Endpoint) // OnFinished is called when an endpoint finishes and is removed // Returning false prevents the endpoint from being recreated, which it will // be if it still exists in the configuration OnFinish(*Endpoint) bool // OnPong is called when a pong response is received from the endpoint OnPong(*Endpoint) // OnStarted is called when an endpoint starts up and is ready OnStarted(*Endpoint) }
Observer is the interface implemented by the observer of the sink and will receive callbacks on status changes it needs to action
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
Sink structure contains the control channels that each endpoint will utilise. The newEndpoint method attaches new endpoints to this
func (*Sink) APINavigatable ¶
func (f *Sink) APINavigatable() admin.APINavigatable
APINavigatable returns an APINavigatable that exposes status information for this sink It should be called BEFORE adding any endpoints as existing endpoints will not automatically become monitored
func (*Sink) AddEndpoint ¶
func (f *Sink) AddEndpoint(server string, addressPool *addresspool.Pool, finishOnFail bool) *Endpoint
AddEndpoint initialises a new endpoint for a given server entry and adds it to the back of the list of endpoints
func (*Sink) AddEndpointAfter ¶
func (f *Sink) AddEndpointAfter(server string, addressPool *addresspool.Pool, finishOnFail bool, after *Endpoint) *Endpoint
AddEndpointAfter initialises a new endpoint for a given server entry and adds it in the list to the position after the given endpoint. If the given endpoint is nil it is added at the front
func (*Sink) ClearTimeout ¶
ClearTimeout removes a timeout structure
func (*Sink) EventChan ¶
func (f *Sink) EventChan() <-chan transports.Event
EventChan returns the event channel Status events and messages from endpoints pass through here for processing
func (*Sink) FindEndpoint ¶
FindEndpoint returns the endpoint associated with the given server entry, or nil if no endpoint is associated
func (*Sink) ForceFailure ¶
ForceFailure forces an endpoint to fail
func (*Sink) MoveEndpointAfter ¶
MoveEndpointAfter ensures the endpoint specified appears directly after the requested endpoint, or at the beginning if nil
func (*Sink) ProcessEvent ¶
func (s *Sink) ProcessEvent(event transports.Event, observer Observer)
ProcessEvent performs the necessary processing of events
func (*Sink) ProcessTimeouts ¶
func (f *Sink) ProcessTimeouts()
ProcessTimeouts processes all pending timeouts
func (*Sink) QueuePayload ¶
QueuePayload locates the best endpoint to send the events to, and attempts to queue the events on that endpoint. Returns the best endpoint and any error that occurred sending the events.
func (*Sink) RegisterTimeout ¶
func (f *Sink) RegisterTimeout(timeout *Timeout, duration time.Duration, timeoutFunc TimeoutFunc)
RegisterTimeout registers a timeout structure with a timeout and timeout callback
func (*Sink) ReloadConfig ¶
ReloadConfig loads in a new configuration, endpoints will be shutdown if they are no longer in the configuration
func (*Sink) Shutdown ¶
func (f *Sink) Shutdown()
Shutdown signals all associated endpoints to begin shutting down
func (*Sink) ShutdownEndpoint ¶
ShutdownEndpoint requests the endpoint associated with the given server entry to shutdown, returning false if the endpoint could not be shutdown
func (*Sink) TimeoutChan ¶
TimeoutChan returns a channel which will receive the current time when the next endpoint hits its registered timeout
type Timeout ¶
type Timeout struct {
// contains filtered or unexported fields
}
Timeout holds timeout information for use with the sink timeout channel It can be embedded into any other structure that requires timeout support
func (*Timeout) InitTimeout ¶
func (t *Timeout) InitTimeout()
InitTimeout initialises the timeout structure
type TimeoutFunc ¶
type TimeoutFunc func()
TimeoutFunc describes a callback that can be registered with the sink timer channel