syncproto

package
v0.0.0-...-21cfbab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2023 License: Apache-2.0, Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package syncproto defines the structs used in the Felix/Typha protocol.

Overview

Felix connects to Typha over a TCP socket, then Felix initiates the (synchronous) handshake consisting of a ClientHello then a ServerHello message.

Once the handshake is complete, Typha sends a series of KV pairs to Felix, amounting to a complete snapshot of the datastore. It may send more than one KV message, each containing one or more KV pairs.

Once a complete snapshot has been sent, Typha sends a SyncStatus message with its current sync status. This is typically "InSync" but it may be another status, such as "Resync" if Typha itself is resyncing with the datastore.

At any point after the handshake, Typha may send a Ping message, which Felix should respond to as quickly as possible with a Pong (if Typha doesn't receive a timely response it may terminate the connection).

After the initial snapshot is sent, Typha sends KVs and SyncStatus messages as new updates are received from the datastore.

+-------+                +-------+
| Felix |                | Typha |
+-------+                +-------+
|                        |
| connect                |
|----------------------->|
|                        | -------------------------------\
|                        |-| accept, wait for ClientHello |
|                        | |------------------------------|
|                        |
| ClientHello            |
|----------------------->|
|                        |
|            ServerHello |
|<-----------------------|
|                        |
|                        | -------------------------------------------------------\
|--------------------------| if compression enabled, restart client's gob decoder |
|                        | | send ACK when ready to receive compressed data       |
| ACK                    | |------------------------------------------------------|
|----------------------->|
|                        | -------------------------------------------------------\
|                        |-| if compression enabled, restart server's gob encoder |
|                        | | with compressed stream                               |
|                        | |------------------------------------------------------|
|                        |
|                        | ------------------------------------\
|                        |-| start KV send & pinger goroutines |
|                        | |-----------------------------------|
|                        |
|                KVs * n |
|<-----------------------|
|                        |
|                   Ping |
|<-----------------------|
|                        |
| Pong                   |
|----------------------->|
|                        |
|                KVs * n |
|<-----------------------|
|                        |
|     SyncStatus(InSync) |
|<-----------------------|
|                        |
|                KVs * n |
|<-----------------------|
|                        |

Wire format

The protocol uses gob to encode messages. Each message is wrapped in an Envelope struct to simplify decoding.

Key/value pairs are encoded as SerializedUpdate objects. These contain the KV pair along with the Syncer metadata about the update (such as its revision and update type). The key and value are encoded to the libcalico-go "default" encoding, as used when storing data in, for example, etcd. I.e. the gob struct contains string and []byte fields to hold the key and value, respectively. Doing this has some advantages:

(1) It avoids any subtle incompatibility between our datamodel and gob.

(2) It removes the need to register all our datatypes with the gob en/decoder.

(3) It re-uses known-good serialization code with known semantics around
    data-model upgrade.  I.e. since that serialization is based on the JSON
    marshaller, we know how it treats added/removed fields.

(4) It allows us to do the serialization of each KV pair once and send it to
    all listening clients.  (Doing this in gob is not easy because the gob
    connection is stateful.)

Upgrading the datamodel

Some care needs to be taken when upgrading Felix and Typha to ensure that datamodel changes are correctly handled.

Since Typha parses resources from the datamodel and then serializes them again,

  • Typha can only pass through resources (and fields) that were present in the version of libcalico-go that it was compiled against.

  • Similarly, Felix can only parse resources and fields that were present in the version of libcalico-go that it was compiled against.

  • It is important that even synthesized resources (for example, those that are generated by the Kubernetes datastore driver) are serializable, even if we never normally write them to a key/value based datastore such as etcd.

In the common case, where a new field is added to the datamodel:

  • If a new Felix connects to an old Typha then Typha will strip the new field at parse-time and pass the object through to Felix. Hence Felix will behave as if the field wasn't present. As long as the field was added in a back-compatible way, Felix should default to its old behaviour and the overall outcome will be that new Felix will behave as if it was an old Felix.

  • If an old Felix connects to a new Typha, then Typha will pass through the new field to Felix but Felix will strip it out when it parses the update.

Where a whole new resource is added:

  • If a new Felix connects to an old Typha then Typha will ignore the new resource so it is important that Felix is engineered to allow for missing resources in that case.

  • If an old Felix connects to a new Typha then Typha will send the resource but the old Felix will fail to parse it. In that case, the Typha client code used by Felix drops the KV pair and logs an error.

In more complicated cases: it's important to think through how the above cases play out. For example, removing one synthesized resource type and adding another to take its place may no longer work as intended since the new one will get stripped out when a mixed Typha/Felix version connection occurs.

If such a change does need to be made, we could treat it as a Typha protocol upgrade as described below.

Upgrading the Typha protocol

In general, even fairly large changes to the protocol can be managed by suitable signalling in the handshake. The most important limitation to be aware of is that neither server nor client should send a new message _type_ to the other without verifying that the other supports that message type. In concrete terms, that means adding fields to the handshake to advertise support for particular features and then for the other side to check that the new field is set before sending the new message types. This works because gob defaults unknown fields to their zero value on read, so, if the peer doesn't say "SupportsFeatureX: true" in their Hello message, then you'll see "SupportsFeatureX: false" at the other side. If you send a message that the peer doesn't understand, decoding will return an error.

It's also possible to switch from one protocol to another mid-stream. We do this to enable compression. The main gotcha is to ensure that no new format data is sent until after the other side has acknowledged that it has drained the old format data and prepared the new format decoder. Otherwise the old format decoder may eagerly read data in the new format into its buffer and get confused.

Index

Constants

View Source
const DefaultPort = 5473

Variables

View Source
var (
	// AllSyncerTypes contains each of the SyncerType constants. We use an array rather than a slice for a
	// compile-time length check.
	AllSyncerTypes = [NumSyncerTypes]SyncerType{
		SyncerTypeFelix,
		SyncerTypeBGP,
		SyncerTypeTunnelIPAllocation,
		SyncerTypeNodeStatus,
	}
)
View Source
var ErrBadKey = errors.New("Unable to parse key.")

Functions

This section is empty.

Types

type CompressionAlgorithm

type CompressionAlgorithm string
const (
	CompressionSnappy CompressionAlgorithm = "snappy"
)

type Envelope

type Envelope struct {
	Message interface{}
}

func (Envelope) String

func (e Envelope) String() string

logrus only looks for a String() method on the top-level object, make sure we call through to the wrapped object.

type MsgACK

type MsgACK struct {
}

MsgACK is a general-purpose ACK message, currently used during the initial handshake to acknowledge the switch to compressed mode.

type MsgClientHello

type MsgClientHello struct {
	Hostname string
	Info     string
	Version  string

	// SyncerType the requested syncer type.  Added in v3.3; if client doesn't provide a value, assumed to be
	// SyncerTypeFelix.
	SyncerType SyncerType

	SupportsDecoderRestart         bool
	SupportedCompressionAlgorithms []CompressionAlgorithm

	ClientConnID uint64
}

MsgClientHello is the first message sent by the client after it opens the connection. It begins the handshake. It includes a request to use a particular kind of syncer and tells the server what features are supported.

type MsgDecoderRestart

type MsgDecoderRestart struct {
	Message              string
	CompressionAlgorithm CompressionAlgorithm
}

MsgDecoderRestart is sent (currently only from server to client) to tell it to restart its decoder with new parameters.

type MsgKVs

type MsgKVs struct {
	KVs []SerializedUpdate
}

func (MsgKVs) String

func (m MsgKVs) String() string

type MsgPing

type MsgPing struct {
	Timestamp time.Time
}

type MsgPong

type MsgPong struct {
	PingTimestamp time.Time
	PongTimestamp time.Time
}

type MsgServerHello

type MsgServerHello struct {
	Version string

	// SyncerType the active syncer type; if not specified, implies that the server is an older Typha instance that
	// only supports SyncerTypeFelix.
	SyncerType SyncerType

	// SupportsNodeResourceUpdates provides to the client whether this Typha supports node resource updates.
	SupportsNodeResourceUpdates bool

	ServerConnID uint64
}

MsgServerHello is the server's response to MsgClientHello.

type MsgSyncStatus

type MsgSyncStatus struct {
	SyncStatus api.SyncStatus
}

type SerializedUpdate

type SerializedUpdate struct {
	Key               string
	Value             []byte
	Revision          interface{}
	V3ResourceVersion string
	TTL               time.Duration
	UpdateType        api.UpdateType
}

func SerializeUpdate

func SerializeUpdate(u api.Update) (su SerializedUpdate, err error)

func (SerializedUpdate) String

func (s SerializedUpdate) String() string

func (SerializedUpdate) ToUpdate

func (s SerializedUpdate) ToUpdate() (api.Update, error)

func (SerializedUpdate) WouldBeNoOp

func (s SerializedUpdate) WouldBeNoOp(previous SerializedUpdate) bool

WouldBeNoOp returns true if this update would be a no-op given that previous has already been sent.

type SyncerType

type SyncerType string
const (
	SyncerTypeFelix              SyncerType = "felix"
	SyncerTypeBGP                SyncerType = "bgp"
	SyncerTypeTunnelIPAllocation SyncerType = "tunnel-ip-allocation"
	SyncerTypeNodeStatus         SyncerType = "node-status"

	// NumSyncerTypes is the number of SyncerType constants above.  For iota to pick up the correct value, all
	// new consts must be added to this block and NumSyncerTypes must be the last entry.
	NumSyncerTypes = iota
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL