worker

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2018 License: MIT Imports: 23 Imported by: 4

README

Cadence Worker (In Development)

Cadence Worker is a new role for Cadence service used for hosting any components responsible for performing background processing on the Cadence cluster.

Replicator

Replicator is a background worker responsible for consuming replication tasks generated by remote Cadence clusters and pass it down to processor so they can be applied to local Cadence cluster.

It uses Kafka as the replication tasks buffer and relies on [kafka-client library] (https://github.com/uber-go/kafka-client/) for consuming messages from Kafka.

Quickstart for localhost development

  1. Setup Kafka by following instructions: Kafka Quickstart
  2. Create Kafka topic for active cluster:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic active
  1. Create Kafka topic for standby cluster:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic standby
  1. Start Cadence development server for active zone:
./cadence-server --zone active start

Cadence cluster is now running with the replicator consuming messages from Kafka topic standby.

Create replication task using CLI

Kafka CLI can be used to generate a replication task for testing purpose:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic standby

Replication task message:

{taskType: 0}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyDomainReplicationTask is the error to indicate empty replication task
	ErrEmptyDomainReplicationTask = errors.New("empty domain replication task")
	// ErrInvalidDomainOperation is the error to indicate empty domain operation attribute
	ErrInvalidDomainOperation = errors.New("invalid domain operation attribute")
	// ErrInvalidDomainID is the error to indicate empty rID attribute
	ErrInvalidDomainID = errors.New("invalid domain ID attribute")
	// ErrInvalidDomainInfo is the error to indicate empty info attribute
	ErrInvalidDomainInfo = errors.New("invalid domain info attribute")
	// ErrInvalidDomainConfig is the error to indicate empty config attribute
	ErrInvalidDomainConfig = errors.New("invalid domain config attribute")
	// ErrInvalidDomainReplicationConfig is the error to indicate empty replication config attribute
	ErrInvalidDomainReplicationConfig = errors.New("invalid domain replication config attribute")
	// ErrInvalidDomainConfigVersion is the error to indicate empty config version attribute
	ErrInvalidDomainConfigVersion = errors.New("invalid domain config version attribute")
	// ErrInvalidDomainFailoverVersion is the error to indicate empty failover version attribute
	ErrInvalidDomainFailoverVersion = errors.New("invalid domain failover version attribute")
	// ErrInvalidDomainStatus is the error to indicate invalid domain status
	ErrInvalidDomainStatus = errors.New("invalid domain status attribute")
)
View Source
var (
	// ErrEmptyReplicationTask is the error to indicate empty replication task
	ErrEmptyReplicationTask = &shared.BadRequestError{Message: "empty replication task"}
	// ErrUnknownReplicationTask is the error to indicate unknown replication task type
	ErrUnknownReplicationTask = &shared.BadRequestError{Message: "unknown replication task"}
	// ErrDeserializeReplicationTask is the error to indicate failure to deserialize replication task
	ErrDeserializeReplicationTask = &shared.BadRequestError{Message: "Failed to deserialize replication task"}
)

Functions

func NewService

func NewService(params *service.BootstrapParams) common.Daemon

NewService builds a new cadence-worker service

Types

type Config

type Config struct {
	// Replicator settings
	PersistenceMaxQPS          dynamicconfig.IntPropertyFn
	ReplicatorConcurrency      int
	ReplicatorBufferRetryCount int
	ReplicationTaskMaxRetry    int
}

Config contains all the service config for worker

func NewConfig

func NewConfig(dc *dynamicconfig.Collection) *Config

NewConfig builds the new Config for cadence-worker service

type DomainReplicator

type DomainReplicator interface {
	HandleReceivingTask(task *replicator.DomainTaskAttributes) error
}

DomainReplicator is the interface which can replicate the domain

func NewDomainReplicator

func NewDomainReplicator(metadataManagerV2 persistence.MetadataManager, logger bark.Logger) DomainReplicator

NewDomainReplicator create a new instance odf domain replicator

type Replicator

type Replicator struct {
	// contains filtered or unexported fields
}

Replicator is the processor for replication tasks

func NewReplicator

func NewReplicator(clusterMetadata cluster.Metadata, metadataManagerV2 persistence.MetadataManager,
	historyClient history.Client, config *Config, client messaging.Client, logger bark.Logger,
	metricsClient metrics.Client) *Replicator

NewReplicator creates a new replicator for processing replication tasks

func (*Replicator) Start

func (r *Replicator) Start() error

Start is called to start replicator

func (*Replicator) Stop

func (r *Replicator) Stop()

Stop is called to stop replicator

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service represents the cadence-worker service. This service host all background processing which needs to happen for a Cadence cluster. This service runs the replicator which is responsible for applying replication tasks generated by remote clusters.

func (*Service) Start

func (s *Service) Start()

Start is called to start the service

func (*Service) Stop

func (s *Service) Stop()

Stop is called to stop the service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL