Documentation ¶
Index ¶
- Variables
- type AuthProvider
- type BypassAuthProvider
- type Client
- func NewClient(serviceName string, host string, port int, options *ClientOptions) (Client, error)
- func NewClientWithFEClient(feClient cherami.TChanBFrontend, options *ClientOptions) (Client, error)
- func NewHyperbahnClient(serviceName string, bootstrapFile string, options *ClientOptions) (Client, error)
- type ClientOptions
- type Consumer
- type CreateConsumerRequest
- type CreatePublisherRequest
- type CreateTaskExecutorRequest
- type CreateTaskSchedulerRequest
- type Delivery
- type OpenConsumerOutWebsocketStream
- func (s *OpenConsumerOutWebsocketStream) Done() error
- func (s *OpenConsumerOutWebsocketStream) Flush() error
- func (s *OpenConsumerOutWebsocketStream) Read() (*cherami.OutputHostCommand, error)
- func (s *OpenConsumerOutWebsocketStream) ResponseHeaders() (map[string]string, error)
- func (s *OpenConsumerOutWebsocketStream) Write(arg *cherami.ControlFlow) error
- type OpenPublisherOutWebsocketStream
- func (s *OpenPublisherOutWebsocketStream) Done() error
- func (s *OpenPublisherOutWebsocketStream) Flush() error
- func (s *OpenPublisherOutWebsocketStream) Read() (*cherami.InputHostCommand, error)
- func (s *OpenPublisherOutWebsocketStream) ResponseHeaders() (map[string]string, error)
- func (s *OpenPublisherOutWebsocketStream) Write(arg *cherami.PutMessage) error
- type Publisher
- type PublisherMessage
- type PublisherReceipt
- type PublisherType
- type ScheduleTaskRequest
- type Task
- type TaskExecutor
- type TaskFunc
- type TaskScheduler
- type WSConnector
Constants ¶
This section is empty.
Variables ¶
var ErrMessageTimedout = errors.New("Timed out.")
ErrMessageTimedout is returned by Publish when no ack is received within timeout interval
Functions ¶
This section is empty.
Types ¶
type AuthProvider ¶ added in v1.20.0
type AuthProvider interface { // CreateSecurityContext creates security context CreateSecurityContext(ctx thrift.Context) (thrift.Context, error) }
AuthProvider provides authentication information in client side
type BypassAuthProvider ¶ added in v1.20.0
type BypassAuthProvider struct{}
BypassAuthProvider is a dummy implementation for AuthProvider
func NewBypassAuthProvider ¶ added in v1.20.0
func NewBypassAuthProvider() *BypassAuthProvider
NewBypassAuthProvider creates a dummy AuthProvider instance
func (*BypassAuthProvider) CreateSecurityContext ¶ added in v1.20.0
CreateSecurityContext creates security context
type Client ¶
type Client interface { Close() CreateConsumerGroup(request *cherami.CreateConsumerGroupRequest) (*cherami.ConsumerGroupDescription, error) CreateDestination(request *cherami.CreateDestinationRequest) (*cherami.DestinationDescription, error) CreateConsumer(request *CreateConsumerRequest) Consumer CreatePublisher(request *CreatePublisherRequest) Publisher DeleteConsumerGroup(request *cherami.DeleteConsumerGroupRequest) error DeleteDestination(request *cherami.DeleteDestinationRequest) error ListConsumerGroups(request *cherami.ListConsumerGroupRequest) (*cherami.ListConsumerGroupResult_, error) ListDestinations(request *cherami.ListDestinationsRequest) (*cherami.ListDestinationsResult_, error) ReadConsumerGroup(request *cherami.ReadConsumerGroupRequest) (*cherami.ConsumerGroupDescription, error) ReadDestination(request *cherami.ReadDestinationRequest) (*cherami.DestinationDescription, error) UpdateConsumerGroup(request *cherami.UpdateConsumerGroupRequest) (*cherami.ConsumerGroupDescription, error) UpdateDestination(request *cherami.UpdateDestinationRequest) (*cherami.DestinationDescription, error) GetQueueDepthInfo(request *cherami.GetQueueDepthInfoRequest) (*cherami.GetQueueDepthInfoResult_, error) MergeDLQForConsumerGroup(request *cherami.MergeDLQForConsumerGroupRequest) error PurgeDLQForConsumerGroup(request *cherami.PurgeDLQForConsumerGroupRequest) error ReadPublisherOptions(path string) (*cherami.ReadPublisherOptionsResult_, error) ReadConsumerGroupHosts(path string, consumerGroupName string) (*cherami.ReadConsumerGroupHostsResult_, error) }
Client exposes API for destination and consumer group CRUD and capability to publish and consume messages
func NewClient ¶
NewClient returns the singleton Cherami client used for communicating with the service at given port
func NewClientWithFEClient ¶ added in v1.20.0
func NewClientWithFEClient(feClient cherami.TChanBFrontend, options *ClientOptions) (Client, error)
NewClientWithFEClient is used by Frontend to create a Cherami client for itself. It is used by non-streaming publish/consume APIs. ** Internal Cherami Use Only **
func NewHyperbahnClient ¶
func NewHyperbahnClient(serviceName string, bootstrapFile string, options *ClientOptions) (Client, error)
NewHyperbahnClient returns the singleton Cherami client used for communicating with the service via Hyperbahn or Muttley. Streaming methods (for LOG/Consistent destinations) will not work.
type ClientOptions ¶
type ClientOptions struct { Timeout time.Duration // DeploymentStr specifies which deployment(staging,prod,dev,etc) the client should connect to // If the string is empty, client will connect to prod // If the string is 'prod', client will connect to prod // If the string is 'staging' or 'staging2', client will connect to staging or staging2 // If the string is 'dev', client will connect to dev server DeploymentStr string // MetricsReporter is the reporter object MetricsReporter metrics.Reporter // Logger is the logger object Logger bark.Logger // Interval for polling input/output host updates. Normally client doesn't need to explicitly set this option // because the default setting should work fine. This is only useful in testing or other edge scenarios ReconfigurationPollingInterval time.Duration // AuthProvider provides the authentication information in client side AuthProvider AuthProvider }
ClientOptions used by Cherami client
type Consumer ¶
type Consumer interface { // Open will connect to Cherami nodes and start delivering messages to // a provided Delivery channel for registered consumer group. // // It is ADVISED that deliveryCh's buffer size should be bigger than the // total PrefetchCount in CreateConsumerRequest of the consumers writing // to this channel. Open(deliveryCh chan Delivery) (chan Delivery, error) // Closed all the connections to Cherami nodes for this consumer Close() // Pause consuming messages Pause() // Resume consuming messages Resume() // AckDelivery can be used by application to Ack a message so it is not delivered to any other consumer AckDelivery(deliveryToken string) error // NackDelivery can be used by application to Nack a message so it can be delivered to another consumer immediately // without waiting for the timeout to expire NackDelivery(deliveryToken string) error }
Consumer is used by an application to receive messages from Cherami service
type CreateConsumerRequest ¶
type CreateConsumerRequest struct { // Path to destination consumer wants to consume messages from Path string // ConsumerGroupName registered with Cherami for a particular destination ConsumerGroupName string // Name of consumer (worker) connecting to Cherami ConsumerName string // Number of messages to buffer locally. Clients which process messages very fast may want to specify larger value // for PrefetchCount for faster throughput. On the flip side larger values for PrefetchCount will result in // more messages being buffered locally causing high memory foot print PrefetchCount int // Options used for making API calls to Cherami services // This option is now deprecated. If you need to specify any option, you can specify it when you call NewClient() Options *ClientOptions }
CreateConsumerRequest struct is used to call Client.CreateConsumer to create an object used by application to consume messages
type CreatePublisherRequest ¶
type CreatePublisherRequest struct { Path string MaxInflightMessagesPerConnection int // PublisherType represents the mode in which // publishing should be done i.e. either through // websocket streaming or through tchannel batch API // Defaults to websocket streaming. Choose non-streaming // batch API for low throughput publishing. PublisherType PublisherType }
CreatePublisherRequest struct used to call Client.CreatePublisher to create an object used by application to publish messages
type CreateTaskExecutorRequest ¶
type CreateTaskExecutorRequest struct { // Concurrency is the number of concurrent workers to execute tasks Concurrency int // Path to destination which tasks dequeued from Path string // ConsumerGroupName registered with Cherami for a particular destination ConsumerGroupName string // ConsumerName is name of consumer (worker) connecting to Cherami ConsumerName string // PrefetchCount is number of messages to buffer locally PrefetchCount int // Timeout is timeout setting used when ack/nack back to Cherami Timeout time.Duration }
CreateTaskExecutorRequest is used to call Client.CreateTaskExecutor to create a task executor
type CreateTaskSchedulerRequest ¶
type CreateTaskSchedulerRequest struct { // Path to destination which tasks enqueue into Path string // MaxInflightMessagesPerConnection is number of messages pending confirmation per connection MaxInflightMessagesPerConnection int }
CreateTaskSchedulerRequest is used to call Client.CreateTaskScheduler to create a task scheduler
type Delivery ¶
type Delivery interface { // Returns the message returned by Cherami GetMessage() *cherami.ConsumerMessage // Returns a delivery token which can be used to Ack/Nack delivery using the Consumer API // Consumer has 2 options to Ack/Nack a delivery: // 1) Simply call the Ack/Nack API on the delivery after processing the message // 2) If the consumer wants to forward the message to downstream component for processing then they can get the // DeliveryToken by calling this function and pass it along. Later the downstream component can call the // API on the Consumer with this token to Ack/Nack the message. GetDeliveryToken() string // Acks this delivery Ack() error // Nacks this delivery Nack() error // VerifyChecksum verifies checksum of the message if exist // Consumer needs to perform this verification and decide what to do based on returned result VerifyChecksum() bool }
Delivery is the container which has the actual message returned by Cherami
type OpenConsumerOutWebsocketStream ¶
type OpenConsumerOutWebsocketStream struct {
// contains filtered or unexported fields
}
OpenConsumerOutWebsocketStream is a wrapper for websocket to work with OpenPublisherStream
func (*OpenConsumerOutWebsocketStream) Done ¶
func (s *OpenConsumerOutWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenConsumerOutWebsocketStream) Flush ¶
func (s *OpenConsumerOutWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenConsumerOutWebsocketStream) Read ¶
func (s *OpenConsumerOutWebsocketStream) Read() (*cherami.OutputHostCommand, error)
Read returns the next argument, if any is available.
func (*OpenConsumerOutWebsocketStream) ResponseHeaders ¶
func (s *OpenConsumerOutWebsocketStream) ResponseHeaders() (map[string]string, error)
ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface
func (*OpenConsumerOutWebsocketStream) Write ¶
func (s *OpenConsumerOutWebsocketStream) Write(arg *cherami.ControlFlow) error
Write writes a result to the response stream
type OpenPublisherOutWebsocketStream ¶
type OpenPublisherOutWebsocketStream struct {
// contains filtered or unexported fields
}
OpenPublisherOutWebsocketStream is a wrapper for websocket to work with OpenPublisherStream
func (*OpenPublisherOutWebsocketStream) Done ¶
func (s *OpenPublisherOutWebsocketStream) Done() error
Done closes the request stream and should be called after all arguments have been written.
func (*OpenPublisherOutWebsocketStream) Flush ¶
func (s *OpenPublisherOutWebsocketStream) Flush() error
Flush flushes all written arguments.
func (*OpenPublisherOutWebsocketStream) Read ¶
func (s *OpenPublisherOutWebsocketStream) Read() (*cherami.InputHostCommand, error)
Read returns the next argument, if any is available.
func (*OpenPublisherOutWebsocketStream) ResponseHeaders ¶
func (s *OpenPublisherOutWebsocketStream) ResponseHeaders() (map[string]string, error)
ResponseHeaders is defined to conform to the tchannel-stream .*OutCall interface
func (*OpenPublisherOutWebsocketStream) Write ¶
func (s *OpenPublisherOutWebsocketStream) Write(arg *cherami.PutMessage) error
Write writes a result to the response stream
type Publisher ¶
type Publisher interface { Open() error Close() // Pause publishing. All publishing will fail until Resume() is called. // Note: Pause/Resume APIs only work for streaming publishers(i.e. publish type is PublisherTypeStreaming) // For non-streaming publishers, Pause/Resume APIs are no-op. Pause() // Resume publishing. Resume() Publish(message *PublisherMessage) *PublisherReceipt PublishAsync(message *PublisherMessage, done chan<- *PublisherReceipt) (string, error) }
Publisher is used by an application to publish messages to Cherami service
func NewPublisher ¶
NewPublisher constructs a new Publisher object Deprecated: NewPublisher is deprecated, please use NewPublisherWithReporter
type PublisherMessage ¶
type PublisherMessage struct { Data []byte Delay time.Duration // UserContext is user specified context to pass through UserContext map[string]string }
PublisherMessage is a struct that wraps the message payload and a delay time duration.
type PublisherReceipt ¶
type PublisherReceipt struct { // ID is the message id passed with message when published ID string // Receipt is a token that contains info where the message is stored Receipt string // Error is the error if any that associates with the publishing of this message Error error // UserContext is user specified context to pass through UserContext map[string]string }
PublisherReceipt is an token for publisher as the prove of message being durably stored.
type PublisherType ¶
type PublisherType int
PublisherType represents the type of publisher viz. streaming/non-streaming
const ( // PublisherTypeStreaming indicates a publisher that uses websocket streaming PublisherTypeStreaming PublisherType = iota // PublisherTypeNonStreaming indicates a publisher that uses tchannel batch api PublisherTypeNonStreaming )
type ScheduleTaskRequest ¶
type ScheduleTaskRequest struct { // TaskType is the unique type name which is used to register task handler with task executor TaskType string // TaskID is the unique identifier of this specific task TaskID string // TaskValue can be anything represent the task TaskValue interface{} // Context is key value pairs context accosicated with the task Context map[string]string // Delay is the time duration before task can be executed Delay time.Duration }
ScheduleTaskRequest is used to call TaskScheduler.ScheduleTask to schedule a new task
type Task ¶
type Task interface { // GetType returns the unique type name that can be used to identify cooresponding task handler GetType() string // GetID returns the unique identifier of this specific task GetID() string // GetValue deserializes task value into given struct that matches the type used to publish the task GetValue(instance interface{}) error // GetContext returns key value pairs context accosicated with the task when published GetContext() map[string]string }
Task represents the task queued in Cherami
type TaskExecutor ¶
type TaskExecutor interface { // Register registers task handler with its *unique* task type Register(taskType string, taskFunc TaskFunc) // Start starts dequeuing tasks and execute them Start() error // Stop stops dequeuing/exeuction of tasks // There's no guarantee to drain scheduled tasks when Stop is invoked Stop() }
TaskExecutor is used to pull tasks from Cherami and execute their task handlers accordingly
func NewTaskExecutor ¶
func NewTaskExecutor(client Client, request *CreateTaskExecutorRequest) TaskExecutor
NewTaskExecutor creates a task executor
type TaskScheduler ¶
type TaskScheduler interface { // Open gets TaskScheduler for scheduling tasks Open() error // Close make sure resources are released Close() // ScheduleTask enqueues a task ScheduleTask(request *ScheduleTaskRequest) error }
TaskScheduler is used to put tasks into Cherami
func NewTaskScheduler ¶
func NewTaskScheduler(client Client, request *CreateTaskSchedulerRequest) TaskScheduler
NewTaskScheduler creates a task scheduler
type WSConnector ¶
type WSConnector interface { OpenPublisherStream(hostPort string, requestHeader http.Header) (stream.BInOpenPublisherStreamOutCall, error) OpenConsumerStream(hostPort string, requestHeader http.Header) (stream.BOutOpenConsumerStreamOutCall, error) }
WSConnector takes care of establishing connection via websocket stream