Documentation ¶
Index ¶
- Constants
- Variables
- func Execute(ctx ruleql.Context, expr string) interface{}
- func GetAndAdd(n *uint64, diff uint64) uint64
- func GetValue(data ruleql.Context, path string) interface{}
- func IsExist(fileAddr string) bool
- func NewBatchSink(name string, options *SinkBatchOptions, doSinkFn ProcessFn) (*batchSink, error)
- func StringToDate(s string) (time.Time, error)
- func ToBool(i interface{}) bool
- func ToBoolE(i interface{}) (bool, error)
- func ToBoolSlice(i interface{}) []bool
- func ToBoolSliceE(i interface{}) ([]bool, error)
- func ToDuration(i interface{}) time.Duration
- func ToDurationE(i interface{}) (d time.Duration, err error)
- func ToDurationSlice(i interface{}) []time.Duration
- func ToDurationSliceE(i interface{}) ([]time.Duration, error)
- func ToFloat32(i interface{}) float32
- func ToFloat32E(i interface{}) (float32, error)
- func ToFloat32Slice(i interface{}) []float32
- func ToFloat32SliceE(i interface{}) ([]float32, error)
- func ToFloat64(i interface{}) float64
- func ToFloat64E(i interface{}) (float64, error)
- func ToFloat64Slice(i interface{}) []float64
- func ToFloat64SliceE(i interface{}) ([]float64, error)
- func ToInt(i interface{}) int
- func ToInt16(i interface{}) int16
- func ToInt16E(i interface{}) (int16, error)
- func ToInt32(i interface{}) int32
- func ToInt32E(i interface{}) (int32, error)
- func ToInt32Slice(i interface{}) []int32
- func ToInt32SliceE(i interface{}) ([]int32, error)
- func ToInt64(i interface{}) int64
- func ToInt64E(i interface{}) (int64, error)
- func ToInt8(i interface{}) int8
- func ToInt8E(i interface{}) (int8, error)
- func ToIntE(i interface{}) (int, error)
- func ToIntSlice(i interface{}) []int
- func ToIntSliceE(i interface{}) ([]int, error)
- func ToSlice(i interface{}) []interface{}
- func ToSliceE(i interface{}) ([]interface{}, error)
- func ToString(i interface{}) string
- func ToStringE(i interface{}) (string, error)
- func ToStringMap(i interface{}) map[string]interface{}
- func ToStringMapBool(i interface{}) map[string]bool
- func ToStringMapBoolE(i interface{}) (map[string]bool, error)
- func ToStringMapE(i interface{}) (map[string]interface{}, error)
- func ToStringMapInt(i interface{}) map[string]int
- func ToStringMapInt64(i interface{}) map[string]int64
- func ToStringMapInt64E(i interface{}) (map[string]int64, error)
- func ToStringMapIntE(i interface{}) (map[string]int, error)
- func ToStringMapString(i interface{}) map[string]string
- func ToStringMapStringE(i interface{}) (map[string]string, error)
- func ToStringMapStringSlice(i interface{}) map[string][]string
- func ToStringMapStringSliceE(i interface{}) (map[string][]string, error)
- func ToStringSlice(i interface{}) []string
- func ToStringSliceE(i interface{}) ([]string, error)
- func ToStringSliceX(i interface{}) ([]string, error)
- func ToTime(i interface{}) time.Time
- func ToTimeE(i interface{}) (tim time.Time, err error)
- func ToUint(i interface{}) uint
- func ToUint16(i interface{}) uint16
- func ToUint16E(i interface{}) (uint16, error)
- func ToUint32(i interface{}) uint32
- func ToUint32E(i interface{}) (uint32, error)
- func ToUint64(i interface{}) uint64
- func ToUint64E(i interface{}) (uint64, error)
- func ToUint8(i interface{}) uint8
- func ToUint8E(i interface{}) (uint8, error)
- func ToUintE(i interface{}) (uint, error)
- type BatchBuilder
- type BatchSink
- type BlockingQueue
- type BlockingQueueIterator
- type Callback
- type CallbackFn
- type CallbackMessage
- type Context
- type JsonContext
- type MessageBatchQueue
- type Node
- type ProcessFn
- type SinkBatchOptions
Constants ¶
const ( // MaxMessageSize limit message size for transfer MaxMessageSize = 5 * 1024 * 1024 // MaxBatchSize will be the largest size for a batch sent from this particular producer. // This is used as a baseline to allocate a new buffer that can hold the entire batch // without needing costly re-allocations. MaxBatchSize = 128 * 1024 // DefaultMaxMessagesPerBatch init default num of entries in per batch. DefaultMaxMessagesPerBatch = 1000 )
const ( StatusIdle = iota StatusInProgress )
Variables ¶
var ( UNDEFINED_RESULT = ruleql.UNDEFINED_RESULT NewMessage = func(idx int) types.PublishMessage { message := types.NewMessage() topic := "/sys/xxxxx/deviceId/thing/event/property/post" message.SetTopic(topic) var i int64 = 1579420383073 var j int64 = int64(idx * 1000) msgCtx.SetValue("params.Cpu.time", big.NewInt(i+j)) msgCtx.SetValue("params.Cpu.value", rand.Float32()) msgCtx.SetValue("params.Mem.time", big.NewInt(i+j)) msgCtx.SetValue("params.Mem.value", rand.Float32()) message.SetData([]byte(msgCtx.String())) return types.PublishMessage(message) } )
var (
DefaultSequenceID = uint64(0)
)
Functions ¶
func NewBatchSink ¶
func NewBatchSink(name string, options *SinkBatchOptions, doSinkFn ProcessFn) (*batchSink, error)
func StringToDate ¶
StringToDate attempts to parse a string into a time.Time type using a predefined list of formats. If no suitable format is found, an error is returned.
func ToBoolSlice ¶
func ToBoolSlice(i interface{}) []bool
ToBoolSlice casts an interface to a []bool type.
func ToBoolSliceE ¶
ToBoolSliceE casts an interface to a []bool type.
func ToDuration ¶
ToDuration casts an interface to a time.Duration type.
func ToDurationE ¶
ToDurationE casts an interface to a time.Duration type.
func ToDurationSlice ¶
ToDurationSlice casts an interface to a []time.Duration type.
func ToDurationSliceE ¶
ToDurationSliceE casts an interface to a []time.Duration type.
func ToFloat32 ¶
func ToFloat32(i interface{}) float32
ToFloat32 casts an interface to a float32 type.
func ToFloat32E ¶
ToFloat32E casts an interface to a float32 type.
func ToFloat32Slice ¶
func ToFloat32Slice(i interface{}) []float32
ToStringSlice casts an interface to a []string type.
func ToFloat32SliceE ¶
ToStringSliceE casts an interface to a []string type.
func ToFloat64 ¶
func ToFloat64(i interface{}) float64
ToFloat64 casts an interface to a float64 type.
func ToFloat64E ¶
ToFloat64E casts an interface to a float64 type.
func ToFloat64Slice ¶
func ToFloat64Slice(i interface{}) []float64
ToStringSlice casts an interface to a []string type.
func ToFloat64SliceE ¶
ToStringSliceE casts an interface to a []string type.
func ToInt32Slice ¶
func ToInt32Slice(i interface{}) []int32
ToStringSlice casts an interface to a []string type.
func ToInt32SliceE ¶
ToStringSliceE casts an interface to a []string type.
func ToIntSlice ¶
func ToIntSlice(i interface{}) []int
ToIntSlice casts an interface to a []int type.
func ToIntSliceE ¶
ToIntSliceE casts an interface to a []int type.
func ToSlice ¶
func ToSlice(i interface{}) []interface{}
ToSlice casts an interface to a []interface{} type.
func ToSliceE ¶
func ToSliceE(i interface{}) ([]interface{}, error)
ToSliceE casts an interface to a []interface{} type.
func ToStringMap ¶
func ToStringMap(i interface{}) map[string]interface{}
ToStringMap casts an interface to a map[string]interface{} type.
func ToStringMapBool ¶
ToStringMapBool casts an interface to a map[string]bool type.
func ToStringMapBoolE ¶
ToStringMapBoolE casts an interface to a map[string]bool type.
func ToStringMapE ¶
ToStringMapE casts an interface to a map[string]interface{} type.
func ToStringMapInt ¶
ToStringMapInt casts an interface to a map[string]int type.
func ToStringMapInt64 ¶
ToStringMapInt64 casts an interface to a map[string]int64 type.
func ToStringMapInt64E ¶
ToStringMapInt64E casts an interface to a map[string]int64{} type.
func ToStringMapIntE ¶
ToStringMapIntE casts an interface to a map[string]int{} type.
func ToStringMapString ¶
ToStringMapString casts an interface to a map[string]string type.
func ToStringMapStringE ¶
ToStringMapStringE casts an interface to a map[string]string type.
func ToStringMapStringSlice ¶
ToStringMapStringSlice casts an interface to a map[string][]string type.
func ToStringMapStringSliceE ¶
ToStringMapStringSliceE casts an interface to a map[string][]string type.
func ToStringSlice ¶
func ToStringSlice(i interface{}) []string
ToStringSlice casts an interface to a []string type.
func ToStringSliceE ¶
ToStringSliceE casts an interface to a []string type.
func ToStringSliceX ¶
ToStringSliceE casts an interface to a []string type.
Types ¶
type BatchBuilder ¶
type BatchBuilder struct {
// contains filtered or unexported fields
}
BatchBuilder wraps the objects needed to build a batch.
func NewBatchBuilder ¶
func NewBatchBuilder(maxMessages uint) *BatchBuilder
NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func (*BatchBuilder) Add ¶
func (bb *BatchBuilder) Add(payload types.Message) (isFull bool)
Add will add single message to batch.
func (*BatchBuilder) Flush ¶
func (bb *BatchBuilder) Flush() (batchData []types.Message, sequenceID uint64)
Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (*BatchBuilder) IsFull ¶
func (bb *BatchBuilder) IsFull() bool
IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
type BlockingQueue ¶
type BlockingQueue interface { // Put enqueue one item, block if the queue is full Put(item interface{}) // Take dequeue one item, block until it's available Take() interface{} // Poll dequeue one item, return nil if queue is empty Poll() interface{} // Peek return the first item without dequeing, return nil if queue is empty Peek() interface{} // PeekLast return last item in queue without dequeing, return nil if queue is empty PeekLast() interface{} // Size return the current size of the queue Size() int // Iterator return an iterator for the queue Iterator() BlockingQueueIterator }
BlockingQueue is a interface of block queue
func NewBlockingQueue ¶
func NewBlockingQueue(maxSize int) BlockingQueue
NewBlockingQueue init block queue and returns a BlockingQueue
type BlockingQueueIterator ¶
type BlockingQueueIterator interface { HasNext() bool Next() interface{} }
BlockingQueueIterator abstract a interface of block queue iterator.
type CallbackFn ¶
type CallbackMessage ¶
type Context ¶
func NewMessageContext ¶
func NewMessageContext(message types.PublishMessage) Context
NewJSONContext new context from json
type JsonContext ¶
type JsonContext struct {
// contains filtered or unexported fields
}
func NewJSONContext ¶
func NewJSONContext(jsonRaw string) *JsonContext
NewJSONContext new context from json
func (*JsonContext) SetValue ¶
func (c *JsonContext) SetValue(xpath string, vaule interface{})
Value get value from context
func (*JsonContext) Value ¶
func (c *JsonContext) Value(path string) interface{}
Value get value from context
type MessageBatchQueue ¶
type MessageBatchQueue struct {
// contains filtered or unexported fields
}
func (*MessageBatchQueue) Close ¶
func (this *MessageBatchQueue) Close()
func (*MessageBatchQueue) Flush ¶
func (this *MessageBatchQueue) Flush()
func (*MessageBatchQueue) Push ¶
func (this *MessageBatchQueue) Push(msg types.Message)
type SinkBatchOptions ¶
type SinkBatchOptions struct { // BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000) MaxBatching int // MaxPendingMessages set the max size of the queue. MaxPendingMessages uint // BatchingMaxPublishDelay set the time period within which the messages sent will be batched (default: 10ms) BatchingMaxPublishDelay time.Duration }