Documentation ¶
Index ¶
- Constants
- func Contains(a []int, x int) bool
- func Encode(data [][]byte) []byte
- func EncodePayload(key []byte, offset int64, data []byte) []byte
- func EncodeV2(partition int, data [][]byte) []byte
- type ChannelObject
- type ControlMsg
- type ControlSignal
- type Distributor
- type DistributorType
- type Dmux
- type DmuxConf
- type DmuxOptionalParams
- type Duration
- type Hasher
- type ResizeMeta
- type ResponseMsg
- type Sideline
- type Sink
- type Source
Constants ¶
const ( //Resize is signal used to trigger resize of a Running Channel Resize ControlSignal = 1 //Stop is signenal used to Stop Dmux Stop ControlSignal = 2 //Sucess response code for ControlSignal Action Sucess uint8 = 1 //Failed response code for ControlSignal Action Failed uint8 = 2 )
const SidelineMessage string = "sidelining the message"
Variables ¶
This section is empty.
Functions ¶
func Encode ¶
Encode function is used to convert 2d byte[] to 1d byte[] This function uses the following encoding scheme first 4 bytes = batch Size now for every batch first 4 bytes = data Size followed by data byte[]
func EncodePayload ¶
Encode function is used to convert payload byte[] to 1d byte[] along with the key and offset This function uses the following encoding scheme first 4 bytes = data Size next 8 bytes = offset next 4 bytes = key length next n byte = key followed by data[]
Types ¶
type ChannelObject ¶
type ControlMsg ¶
type ControlMsg struct {
// contains filtered or unexported fields
}
ControlMsg is the struct passed to Dmux control Channel to enable it perform Admin operations such as Resize
type ControlSignal ¶
type ControlSignal uint8
ControlSignal type defined to build constants used in passing ControlSignals to DefaultDmux
type Distributor ¶
type Distributor interface { //Distribute method take incoming data interface and number of outbound channels //to return the index of channel to be selected for Distribution of this message Distribute(data interface{}, size int) int }
Distributor interface abstracts the Logic to distribute the load from Source to Sink. Client can choose to use HashDistributor or RoundRobinDistributor or write their own distribution Logic
func GetDistribution ¶
func GetDistribution(distributorType DistributorType, h Hasher) Distributor
GetDistribution returns correct Distributor based on distributorType
func GetHashDistribution ¶
func GetHashDistribution(h Hasher) Distributor
GetHashDistribution returns hashDistributor implementation of Distributor interface to provide Consistent Hash based routing in Dmux from Source to Sink. This needs Client to implement Hasher and pass Hasher in this method arg
func GetRoundRobinDistribution ¶
func GetRoundRobinDistribution() Distributor
GetRoundRobinDistribution return roundRobinDistributor
type DistributorType ¶
type DistributorType string
DistributorType based on this distribution is determined
const ( //HashDistributor will distribute based on the key hash HashDistributor DistributorType = "Hash" //RoundRobinDistributor will distribute on round robin fashion RoundRobinDistributor DistributorType = "RoundRobin" )
type Dmux ¶
type Dmux struct {
// contains filtered or unexported fields
}
Dmux struct which enables Size based Dmultiplexing for Source to Sink connections. TODO restrict size to be powers of 2 for better optimization in modulo
func GetDmux ¶
func GetDmux(conf DmuxConf, d Distributor) *Dmux
GetDmux is public method used to Get instance of a Dmux struct
func (*Dmux) ConnectWithSideline ¶
func (d *Dmux) ConnectWithSideline(source Source, sink Sink, sidelineImpl sideline_module.CheckMessageSideline, optionalParams DmuxOptionalParams)
Connect method holds Dmux logic used to Connect Source to Sink With Sideline
type DmuxConf ¶
type DmuxConf struct { Size int `json:"size"` SourceQSize int `json:"source_queue_size"` SinkQSize int `json:"sink_queue_size"` DistributorType DistributorType `json:"distributor_type"` BatchSize int `json:"batch_size"` Version int `json:"version"` Sideline Sideline `json:"sideline"` }
DmuxConf holds configuration parameters for Dmux
type DmuxOptionalParams ¶
type DmuxOptionalParams struct {
EnableDebugLog bool
}
type Duration ¶
Duration type embeds time.Duration, this was added to fix JSON parsing of time to valid go duration
func (Duration) MarshalJSON ¶
MarshalJSON implements encoding/json to serialize this type to json string
func (*Duration) UnmarshalJSON ¶
UnmarshalJSON implements encoding/json to deserialize this to Duration
type Hasher ¶
type Hasher interface { //ComputeHash method has to be implemented by Client to define how to distribute // when using HashDistributor ComputeHash(data interface{}) int }
Hasher interface that can be implemented to define data interface and compute and return the righ hash int for it
type ResizeMeta ¶
type ResizeMeta struct {
// contains filtered or unexported fields
}
ResizeMeta is the struct used to define resize value which is used when Dmux is resizing
type ResponseMsg ¶
type ResponseMsg struct {
// contains filtered or unexported fields
}
ResponseMsg is used for running Dmux instnace to response to client
type Sideline ¶
type Sideline struct { Retries int `json:"retries"` SidelineResponseCodes []int `json:"sidelineResponseCodes"` ConsumerGroupName string `json:"consumerGroupName"` ClusterName string `json:"clusterName"` ConnectionType string `json:"type"` SidelineMeta interface{} `json:"sidelineMeta"` }
Sideline holds config parameters for sideline
type Sink ¶
type Sink interface { // Clone method is expected to return instance of Sink. If Sink is Stateless // this can return selfRefrence back. If Sink is Stateful, its good idea to // create new instnace of Sink. Clone() Sink // Consume method gets The interface. //TODO currently this method does not return error, need to solve for error // handling Consume(msg interface{}, retries int, sidelineResponseCodes []int) error //BatchConsume method is invoked in batch_size is configured BatchConsume(msg []interface{}, version int) }
Sink is interface that implements OutputSink of Dmux operation
type Source ¶
type Source interface { //Generate method takes output channel to which it writes data. The //implementation can write to to this using multiple goroutines //This method is not expected to return, its run in a separate goroutine Generate(out chan<- interface{}) //Method used to trigger GracefulStop of Source Stop() // GetKey Method to get key for a message GetKey(msg interface{}) []byte // GetPartition Method to get partition for a message GetPartition(msg interface{}) int32 // GetValue Method to get value for a message GetValue(msg interface{}) []byte // GetOffset Method to get offsets GetOffset(msg interface{}) int64 }
Source is interface that implements input Source to the Dmux