Documentation ¶
Index ¶
- Constants
- Variables
- type AckID
- type Notifier
- type NotifyMsg
- type NotifyType
- type OutOptions
- type OutputCgConfig
- type OutputHost
- func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error
- func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)
- func (h *OutputHost) LoadUconfig()
- func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error
- func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)
- func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error
- func (h *OutputHost) ReceiveMessageBatch(ctx thrift.Context, request *cherami.ReceiveMessageBatchRequest) (*cherami.ReceiveMessageBatchResult_, error)
- func (h *OutputHost) RegisterWSHandler() *http.ServeMux
- func (h *OutputHost) Report(reporter common.LoadReporter)
- func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error
- func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)
- func (h *OutputHost) Shutdown()
- func (h *OutputHost) Start(thriftService []thrift.TChanServer)
- func (h *OutputHost) Stop()
- func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)
- func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)
- type Throttler
Constants ¶
const ( // ThrottleDown is the message to stop throttling the connections ThrottleDown = iota // ThrottleUp is the messages to start throttling connections ThrottleUp // NumNotifyTypes is the overall notification types NumNotifyTypes )
const SmartRetryDisableString = `smartRetryDisable`
SmartRetryDisableString can be added to a destination or CG owner email to request smart retry to be disabled Note that Google allows something like this: gbailey+smartRetryDisable@uber.com The above is still a valid email and will be delivered to gbailey@uber.com
Variables ¶
var ErrCgAlreadyUnloaded = &cherami.InternalServiceError{Message: "Consumer Group already unloaded"}
ErrCgAlreadyUnloaded is returned when the cg is already unloaded
var ErrCgUnloaded = &cherami.InternalServiceError{Message: "ConsumerGroup already unloaded"}
ErrCgUnloaded is returned when the cgCache is already unloaded
var ErrConfigCast = &cherami.InternalServiceError{Message: "Unable to cast to OutputCgConfig"}
ErrConfigCast is returned when we are unable to cast to the CgConfig type
var ErrHostShutdown = &cherami.InternalServiceError{Message: "OutputHost already shutdown"}
ErrHostShutdown is returned when the host is already shutdown
var ErrLimit = &cherami.InternalServiceError{Message: "Outputhost cons connection limit exceeded"}
ErrLimit is returned when the overall limit for the CG is violated
Functions ¶
This section is empty.
Types ¶
type AckID ¶
type AckID string
AckID is an acknowledgement ID; not the same as common.AckID, which decomposes this string Capitalized because otherwise the type name conflicts horribly with local variables
type Notifier ¶
type Notifier interface { // Register is the rotuine to register this connection on the notifier Register(id int, notifyCh chan NotifyMsg) // Unregister is the routine to unregister this connection from the notifier Unregister(id int) // Notify is the routine to send a notification on the appropriate channel Notify(id int, notifyMsg NotifyMsg) }
Notifier is the interface to notify connections about nacks and potentially slowing them down, if needed. Any one who wants to get notified should Register with the notifier by specifying a unique ID and a channel to receive the notification.
type NotifyMsg ¶
type NotifyMsg struct {
// contains filtered or unexported fields
}
NotifyMsg is the message sent on the notify channel
type OutOptions ¶
OutOptions is the options used during instantiating a new host
type OutputCgConfig ¶ added in v0.2.0
type OutputCgConfig struct { // MessageCacheSize is used to configure the per CG cache size. // This is a string slice, where each entry is a tuple with the // destination/CG_name=value. // For example, we can ideally have two CGs for the same destination // with different size config as follows: // "/test/destination//test/cg_1=50,/test/destination//test/cg_2=100" MessageCacheSize []string `name:"messagecachesize" default:"/=10000"` }
OutputCgConfig is the per cg config used by the cassandra config manager
type OutputHost ¶
OutputHost is the main server class for OutputHosts
func NewOutputHost ¶
func NewOutputHost(serviceName string, sVice common.SCommon, metadataClient metadata.TChanMetadataService, frontendClient ccherami.TChanBFrontend, opts *OutOptions) (*OutputHost, []thrift.TChanServer)
NewOutputHost is the constructor for BOut
func (*OutputHost) AckMessages ¶
func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error
AckMessages is the implementation of the thrift handler for the BOut service
func (*OutputHost) ConsumerGroupsUpdated ¶
func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)
ConsumerGroupsUpdated is the API exposed to ExtentController to communicate any changes to current extent view
func (*OutputHost) LoadUconfig ¶
func (h *OutputHost) LoadUconfig()
LoadUconfig load the dynamic config values for key
func (*OutputHost) OpenConsumerStream ¶
func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error
OpenConsumerStream is the implementation of the thrift handler for the Out service TODO: find remote "host" from the context as a tag (pass to newConsConnection)
func (*OutputHost) OpenConsumerStreamHandler ¶
func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)
OpenConsumerStreamHandler is websocket handler for opening consumer stream
func (*OutputHost) OpenStreamingConsumerStream ¶
func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error
OpenStreamingConsumerStream is unimplemented
func (*OutputHost) ReceiveMessageBatch ¶
func (h *OutputHost) ReceiveMessageBatch(ctx thrift.Context, request *cherami.ReceiveMessageBatchRequest) (*cherami.ReceiveMessageBatchResult_, error)
ReceiveMessageBatch is a thrift handler. It consumes messages from this output host within thrift context deadline. This is long-poll able.
func (*OutputHost) RegisterWSHandler ¶
func (h *OutputHost) RegisterWSHandler() *http.ServeMux
RegisterWSHandler is the implementation of WSService interface
func (*OutputHost) Report ¶
func (h *OutputHost) Report(reporter common.LoadReporter)
Report is the implementation for reporting host specific load to controller
func (*OutputHost) SetConsumedMessages ¶
func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error
SetConsumedMessages is unimplemented
func (*OutputHost) SetFrontendClient ¶
func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)
SetFrontendClient is used to set the frontend client after we start the output
func (*OutputHost) Shutdown ¶
func (h *OutputHost) Shutdown()
Shutdown shutsdown all the OutputHost cleanly
func (*OutputHost) Start ¶
func (h *OutputHost) Start(thriftService []thrift.TChanServer)
Start starts the outputhost service
func (*OutputHost) UnloadConsumerGroups ¶
func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)
UnloadConsumerGroups is the API used to unload consumer groups to clear the cache
func (*OutputHost) UtilGetPickedStore ¶
func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)
UtilGetPickedStore is used by the integration test to figure out which store host we are currently connected to XXX: This should not be used anywhere else other than the test.
type Throttler ¶
type Throttler interface { // SetBackoffCoefficient is the routine to set the backoff coefficient for the throttler SetBackoffCoefficient(backoffCoefficient float64) // SetMaxThrottleDuration is used to set the max duration to return for the throttler SetMaxThrottleDuration(max time.Duration) // SetMinThrottleDuration is used to set the min duration to return for the throttler SetMinThrottleDuration(min time.Duration) // GetCurrentSleepDuration returns the current duration for this throttler GetCurrentSleepDuration() time.Duration // GetNextSleepDuration returns the sleep duration based on the backoff coefficient GetNextSleepDuration() time.Duration // ResetSleepDuration resets the current sleep duration for this throttler ResetSleepDuration() time.Duration // IsCurrentSleepDurationAtMaximum tells whether the current sleep duration is maxed out IsCurrentSleepDurationAtMaximum() bool }
Throttler is the interface is to get the duration to sleep based on a simple backoff formulae sleepDuration = time.Duration(currentSleepDuration * backoffCoefficient) Note: this is lock free and is *not* thread safe. This is intentional because the user is expected to be a single goroutine. If we need this to be thread safe we need to add some synchronization.