Documentation ¶
Overview ¶
Package gossip implements functionality to build gossip agents and control their life cycle: start/stop, join/leave the gossip network, send messages, ...
Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- type Agent
- func (a *Agent) Broadcasts() *memberlist.TransmitLimitedQueue
- func (a *Agent) DeregisterProcessor(name string)
- func (a *Agent) GetAddrPort() (net.IP, uint16)
- func (a *Agent) Join(addrs []string) (int, error)
- func (a *Agent) Leave() error
- func (a *Agent) Memberlist() *memberlist.Memberlist
- func (a *Agent) RegisterMetrics(cs []prometheus.Collector)
- func (a *Agent) RegisterProcessor(name string, p Processor)
- func (a *Agent) Send(msg *Message)
- func (a *Agent) Shutdown() error
- func (a *Agent) Start()
- func (a *Agent) State() Status
- type AgentOptionF
- func SetAdvertiseAddr(addr string) AgentOptionF
- func SetBindAddr(addr string) AgentOptionF
- func SetBroadcastTimeout(timeout time.Duration) AgentOptionF
- func SetCache(size int) AgentOptionF
- func SetEnableCompression(enabled bool) AgentOptionF
- func SetLeaveOnTerm(leave bool) AgentOptionF
- func SetLeavePropagateDelay(delay time.Duration) AgentOptionF
- func SetMetricsServer(addr string) AgentOptionF
- func SetNodeName(name string) AgentOptionF
- func SetNotifier(n Notifier) AgentOptionF
- func SetProcessInterval(interval time.Duration) AgentOptionF
- func SetProcessors(p map[string]Processor) AgentOptionF
- func SetQEDClient(qed *client.HTTPClient) AgentOptionF
- func SetRole(role string) AgentOptionF
- func SetSnapshotStore(store SnapshotStore) AgentOptionF
- func SetStartJoin(addrs []string) AgentOptionF
- func SetTasksManager(tm TasksManager) AgentOptionF
- func SetTimeoutQueues(timeout time.Duration) AgentOptionF
- type BPlusTreeStore
- type BatchProcessor
- type Cache
- type Config
- type Filter
- type Message
- type MessageBus
- type MessageQueue
- type MessageType
- type Meta
- type Notifier
- type Peer
- type PeerList
- func (l *PeerList) Append(m *PeerList)
- func (l *PeerList) Delete(m *Peer)
- func (l *PeerList) Exclude(ex *PeerList) *PeerList
- func (l *PeerList) Filter(f Filter) *PeerList
- func (l *PeerList) Shuffle() *PeerList
- func (l PeerList) Size() int
- func (l *PeerList) Take(n int) *PeerList
- func (l *PeerList) Update(m *Peer)
- type PrinterFactory
- type Processor
- type Producer
- type RestSnapshotStore
- func (r *RestSnapshotStore) Count() (uint64, error)
- func (r *RestSnapshotStore) DeleteRange(start uint64, end uint64) error
- func (r *RestSnapshotStore) GetRange(start uint64, end uint64) ([]protocol.SignedSnapshot, error)
- func (r *RestSnapshotStore) GetSnapshot(version uint64) (*protocol.SignedSnapshot, error)
- func (r *RestSnapshotStore) PutBatch(b *protocol.BatchSnapshots) error
- func (r *RestSnapshotStore) PutSnapshot(version uint64, snapshot *protocol.SignedSnapshot) error
- type RestSnapshotStoreConfig
- type SimpleNotifier
- type SimpleNotifierConfig
- type SimpleTasksManager
- type SimpleTasksManagerConfig
- type SnapshotStore
- type Status
- type StoreItem
- type Subscriber
- type Subscribers
- type Task
- type TaskFactory
- type TasksManager
- type Topology
Constants ¶
const DefaultBindPort int = 7946
This is the default port that we use for the Agent communication
const (
MAXMESSAGEID = 1 << 8
)
Variables ¶
var ChTimedOut error = errors.New("Timeout sending data to channel")
var NoSubscribersFound error = errors.New("No subscribers found")
Functions ¶
This section is empty.
Types ¶
type Agent ¶
type Agent struct { // Self stores the peer information corresponding to // this agent instance. It is used to make routing // decissions. Self *Peer // A cached KV to be used by processors and tasks Cache Cache // In channel receives messages from the gossip // network to be processed by the agent In MessageBus // Out channel enqueue the messages to be forwarded to // other gossip agents Out MessageBus // Client to a running QED Qed *client.HTTPClient //Client to a notification service Notifier Notifier // Client to a snapshot store service SnapshotStore SnapshotStore //Client to a task manager service Tasks TasksManager // contains filtered or unexported fields }
Agent exposes the necesary API to interact with the gossip network, the snapshot store, the QED log and the alerts store.
The agent API enables QED users to implement and integrate its own tools and services with QED.
func NewAgent ¶
func NewAgent(options ...AgentOptionF) (*Agent, error)
NewAgent returns a configured and started agent or error if it cannot be created. On return, the agent is already connected to the gossip network but it will not process any information. It will though enqueue request as soon as it is created. When those queues are full, messages will start to be dropped silently.
func NewAgentFromConfig ¶
Creates new agent from a configuration object It does not create external clients like QED, SnapshotStore or Notifier, nor a task manager.
func NewDefaultAgent ¶
func NewDefaultAgent(conf *Config, qed *client.HTTPClient, s SnapshotStore, t TasksManager, n Notifier) (*Agent, error)
Returns a new agent with all the APIs initialized and with a cache of size bytes.
func (*Agent) Broadcasts ¶
func (a *Agent) Broadcasts() *memberlist.TransmitLimitedQueue
Returns the broadcast facility to manage broadcasts messages directly
func (*Agent) DeregisterProcessor ¶
Deregister a processor per name. It will not fail if the processor does not exist.
func (*Agent) GetAddrPort ¶
Returns this agent IP address and port
func (*Agent) Leave ¶
Leave ask the agent to leave the gossip network gracefully, communicating to others this agent want to leave
func (*Agent) Memberlist ¶
func (a *Agent) Memberlist() *memberlist.Memberlist
Returns the memberlist object to manage the gossip api directly
func (*Agent) RegisterMetrics ¶
func (a *Agent) RegisterMetrics(cs []prometheus.Collector)
Register a slice of collectors in the agent metrics server
func (*Agent) RegisterProcessor ¶
Register a new processor into the agent, to add some tasks per batch to be executed by the task manager.
func (*Agent) Send ¶
Sends a batch using the gossip network reliable transport to other nodes based on the routing policy applied
func (*Agent) Shutdown ¶
AgentStatusShutdown forcefully shuts down the Agent instance, stopping all network activity and background maintenance associated with the instance.
This is not a graceful shutdown, and should be preceded by a call to Leave. Otherwise, other agents in the cluster will detect this agent's exit as a agent failure.
It is safe to call this method multiple times.
type AgentOptionF ¶
func SetAdvertiseAddr ¶
func SetAdvertiseAddr(addr string) AgentOptionF
func SetBindAddr ¶
func SetBindAddr(addr string) AgentOptionF
func SetBroadcastTimeout ¶
func SetBroadcastTimeout(timeout time.Duration) AgentOptionF
func SetCache ¶
func SetCache(size int) AgentOptionF
export GOGC variable to make GC to collect memory adecuately if the cache is too big
func SetEnableCompression ¶
func SetEnableCompression(enabled bool) AgentOptionF
func SetLeaveOnTerm ¶
func SetLeaveOnTerm(leave bool) AgentOptionF
func SetLeavePropagateDelay ¶
func SetLeavePropagateDelay(delay time.Duration) AgentOptionF
func SetMetricsServer ¶
func SetMetricsServer(addr string) AgentOptionF
func SetNodeName ¶
func SetNodeName(name string) AgentOptionF
func SetNotifier ¶
func SetNotifier(n Notifier) AgentOptionF
func SetProcessInterval ¶
func SetProcessInterval(interval time.Duration) AgentOptionF
func SetProcessors ¶
func SetProcessors(p map[string]Processor) AgentOptionF
func SetQEDClient ¶
func SetQEDClient(qed *client.HTTPClient) AgentOptionF
func SetRole ¶
func SetRole(role string) AgentOptionF
func SetSnapshotStore ¶
func SetSnapshotStore(store SnapshotStore) AgentOptionF
func SetStartJoin ¶
func SetStartJoin(addrs []string) AgentOptionF
func SetTasksManager ¶
func SetTasksManager(tm TasksManager) AgentOptionF
func SetTimeoutQueues ¶
func SetTimeoutQueues(timeout time.Duration) AgentOptionF
type BPlusTreeStore ¶
type BPlusTreeStore struct {
// contains filtered or unexported fields
}
func (*BPlusTreeStore) Count ¶
func (s *BPlusTreeStore) Count() (uint64, error)
func (*BPlusTreeStore) DeleteRange ¶
func (s *BPlusTreeStore) DeleteRange(start, end uint64) error
func (BPlusTreeStore) GetRange ¶
func (s BPlusTreeStore) GetRange(start, end uint64) ([]protocol.Snapshot, error)
func (*BPlusTreeStore) PutSnapshot ¶
func (s *BPlusTreeStore) PutSnapshot(version uint64, snapshot protocol.Snapshot) error
type BatchProcessor ¶
type BatchProcessor struct {
// contains filtered or unexported fields
}
Reads agents in queue, and generates a *protocol.BatchSnapshots queue. It also calls the tasks factories and enqueue the generated tasks in the agent task manager.
func NewBatchProcessor ¶
func NewBatchProcessor(a *Agent, tf []TaskFactory) *BatchProcessor
func (*BatchProcessor) Metrics ¶
func (d *BatchProcessor) Metrics() []prometheus.Collector
func (*BatchProcessor) Stop ¶
func (d *BatchProcessor) Stop()
func (*BatchProcessor) Subscribe ¶
func (d *BatchProcessor) Subscribe(id int, ch <-chan *Message)
type Cache ¶
type Cache interface { Get(key []byte) (value []byte, err error) Set(key []byte, value []byte, expireSeconds int) (err error) }
Defines the methods required for a cache implementation to be used by the agent and its processors
type Config ¶
type Config struct { Log string `desc:"Set log level to info, error or debug"` // The name of this node. This must be unique in the cluster. If this // is not set, Auditor will set it to the hostname of the running machine. NodeName string `desc:"Set gossip name for this agent"` Role string `desc:"Set gossip role for this agent routing"` // BindAddr is the address that the Auditor agent's communication ports // will bind to. Auditor will use this address to bind to for both TCP // and UDP connections. If no port is present in the address, the default // port will be used. BindAddr string `desc:"Address ip:port to expose gossip protocol"` // AdvertiseAddr is the address agent will advertise to // other members of the cluster. Can be used for basic NAT traversal // where both the internal ip:port and external ip:port are known. AdvertiseAddr string `desc:"Address ip:port to advertise in gossip if our bind addr is not reachable from other agents"` // MetricsAddr is the address where the metrics server will expose its // API to enable mterics collectors retrieve them MetricsAddr string `desc:"Address ip:port to expose metrics"` // LeaveOnTerm controls if the agent does a graceful leave when receiving // the TERM signal. Defaults false. This can be changed on reload. LeaveOnTerm bool `desc:"Controls if the agent does a graceful leave when receiving the TERM signal"` // StartJoin is a list of addresses to attempt to join when the // agent starts. If the agent is unable to communicate with any of these // addresses, then the agent will error and exit. StartJoin []string `desc:"Address list ip1:port1,ip2:port2... to join other agents and form a gossip network"` // EnableCompression specifies whether message compression is enabled // by `github.com/hashicorp/memberlist` when broadcasting events. EnableCompression bool `desc:"Specifies whether message compression is enabled when broadcasting events"` // BroadcastTimeout is the amount of time to wait for a broadcast // message to be sent to the cluster. Broadcast messages are used for // things like leave messages and force remove messages. If this is not // set, a timeout of 5 seconds will be set. BroadcastTimeout time.Duration `desc:"The amount of time to wait for a broadcast message to be sent to the cluster"` // LeavePropagateDelay is for our leave (node dead) message to propagate // through the cluster. In particular, we want to stay up long enough to // service any probes from other nodes before they learn about us // leaving and stop probing. Otherwise, we risk getting node failures as // we leave. LeavePropagateDelay time.Duration `desc:"Time for our leave (node dead) message to propagate through the cluster"` // MemberlistConfig is the memberlist configuration that Agent will // use to do the underlying membership management and gossip. Some // fields in the MemberlistConfig will be overwritten by Auditor no // matter what: // // * Name - This will always be set to the same as the NodeName // in this configuration. // // * Events - Auditor uses a custom event delegate. // // * Delegate - Auditor uses a custom delegate. // MemberlistConfig *memberlist.Config `flag:"-"` // Timeout enqueuing elements on a channel TimeoutQueues time.Duration `desc:"Timeout enqueuing elements on a channel"` // Interval to send out messages to other agents ProcessInterval time.Duration `desc:"Interval to send out messages to other agents"` // Maximum number of concurrent senders MaxSenders int `desc:"Maximum number of concurrent senders"` // Cache size in bytes to store agent temporal objects. // This cache will evict old objects by default CacheSize int `desc:"Cache size in bytes to store agent temporal objects"` }
Config is the configuration for creating an Agent instance
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig contains the defaults for configurations.
type Message ¶
type Message struct { Kind MessageType From *Peer TTL int Payload []byte }
Gossip message code. Up to 255 different messages.
type MessageBus ¶
type MessageBus struct {
// contains filtered or unexported fields
}
Implements a subscriber / publisher model for gossip Messages
func (*MessageBus) Publish ¶
func (eb *MessageBus) Publish(msg *Message) error
Publish a message to all the subscribers of its MessageType.
If there is no subscriber the message will not be sent and will be lost.
All the subscribers will get all the messages. If a subscriber is busy it will block delivery to the next subscribers. Also publish will create a goroutine per message sent, and will not time out.
func (*MessageBus) Subscribe ¶
func (eb *MessageBus) Subscribe(t MessageType, s Subscriber, size int)
Subscribe add a subscriber to the its correspondant pool. Returns the subscription id needed for unsubscribe
func (*MessageBus) Unsubscribe ¶
func (eb *MessageBus) Unsubscribe(t MessageType, id int)
Unsubscribe a subscriber by its id
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
Implements a message queue in which subscribers consumes producers Messages.
There is a queue for each kind of message, and all the producers and subscribers will operate over the same chan *Message.
This pattern allows a pool of subscribers to consume messages from a pool of producers without blocking.
func (*MessageQueue) Cancel ¶
func (mq *MessageQueue) Cancel(t MessageType)
Cancels signals all producers and consumers to stop closing the internal channel
func (*MessageQueue) Consumer ¶
func (mq *MessageQueue) Consumer(t MessageType, s Subscriber)
Register a consumer to the MessageType queue
func (*MessageQueue) Producer ¶
func (mq *MessageQueue) Producer(t MessageType, p Producer)
Register a producer to the MessageType queue
type MessageType ¶
type MessageType uint8
const (
BatchMessageType MessageType = iota // Contains a protocol.BatchSnapshots
)
type Notifier ¶
Notifies string messages to external services. The process of sending the notifications is asynchronous, so a start and stop method is needed to activate/desactivate the process.
type Peer ¶
Member is a single member of the gossip cluster.
func ParsePeer ¶
func ParsePeer(node *memberlist.Node) *Peer
Builds a new peer from the memberlist.Node data
func (Peer) Node ¶
func (p Peer) Node() *memberlist.Node
Returns a memberlist node from a peer datra
type PeerList ¶
type PeerList struct {
L []*Peer
}
Implements a list of peers which is able to filter, merge and take elements from the head
func NewPeerList ¶
func NewPeerList() *PeerList
type PrinterFactory ¶
type PrinterFactory struct { }
PrinterFactory create tasks than print BatchSnapshots for testing purposes. Its intented to be used with the BatchSnapshot processor
func (PrinterFactory) Metrics ¶
func (p PrinterFactory) Metrics() []prometheus.Collector
type Processor ¶
type Processor interface { Start() Stop() Metrics() []prometheus.Collector }
A processor mission is to translate from and to the gossip network []byte type to whatever has semantic sense.
Also it should enqueue tasks in the agent task manager.
type Producer ¶
type Producer interface {
Produce(ch chan<- *Message)
}
A producers fills the chan *Message with the gossip messages to be consumed by a subscriber
type RestSnapshotStore ¶
type RestSnapshotStore struct {
// contains filtered or unexported fields
}
Implements access to a snapshot store in a http rest service. The process of sending the notifications is asynchronous, so a start and stop method is
func NewRestSnapshotStore ¶
func NewRestSnapshotStore(endpoint []string, dialTimeout, readTimeout time.Duration) *RestSnapshotStore
Returns a new RestSnapshotStore client
func NewRestSnapshotStoreFromConfig ¶
func NewRestSnapshotStoreFromConfig(c *RestSnapshotStoreConfig) *RestSnapshotStore
func (*RestSnapshotStore) Count ¶
func (r *RestSnapshotStore) Count() (uint64, error)
func (*RestSnapshotStore) DeleteRange ¶
func (r *RestSnapshotStore) DeleteRange(start uint64, end uint64) error
func (*RestSnapshotStore) GetRange ¶
func (r *RestSnapshotStore) GetRange(start uint64, end uint64) ([]protocol.SignedSnapshot, error)
func (*RestSnapshotStore) GetSnapshot ¶
func (r *RestSnapshotStore) GetSnapshot(version uint64) (*protocol.SignedSnapshot, error)
func (*RestSnapshotStore) PutBatch ¶
func (r *RestSnapshotStore) PutBatch(b *protocol.BatchSnapshots) error
Stores a batch int he store
func (*RestSnapshotStore) PutSnapshot ¶
func (r *RestSnapshotStore) PutSnapshot(version uint64, snapshot *protocol.SignedSnapshot) error
type RestSnapshotStoreConfig ¶
type RestSnapshotStoreConfig struct { Endpoint []string `desc:"REST snapshot store service endpoint list http://ip1:port1/path1,http://ip2:port2/path2... "` DialTimeout time.Duration `desc:"Timeout dialing the REST snapshot store service"` ReadTimeout time.Duration `desc:"Timeout reading the REST snapshot store service response"` }
RestSnapshotStore configuration object used to parse cli options and to build the SimpleNotifier instance
func DefaultRestSnapshotStoreConfig ¶
func DefaultRestSnapshotStoreConfig() *RestSnapshotStoreConfig
type SimpleNotifier ¶
type SimpleNotifier struct {
// contains filtered or unexported fields
}
Implements the default notification service client using an HTTP API:
This notifier posts the msg contents to the specified endpoint.
func NewSimpleNotifier ¶
func NewSimpleNotifier(endpoint []string, size int, dialTimeout, readTimeout time.Duration) *SimpleNotifier
Returns a new default notififier client configured to post messages to the endpoint provided. To use the default timeouts of 200ms set them to 0:
queueTimeout is the time to wait for the queue to accept a new message dialTimeout is the time to wait for dial to the notifications server readTimeout is the time to wait for the notifications server response
func NewSimpleNotifierFromConfig ¶
func NewSimpleNotifierFromConfig(c *SimpleNotifierConfig) *SimpleNotifier
Returns a SimpleNotifier pointer configured with configuration c.
func (*SimpleNotifier) Alert ¶
func (n *SimpleNotifier) Alert(msg string) error
Alert enqueue a message into the notifications queue to be sent. It will block if the notifications queue is full.
func (*SimpleNotifier) Start ¶
func (n *SimpleNotifier) Start()
Starts a process which send notifications
to a random url selected from the configuration list of urls.
func (*SimpleNotifier) Stop ¶
func (n *SimpleNotifier) Stop()
Makes the notifications process to end
type SimpleNotifierConfig ¶
type SimpleNotifierConfig struct { Endpoint []string `desc:"Notification service endpoint list http://ip1:port1/path1,http://ip2:port2/path2... "` QueueSize int `desc:"Notifications queue size"` DialTimeout time.Duration `desc:"Timeout dialing the notification service"` ReadTimeout time.Duration `desc:"Timeout reading the notification service response"` }
SimpleNotifier configuration object used to parse cli options and to build the SimpleNotifier instance
func DefaultSimpleNotifierConfig ¶
func DefaultSimpleNotifierConfig() *SimpleNotifierConfig
Returns the default configuration for the SimpleNotifier
type SimpleTasksManager ¶
type SimpleTasksManager struct {
// contains filtered or unexported fields
}
Simple implementation of a task manager used by the QED provided agents
func NewSimpleTasksManager ¶
func NewSimpleTasksManager(i time.Duration, max int) *SimpleTasksManager
NewTasksManager returns a new TasksManager and its task channel. The execution loop will try to execute up to maxTasks tasks each interval. Also the channel has maxTasks capacity.
func NewSimpleTasksManagerFromConfig ¶
func NewSimpleTasksManagerFromConfig(c *SimpleTasksManagerConfig) *SimpleTasksManager
func (*SimpleTasksManager) Add ¶
func (t *SimpleTasksManager) Add(task Task) error
Add a task to the task manager queue, with the configured timed out. It will block until the task is read if the channel is full.
func (*SimpleTasksManager) Len ¶
func (t *SimpleTasksManager) Len() int
Len returns the number of pending tasks enqueued in the tasks channel
func (*SimpleTasksManager) Start ¶
func (t *SimpleTasksManager) Start()
Start activates the task dispatcher to execute enqueued tasks
func (*SimpleTasksManager) Stop ¶
func (t *SimpleTasksManager) Stop()
Stop disables the task dispatcher It does not wait to empty the task queue nor closes the task channel.
type SimpleTasksManagerConfig ¶
type SimpleTasksManagerConfig struct { Interval time.Duration `desc:"Interval to execute enqueued tasks"` MaxTasks int `desc:"Maximum number of concurrent tasks"` }
SimpleTasksManager configuration object used to parse cli options and to build the SimpleNotifier instance
func DefaultSimpleTasksManagerConfig ¶
func DefaultSimpleTasksManagerConfig() *SimpleTasksManagerConfig
Returns the default configuration for the SimpleTasksManager
type SnapshotStore ¶
type SnapshotStore interface { PutBatch(b *protocol.BatchSnapshots) error PutSnapshot(version uint64, snapshot *protocol.SignedSnapshot) error GetRange(start, end uint64) ([]protocol.SignedSnapshot, error) GetSnapshot(version uint64) (*protocol.SignedSnapshot, error) DeleteRange(start, end uint64) error Count() (uint64, error) }
type Subscriber ¶
A Subscriber to agent gossip message queues is like:
func (my *BatchProcessor) Subscribe(c chan *Message) MessageType { my.inCh = c return BatchMessageType }
and receives a channel to read publised messages from the selected MessageType
type Subscribers ¶
type Subscribers []chan *Message
type Task ¶
type Task func() error
Task are executed by the TasksManager loop calling to their Do method. All tasks must have their own context already when they are enqueued into the task manager.
For example: // complies with the Processor interface
func TaskFactory(ctx contex.Context) Task { a := ctx.Value("agent").(gossip.Agent) b := ctx.Value("batch").(*protocol.BatchSnapshots) return func() error { fmt.Println(a.Send(b)) return nil } }
type TaskFactory ¶
type TaskFactory interface { New(context.Context) Task Metrics() []prometheus.Collector }
A task factory builds tasks with the provided context information.
The context contains the agent API and the context added by the message processor.
The implenetor of the task factory must know the details of the data included by the processor in the context.
type TasksManager ¶
TasksManager executes enqueued tasks, It is in charge of applying limits to task execution such as timeouts. It only has an API to stop and start the tasks execution loop.
type Topology ¶
Hold the gossip network information as this node sees it. This information can be used to route messages to other nodes.
func (*Topology) Each ¶
Returns a peer list of each kind with n elements on each kind, Each list is built excluding all the nodes in the list l, shuffling the result, and taking the n elements from the head of the list.