Documentation ¶
Index ¶
- Constants
- Variables
- func NewSaramaLoggerFromBark(l bark.Logger, module string) sarama.StdLogger
- func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMessageConverter, ...) stream.BStoreOpenReadStreamOutCall
- type AckID
- type Committer
- type CommitterLevel
- type KafkaCommitter
- func (c *KafkaCommitter) GetCommitLevel() (l CommitterLevel)
- func (c *KafkaCommitter) GetReadLevel() (l CommitterLevel)
- func (c *KafkaCommitter) SetCommitLevel(l CommitterLevel)
- func (c *KafkaCommitter) SetFinalLevel(l CommitterLevel)
- func (c *KafkaCommitter) SetReadLevel(l CommitterLevel)
- func (c *KafkaCommitter) UnlockAndFlush(l sync.Locker) error
- type KafkaGroupMetadata
- type KafkaMessageConverter
- type KafkaMessageConverterConfig
- type KafkaMessageConverterFactory
- type KafkaOffsetMetadata
- type KafkaTopicPartitionAddresser
- type Notifier
- type NotifyMsg
- type NotifyType
- type OutOptions
- type OutputCgConfig
- type OutputHost
- func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error
- func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)
- func (h *OutputHost) ListLoadedConsumerGroups(ctx thrift.Context) (result *admin.ListConsumerGroupsResult_, err error)
- func (h *OutputHost) LoadUconfig()
- func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error
- func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)
- func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error
- func (h *OutputHost) ReadCgState(ctx thrift.Context, req *admin.ReadConsumerGroupStateRequest) (result *admin.ReadConsumerGroupStateResult_, err error)
- func (h *OutputHost) ReceiveMessageBatch(ctx thrift.Context, request *cherami.ReceiveMessageBatchRequest) (*cherami.ReceiveMessageBatchResult_, error)
- func (h *OutputHost) RegisterWSHandler() *http.ServeMux
- func (h *OutputHost) Report(reporter common.LoadReporter)
- func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error
- func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)
- func (h *OutputHost) Shutdown()
- func (h *OutputHost) Start(thriftService []thrift.TChanServer)
- func (h *OutputHost) Stop()
- func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)
- func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)
- type Throttler
- type TopicPartition
Constants ¶
const ( // ThrottleDown is the message to stop throttling the connections ThrottleDown = iota // ThrottleUp is the messages to start throttling connections ThrottleUp // NumNotifyTypes is the overall notification types NumNotifyTypes )
const SmartRetryDisableString = `smartRetryDisable`
SmartRetryDisableString can be added to a destination or CG owner email to request smart retry to be disabled Note that Google allows something like this: gbailey+smartRetryDisable@uber.com The above is still a valid email and will be delivered to gbailey@uber.com
Variables ¶
var ErrCgAlreadyUnloaded = &cherami.InternalServiceError{Message: "Consumer Group already unloaded"}
ErrCgAlreadyUnloaded is returned when the cg is already unloaded
var ErrCgUnloaded = &cherami.InternalServiceError{Message: "ConsumerGroup already unloaded"}
ErrCgUnloaded is returned when the cgCache is already unloaded
var ErrConfigCast = &cherami.InternalServiceError{Message: "Unable to cast to OutputCgConfig"}
ErrConfigCast is returned when we are unable to cast to the CgConfig type
var ErrHostShutdown = &cherami.InternalServiceError{Message: "OutputHost already shutdown"}
ErrHostShutdown is returned when the host is already shutdown
var ErrLimit = &cherami.InternalServiceError{Message: "Outputhost cons connection limit exceeded"}
ErrLimit is returned when the overall limit for the CG is violated
Functions ¶
func NewSaramaLoggerFromBark ¶
NewSaramaLoggerFromBark provides a logger suitable for Sarama from a given bark logger
func OpenKafkaStream ¶
func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMessageConverter, logger bark.Logger) stream.BStoreOpenReadStreamOutCall
OpenKafkaStream opens a store call simulated from the given sarama message stream
Types ¶
type AckID ¶
type AckID string
AckID is an acknowledgement ID; not the same as common.AckID, which decomposes this string Capitalized because otherwise the type name conflicts horribly with local variables
type Committer ¶
type Committer interface { // SetCommitLevel indicates that work up to and including the message specified by the sequence number and address // has been acknowledged. Not guaranteed to be persisted until a successful call to Flush() SetCommitLevel(l CommitterLevel) // SetReadLevel indicates that a particular message has been read. Metadata not guaranteed to be communicated/persisted // until a successful call to Flush() SetReadLevel(l CommitterLevel) // SetFinalLevel indicates that a particular level is the last that can possibly be read. Metadata not guaranteed // to be communicated/persisted until a successful call to Flush() SetFinalLevel(l CommitterLevel) // UnlockAndFlush copies accumulated commit/read state, unlocks the provided lock, and then commits // the levels to durable storage, e.g. Kafka offset storage or Cherami-Cassandra AckLevel storage UnlockAndFlush(l sync.Locker) error // GetCommitLevel receives the last value given to Commit() GetCommitLevel() (l CommitterLevel) // GetReadLevel receives the last value given to Read() GetReadLevel() (l CommitterLevel) }
Committer is an interface that wraps the internals of how offsets/acklevels are committed for a given queueing system (e.g. Kafka or Cherami)
type CommitterLevel ¶
type CommitterLevel struct {
// contains filtered or unexported fields
}
CommitterLevel binds a logical and store address together for committing
type KafkaCommitter ¶
type KafkaCommitter struct { *sc.OffsetStash KafkaOffsetMetadata // contains filtered or unexported fields }
KafkaCommitter is commits ackLevels to Cassandra through the TChanMetadataClient interface
func NewKafkaCommitter ¶
func NewKafkaCommitter( outputHostUUID string, cgUUID string, logger bark.Logger, client **sc.Consumer) *KafkaCommitter
NewKafkaCommitter instantiates a kafkaCommitter
func (*KafkaCommitter) GetCommitLevel ¶
func (c *KafkaCommitter) GetCommitLevel() (l CommitterLevel)
GetCommitLevel returns the next commit level that will be flushed
func (*KafkaCommitter) GetReadLevel ¶
func (c *KafkaCommitter) GetReadLevel() (l CommitterLevel)
GetReadLevel returns the next readlevel that will be flushed
func (*KafkaCommitter) SetCommitLevel ¶
func (c *KafkaCommitter) SetCommitLevel(l CommitterLevel)
SetCommitLevel just updates the next Cherami ack level that will be flushed
func (*KafkaCommitter) SetFinalLevel ¶
func (c *KafkaCommitter) SetFinalLevel(l CommitterLevel)
SetFinalLevel just updates the last possible read level
func (*KafkaCommitter) SetReadLevel ¶
func (c *KafkaCommitter) SetReadLevel(l CommitterLevel)
SetReadLevel just updates the next Cherami read level that will be flushed
func (*KafkaCommitter) UnlockAndFlush ¶
func (c *KafkaCommitter) UnlockAndFlush(l sync.Locker) error
UnlockAndFlush pushes our commit and read levels to Cherami metadata, using SetAckOffset
type KafkaGroupMetadata ¶
type KafkaGroupMetadata struct { // Version is the version of this structure Version uint // CGUUID is the internal Cherami consumer group UUID that committed this offset CGUUID string // OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset OutputHostUUID string }
KafkaGroupMetadata is a structure used for JSON encoding/decoding of the metadata stored for Kafka groups joined by Cherami
type KafkaMessageConverter ¶
type KafkaMessageConverter func(kMsg *s.ConsumerMessage) (cMsg *store.ReadMessageContent)
KafkaMessageConverter defines interface to convert a Kafka message to Cherami message
func GetDefaultKafkaMessageConverter ¶
func GetDefaultKafkaMessageConverter(logger bark.Logger) KafkaMessageConverter
GetDefaultKafkaMessageConverter returns the default kafka message converter
type KafkaMessageConverterConfig ¶
type KafkaMessageConverterConfig struct { // Destination and ConsumerGroup are not needed currently, but may in the future // Destination *cherami.DestinationDescription // ConsumerGroup *cherami.ConsumerGroupDescription KafkaTopics []string // optional: already contained in destination-description KafkaCluster string // optional: already contained in destination-description }
KafkaMessageConverterConfig is used to config customized converter
type KafkaMessageConverterFactory ¶
type KafkaMessageConverterFactory interface {
GetConverter(cfg *KafkaMessageConverterConfig, log bark.Logger) KafkaMessageConverter
}
KafkaMessageConverterFactory provides converter from Kafka message to Cherami message In the future, it may provide implementations for BStoreOpenReadStreamOutCall
type KafkaOffsetMetadata ¶
type KafkaOffsetMetadata struct { // Version is the version of this structure Version uint // CGUUID is the internal Cherami consumer group UUID that committed this offset CGUUID string // OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset OutputHostUUID string // OutputHostStartTime is the time that the output host started OutputHostStartTime string // CommitterStartTime is the time that this committer was started CommitterStartTime string }
KafkaOffsetMetadata is a structure used for JSON encoding/decoding of the metadata stored for Kafka offsets committed by Cherami
type KafkaTopicPartitionAddresser ¶
KafkaTopicPartitionAddresser translates topic/partition/offset to/from store address
func (*KafkaTopicPartitionAddresser) GetStoreAddress ¶
func (k *KafkaTopicPartitionAddresser) GetStoreAddress(tp *TopicPartition, offset int64, logFn func() bark.Logger) (address storeHostAddress)
GetStoreAddress converts a given topic, partition, and offset into a store address
func (*KafkaTopicPartitionAddresser) GetTopicPartitionOffset ¶
func (k *KafkaTopicPartitionAddresser) GetTopicPartitionOffset(address storeHostAddress, logFn func() bark.Logger) (tp *TopicPartition, offset int64)
GetTopicPartitionOffset recovers the original topic, partition, and offset from a store address
type Notifier ¶
type Notifier interface { // Register is the rotuine to register this connection on the notifier Register(id int, notifyCh chan NotifyMsg) // Unregister is the routine to unregister this connection from the notifier Unregister(id int) // Notify is the routine to send a notification on the appropriate channel Notify(id int, notifyMsg NotifyMsg) }
Notifier is the interface to notify connections about nacks and potentially slowing them down, if needed. Any one who wants to get notified should Register with the notifier by specifying a unique ID and a channel to receive the notification.
type NotifyMsg ¶
type NotifyMsg struct {
// contains filtered or unexported fields
}
NotifyMsg is the message sent on the notify channel
type OutOptions ¶
type OutOptions struct { //CacheIdleTimeout CacheIdleTimeout time.Duration //KStreamFactory KStreamFactory KafkaMessageConverterFactory }
OutOptions is the options used during instantiating a new host
type OutputCgConfig ¶
type OutputCgConfig struct { // MessageCacheSize is used to configure the per CG cache size. // This is a string slice, where each entry is a tuple with the // destination/CG_name=value. // For example, we can ideally have two CGs for the same destination // with different size config as follows: // "/test/destination//test/cg_1=50,/test/destination//test/cg_2=100" MessageCacheSize []string `name:"messagecachesize" default:"/=10000"` }
OutputCgConfig is the per cg config used by the cassandra config manager
type OutputHost ¶
OutputHost is the main server class for OutputHosts
func NewOutputHost ¶
func NewOutputHost( serviceName string, sVice common.SCommon, metadataClient metadata.TChanMetadataService, frontendClient ccherami.TChanBFrontend, opts *OutOptions, kafkaCfg configure.CommonKafkaConfig, ) (*OutputHost, []thrift.TChanServer)
NewOutputHost is the constructor for BOut
func (*OutputHost) AckMessages ¶
func (h *OutputHost) AckMessages(ctx thrift.Context, ackRequest *cherami.AckMessagesRequest) error
AckMessages is the implementation of the thrift handler for the BOut service
func (*OutputHost) ConsumerGroupsUpdated ¶
func (h *OutputHost) ConsumerGroupsUpdated(ctx thrift.Context, request *admin.ConsumerGroupsUpdatedRequest) (err error)
ConsumerGroupsUpdated is the API exposed to ExtentController to communicate any changes to current extent view
func (*OutputHost) ListLoadedConsumerGroups ¶
func (h *OutputHost) ListLoadedConsumerGroups(ctx thrift.Context) (result *admin.ListConsumerGroupsResult_, err error)
ListLoadedConsumerGroups is the API used to unload consumer groups to clear the cache
func (*OutputHost) LoadUconfig ¶
func (h *OutputHost) LoadUconfig()
LoadUconfig load the dynamic config values for key
func (*OutputHost) OpenConsumerStream ¶
func (h *OutputHost) OpenConsumerStream(ctx thrift.Context, call stream.BOutOpenConsumerStreamInCall) error
OpenConsumerStream is the implementation of the thrift handler for the Out service TODO: find remote "host" from the context as a tag (pass to newConsConnection)
func (*OutputHost) OpenConsumerStreamHandler ¶
func (h *OutputHost) OpenConsumerStreamHandler(w http.ResponseWriter, r *http.Request)
OpenConsumerStreamHandler is websocket handler for opening consumer stream
func (*OutputHost) OpenStreamingConsumerStream ¶
func (h *OutputHost) OpenStreamingConsumerStream(ctx thrift.Context, call stream.BOutOpenStreamingConsumerStreamInCall) error
OpenStreamingConsumerStream is unimplemented
func (*OutputHost) ReadCgState ¶
func (h *OutputHost) ReadCgState(ctx thrift.Context, req *admin.ReadConsumerGroupStateRequest) (result *admin.ReadConsumerGroupStateResult_, err error)
ReadCgState is the API used to get the cg state
func (*OutputHost) ReceiveMessageBatch ¶
func (h *OutputHost) ReceiveMessageBatch(ctx thrift.Context, request *cherami.ReceiveMessageBatchRequest) (*cherami.ReceiveMessageBatchResult_, error)
ReceiveMessageBatch is a thrift handler. It consumes messages from this output host within thrift context deadline. This is long-poll able.
func (*OutputHost) RegisterWSHandler ¶
func (h *OutputHost) RegisterWSHandler() *http.ServeMux
RegisterWSHandler is the implementation of WSService interface
func (*OutputHost) Report ¶
func (h *OutputHost) Report(reporter common.LoadReporter)
Report is the implementation for reporting host specific load to controller
func (*OutputHost) SetConsumedMessages ¶
func (h *OutputHost) SetConsumedMessages(ctx thrift.Context, request *cherami.SetConsumedMessagesRequest) error
SetConsumedMessages is unimplemented
func (*OutputHost) SetFrontendClient ¶
func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend)
SetFrontendClient is used to set the frontend client after we start the output
func (*OutputHost) Shutdown ¶
func (h *OutputHost) Shutdown()
Shutdown shutsdown all the OutputHost cleanly
func (*OutputHost) Start ¶
func (h *OutputHost) Start(thriftService []thrift.TChanServer)
Start starts the outputhost service
func (*OutputHost) UnloadConsumerGroups ¶
func (h *OutputHost) UnloadConsumerGroups(ctx thrift.Context, request *admin.UnloadConsumerGroupsRequest) (err error)
UnloadConsumerGroups is the API used to unload consumer groups to clear the cache
func (*OutputHost) UtilGetPickedStore ¶
func (h *OutputHost) UtilGetPickedStore(cgName string, path string) (connStore string)
UtilGetPickedStore is used by the integration test to figure out which store host we are currently connected to XXX: This should not be used anywhere else other than the test.
type Throttler ¶
type Throttler interface { // SetBackoffCoefficient is the routine to set the backoff coefficient for the throttler SetBackoffCoefficient(backoffCoefficient float64) // SetMaxThrottleDuration is used to set the max duration to return for the throttler SetMaxThrottleDuration(max time.Duration) // SetMinThrottleDuration is used to set the min duration to return for the throttler SetMinThrottleDuration(min time.Duration) // GetCurrentSleepDuration returns the current duration for this throttler GetCurrentSleepDuration() time.Duration // GetNextSleepDuration returns the sleep duration based on the backoff coefficient GetNextSleepDuration() time.Duration // ResetSleepDuration resets the current sleep duration for this throttler ResetSleepDuration() time.Duration // IsCurrentSleepDurationAtMaximum tells whether the current sleep duration is maxed out IsCurrentSleepDurationAtMaximum() bool }
Throttler is the interface is to get the duration to sleep based on a simple backoff formulae sleepDuration = time.Duration(currentSleepDuration * backoffCoefficient) Note: this is lock free and is *not* thread safe. This is intentional because the user is expected to be a single goroutine. If we need this to be thread safe we need to add some synchronization.
type TopicPartition ¶
TopicPartition represents a Kafka topic/partition pair