outputhost

package
v0.0.0-...-f872574 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2019 License: MIT Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ThrottleDown is the message to stop throttling the connections
	ThrottleDown = iota
	// ThrottleUp is the messages to start throttling connections
	ThrottleUp
	// NumNotifyTypes is the overall notification types
	NumNotifyTypes
)
View Source
const SmartRetryDisableString = `smartRetryDisable`

SmartRetryDisableString can be added to a destination or CG owner email to request smart retry to be disabled Note that Google allows something like this: gbailey+smartRetryDisable@uber.com The above is still a valid email and will be delivered to gbailey@uber.com

Variables

View Source
var ErrCgAlreadyUnloaded = &cherami.InternalServiceError{Message: "Consumer Group already unloaded"}

ErrCgAlreadyUnloaded is returned when the cg is already unloaded

View Source
var ErrCgUnloaded = &cherami.InternalServiceError{Message: "ConsumerGroup already unloaded"}

ErrCgUnloaded is returned when the cgCache is already unloaded

View Source
var ErrConfigCast = &cherami.InternalServiceError{Message: "Unable to cast to OutputCgConfig"}

ErrConfigCast is returned when we are unable to cast to the CgConfig type

View Source
var ErrHostShutdown = &cherami.InternalServiceError{Message: "OutputHost already shutdown"}

ErrHostShutdown is returned when the host is already shutdown

View Source
var ErrLimit = &cherami.InternalServiceError{Message: "Outputhost cons connection limit exceeded"}

ErrLimit is returned when the overall limit for the CG is violated

Functions

func NewSaramaLoggerFromBark

func NewSaramaLoggerFromBark(l bark.Logger, module string) sarama.StdLogger

NewSaramaLoggerFromBark provides a logger suitable for Sarama from a given bark logger

func OpenKafkaStream

func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMessageConverter, logger bark.Logger) stream.BStoreOpenReadStreamOutCall

OpenKafkaStream opens a store call simulated from the given sarama message stream

Types

type AckID

type AckID string

AckID is an acknowledgement ID; not the same as common.AckID, which decomposes this string Capitalized because otherwise the type name conflicts horribly with local variables

type Committer

type Committer interface {
	// SetCommitLevel indicates that work up to and including the message specified by the sequence number and address
	// has been acknowledged. Not guaranteed to be persisted until a successful call to Flush()
	SetCommitLevel(l CommitterLevel)

	// SetReadLevel indicates that a particular message has been read. Metadata not guaranteed to be communicated/persisted
	// until a successful call to Flush()
	SetReadLevel(l CommitterLevel)

	// SetFinalLevel indicates that a particular level is the last that can possibly be read. Metadata not guaranteed
	// to be communicated/persisted until a successful call to Flush()
	SetFinalLevel(l CommitterLevel)

	// UnlockAndFlush copies accumulated commit/read state, unlocks the provided lock, and then commits
	// the levels to durable storage, e.g. Kafka offset storage or Cherami-Cassandra AckLevel storage
	UnlockAndFlush(l sync.Locker) error

	// GetCommitLevel receives the last value given to Commit()
	GetCommitLevel() (l CommitterLevel)

	// GetReadLevel receives the last value given to Read()
	GetReadLevel() (l CommitterLevel)
}

Committer is an interface that wraps the internals of how offsets/acklevels are committed for a given queueing system (e.g. Kafka or Cherami)

type CommitterLevel

type CommitterLevel struct {
	// contains filtered or unexported fields
}

CommitterLevel binds a logical and store address together for committing

type KafkaCommitter

type KafkaCommitter struct {
	*sc.OffsetStash

	KafkaOffsetMetadata
	// contains filtered or unexported fields
}

KafkaCommitter is commits ackLevels to Cassandra through the TChanMetadataClient interface

func NewKafkaCommitter

func NewKafkaCommitter(
	outputHostUUID string,
	cgUUID string,
	logger bark.Logger,
	client **sc.Consumer) *KafkaCommitter

NewKafkaCommitter instantiates a kafkaCommitter

func (*KafkaCommitter) GetCommitLevel

func (c *KafkaCommitter) GetCommitLevel() (l CommitterLevel)

GetCommitLevel returns the next commit level that will be flushed

func (*KafkaCommitter) GetReadLevel

func (c *KafkaCommitter) GetReadLevel() (l CommitterLevel)

GetReadLevel returns the next readlevel that will be flushed

func (*KafkaCommitter) SetCommitLevel

func (c *KafkaCommitter) SetCommitLevel(l CommitterLevel)

SetCommitLevel just updates the next Cherami ack level that will be flushed

func (*KafkaCommitter) SetFinalLevel

func (c *KafkaCommitter) SetFinalLevel(l CommitterLevel)

SetFinalLevel just updates the last possible read level

func (*KafkaCommitter) SetReadLevel

func (c *KafkaCommitter) SetReadLevel(l CommitterLevel)

SetReadLevel just updates the next Cherami read level that will be flushed

func (*KafkaCommitter) UnlockAndFlush

func (c *KafkaCommitter) UnlockAndFlush(l sync.Locker) error

UnlockAndFlush pushes our commit and read levels to Cherami metadata, using SetAckOffset

type KafkaGroupMetadata

type KafkaGroupMetadata struct {
	// Version is the version of this structure
	Version uint

	// CGUUID is the internal Cherami consumer group UUID that committed this offset
	CGUUID string

	// OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset
	OutputHostUUID string
}

KafkaGroupMetadata is a structure used for JSON encoding/decoding of the metadata stored for Kafka groups joined by Cherami

type KafkaMessageConverter

type KafkaMessageConverter func(kMsg *s.ConsumerMessage) (cMsg *store.ReadMessageContent)

KafkaMessageConverter defines interface to convert a Kafka message to Cherami message

func GetDefaultKafkaMessageConverter

func GetDefaultKafkaMessageConverter(logger bark.Logger) KafkaMessageConverter

GetDefaultKafkaMessageConverter returns the default kafka message converter

type KafkaMessageConverterConfig

type KafkaMessageConverterConfig struct {
	// Destination and ConsumerGroup are not needed currently, but may in the future
	// Destination *cherami.DestinationDescription
	// ConsumerGroup *cherami.ConsumerGroupDescription
	KafkaTopics  []string // optional: already contained in destination-description
	KafkaCluster string   // optional: already contained in destination-description
}

KafkaMessageConverterConfig is used to config customized converter

type KafkaMessageConverterFactory

type KafkaMessageConverterFactory interface {
	GetConverter(cfg *KafkaMessageConverterConfig, log bark.Logger) KafkaMessageConverter
}

KafkaMessageConverterFactory provides converter from Kafka message to Cherami message In the future, it may provide implementations for BStoreOpenReadStreamOutCall

type KafkaOffsetMetadata

type KafkaOffsetMetadata struct {
	// Version is the version of this structure
	Version uint

	// CGUUID is the internal Cherami consumer group UUID that committed this offset
	CGUUID string

	// OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset
	OutputHostUUID string

	// OutputHostStartTime is the time that the output host started
	OutputHostStartTime string

	// CommitterStartTime is the time that this committer was started
	CommitterStartTime string
}

KafkaOffsetMetadata is a structure used for JSON encoding/decoding of the metadata stored for Kafka offsets committed by Cherami

type KafkaTopicPartitionAddresser

type KafkaTopicPartitionAddresser struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

KafkaTopicPartitionAddresser translates topic/partition/offset to/from store address

func (*KafkaTopicPartitionAddresser) GetStoreAddress

func (k *KafkaTopicPartitionAddresser) GetStoreAddress(tp *TopicPartition, offset int64, logFn func() bark.Logger) (address storeHostAddress)

GetStoreAddress converts a given topic, partition, and offset into a store address

func (*KafkaTopicPartitionAddresser) GetTopicPartitionOffset

func (k *KafkaTopicPartitionAddresser) GetTopicPartitionOffset(address storeHostAddress, logFn func() bark.Logger) (tp *TopicPartition, offset int64)

GetTopicPartitionOffset recovers the original topic, partition, and offset from a store address

type Notifier

type Notifier interface {
	// Register is the rotuine to register this connection on the notifier
	Register(id int, notifyCh chan NotifyMsg)

	// Unregister is the routine to unregister this connection from the notifier
	Unregister(id int)

	// Notify is the routine to send a notification on the appropriate channel
	Notify(id int, notifyMsg NotifyMsg)
}

Notifier is the interface to notify connections about nacks and potentially slowing them down, if needed. Any one who wants to get notified should Register with the notifier by specifying a unique ID and a channel to receive the notification.

type NotifyMsg

type NotifyMsg struct {
	// contains filtered or unexported fields
}

NotifyMsg is the message sent on the notify channel

type NotifyType

type NotifyType int

NotifyType is the notify channel type

type OutOptions

type OutOptions struct {
	//CacheIdleTimeout
	CacheIdleTimeout time.Duration
	//KStreamFactory
	KStreamFactory KafkaMessageConverterFactory
}

OutOptions is the options used during instantiating a new host

type OutputCgConfig

type OutputCgConfig struct {
	// MessageCacheSize is used to configure the per CG cache size.
	// This is a string slice, where each entry is a tuple with the
	// destination/CG_name=value.
	// For example, we can ideally have two CGs for the same destination
	// with different size config as follows:
	// "/test/destination//test/cg_1=50,/test/destination//test/cg_2=100"
	MessageCacheSize []string `name:"messagecachesize" default:"/=10000"`
}

OutputCgConfig is the per cg config used by the cassandra config manager

type OutputHost

type OutputHost struct {
	common.SCommon
	// contains filtered or unexported fields
}

OutputHost is the main server class for OutputHosts

func NewOutputHost

func NewOutputHost(
	serviceName string,
	sVice common.SCommon,
	metadataClient metadata.TChanMetadataService,
	frontendClient ccherami.TChanBFrontend,
	opts *OutOptions,
	kafkaCfg configure.CommonKafkaConfig,
) (*OutputHost, []thrift.TChanServer)

NewOutputHost is the constructor for BOut

func (*OutputHost) AckMessages

func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error

AckMessages is the implementation of the thrift handler for the BOut service

func (*OutputHost) ConsumerGroupsUpdated

func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)

ConsumerGroupsUpdated is the API exposed to ExtentController to communicate any changes to current extent view

func (*OutputHost) ListLoadedConsumerGroups

func (h *OutputHost) ListLoadedConsumerGroups(ctx thrift.Context) (result *admin.ListConsumerGroupsResult_, err error)

ListLoadedConsumerGroups is the API used to unload consumer groups to clear the cache

func (*OutputHost) LoadUconfig

func (h *OutputHost) LoadUconfig()

LoadUconfig load the dynamic config values for key

func (*OutputHost) OpenConsumerStream

func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error

OpenConsumerStream is the implementation of the thrift handler for the Out service TODO: find remote "host" from the context as a tag (pass to newConsConnection)

func (*OutputHost) OpenConsumerStreamHandler

func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)

OpenConsumerStreamHandler is websocket handler for opening consumer stream

func (*OutputHost) OpenStreamingConsumerStream

func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error

OpenStreamingConsumerStream is unimplemented

func (*OutputHost) ReadCgState

ReadCgState is the API used to get the cg state

func (*OutputHost) ReceiveMessageBatch

ReceiveMessageBatch is a thrift handler. It consumes messages from this output host within thrift context deadline. This is long-poll able.

func (*OutputHost) RegisterWSHandler

func (h *OutputHost) RegisterWSHandler() *http.ServeMux

RegisterWSHandler is the implementation of WSService interface

func (*OutputHost) Report

func (h *OutputHost) Report(reporter common.LoadReporter)

Report is the implementation for reporting host specific load to controller

func (*OutputHost) SetConsumedMessages

func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error

SetConsumedMessages is unimplemented

func (*OutputHost) SetFrontendClient

func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)

SetFrontendClient is used to set the frontend client after we start the output

func (*OutputHost) Shutdown

func (h *OutputHost) Shutdown()

Shutdown shutsdown all the OutputHost cleanly

func (*OutputHost) Start

func (h *OutputHost) Start(thriftService []thrift.TChanServer)

Start starts the outputhost service

func (*OutputHost) Stop

func (h *OutputHost) Stop()

Stop stops the service

func (*OutputHost) UnloadConsumerGroups

func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)

UnloadConsumerGroups is the API used to unload consumer groups to clear the cache

func (*OutputHost) UtilGetPickedStore

func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)

UtilGetPickedStore is used by the integration test to figure out which store host we are currently connected to XXX: This should not be used anywhere else other than the test.

type Throttler

type Throttler interface {
	// SetBackoffCoefficient is the routine to set the backoff coefficient for the throttler
	SetBackoffCoefficient(backoffCoefficient float64)

	// SetMaxThrottleDuration is used to set the max duration to return for the throttler
	SetMaxThrottleDuration(max time.Duration)

	// SetMinThrottleDuration is used to set the min duration to return for the throttler
	SetMinThrottleDuration(min time.Duration)

	// GetCurrentSleepDuration returns the current duration for this throttler
	GetCurrentSleepDuration() time.Duration

	// GetNextSleepDuration returns the sleep duration based on the backoff coefficient
	GetNextSleepDuration() time.Duration

	// ResetSleepDuration resets the current sleep duration for this throttler
	ResetSleepDuration() time.Duration

	// IsCurrentSleepDurationAtMaximum tells whether the current sleep duration is maxed out
	IsCurrentSleepDurationAtMaximum() bool
}

Throttler is the interface is to get the duration to sleep based on a simple backoff formulae sleepDuration = time.Duration(currentSleepDuration * backoffCoefficient) Note: this is lock free and is *not* thread safe. This is intentional because the user is expected to be a single goroutine. If we need this to be thread safe we need to add some synchronization.

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int32
}

TopicPartition represents a Kafka topic/partition pair

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL