client

package
v0.0.0-...-d3ccc4f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2018 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AllowForwardRequestHeaderKey is a request header key.
	// If this header is set, the syncmaster will forward
	// requests to the current leader instead of returning a
	// 503.
	AllowForwardRequestHeaderKey = "X-Allow-Forward-To-Leader"
)
View Source
const (
	// ClientIDHeaderKey is the name of a request header containing the ID that is
	// making the request.
	ClientIDHeaderKey = "X-ArangoSync-Client-ID"
)

Variables

View Source
var (

	// NotFoundError indicates that an object does not exist.
	NotFoundError = StatusError{StatusCode: http.StatusNotFound, /* contains filtered or unexported fields */}
	// ServiceUnavailableError indicates that right now the service is not available, please retry later.
	ServiceUnavailableError = StatusError{StatusCode: http.StatusServiceUnavailable, /* contains filtered or unexported fields */}
	// BadRequestError indicates invalid arguments.
	BadRequestError = StatusError{StatusCode: http.StatusBadRequest, /* contains filtered or unexported fields */}
	// PreconditionFailedError indicates that the state of the system is such that the request cannot be executed.
	PreconditionFailedError = StatusError{StatusCode: http.StatusPreconditionFailed, /* contains filtered or unexported fields */}
	// InternalServerError indicates an unspecified error inside the server, perhaps a bug.
	InternalServerError = StatusError{StatusCode: http.StatusInternalServerError, /* contains filtered or unexported fields */}
	// UnauthorizedError indicates that the request has not the correct authorization.
	UnauthorizedError = StatusError{StatusCode: http.StatusUnauthorized, /* contains filtered or unexported fields */}
	// RequestTimeoutError indicates that the request is taken longer than we're prepared to wait.
	RequestTimeoutError = StatusError{StatusCode: http.StatusRequestTimeout, /* contains filtered or unexported fields */}
)
View Source
var (
	// ValidSyncStatusValues is a list of all possible sync status values.
	ValidSyncStatusValues = []SyncStatus{
		SyncStatusInactive,
		SyncStatusInitializing,
		SyncStatusInitialSync,
		SyncStatusRunning,
		SyncStatusCancelling,
		SyncStatusFailed,
	}
)

Functions

func DefaultHTTPClient

func DefaultHTTPClient(tlsConfig *tls.Config) *http.Client

DefaultHTTPClient creates a new HTTP client configured for accessing a starter.

func IsBadRequest

func IsBadRequest(err error) bool

IsBadRequest returns true if the given error is caused by a BadRequestError.

func IsCanceled

func IsCanceled(err error) bool

IsCanceled returns true if the given error is caused by a context.Canceled.

func IsInternalServer

func IsInternalServer(err error) bool

IsInternalServer returns true if the given error is caused by a InternalServerError.

func IsNotFound

func IsNotFound(err error) bool

IsNotFound returns true if the given error is caused by a NotFoundError.

func IsPreconditionFailed

func IsPreconditionFailed(err error) bool

IsPreconditionFailed returns true if the given error is caused by a PreconditionFailedError.

func IsRedirectTo

func IsRedirectTo(err error) (string, bool)

IsRedirectTo returns true when the given error is caused by an RedirectToError. If so, it also returns the redirect location.

func IsRequestTimeout

func IsRequestTimeout(err error) bool

IsRequestTimeout returns true if the given error is caused by a RequestTimeoutError.

func IsServiceUnavailable

func IsServiceUnavailable(err error) bool

IsServiceUnavailable returns true if the given error is caused by a ServiceUnavailableError.

func IsSignificantDelayDiff

func IsSignificantDelayDiff(d1, d2 time.Duration) bool

IsSignificantDelayDiff returns true if there is a significant difference between the given delays.

func IsStatusError

func IsStatusError(err error) (int, bool)

IsStatusError returns the status code and true if the given error is caused by a StatusError.

func IsStatusErrorWithCode

func IsStatusErrorWithCode(err error, code int) bool

IsStatusErrorWithCode returns true if the given error is caused by a StatusError with given code.

func IsUnauthorized

func IsUnauthorized(err error) bool

IsUnauthorized returns true if the given error is caused by a UnauthorizedError.

func ParseResponseError

func ParseResponseError(r *http.Response, body []byte) error

ParseResponseError returns an error from given response. It tries to parse the body (if given body is nil, will be read from response) for ErrorResponse.

Types

type API

type API interface {
	// Close this client
	Close() error
	// Get the version of the sync master/worker
	Version(ctx context.Context) (VersionInfo, error)
	// Get the role of the sync master/worker
	Role(ctx context.Context) (Role, error)
	// Health performs a quick health check.
	// Returns an error when anything is wrong. If so, check Status.
	Health(ctx context.Context) error
	// Returns the master API (only valid when Role returns master)
	Master() MasterAPI
	// Returns the worker API (only valid when Role returns worker)
	Worker() WorkerAPI

	// Set the ID of the client that is making requests.
	SetClientID(id string)
	// SetShared marks the client as shared.
	// Closing a shared client will not close all idle connections.
	SetShared()
	// SynchronizeMasterEndpoints ensures that the client is using all known master
	// endpoints.
	// Do not use for connections to workers.
	// Returns true when endpoints have changed.
	SynchronizeMasterEndpoints(ctx context.Context) (bool, error)
	// Endpoint returns the currently used endpoint for this client.
	Endpoint() Endpoint
}

API of a sync master/worker

func NewArangoSyncClient

func NewArangoSyncClient(endpoints []string, authConf AuthenticationConfig, tlsConfig *tls.Config) (API, error)

NewArangoSyncClient creates a new client implementation.

type AuthProxy

type AuthProxy struct {
	TLSAuthentication
}

AuthProxy is a helper that implements github.com/arangodb-helper/go-certificates#TLSAuthentication.

func (AuthProxy) CACertificate

func (a AuthProxy) CACertificate() string

func (AuthProxy) ClientCertificate

func (a AuthProxy) ClientCertificate() string

func (AuthProxy) ClientKey

func (a AuthProxy) ClientKey() string

type Authentication

type Authentication struct {
	TLSAuthentication
	JWTSecret string
	Username  string
	Password  string
}

Authentication contains all possible authentication methods for a client. Order of authentication methods: - JWTSecret - ClientToken - ClientCertificate

func NewAuthentication

func NewAuthentication(tlsAuth TLSAuthentication, jwtSecret string) Authentication

NewAuthentication creates a new Authentication from given arguments.

func (Authentication) String

func (a Authentication) String() string

String returns a string used to unique identify the authentication settings.

type AuthenticationConfig

type AuthenticationConfig struct {
	JWTSecret   string
	BearerToken string
	UserName    string
	Password    string
}

type CancelSynchronizationRequest

type CancelSynchronizationRequest struct {
	// WaitTimeout is the amount of time the cancel function will wait
	// until the synchronization has reached an `inactive` state.
	// If this value is zero, the cancel function will only switch to the canceling state
	// but not wait until the `inactive` state is reached.
	WaitTimeout time.Duration `json:"wait_timeout,omitempty"`
	// Force is set if you want to end the synchronization even if the source
	// master cannot be reached.
	Force bool `json:"force,omitempty"`
	// ForceTimeout is the amount of time the syncmaster tries to contact
	// the source master to notify it about cancelling the synchronization.
	// This fields is only used when Force is true.
	ForceTimeout time.Duration `json:"force_timeout,omitempty"`
}

type CancelSynchronizationResponse

type CancelSynchronizationResponse struct {
	// Aborted is set when synchronization has cancelled (state is now inactive)
	// but the source sync master was not notified.
	// This is only possible when the Force flags is set on the request.
	Aborted bool `json:"aborted,omitempty"`
	// Source is the endpoint of sync master on remote cluster that we used
	// to be synchronizing from.
	Source Endpoint `json:"source,omitempty"`
	// ClusterID is the ID of the local synchronization cluster.
	ClusterID string `json:"cluster_id,omitempty"`
}

type ChannelPrefixInfo

type ChannelPrefixInfo struct {
	Prefix string `json:"prefix"`
}

type ClientCache

type ClientCache struct {
	// contains filtered or unexported fields
}

func (*ClientCache) GetClient

func (cc *ClientCache) GetClient(log zerolog.Logger, source Endpoint, auth Authentication, insecureSkipVerify bool) (API, error)

GetClient returns a client used to access the source with given authentication.

type CommitDirectMQMessageRequest

type CommitDirectMQMessageRequest struct {
	Offset int64 `json:"offset"`
}

CommitDirectMQMessageRequest is the JSON request body for CommitDirectMQMessage request.

type ConfigureWorkerRequest

type ConfigureWorkerRequest struct {
	Endpoint string `json:"endpoint"` // Endpoint of the worker
}

ConfigureWorkerRequest is the JSON body for the ConfigureWorker request.

type DirectMQMessage

type DirectMQMessage struct {
	Offset  int64           `json:"offset"`
	Message json.RawMessage `json:"message"`
}

DirectMQMessage is a direct MQ message.

type DirectMQToken

type DirectMQToken struct {
	// Token used to authenticate with the server.
	Token string `json:"token"`
	// How long the token will be valid.
	// Afterwards a new token has to be fetched.
	TokenTTL time.Duration `json:"token-ttl"`
}

DirectMQToken provides a token with its TTL

type DirectMQTokenRequest

type DirectMQTokenRequest struct {
	// Token used to authenticate with the server.
	Token string `json:"token"`
}

DirectMQTokenRequest is the JSON request body for Renew/Clone direct MQ token request.

type DirectMQTopicEndpoint

type DirectMQTopicEndpoint struct {
	// Endpoint of the server that can provide messages for a specific topic.
	Endpoint Endpoint `json:"endpoint"`
	// CA certificate used to sign the TLS connection of the server.
	// This is used for verifying the server.
	CACertificate string `json:"caCertificate"`
	// Token used to authenticate with the server.
	Token string `json:"token"`
	// How long the token will be valid.
	// Afterwards a new token has to be fetched.
	TokenTTL time.Duration `json:"token-ttl"`
}

DirectMQTopicEndpoint provides information about an endpoint for Direct MQ messages.

type Endpoint

type Endpoint []string

Endpoint is a list of URL's that are considered to be off the same service.

func (Endpoint) Clone

func (ep Endpoint) Clone() Endpoint

Clone returns a deep clone of the given endpoint

func (Endpoint) Contains

func (ep Endpoint) Contains(x string) bool

Contains returns true when x is an element of ep.

func (Endpoint) Equals

func (ep Endpoint) Equals(other Endpoint) bool

Equals returns true when a and b contain the same elements (perhaps in different order).

func (Endpoint) Intersection

func (ep Endpoint) Intersection(other Endpoint) Endpoint

Intersection the endpoint containing all elements included in ep and in other.

func (Endpoint) IsEmpty

func (ep Endpoint) IsEmpty() bool

IsEmpty returns true ep has no elements.

func (Endpoint) Merge

func (ep Endpoint) Merge(args ...string) Endpoint

Merge adds the given endpoint to the endpoint, avoiding duplicates

func (Endpoint) URLs

func (ep Endpoint) URLs() ([]url.URL, error)

URLs returns all endpoints as parsed URL's

func (Endpoint) Validate

func (ep Endpoint) Validate() error

Validate checks all URL's, returning the first error found.

type EndpointsResponse

type EndpointsResponse struct {
	Endpoints Endpoint `json:"endpoints"`
}

type ErrorResponse

type ErrorResponse struct {
	Error string
}

type GetDirectMQMessagesResponse

type GetDirectMQMessagesResponse struct {
	Messages []DirectMQMessage `json:"messages,omitempty"`
}

GetDirectMQMessagesResponse is the JSON body for GetDirectMQMessages response.

type InternalDirectMQAPI

type InternalDirectMQAPI interface {
	// GetDirectMQMessages return messages for a given MQ channel.
	GetDirectMQMessages(ctx context.Context, channelName string) ([]DirectMQMessage, error)
	// CommitDirectMQMessage removes all messages from the given channel up to an including the given offset.
	CommitDirectMQMessage(ctx context.Context, channelName string, offset int64) error
}

InternalDirectMQAPI contains the internal API of the sync master/worker wrt direct MQ messages.

type InternalMasterAPI

type InternalMasterAPI interface {

	// Load configuration data from the master
	ConfigureWorker(ctx context.Context, endpoint string) (WorkerConfiguration, error)
	// Return all registered workers
	RegisteredWorkers(ctx context.Context) ([]WorkerRegistration, error)
	// Return info about a specific worker
	RegisteredWorker(ctx context.Context, id string) (WorkerRegistration, error)
	// Register (or update registration of) a worker
	RegisterWorker(ctx context.Context, endpoint, token, hostID string) (WorkerRegistrationResponse, error)
	// Remove the registration of a worker
	UnregisterWorker(ctx context.Context, id string) error
	// Get info about a specific task
	Task(ctx context.Context, id string) (TaskInfo, error)
	// Get all known tasks
	Tasks(ctx context.Context) ([]TaskInfo, error)
	// Get all known tasks for a given channel
	TasksByChannel(ctx context.Context, channelName string) ([]TaskInfo, error)
	// Notify the master that a task with given ID has completed.
	TaskCompleted(ctx context.Context, taskID string, info TaskCompletedRequest) error
	// Create tasks to start synchronization of a shard in the given db+col.
	SynchronizeShard(ctx context.Context, dbName, colName string, shardIndex int) error
	// Stop tasks to synchronize a shard in the given db+col.
	CancelSynchronizeShard(ctx context.Context, dbName, colName string, shardIndex int) error
	// Report status of the synchronization of a shard back to the master.
	SynchronizeShardStatus(ctx context.Context, entries []SynchronizationShardStatusRequestEntry) error
	// IsChannelRelevant checks if a MQ channel is still relevant
	IsChannelRelevant(ctx context.Context, channelName string) (bool, error)

	// Worker & Master -> Master
	// GetDirectMQTopicEndpoint returns an endpoint that the caller can use to fetch direct MQ messages
	// from.
	// This method requires a directMQ token or client cert for authentication.
	GetDirectMQTopicEndpoint(ctx context.Context, channelName string) (DirectMQTopicEndpoint, error)
	// RenewDirectMQToken renews a given direct MQ token.
	// This method requires a directMQ token for authentication.
	RenewDirectMQToken(ctx context.Context, token string) (DirectMQToken, error)
	// CloneDirectMQToken creates a clone of a given direct MQ token.
	// When the given token is revoked, the newly cloned token is also revoked.
	// This method requires a directMQ token for authentication.
	CloneDirectMQToken(ctx context.Context, token string) (DirectMQToken, error)
	// Add entire direct MQ API
	InternalDirectMQAPI

	// Start a task that sends inventory data to a receiving remote cluster.
	OutgoingSynchronization(ctx context.Context, input OutgoingSynchronizationRequest) (OutgoingSynchronizationResponse, error)
	// Cancel sending synchronization data to the remote cluster with given ID.
	CancelOutgoingSynchronization(ctx context.Context, remoteID string) error
	// Create tasks to send synchronization data of a shard in the given db+col to a remote cluster.
	OutgoingSynchronizeShard(ctx context.Context, remoteID, dbName, colName string, shardIndex int, input OutgoingSynchronizeShardRequest) error
	// Stop tasks to send synchronization data of a shard in the given db+col to a remote cluster.
	CancelOutgoingSynchronizeShard(ctx context.Context, remoteID, dbName, colName string, shardIndex int) error
	// Report status of the synchronization of a shard back to the master.
	OutgoingSynchronizeShardStatus(ctx context.Context, entries []SynchronizationShardStatusRequestEntry) error
	// Reset a failed shard synchronization.
	OutgoingResetShardSynchronization(ctx context.Context, remoteID, dbName, colName string, shardIndex int, newControlChannel, newDataChannel string) error

	// Get a prefix for names of channels that contain message
	// going to this master.
	ChannelPrefix(ctx context.Context) (string, error)
	// Get the local message queue configuration.
	GetMessageQueueConfig(ctx context.Context) (MessageQueueConfig, error)
}

InternalMasterAPI contains the internal API of the sync master.

type InternalWorkerAPI

type InternalWorkerAPI interface {
	// StartTask is called by the master to instruct the worker
	// to run a task with given instructions.
	StartTask(ctx context.Context, data StartTaskRequest) error
	// StopTask is called by the master to instruct the worker
	// to stop all work on the given task.
	StopTask(ctx context.Context, taskID string) error
	// SetDirectMQTopicToken configures the token used to access messages of a given channel.
	SetDirectMQTopicToken(ctx context.Context, channelName, token string, tokenTTL time.Duration) error
	// Add entire direct MQ API
	InternalDirectMQAPI
}

InternalWorkerAPI contains the internal API of the sync worker.

type IsChannelRelevantResponse

type IsChannelRelevantResponse struct {
	IsRelevant bool `json:"isRelevant"`
}

IsChannelRelevantResponse is the JSON response for a MasterAPI.IsChannelRelevant call

type MasterAPI

type MasterAPI interface {
	// Gets the current status of synchronization towards the local cluster.
	Status(ctx context.Context) (SyncInfo, error)
	// Configure the master to synchronize the local cluster from a given remote cluster.
	Synchronize(ctx context.Context, input SynchronizationRequest) error
	// Configure the master to stop & completely cancel the current synchronization of the
	// local cluster from a remote cluster.
	// Errors:
	// - RequestTimeoutError when input.WaitTimeout is non-zero and the inactive stage is not reached in time.
	CancelSynchronization(ctx context.Context, input CancelSynchronizationRequest) (CancelSynchronizationResponse, error)
	// Reset a failed shard synchronization.
	ResetShardSynchronization(ctx context.Context, dbName, colName string, shardIndex int) error
	// Update the maximum allowed time between messages in a task channel.
	SetMessageTimeout(ctx context.Context, timeout time.Duration) error
	// Return a list of all known master endpoints of this datacenter.
	// The resulting endpoints are usable from inside and outside the datacenter.
	GetEndpoints(ctx context.Context) (Endpoint, error)
	// Return a list of master endpoints of the leader (syncmaster) of this datacenter.
	// Length of returned list will be 1 or the call will fail because no master is available.
	// In the very rare occasion that the leadership is changing during this call, a list
	// of length 0 can be returned.
	// The resulting endpoint is usable only within the same datacenter.
	GetLeaderEndpoint(ctx context.Context) (Endpoint, error)
	// Return a list of known masters in this datacenter.
	Masters(ctx context.Context) ([]MasterInfo, error)

	InternalMasterAPI
}

MasterAPI contains API of sync master

type MasterInfo

type MasterInfo struct {
	// Unique identifier of the master
	ID string `json:"id"`
	// Internal endpoint of the master
	Endpoint string `json:"endpoint"`
	// Is this master the current leader
	Leader bool `json:"leader"`
}

MasterInfo contains information about a single master.

type MastersResponse

type MastersResponse struct {
	Masters []MasterInfo `json:"masters"`
}

type MessageQueueConfig

type MessageQueueConfig struct {
	Type           string            `json:"type"`
	Endpoints      []string          `json:"endpoints"`
	Authentication TLSAuthentication `json:"authentication"`
}

MessageQueueConfig contains all deployment configuration info for the local MQ.

func (MessageQueueConfig) Clone

Clone returns a deep copy of the given config

type OutgoingSyncInfo

type OutgoingSyncInfo struct {
	ID       string          `json:"id"`               // ID of sync master to which data is being send
	Endpoint Endpoint        `json:"endpoint"`         // Endpoint of sync masters to which data is being send
	Status   SyncStatus      `json:"status"`           // Overall status for this outgoing target
	Shards   []ShardSyncInfo `json:"shards,omitempty"` // Status of outgoing synchronization per shard for this target
}

OutgoingSyncInfo holds JSON info returned as part of `GET /_api/sync` regarding a specific target for outgoing synchronization data.

type OutgoingSynchronizationRequest

type OutgoingSynchronizationRequest struct {
	// ID of remote cluster
	ID string `json:"id"`
	// Endpoints of sync masters of the remote (target) cluster
	Target   Endpoint `json:"target"`
	Channels struct {
		// Name of MQ topic to send inventory data to.
		Inventory string `json:"inventory"`
	} `json:"channels"`
	// MQ configuration of the remote (target) cluster
	MessageQueueConfig MessageQueueConfig `json:"mq-config"`
}

OutgoingSynchronizationRequest holds the master->master request data for configuring an outgoing inventory stream.

func (OutgoingSynchronizationRequest) Clone

Clone returns a deep copy of the given request.

type OutgoingSynchronizationResponse

type OutgoingSynchronizationResponse struct {
	// MQ configuration of the remote (source) cluster
	MessageQueueConfig MessageQueueConfig `json:"mq-config"`
}

OutgoingSynchronizationResponse holds the answer to an master->master request for configuring an outgoing synchronization.

type OutgoingSynchronizeShardRequest

type OutgoingSynchronizeShardRequest struct {
	Channels struct {
		// Name of MQ topic to receive control messages on.
		Control string `json:"control"`
		// Name of MQ topic to send data messages to.
		Data string `json:"data"`
	} `json:"channels"`
}

OutgoingSynchronizeShardRequest holds the master->master request data for configuring an outgoing shard synchronization stream.

type RedirectToError

type RedirectToError struct {
	Location string
}

func (RedirectToError) Error

func (e RedirectToError) Error() string

type Role

type Role string
const (
	RoleMaster Role = "master"
	RoleWorker Role = "worker"
)

func (Role) IsMaster

func (r Role) IsMaster() bool

func (Role) IsWorker

func (r Role) IsWorker() bool

type RoleInfo

type RoleInfo struct {
	Role Role `json:"role"`
}

type SetDirectMQTopicTokenRequest

type SetDirectMQTopicTokenRequest struct {
	// Token used to authenticate with the server.
	Token string `json:"token"`
	// How long the token will be valid.
	// Afterwards a new token has to be fetched.
	TokenTTL time.Duration `json:"token-ttl"`
}

SetDirectMQTopicTokenRequest is the JSON request body for SetDirectMQTopicToken request.

type SetMessageTimeoutRequest

type SetMessageTimeoutRequest struct {
	MessageTimeout time.Duration `json:"messageTimeout"`
}

type ShardSyncInfo

type ShardSyncInfo struct {
	Database              string        `json:"database"`                 // Database containing the collection - shard
	Collection            string        `json:"collection"`               // Collection containing the shard
	ShardIndex            int           `json:"shardIndex"`               // Index of the shard (0..)
	Status                SyncStatus    `json:"status"`                   // Status of this shard
	StatusMessage         string        `json:"status_message,omitempty"` // Human readable message about the status of this shard
	Delay                 time.Duration `json:"delay,omitempty"`          // Delay between other datacenter and us.
	LastMessage           time.Time     `json:"last_message"`             // Time of last message received by the task handling this shard
	LastDataChange        time.Time     `json:"last_data_change"`         // Time of last message that resulted in a data change, received by the task handling this shard
	LastShardMasterChange time.Time     `json:"last_shard_master_change"` // Time of when we last had a change in the status of the shard master
	ShardMasterKnown      bool          `json:"shard_master_known"`       // Is the shard master known?
}

ShardSyncInfo holds JSON info returned as part of `GET /_api/sync` regarding a specific shard.

type StartTaskRequest

type StartTaskRequest struct {
	ID string `json:"id"`
	tasks.TaskData
	// MQ configuration of the remote cluster
	RemoteMessageQueueConfig MessageQueueConfig `json:"remote-mq-config"`
}

type StatusAPI

type StatusAPI interface {
	// SendIncomingStatus queues a given incoming synchronization status entry for sending.
	SendIncomingStatus(entry SynchronizationShardStatusRequestEntry)
	// SendOutgoingStatus queues a given outgoing synchronization status entry for sending.
	SendOutgoingStatus(entry SynchronizationShardStatusRequestEntry)
}

StatusAPI describes the API provided to task workers used to send status updates to the master.

type StatusError

type StatusError struct {
	StatusCode int
	// contains filtered or unexported fields
}

func (StatusError) Error

func (e StatusError) Error() string

type SyncInfo

type SyncInfo struct {
	Source         Endpoint           `json:"source"`                   // Endpoint of sync master on remote cluster
	Status         SyncStatus         `json:"status"`                   // Overall status of (incoming) synchronization
	Shards         []ShardSyncInfo    `json:"shards,omitempty"`         // Status of incoming synchronization per shard
	Outgoing       []OutgoingSyncInfo `json:"outgoing,omitempty"`       // Status of outgoing synchronization
	MessageTimeout time.Duration      `json:"messageTimeout,omitempty"` // Maximum time between messages in a task channel
}

SyncInfo holds the JSON info returned from `GET /_api/sync`

type SyncStatus

type SyncStatus string
const (
	// SyncStatusInactive indicates that no synchronization is taking place
	SyncStatusInactive SyncStatus = "inactive"
	// SyncStatusInitializing indicates that synchronization tasks are being setup
	SyncStatusInitializing SyncStatus = "initializing"
	// SyncStatusInitialSync indicates that initial synchronization of collections is ongoing
	SyncStatusInitialSync SyncStatus = "initial-sync"
	// SyncStatusRunning indicates that all collections have been initially synchronized
	// and normal transaction synchronization is active.
	SyncStatusRunning SyncStatus = "running"
	// SyncStatusCancelling indicates that the synchronization process is being cancelled.
	SyncStatusCancelling SyncStatus = "cancelling"
	// SyncStatusFailed indicates that the synchronization process has encountered an unrecoverable failure
	SyncStatusFailed SyncStatus = "failed"
)

func (SyncStatus) Equals

func (s SyncStatus) Equals(other SyncStatus) bool

Equals returns true when the other status is equal to the given status (both normalized).

func (SyncStatus) IsActive

func (s SyncStatus) IsActive() bool

IsActive returns true if the given status indicates an active state. The is: initializing, initial-sync or running

func (SyncStatus) IsInactiveOrEmpty

func (s SyncStatus) IsInactiveOrEmpty() bool

IsInactiveOrEmpty returns true if the given status equals inactive or is empty.

func (SyncStatus) IsInitialSyncOrRunning

func (s SyncStatus) IsInitialSyncOrRunning() bool

IsInitialSyncOrRunning returns true if the given status equals initial-sync or running.

func (SyncStatus) Normalize

func (s SyncStatus) Normalize() SyncStatus

Normalize converts an empty status to inactive.

type SynchronizationRequest

type SynchronizationRequest struct {
	// Endpoint of sync master of the source cluster
	Source Endpoint `json:"source"`
	// Authentication of the master
	Authentication TLSAuthentication `json:"authentication"`
}

func (SynchronizationRequest) Clone

Clone returns a deep copy of the given request.

func (SynchronizationRequest) IsSame

IsSame returns true if both requests contain the same values. The source is considered the same is the intersection of existing & given source is not empty. We consider an intersection because: - Servers can be down, resulting in a temporary missing endpoint - Customer can specify only 1 of all servers

func (SynchronizationRequest) Validate

func (r SynchronizationRequest) Validate() error

Validate checks the values of the given request and returns an error in case of improper values. Returns nil on success.

type SynchronizationShardStatus

type SynchronizationShardStatus struct {
	// Current status
	Status SyncStatus `json:"status"`
	// Human readable status message
	StatusMessage string `json:"status_message,omitempty"`
	// Delay between us and other data center.
	Delay time.Duration `json:"delay"`
	// Time of last message received by the task handling this shard
	LastMessage time.Time `json:"last_message"`
	// Time of last message that resulted in a data change, received by the task handling this shard
	LastDataChange time.Time `json:"last_data_change"`
	// Time of when we last had a change in the status of the shard master
	LastShardMasterChange time.Time `json:"last_shard_master_change"`
	// Is the shard master known?
	ShardMasterKnown bool `json:"shard_master_known"`
}

func (SynchronizationShardStatus) IsSame

IsSame returns true when the Status & StatusMessage of both statuses are equal and the Delay is very close.

type SynchronizationShardStatusRequest

type SynchronizationShardStatusRequest struct {
	Entries []SynchronizationShardStatusRequestEntry `json:"entries"`
}

SynchronizationShardStatusRequest is the request body of a (Outgoing)SynchronizationShardStatus request.

type SynchronizationShardStatusRequestEntry

type SynchronizationShardStatusRequestEntry struct {
	RemoteID   string                     `json:"remoteID"`
	Database   string                     `json:"database"`
	Collection string                     `json:"collection"`
	ShardIndex int                        `json:"shardIndex"`
	Status     SynchronizationShardStatus `json:"status"`
}

SynchronizationShardStatusRequestEntry is a single entry in a SynchronizationShardStatusRequest

type TLSAuthentication

type TLSAuthentication = tasks.TLSAuthentication

TLSAuthentication contains configuration for using client certificates and TLS verification of the server.

type TaskAssignment

type TaskAssignment struct {
	// ID of worker the task is assigned to
	WorkerID string `json:"worker_id"`
	// When the assignment was made
	CreatedAt time.Time `json:"created_at"`
	// How many assignments have been made
	Counter int `json:"counter,omitempty"`
}

TaskAssignment contains information of the assignment of a task to a worker. It is serialized as JSON into the agency.

type TaskCompletedRequest

type TaskCompletedRequest struct {
	Error bool `json:"error,omitempty"`
}

TaskCompletedRequest holds the info for a TaskCompleted request.

type TaskInfo

type TaskInfo struct {
	ID         string         `json:"id"`
	Task       tasks.TaskData `json:"task"`
	Assignment TaskAssignment `json:"assignment"`
}

TaskInfo contains all information known about a task.

func (TaskInfo) IsAssigned

func (i TaskInfo) IsAssigned() bool

IsAssigned returns true when the task in given info is assigned to a worker, false otherwise.

func (TaskInfo) NeedsCleanup

func (i TaskInfo) NeedsCleanup() bool

NeedsCleanup returns true when the entry is subject to cleanup.

type TasksResponse

type TasksResponse struct {
	Tasks []TaskInfo `json:"tasks,omitempty"`
}

TasksResponse is the JSON response for MasterAPI.Tasks method.

type VersionInfo

type VersionInfo struct {
	Version string `json:"version"`
	Build   string `json:"build"`
}

type WorkerAPI

type WorkerAPI interface {
	InternalWorkerAPI
}

WorkerAPI contains API of sync worker

type WorkerConfiguration

type WorkerConfiguration struct {
	Cluster struct {
		Endpoints       []string `json:"endpoints"`
		JWTSecret       string   `json:"jwtSecret,omitempty"`
		MaxDocumentSize int      `json:"maxDocumentSize,omitempty"`
		// Minimum replication factor of new/modified collections
		MinReplicationFactor int `json:"min-replication-factor,omitempty"`
		// Maximum replication factor of new/modified collections
		MaxReplicationFactor int `json:"max-replication-factor,omitempty"`
	} `json:"cluster"`
	HTTPServer struct {
		Certificate string `json:"certificate"`
		Key         string `json:"key"`
	} `json:"httpServer"`
	MessageQueue struct {
		MessageQueueConfig // MQ configuration of local MQ
	} `json:"mq"`
}

WorkerConfiguration contains configuration data passed from the master to the worker.

func (*WorkerConfiguration) SetDefaults

func (c *WorkerConfiguration) SetDefaults()

SetDefaults fills empty values with defaults

func (WorkerConfiguration) Validate

func (c WorkerConfiguration) Validate() error

Validate the given configuration. Return an error on validation errors, nil when all ok.

type WorkerRegistration

type WorkerRegistration struct {
	// ID of the worker assigned to it by the master
	ID string `json:"id"`
	// Endpoint of the worker
	Endpoint string `json:"endpoint"`
	// Expiration time of the last registration of the worker
	ExpiresAt time.Time `json:"expiresAt"`
	// ID of the worker when communicating with ArangoDB servers.
	ServerID int64 `json:"serverID"`
	// IF of the host the worker process is running on
	HostID string `json:"host,omitempty"`
}

func (WorkerRegistration) IsExpired

func (wr WorkerRegistration) IsExpired() bool

IsExpired returns true when the given worker is expired.

func (WorkerRegistration) Validate

func (wr WorkerRegistration) Validate() error

Validate the given registration. Return nil if ok, error otherwise.

type WorkerRegistrationRequest

type WorkerRegistrationRequest struct {
	Endpoint string `json:"endpoint"`
	Token    string `json:"token,omitempty"`
	HostID   string `json:host,omitempty"`
}

type WorkerRegistrationResponse

type WorkerRegistrationResponse struct {
	WorkerRegistration
	// Maximum time between message in a task channel.
	MessageTimeout time.Duration `json:"messageTimeout,omitempty"`
}

type WorkerRegistrations

type WorkerRegistrations struct {
	Workers []WorkerRegistration `json:"workers"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL