gossip

package
v0.3.0-docs.0...-f96dd80 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2019 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package gossip implements functionality to build gossip agents and control their life cycle: start/stop, join/leave the gossip network, send messages, ...

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Copyright 2018-2019 Banco Bilbao Vizcaya Argentaria, S.A.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Index

Constants

View Source
const DefaultBindPort int = 7946

This is the default port that we use for the Agent communication

View Source
const (
	MAXMESSAGEID = 1 << 8
)

Variables

View Source
var ChTimedOut error = errors.New("Timeout sending data to channel")
View Source
var NoSubscribersFound error = errors.New("No subscribers found")

Functions

This section is empty.

Types

type Agent

type Agent struct {

	// Self stores the peer information corresponding to
	// this agent instance. It is used to make routing
	// decissions.
	Self *Peer

	// A cached KV to be used by processors and tasks
	Cache Cache

	// In channel receives messages from the gossip
	// network to be processed by the agent
	In MessageBus

	// Out channel enqueue the messages to be forwarded to
	// other gossip agents
	Out MessageBus

	// Client to a running QED
	Qed *client.HTTPClient

	//Client to a notification service
	Notifier Notifier

	// Client to a snapshot store service
	SnapshotStore SnapshotStore

	//Client to a task manager service
	Tasks TasksManager
	// contains filtered or unexported fields
}

Agent exposes the necesary API to interact with the gossip network, the snapshot store, the QED log and the alerts store.

The agent API enables QED users to implement and integrate its own tools and services with QED.

func NewAgent

func NewAgent(options ...AgentOptionF) (*Agent, error)

NewAgent returns a configured and started agent or error if it cannot be created. On return, the agent is already connected to the gossip network but it will not process any information. It will though enqueue request as soon as it is created. When those queues are full, messages will start to be dropped silently.

func NewAgentFromConfig

func NewAgentFromConfig(conf *Config) (agent *Agent, err error)

Creates new agent from a configuration object It does not create external clients like QED, SnapshotStore or Notifier, nor a task manager.

func NewDefaultAgent

func NewDefaultAgent(conf *Config, qed *client.HTTPClient, s SnapshotStore, t TasksManager, n Notifier) (*Agent, error)

Returns a new agent with all the APIs initialized and with a cache of size bytes.

func (*Agent) Broadcasts

func (a *Agent) Broadcasts() *memberlist.TransmitLimitedQueue

Returns the broadcast facility to manage broadcasts messages directly

func (*Agent) DeregisterProcessor

func (a *Agent) DeregisterProcessor(name string)

Deregister a processor per name. It will not fail if the processor does not exist.

func (*Agent) GetAddrPort

func (a *Agent) GetAddrPort() (net.IP, uint16)

Returns this agent IP address and port

func (*Agent) Join

func (a *Agent) Join(addrs []string) (int, error)

Join asks the Agent instance to join the nodes with the give addrs addresses.

func (*Agent) Leave

func (a *Agent) Leave() error

Leave ask the agent to leave the gossip network gracefully, communicating to others this agent want to leave

func (*Agent) Memberlist

func (a *Agent) Memberlist() *memberlist.Memberlist

Returns the memberlist object to manage the gossip api directly

func (*Agent) RegisterMetrics

func (a *Agent) RegisterMetrics(cs []prometheus.Collector)

Register a slice of collectors in the agent metrics server

func (*Agent) RegisterProcessor

func (a *Agent) RegisterProcessor(name string, p Processor)

Register a new processor into the agent, to add some tasks per batch to be executed by the task manager.

func (*Agent) Send

func (a *Agent) Send(msg *Message)

Sends a batch using the gossip network reliable transport to other nodes based on the routing policy applied

func (*Agent) Shutdown

func (a *Agent) Shutdown() error

AgentStatusShutdown forcefully shuts down the Agent instance, stopping all network activity and background maintenance associated with the instance.

This is not a graceful shutdown, and should be preceded by a call to Leave. Otherwise, other agents in the cluster will detect this agent's exit as a agent failure.

It is safe to call this method multiple times.

func (*Agent) Start

func (a *Agent) Start()

Enables the processing engines of the agent

func (*Agent) State

func (a *Agent) State() Status

Returns this agent status. This can be used to check if we should stop doing something based on the state of the agent in the gossip network.

type AgentOptionF

type AgentOptionF func(*Agent) error

func SetAdvertiseAddr

func SetAdvertiseAddr(addr string) AgentOptionF

func SetBindAddr

func SetBindAddr(addr string) AgentOptionF

func SetBroadcastTimeout

func SetBroadcastTimeout(timeout time.Duration) AgentOptionF

func SetCache

func SetCache(size int) AgentOptionF

export GOGC variable to make GC to collect memory adecuately if the cache is too big

func SetEnableCompression

func SetEnableCompression(enabled bool) AgentOptionF

func SetLeaveOnTerm

func SetLeaveOnTerm(leave bool) AgentOptionF

func SetLeavePropagateDelay

func SetLeavePropagateDelay(delay time.Duration) AgentOptionF

func SetMetricsServer

func SetMetricsServer(addr string) AgentOptionF

func SetNodeName

func SetNodeName(name string) AgentOptionF

func SetNotifier

func SetNotifier(n Notifier) AgentOptionF

func SetProcessInterval

func SetProcessInterval(interval time.Duration) AgentOptionF

func SetProcessors

func SetProcessors(p map[string]Processor) AgentOptionF

func SetQEDClient

func SetQEDClient(qed *client.HTTPClient) AgentOptionF

func SetRole

func SetRole(role string) AgentOptionF

func SetSnapshotStore

func SetSnapshotStore(store SnapshotStore) AgentOptionF

func SetStartJoin

func SetStartJoin(addrs []string) AgentOptionF

func SetTasksManager

func SetTasksManager(tm TasksManager) AgentOptionF

func SetTimeoutQueues

func SetTimeoutQueues(timeout time.Duration) AgentOptionF

type BPlusTreeStore

type BPlusTreeStore struct {
	// contains filtered or unexported fields
}

func (*BPlusTreeStore) Count

func (s *BPlusTreeStore) Count() (uint64, error)

func (*BPlusTreeStore) DeleteRange

func (s *BPlusTreeStore) DeleteRange(start, end uint64) error

func (BPlusTreeStore) GetRange

func (s BPlusTreeStore) GetRange(start, end uint64) ([]protocol.Snapshot, error)

func (*BPlusTreeStore) PutSnapshot

func (s *BPlusTreeStore) PutSnapshot(version uint64, snapshot protocol.Snapshot) error

type BatchProcessor

type BatchProcessor struct {
	// contains filtered or unexported fields
}

Reads agents in queue, and generates a *protocol.BatchSnapshots queue. It also calls the tasks factories and enqueue the generated tasks in the agent task manager.

func NewBatchProcessor

func NewBatchProcessor(a *Agent, tf []TaskFactory) *BatchProcessor

func (*BatchProcessor) Metrics

func (d *BatchProcessor) Metrics() []prometheus.Collector

func (*BatchProcessor) Stop

func (d *BatchProcessor) Stop()

func (*BatchProcessor) Subscribe

func (d *BatchProcessor) Subscribe(id int, ch <-chan *Message)

type Cache

type Cache interface {
	Get(key []byte) (value []byte, err error)
	Set(key []byte, value []byte, expireSeconds int) (err error)
}

Defines the methods required for a cache implementation to be used by the agent and its processors

type Config

type Config struct {
	Log string `desc:"Set log level to info, error or debug"`

	// The name of this node. This must be unique in the cluster. If this
	// is not set, Auditor will set it to the hostname of the running machine.
	NodeName string `desc:"Set gossip name for this agent"`

	Role string `desc:"Set gossip role for this agent routing"`

	// BindAddr is the address that the Auditor agent's communication ports
	// will bind to. Auditor will use this address to bind to for both TCP
	// and UDP connections. If no port is present in the address, the default
	// port will be used.
	BindAddr string `desc:"Address ip:port to expose gossip protocol"`

	// AdvertiseAddr is the address agent will advertise to
	// other members of the cluster. Can be used for basic NAT traversal
	// where both the internal ip:port and external ip:port are known.
	AdvertiseAddr string `desc:"Address ip:port to advertise in gossip if our bind addr is not reachable from other agents"`

	// MetricsAddr is the address where the metrics server will expose its
	// API to enable mterics collectors retrieve them
	MetricsAddr string `desc:"Address ip:port to expose metrics"`

	// LeaveOnTerm controls if the agent does a graceful leave when receiving
	// the TERM signal. Defaults false. This can be changed on reload.
	LeaveOnTerm bool `desc:"Controls if the agent does a graceful leave when receiving the TERM signal"`

	// StartJoin is a list of addresses to attempt to join when the
	// agent starts. If the agent is unable to communicate with any of these
	// addresses, then the agent will error and exit.
	StartJoin []string `desc:"Address list ip1:port1,ip2:port2... to join other agents and form a gossip network"`

	// EnableCompression specifies whether message compression is enabled
	// by `github.com/hashicorp/memberlist` when broadcasting events.
	EnableCompression bool `desc:"Specifies whether message compression is enabled when broadcasting events"`

	// BroadcastTimeout is the amount of time to wait for a broadcast
	// message to be sent to the cluster. Broadcast messages are used for
	// things like leave messages and force remove messages. If this is not
	// set, a timeout of 5 seconds will be set.
	BroadcastTimeout time.Duration `desc:"The amount of time to wait for a broadcast message to be sent to the cluster"`

	// LeavePropagateDelay is for our leave (node dead) message to propagate
	// through the cluster. In particular, we want to stay up long enough to
	// service any probes from other nodes before they learn about us
	// leaving and stop probing. Otherwise, we risk getting node failures as
	// we leave.
	LeavePropagateDelay time.Duration `desc:"Time for our leave (node dead) message to propagate through the cluster"`

	// MemberlistConfig is the memberlist configuration that Agent will
	// use to do the underlying membership management and gossip. Some
	// fields in the MemberlistConfig will be overwritten by Auditor no
	// matter what:
	//
	//   * Name - This will always be set to the same as the NodeName
	//     in this configuration.
	//
	//   * Events - Auditor uses a custom event delegate.
	//
	//   * Delegate - Auditor uses a custom delegate.
	//
	MemberlistConfig *memberlist.Config `flag:"-"`

	// Timeout enqueuing elements on a channel
	TimeoutQueues time.Duration `desc:"Timeout enqueuing elements on a channel"`

	// Interval to send out messages to other agents
	ProcessInterval time.Duration `desc:"Interval to send out messages to other agents"`

	// Maximum number of concurrent senders
	MaxSenders int `desc:"Maximum number of concurrent senders"`

	// Cache size in bytes to store agent temporal objects.
	// This cache will evict old objects by default
	CacheSize int `desc:"Cache size in bytes to store agent temporal objects"`
}

Config is the configuration for creating an Agent instance

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig contains the defaults for configurations.

func (*Config) AddrParts

func (c *Config) AddrParts(address string) (string, int, error)

AddrParts returns the parts of the BindAddr that should be used to configure the Node.

type Filter

type Filter func(m *Peer) bool

A filter function returns if a peer must me selected or not

type Message

type Message struct {
	Kind    MessageType
	From    *Peer
	TTL     int
	Payload []byte
}

Gossip message code. Up to 255 different messages.

func (*Message) Decode

func (m *Message) Decode(buf []byte) error

func (*Message) Encode

func (m *Message) Encode() ([]byte, error)

type MessageBus

type MessageBus struct {
	// contains filtered or unexported fields
}

Implements a subscriber / publisher model for gossip Messages

func (*MessageBus) Publish

func (eb *MessageBus) Publish(msg *Message) error

Publish a message to all the subscribers of its MessageType.

If there is no subscriber the message will not be sent and will be lost.

All the subscribers will get all the messages. If a subscriber is busy it will block delivery to the next subscribers. Also publish will create a goroutine per message sent, and will not time out.

func (*MessageBus) Subscribe

func (eb *MessageBus) Subscribe(t MessageType, s Subscriber, size int)

Subscribe add a subscriber to the its correspondant pool. Returns the subscription id needed for unsubscribe

func (*MessageBus) Unsubscribe

func (eb *MessageBus) Unsubscribe(t MessageType, id int)

Unsubscribe a subscriber by its id

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

Implements a message queue in which subscribers consumes producers Messages.

There is a queue for each kind of message, and all the producers and subscribers will operate over the same chan *Message.

This pattern allows a pool of subscribers to consume messages from a pool of producers without blocking.

func (*MessageQueue) Cancel

func (mq *MessageQueue) Cancel(t MessageType)

Cancels signals all producers and consumers to stop closing the internal channel

func (*MessageQueue) Consumer

func (mq *MessageQueue) Consumer(t MessageType, s Subscriber)

Register a consumer to the MessageType queue

func (*MessageQueue) Producer

func (mq *MessageQueue) Producer(t MessageType, p Producer)

Register a producer to the MessageType queue

type MessageType

type MessageType uint8
const (
	BatchMessageType MessageType = iota // Contains a protocol.BatchSnapshots
)

type Meta

type Meta struct {
	Role string
}

Agent metadata

func (*Meta) Decode

func (a *Meta) Decode(buf []byte) error

func (*Meta) Encode

func (a *Meta) Encode() ([]byte, error)

type Notifier

type Notifier interface {
	Alert(msg string) error
	Start()
	Stop()
}

Notifies string messages to external services. The process of sending the notifications is asynchronous, so a start and stop method is needed to activate/desactivate the process.

type Peer

type Peer struct {
	Name   string
	Addr   net.IP
	Port   uint16
	Meta   Meta
	Status Status
}

Member is a single member of the gossip cluster.

func NewPeer

func NewPeer(name, addr string, port uint16, role string) *Peer

Returns a new peer from the parameters configuration

func ParsePeer

func ParsePeer(node *memberlist.Node) *Peer

Builds a new peer from the memberlist.Node data

func (Peer) Node

func (p Peer) Node() *memberlist.Node

Returns a memberlist node from a peer datra

type PeerList

type PeerList struct {
	L []*Peer
}

Implements a list of peers which is able to filter, merge and take elements from the head

func NewPeerList

func NewPeerList() *PeerList

func (*PeerList) Append

func (l *PeerList) Append(m *PeerList)

Appends a peer list to the current list

func (*PeerList) Delete

func (l *PeerList) Delete(m *Peer)

Deletes a peer from the list by its name

func (*PeerList) Exclude

func (l *PeerList) Exclude(ex *PeerList) *PeerList

Returns a list with all the peers from the exclusion list removed

func (*PeerList) Filter

func (l *PeerList) Filter(f Filter) *PeerList

Returns a filtered peer list, containg only the peers the filter selected

func (*PeerList) Shuffle

func (l *PeerList) Shuffle() *PeerList

Returns the list randomly shuffled

func (PeerList) Size

func (l PeerList) Size() int

func (*PeerList) Take

func (l *PeerList) Take(n int) *PeerList

Returnsa new list with n peers included starting in the head of the list.

func (*PeerList) Update

func (l *PeerList) Update(m *Peer)

Updates a peer data by its name

type PrinterFactory

type PrinterFactory struct {
}

PrinterFactory create tasks than print BatchSnapshots for testing purposes. Its intented to be used with the BatchSnapshot processor

func (PrinterFactory) Metrics

func (p PrinterFactory) Metrics() []prometheus.Collector

func (PrinterFactory) New

func (p PrinterFactory) New(ctx context.Context) Task

type Processor

type Processor interface {
	Start()
	Stop()
	Metrics() []prometheus.Collector
}

A processor mission is to translate from and to the gossip network []byte type to whatever has semantic sense.

Also it should enqueue tasks in the agent task manager.

type Producer

type Producer interface {
	Produce(ch chan<- *Message)
}

A producers fills the chan *Message with the gossip messages to be consumed by a subscriber

type RestSnapshotStore

type RestSnapshotStore struct {
	// contains filtered or unexported fields
}

Implements access to a snapshot store in a http rest service. The process of sending the notifications is asynchronous, so a start and stop method is

func NewRestSnapshotStore

func NewRestSnapshotStore(endpoint []string, dialTimeout, readTimeout time.Duration) *RestSnapshotStore

Returns a new RestSnapshotStore client

func NewRestSnapshotStoreFromConfig

func NewRestSnapshotStoreFromConfig(c *RestSnapshotStoreConfig) *RestSnapshotStore

func (*RestSnapshotStore) Count

func (r *RestSnapshotStore) Count() (uint64, error)

func (*RestSnapshotStore) DeleteRange

func (r *RestSnapshotStore) DeleteRange(start uint64, end uint64) error

func (*RestSnapshotStore) GetRange

func (r *RestSnapshotStore) GetRange(start uint64, end uint64) ([]protocol.SignedSnapshot, error)

func (*RestSnapshotStore) GetSnapshot

func (r *RestSnapshotStore) GetSnapshot(version uint64) (*protocol.SignedSnapshot, error)

func (*RestSnapshotStore) PutBatch

Stores a batch int he store

func (*RestSnapshotStore) PutSnapshot

func (r *RestSnapshotStore) PutSnapshot(version uint64, snapshot *protocol.SignedSnapshot) error

type RestSnapshotStoreConfig

type RestSnapshotStoreConfig struct {
	Endpoint    []string      `desc:"REST snapshot store service endpoint list http://ip1:port1/path1,http://ip2:port2/path2... "`
	DialTimeout time.Duration `desc:"Timeout dialing the REST snapshot store service"`
	ReadTimeout time.Duration `desc:"Timeout reading the REST snapshot store service response"`
}

RestSnapshotStore configuration object used to parse cli options and to build the SimpleNotifier instance

func DefaultRestSnapshotStoreConfig

func DefaultRestSnapshotStoreConfig() *RestSnapshotStoreConfig

type SimpleNotifier

type SimpleNotifier struct {
	// contains filtered or unexported fields
}

Implements the default notification service client using an HTTP API:

This notifier posts the msg contents to the specified endpoint.

func NewSimpleNotifier

func NewSimpleNotifier(endpoint []string, size int, dialTimeout, readTimeout time.Duration) *SimpleNotifier

Returns a new default notififier client configured to post messages to the endpoint provided. To use the default timeouts of 200ms set them to 0:

queueTimeout is the time to wait for the queue to accept a new message
dialTimeout is the time to wait for dial to the notifications server
readTimeout is the time to wait for the notifications server response

func NewSimpleNotifierFromConfig

func NewSimpleNotifierFromConfig(c *SimpleNotifierConfig) *SimpleNotifier

Returns a SimpleNotifier pointer configured with configuration c.

func (*SimpleNotifier) Alert

func (n *SimpleNotifier) Alert(msg string) error

Alert enqueue a message into the notifications queue to be sent. It will block if the notifications queue is full.

func (*SimpleNotifier) Start

func (n *SimpleNotifier) Start()

Starts a process which send notifications

to a random url selected from the configuration list of urls.

func (*SimpleNotifier) Stop

func (n *SimpleNotifier) Stop()

Makes the notifications process to end

type SimpleNotifierConfig

type SimpleNotifierConfig struct {
	Endpoint    []string      `desc:"Notification service endpoint list http://ip1:port1/path1,http://ip2:port2/path2... "`
	QueueSize   int           `desc:"Notifications queue size"`
	DialTimeout time.Duration `desc:"Timeout dialing the notification service"`
	ReadTimeout time.Duration `desc:"Timeout reading the notification service response"`
}

SimpleNotifier configuration object used to parse cli options and to build the SimpleNotifier instance

func DefaultSimpleNotifierConfig

func DefaultSimpleNotifierConfig() *SimpleNotifierConfig

Returns the default configuration for the SimpleNotifier

type SimpleTasksManager

type SimpleTasksManager struct {
	// contains filtered or unexported fields
}

Simple implementation of a task manager used by the QED provided agents

func NewSimpleTasksManager

func NewSimpleTasksManager(i time.Duration, max int) *SimpleTasksManager

NewTasksManager returns a new TasksManager and its task channel. The execution loop will try to execute up to maxTasks tasks each interval. Also the channel has maxTasks capacity.

func NewSimpleTasksManagerFromConfig

func NewSimpleTasksManagerFromConfig(c *SimpleTasksManagerConfig) *SimpleTasksManager

func (*SimpleTasksManager) Add

func (t *SimpleTasksManager) Add(task Task) error

Add a task to the task manager queue, with the configured timed out. It will block until the task is read if the channel is full.

func (*SimpleTasksManager) Len

func (t *SimpleTasksManager) Len() int

Len returns the number of pending tasks enqueued in the tasks channel

func (*SimpleTasksManager) Start

func (t *SimpleTasksManager) Start()

Start activates the task dispatcher to execute enqueued tasks

func (*SimpleTasksManager) Stop

func (t *SimpleTasksManager) Stop()

Stop disables the task dispatcher It does not wait to empty the task queue nor closes the task channel.

type SimpleTasksManagerConfig

type SimpleTasksManagerConfig struct {
	Interval time.Duration `desc:"Interval to execute enqueued tasks"`
	MaxTasks int           `desc:"Maximum number of concurrent tasks"`
}

SimpleTasksManager configuration object used to parse cli options and to build the SimpleNotifier instance

func DefaultSimpleTasksManagerConfig

func DefaultSimpleTasksManagerConfig() *SimpleTasksManagerConfig

Returns the default configuration for the SimpleTasksManager

type SnapshotStore

type SnapshotStore interface {
	PutBatch(b *protocol.BatchSnapshots) error
	PutSnapshot(version uint64, snapshot *protocol.SignedSnapshot) error
	GetRange(start, end uint64) ([]protocol.SignedSnapshot, error)
	GetSnapshot(version uint64) (*protocol.SignedSnapshot, error)
	DeleteRange(start, end uint64) error
	Count() (uint64, error)
}

type Status

type Status int32

Status is the state of the Agent instance.

const (
	AgentStatusAlive Status = iota
	AgentStatusLeaving
	AgentStatusLeft
	AgentStatusShutdown
	AgentStatusFailed
)

func (Status) String

func (s Status) String() string

type StoreItem

type StoreItem struct {
	Key, Value []byte
}

func (StoreItem) Less

func (p StoreItem) Less(b btree.Item) bool

type Subscriber

type Subscriber interface {
	Subscribe(id int, ch <-chan *Message)
}

A Subscriber to agent gossip message queues is like:

func (my *BatchProcessor) Subscribe(c chan *Message) MessageType {
	my.inCh = c
	return BatchMessageType
}

and receives a channel to read publised messages from the selected MessageType

type Subscribers

type Subscribers []chan *Message

type Task

type Task func() error

Task are executed by the TasksManager loop calling to their Do method. All tasks must have their own context already when they are enqueued into the task manager.

For example: // complies with the Processor interface

func TaskFactory(ctx contex.Context) Task {
	a := ctx.Value("agent").(gossip.Agent)
	b := ctx.Value("batch").(*protocol.BatchSnapshots)
	return func() error {
		fmt.Println(a.Send(b))
		return nil
	}
}

type TaskFactory

type TaskFactory interface {
	New(context.Context) Task
	Metrics() []prometheus.Collector
}

A task factory builds tasks with the provided context information.

The context contains the agent API and the context added by the message processor.

The implenetor of the task factory must know the details of the data included by the processor in the context.

type TasksManager

type TasksManager interface {
	Start()
	Stop()
	Add(t Task) error
	Len() int
}

TasksManager executes enqueued tasks, It is in charge of applying limits to task execution such as timeouts. It only has an API to stop and start the tasks execution loop.

type Topology

type Topology struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Hold the gossip network information as this node sees it. This information can be used to route messages to other nodes.

func NewTopology

func NewTopology() *Topology

Returns a new empty topology

func (*Topology) Delete

func (t *Topology) Delete(p *Peer) error

Deletes a peer from the topology

func (*Topology) Each

func (t *Topology) Each(n int, l *PeerList) *PeerList

Returns a peer list of each kind with n elements on each kind, Each list is built excluding all the nodes in the list l, shuffling the result, and taking the n elements from the head of the list.

func (*Topology) Get

func (t *Topology) Get(kind string) *PeerList

Returns a list of peers of a given kind

func (*Topology) Update

func (t *Topology) Update(p *Peer) error

Updates the topology with the peer information

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL