outputhost

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2017 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ThrottleDown is the message to stop throttling the connections
	ThrottleDown = iota
	// ThrottleUp is the messages to start throttling connections
	ThrottleUp
	// NumNotifyTypes is the overall notification types
	NumNotifyTypes
)
View Source
const SmartRetryDisableString = `smartRetryDisable`

SmartRetryDisableString can be added to a destination or CG owner email to request smart retry to be disabled Note that Google allows something like this: gbailey+smartRetryDisable@uber.com The above is still a valid email and will be delivered to gbailey@uber.com

Variables

View Source
var ErrCgAlreadyUnloaded = &cherami.InternalServiceError{Message: "Consumer Group already unloaded"}

ErrCgAlreadyUnloaded is returned when the cg is already unloaded

View Source
var ErrCgUnloaded = &cherami.InternalServiceError{Message: "ConsumerGroup already unloaded"}

ErrCgUnloaded is returned when the cgCache is already unloaded

View Source
var ErrConfigCast = &cherami.InternalServiceError{Message: "Unable to cast to OutputCgConfig"}

ErrConfigCast is returned when we are unable to cast to the CgConfig type

View Source
var ErrHostShutdown = &cherami.InternalServiceError{Message: "OutputHost already shutdown"}

ErrHostShutdown is returned when the host is already shutdown

View Source
var ErrLimit = &cherami.InternalServiceError{Message: "Outputhost cons connection limit exceeded"}

ErrLimit is returned when the overall limit for the CG is violated

Functions

This section is empty.

Types

type AckID

type AckID string

AckID is an acknowledgement ID; not the same as common.AckID, which decomposes this string Capitalized because otherwise the type name conflicts horribly with local variables

type Notifier

type Notifier interface {
	// Register is the rotuine to register this connection on the notifier
	Register(id int, notifyCh chan NotifyMsg)

	// Unregister is the routine to unregister this connection from the notifier
	Unregister(id int)

	// Notify is the routine to send a notification on the appropriate channel
	Notify(id int, notifyMsg NotifyMsg)
}

Notifier is the interface to notify connections about nacks and potentially slowing them down, if needed. Any one who wants to get notified should Register with the notifier by specifying a unique ID and a channel to receive the notification.

type NotifyMsg

type NotifyMsg struct {
	// contains filtered or unexported fields
}

NotifyMsg is the message sent on the notify channel

type NotifyType

type NotifyType int

NotifyType is the notify channel type

type OutOptions

type OutOptions struct {
	//CacheIdleTimeout
	CacheIdleTimeout time.Duration
}

OutOptions is the options used during instantiating a new host

type OutputCgConfig added in v0.2.0

type OutputCgConfig struct {
	// MessageCacheSize is used to configure the per CG cache size.
	// This is a string slice, where each entry is a tuple with the
	// destination/CG_name=value.
	// For example, we can ideally have two CGs for the same destination
	// with different size config as follows:
	// "/test/destination//test/cg_1=50,/test/destination//test/cg_2=100"
	MessageCacheSize []string `name:"messagecachesize" default:"/=10000"`
}

OutputCgConfig is the per cg config used by the cassandra config manager

type OutputHost

type OutputHost struct {
	common.SCommon
	// contains filtered or unexported fields
}

OutputHost is the main server class for OutputHosts

func NewOutputHost

func NewOutputHost(serviceName string, sVice common.SCommon, metadataClient metadata.TChanMetadataService, frontendClient ccherami.TChanBFrontend, opts *OutOptions) (*OutputHost, []thrift.TChanServer)

NewOutputHost is the constructor for BOut

func (*OutputHost) AckMessages

func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error

AckMessages is the implementation of the thrift handler for the BOut service

func (*OutputHost) ConsumerGroupsUpdated

func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)

ConsumerGroupsUpdated is the API exposed to ExtentController to communicate any changes to current extent view

func (*OutputHost) LoadUconfig

func (h *OutputHost) LoadUconfig()

LoadUconfig load the dynamic config values for key

func (*OutputHost) OpenConsumerStream

func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error

OpenConsumerStream is the implementation of the thrift handler for the Out service TODO: find remote "host" from the context as a tag (pass to newConsConnection)

func (*OutputHost) OpenConsumerStreamHandler

func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)

OpenConsumerStreamHandler is websocket handler for opening consumer stream

func (*OutputHost) OpenStreamingConsumerStream

func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error

OpenStreamingConsumerStream is unimplemented

func (*OutputHost) ReceiveMessageBatch

ReceiveMessageBatch is a thrift handler. It consumes messages from this output host within thrift context deadline. This is long-poll able.

func (*OutputHost) RegisterWSHandler

func (h *OutputHost) RegisterWSHandler() *http.ServeMux

RegisterWSHandler is the implementation of WSService interface

func (*OutputHost) Report

func (h *OutputHost) Report(reporter common.LoadReporter)

Report is the implementation for reporting host specific load to controller

func (*OutputHost) SetConsumedMessages

func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error

SetConsumedMessages is unimplemented

func (*OutputHost) SetFrontendClient

func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)

SetFrontendClient is used to set the frontend client after we start the output

func (*OutputHost) Shutdown

func (h *OutputHost) Shutdown()

Shutdown shutsdown all the OutputHost cleanly

func (*OutputHost) Start

func (h *OutputHost) Start(thriftService []thrift.TChanServer)

Start starts the outputhost service

func (*OutputHost) Stop

func (h *OutputHost) Stop()

Stop stops the service

func (*OutputHost) UnloadConsumerGroups

func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)

UnloadConsumerGroups is the API used to unload consumer groups to clear the cache

func (*OutputHost) UtilGetPickedStore

func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)

UtilGetPickedStore is used by the integration test to figure out which store host we are currently connected to XXX: This should not be used anywhere else other than the test.

type Throttler

type Throttler interface {
	// SetBackoffCoefficient is the routine to set the backoff coefficient for the throttler
	SetBackoffCoefficient(backoffCoefficient float64)

	// SetMaxThrottleDuration is used to set the max duration to return for the throttler
	SetMaxThrottleDuration(max time.Duration)

	// SetMinThrottleDuration is used to set the min duration to return for the throttler
	SetMinThrottleDuration(min time.Duration)

	// GetCurrentSleepDuration returns the current duration for this throttler
	GetCurrentSleepDuration() time.Duration

	// GetNextSleepDuration returns the sleep duration based on the backoff coefficient
	GetNextSleepDuration() time.Duration

	// ResetSleepDuration resets the current sleep duration for this throttler
	ResetSleepDuration() time.Duration

	// IsCurrentSleepDurationAtMaximum tells whether the current sleep duration is maxed out
	IsCurrentSleepDurationAtMaximum() bool
}

Throttler is the interface is to get the duration to sleep based on a simple backoff formulae sleepDuration = time.Duration(currentSleepDuration * backoffCoefficient) Note: this is lock free and is *not* thread safe. This is intentional because the user is expected to be a single goroutine. If we need this to be thread safe we need to add some synchronization.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL