Documentation ¶
Index ¶
- Variables
- type Client
- func (c *Client) CAS(ctx context.Context, key string, ...) error
- func (c *Client) Delete(_ context.Context, _ string) error
- func (c *Client) Get(ctx context.Context, key string) (interface{}, error)
- func (c *Client) List(ctx context.Context, prefix string) ([]string, error)
- func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool)
- func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
- type DNSProvider
- type HTTPStatusHandler
- type KV
- func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, ...) error
- func (m *KV) Collect(ch chan<- prometheus.Metric)
- func (m *KV) Describe(ch chan<- *prometheus.Desc)
- func (m *KV) Get(key string, codec codec.Codec) (interface{}, error)
- func (m *KV) GetBroadcasts(overhead, limit int) [][]byte
- func (m *KV) GetCodec(codecID string) codec.Codec
- func (m *KV) GetListeningPort() int
- func (m *KV) JoinMembers(members []string) (int, error)
- func (m *KV) List(prefix string) []string
- func (m *KV) LocalState(_ bool) []byte
- func (m *KV) MergeRemoteState(data []byte, _ bool)
- func (m *KV) NodeMeta(_ int) []byte
- func (m *KV) NotifyMsg(msg []byte)
- func (m *KV) WatchKey(ctx context.Context, key string, codec codec.Codec, f func(interface{}) bool)
- func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, ...)
- type KVConfig
- type KVInitService
- type KeyValuePair
- func (*KeyValuePair) Descriptor() ([]byte, []int)
- func (this *KeyValuePair) Equal(that interface{}) bool
- func (m *KeyValuePair) GetCodec() string
- func (m *KeyValuePair) GetKey() string
- func (m *KeyValuePair) GetValue() []byte
- func (this *KeyValuePair) GoString() string
- func (m *KeyValuePair) Marshal() (dAtA []byte, err error)
- func (m *KeyValuePair) MarshalTo(dAtA []byte) (int, error)
- func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*KeyValuePair) ProtoMessage()
- func (m *KeyValuePair) Reset()
- func (m *KeyValuePair) Size() (n int)
- func (this *KeyValuePair) String() string
- func (m *KeyValuePair) Unmarshal(dAtA []byte) error
- func (m *KeyValuePair) XXX_DiscardUnknown()
- func (m *KeyValuePair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *KeyValuePair) XXX_Merge(src proto.Message)
- func (m *KeyValuePair) XXX_Size() int
- func (m *KeyValuePair) XXX_Unmarshal(b []byte) error
- type KeyValueStore
- func (*KeyValueStore) Descriptor() ([]byte, []int)
- func (this *KeyValueStore) Equal(that interface{}) bool
- func (m *KeyValueStore) GetPairs() []*KeyValuePair
- func (this *KeyValueStore) GoString() string
- func (m *KeyValueStore) Marshal() (dAtA []byte, err error)
- func (m *KeyValueStore) MarshalTo(dAtA []byte) (int, error)
- func (m *KeyValueStore) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*KeyValueStore) ProtoMessage()
- func (m *KeyValueStore) Reset()
- func (m *KeyValueStore) Size() (n int)
- func (this *KeyValueStore) String() string
- func (m *KeyValueStore) Unmarshal(dAtA []byte) error
- func (m *KeyValueStore) XXX_DiscardUnknown()
- func (m *KeyValueStore) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *KeyValueStore) XXX_Merge(src proto.Message)
- func (m *KeyValueStore) XXX_Size() int
- func (m *KeyValueStore) XXX_Unmarshal(b []byte) error
- type Mergeable
- type Message
- type StatusPageData
- type TCPTransport
- func (t *TCPTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
- func (t *TCPTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
- func (t *TCPTransport) GetAutoBindPort() int
- func (t *TCPTransport) PacketCh() <-chan *memberlist.Packet
- func (t *TCPTransport) Shutdown() error
- func (t *TCPTransport) StreamCh() <-chan net.Conn
- func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error)
- type TCPTransportConfig
- type ValueDesc
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthKv = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowKv = fmt.Errorf("proto: integer overflow") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements kv.Client interface, by using memberlist.KV
func NewClient ¶
NewClient creates new client instance. Supplied codec must already be registered in KV.
func (*Client) CAS ¶
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error
CAS is part of kv.Client interface
type DNSProvider ¶
type DNSProvider interface { // Resolve stores a list of provided addresses or their DNS records if requested. // Implementations may have specific ways of interpreting addresses. Resolve(ctx context.Context, addrs []string) error // Addresses returns the latest addresses present in the DNSProvider. Addresses() []string }
DNSProvider supports storing or resolving a list of addresses.
type HTTPStatusHandler ¶
type HTTPStatusHandler struct {
// contains filtered or unexported fields
}
HTTPStatusHandler is a http.Handler with status information about memberlist.
func NewHTTPStatusHandler ¶
func NewHTTPStatusHandler(kvs *KVInitService, tpl *template.Template) HTTPStatusHandler
NewHTTPStatusHandler creates a new HTTPStatusHandler that will render the provided template using the data from StatusPageData.
func (HTTPStatusHandler) ServeHTTP ¶
func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
type KV ¶
type KV struct { services.NamedService // contains filtered or unexported fields }
KV implements Key-Value store on top of memberlist library. KV store has API similar to kv.Client, except methods also need explicit codec for each operation. KV is a Service. It needs to be started first, and is only usable once it enters Running state. If joining of the cluster if configured, it is done in Running state, and if join fails and Abort flag is set, service fails.
func NewKV ¶
func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KV
NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned and service enters Failed state.
func (*KV) CAS ¶
func (m *KV) CAS(ctx context.Context, key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) error
CAS implements Compare-And-Set/Swap operation.
CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately.
This method combines Compare-And-Swap with Merge: it calls 'f' function to get a new state, and then merges this new state into current state, to find out what the change was. Resulting updated current state is then CAS-ed to KV store, and change is broadcast to cluster peers. Merge function is called with CAS flag on, so that it can detect removals. If Merge doesn't result in any change (returns nil), then operation fails and is retried again. After too many failed retries, this method returns error.
func (*KV) Collect ¶
func (m *KV) Collect(ch chan<- prometheus.Metric)
Collect returns extra metrics via supplied channel
func (*KV) Describe ¶
func (m *KV) Describe(ch chan<- *prometheus.Desc)
Describe returns prometheus descriptions via supplied channel
func (*KV) Get ¶
Get returns current value associated with given key. No communication with other nodes in the cluster is done here.
func (*KV) GetBroadcasts ¶
GetBroadcasts is method from Memberlist Delegate interface It returns all pending broadcasts (within the size limit)
func (*KV) GetListeningPort ¶
GetListeningPort returns port used for listening for memberlist communication. Useful when BindPort is set to 0. This call is only valid after KV service has been started.
func (*KV) JoinMembers ¶
JoinMembers joins the cluster with given members. See https://godoc.org/github.com/hashicorp/memberlist#Memberlist.Join This call is only valid after KV service has been started and is still running.
func (*KV) List ¶
List returns all known keys under a given prefix. No communication with other nodes in the cluster is done here.
func (*KV) LocalState ¶
LocalState is method from Memberlist Delegate interface
This is "pull" part of push/pull sync (either periodic, or when new node joins the cluster). Here we dump our entire state -- all keys and their values. There is no limit on message size here, as Memberlist uses 'stream' operations for transferring this state.
func (*KV) MergeRemoteState ¶
MergeRemoteState is a method from the Memberlist Delegate interface.
This is 'push' part of push/pull sync. We merge incoming KV store (all keys and values) with ours.
Data is full state of remote KV store, as generated by LocalState method (run on another node).
func (*KV) NotifyMsg ¶
NotifyMsg is method from Memberlist Delegate interface Called when single message is received, i.e. what our broadcastNewValue has sent.
func (*KV) WatchKey ¶
WatchKey watches for value changes for given key. When value changes, 'f' function is called with the latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call.
Watching ends when 'f' returns false, context is done, or this client is shut down.
func (*KV) WatchPrefix ¶
func (m *KV) WatchPrefix(ctx context.Context, prefix string, codec codec.Codec, f func(string, interface{}) bool)
WatchPrefix watches for any change of values stored under keys with given prefix. When change occurs, function 'f' is called with key and current value. Each change of the key results in one notification. If there are too many pending notifications ('f' is slow), some notifications may be lost.
Watching ends when 'f' returns false, context is done, or this client is shut down.
type KVConfig ¶
type KVConfig struct { // Memberlist options. NodeName string `yaml:"node_name" category:"advanced"` RandomizeNodeName bool `yaml:"randomize_node_name" category:"advanced"` StreamTimeout time.Duration `yaml:"stream_timeout" category:"advanced"` RetransmitMult int `yaml:"retransmit_factor" category:"advanced"` PushPullInterval time.Duration `yaml:"pull_push_interval" category:"advanced"` GossipInterval time.Duration `yaml:"gossip_interval" category:"advanced"` GossipNodes int `yaml:"gossip_nodes" category:"advanced"` GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` EnableCompression bool `yaml:"compression_enabled" category:"advanced"` NotifyInterval time.Duration `yaml:"notify_interval" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` AdvertisePort int `yaml:"advertise_port"` ClusterLabel string `yaml:"cluster_label" category:"advanced"` ClusterLabelVerificationDisabled bool `yaml:"cluster_label_verification_disabled" category:"advanced"` // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"` MaxJoinBackoff time.Duration `yaml:"max_join_backoff" category:"advanced"` MaxJoinRetries int `yaml:"max_join_retries" category:"advanced"` AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"` RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"` // Remove LEFT ingesters from ring after this timeout. LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` BroadcastTimeoutForLocalUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout_for_local_updates_on_shutdown" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` TCPTransport TCPTransportConfig `yaml:",inline"` MetricsNamespace string `yaml:"-"` // Codecs to register. Codecs need to be registered before joining other members. Codecs []codec.Codec `yaml:"-"` }
KVConfig is a config for memberlist.KV
func (*KVConfig) RegisterFlags ¶
type KVInitService ¶
KVInitService initializes a memberlist.KV on first call to GetMemberlistKV, and starts it. On stop, KV is stopped too. If KV fails, error is reported from the service.
func NewKVInitService ¶
func NewKVInitService(cfg *KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KVInitService
func (*KVInitService) GetMemberlistKV ¶
func (kvs *KVInitService) GetMemberlistKV() (*KV, error)
GetMemberlistKV will initialize Memberlist.KV on first call, and add it to service failure watcher.
func (*KVInitService) ServeHTTP ¶
func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request)
type KeyValuePair ¶
type KeyValuePair struct { Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // ID of the codec used to write the value Codec string `protobuf:"bytes,3,opt,name=codec,proto3" json:"codec,omitempty"` }
Single Key-Value pair. Key must be non-empty.
func (*KeyValuePair) Descriptor ¶
func (*KeyValuePair) Descriptor() ([]byte, []int)
func (*KeyValuePair) Equal ¶
func (this *KeyValuePair) Equal(that interface{}) bool
func (*KeyValuePair) GetCodec ¶
func (m *KeyValuePair) GetCodec() string
func (*KeyValuePair) GetKey ¶
func (m *KeyValuePair) GetKey() string
func (*KeyValuePair) GetValue ¶
func (m *KeyValuePair) GetValue() []byte
func (*KeyValuePair) GoString ¶
func (this *KeyValuePair) GoString() string
func (*KeyValuePair) Marshal ¶
func (m *KeyValuePair) Marshal() (dAtA []byte, err error)
func (*KeyValuePair) MarshalToSizedBuffer ¶
func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*KeyValuePair) ProtoMessage ¶
func (*KeyValuePair) ProtoMessage()
func (*KeyValuePair) Reset ¶
func (m *KeyValuePair) Reset()
func (*KeyValuePair) Size ¶
func (m *KeyValuePair) Size() (n int)
func (*KeyValuePair) String ¶
func (this *KeyValuePair) String() string
func (*KeyValuePair) Unmarshal ¶
func (m *KeyValuePair) Unmarshal(dAtA []byte) error
func (*KeyValuePair) XXX_DiscardUnknown ¶
func (m *KeyValuePair) XXX_DiscardUnknown()
func (*KeyValuePair) XXX_Marshal ¶
func (m *KeyValuePair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*KeyValuePair) XXX_Merge ¶
func (m *KeyValuePair) XXX_Merge(src proto.Message)
func (*KeyValuePair) XXX_Size ¶
func (m *KeyValuePair) XXX_Size() int
func (*KeyValuePair) XXX_Unmarshal ¶
func (m *KeyValuePair) XXX_Unmarshal(b []byte) error
type KeyValueStore ¶
type KeyValueStore struct {
Pairs []*KeyValuePair `protobuf:"bytes,1,rep,name=pairs,proto3" json:"pairs,omitempty"`
}
KV Store is just a series of key-value pairs.
func (*KeyValueStore) Descriptor ¶
func (*KeyValueStore) Descriptor() ([]byte, []int)
func (*KeyValueStore) Equal ¶
func (this *KeyValueStore) Equal(that interface{}) bool
func (*KeyValueStore) GetPairs ¶
func (m *KeyValueStore) GetPairs() []*KeyValuePair
func (*KeyValueStore) GoString ¶
func (this *KeyValueStore) GoString() string
func (*KeyValueStore) Marshal ¶
func (m *KeyValueStore) Marshal() (dAtA []byte, err error)
func (*KeyValueStore) MarshalToSizedBuffer ¶
func (m *KeyValueStore) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*KeyValueStore) ProtoMessage ¶
func (*KeyValueStore) ProtoMessage()
func (*KeyValueStore) Reset ¶
func (m *KeyValueStore) Reset()
func (*KeyValueStore) Size ¶
func (m *KeyValueStore) Size() (n int)
func (*KeyValueStore) String ¶
func (this *KeyValueStore) String() string
func (*KeyValueStore) Unmarshal ¶
func (m *KeyValueStore) Unmarshal(dAtA []byte) error
func (*KeyValueStore) XXX_DiscardUnknown ¶
func (m *KeyValueStore) XXX_DiscardUnknown()
func (*KeyValueStore) XXX_Marshal ¶
func (m *KeyValueStore) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*KeyValueStore) XXX_Merge ¶
func (m *KeyValueStore) XXX_Merge(src proto.Message)
func (*KeyValueStore) XXX_Size ¶
func (m *KeyValueStore) XXX_Size() int
func (*KeyValueStore) XXX_Unmarshal ¶
func (m *KeyValueStore) XXX_Unmarshal(b []byte) error
type Mergeable ¶
type Mergeable interface { // Merge with other value in place. Returns change, that can be sent to other clients. // If merge doesn't result in any change, returns nil. // Error can be returned if merging with given 'other' value is not possible. // Implementors of this method are permitted to modify the other parameter, as the // memberlist-based KV store will not use the same "other" parameter in multiple Merge calls. // // In order for state merging to work correctly, Merge function must have some properties. When talking about the // result of the merge in the following text, we don't mean the return value ("change"), but the // end-state of receiver. That means Result of A.Merge(B) is end-state of A. // // Memberlist-based KV store will keep the result even if Merge returned no change. Implementations should // be careful about not changing logical value when returning empty change. // // Idempotency: // Result of applying the same state "B" to state "A" (A.Merge(B)) multiple times has the same effect as // applying it only once. Only first Merge will return non-empty change. // // Commutativity: // A.Merge(B) has the same result as B.Merge(A) (result being the end state of A or B respectively) // // Associativity: // calling B.Merge(C), and then A.Merge(B) has the same result as // calling A.Merge(B) first, and then calling A.Merge(C), // that is, A will end up in the same state in both cases. // // LocalCAS flag is used when doing Merge as part of local CAS operation on KV store. It can be used to detect // missing entries, and generate tombstones. (This breaks commutativity and associativity [!] so it can *only* be // used when doing CAS operation) Merge(other Mergeable, localCAS bool) (change Mergeable, error error) // MergeContent describes the content of this mergeable value. Used by memberlist client to decide if // one change-value can invalidate some other value, that was received previously. // Invalidation can happen only if output of MergeContent is a superset of some other MergeContent. MergeContent() []string // RemoveTombstones remove tombstones older than given limit from this mergeable. // If limit is zero time, remove all tombstones. Memberlist client calls this method with zero limit each // time when client is accessing value from the store. It can be used to hide tombstones from the clients. // Returns the total number of tombstones present and the number of removed tombstones by this invocation. RemoveTombstones(limit time.Time) (total, removed int) // Clone returns a deep copy of the state. Clone() Mergeable }
Mergeable is an interface that values used in gossiping KV Client must implement. It allows merging of different states, obtained via gossiping or CAS function.
type Message ¶
type Message struct { ID int // Unique local ID of the message. Time time.Time // Time when message was sent or received. Size int // Message size Pair KeyValuePair // Following values are computed on the receiving node, based on local state. Version uint // For sent message, which version the message reflects. For received message, version after applying the message. Changes []string // List of changes in this message (as computed by *this* node). }
Message describes incoming or outgoing message, and local state after applying incoming message, or state when sending message. Fields are exported for templating to work.
type StatusPageData ¶
type StatusPageData struct { Now time.Time Memberlist *memberlist.Memberlist SortedMembers []*memberlist.Node Store map[string]ValueDesc MessageHistoryBufferBytes int SentMessages []Message ReceivedMessages []Message }
StatusPageData represents the data passed to the template rendered by HTTPStatusHandler
type TCPTransport ¶
type TCPTransport struct {
// contains filtered or unexported fields
}
TCPTransport is a memberlist.Transport implementation that uses TCP for both packet and stream operations ("packet" and "stream" are terms used by memberlist). It uses a new TCP connections for each operation. There is no connection reuse.
func NewTCPTransport ¶
func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer prometheus.Registerer) (*TCPTransport, error)
NewTCPTransport returns a new tcp-based transport with the given configuration. On success all the network listeners will be created and listening.
func (*TCPTransport) DialTimeout ¶
DialTimeout is used to create a connection that allows memberlist to perform two-way communication with a peer.
func (*TCPTransport) FinalAdvertiseAddr ¶
FinalAdvertiseAddr is given the user's configured values (which might be empty) and returns the desired IP and port to advertise to the rest of the cluster. (Copied from memberlist' net_transport.go)
func (*TCPTransport) GetAutoBindPort ¶
func (t *TCPTransport) GetAutoBindPort() int
GetAutoBindPort returns the bind port that was automatically given by the kernel, if a bind port of 0 was given.
func (*TCPTransport) PacketCh ¶
func (t *TCPTransport) PacketCh() <-chan *memberlist.Packet
PacketCh returns a channel that can be read to receive incoming packets from other peers.
func (*TCPTransport) Shutdown ¶
func (t *TCPTransport) Shutdown() error
Shutdown is called when memberlist is shutting down; this gives the transport a chance to clean up any listeners. This will avoid log spam about errors when we shut down.
func (*TCPTransport) StreamCh ¶
func (t *TCPTransport) StreamCh() <-chan net.Conn
StreamCh returns a channel that can be read to handle incoming stream connections from other peers.
type TCPTransportConfig ¶
type TCPTransportConfig struct { // BindAddrs is a list of IP addresses to bind to. BindAddrs flagext.StringSlice `yaml:"bind_addr"` // BindPort is the port to listen on, for each address above. BindPort int `yaml:"bind_port"` // Timeout used when making connections to other nodes to send packet. // Zero = no timeout PacketDialTimeout time.Duration `yaml:"packet_dial_timeout" category:"advanced"` // Timeout for writing packet data. Zero = no timeout. PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` // Maximum number of concurrent writes to other nodes. MaxConcurrentWrites int `yaml:"max_concurrent_writes" category:"advanced"` // Timeout for acquiring one of the concurrent write slots. AcquireWriterTimeout time.Duration `yaml:"acquire_writer_timeout" category:"advanced"` // Transport logs lots of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-" category:"advanced"` // Where to put custom metrics. nil = don't register. MetricsNamespace string `yaml:"-"` TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` TLS dstls.ClientConfig `yaml:",inline"` }
TCPTransportConfig is a configuration structure for creating new TCPTransport.
func (*TCPTransportConfig) RegisterFlags ¶
func (cfg *TCPTransportConfig) RegisterFlags(f *flag.FlagSet)
func (*TCPTransportConfig) RegisterFlagsWithPrefix ¶
func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)
RegisterFlagsWithPrefix registers flags with prefix.
type ValueDesc ¶
type ValueDesc struct { // Version (local only) is used to keep track of what we're gossiping about, and invalidate old messages. Version uint // ID of codec used to write this value. Only used when sending full state. CodecID string // contains filtered or unexported fields }
ValueDesc stores the value along with it's codec and local version.