utils

package
v0.0.0-...-e67e6ae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2022 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxMessageSize limit message size for transfer
	MaxMessageSize = 5 * 1024 * 1024
	// MaxBatchSize will be the largest size for a batch sent from this particular producer.
	// This is used as a baseline to allocate a new buffer that can hold the entire batch
	// without needing costly re-allocations.
	MaxBatchSize = 128 * 1024
	// DefaultMaxMessagesPerBatch init default num of entries in per batch.
	DefaultMaxMessagesPerBatch = 1000
)
View Source
const (
	StatusIdle = iota
	StatusInProgress
)

Variables

View Source
var (
	UNDEFINED_RESULT = ruleql.UNDEFINED_RESULT

	NewMessage = func(idx int) types.PublishMessage {
		message := types.NewMessage()
		topic := "/sys/xxxxx/deviceId/thing/event/property/post"
		message.SetTopic(topic)
		var i int64 = 1579420383073
		var j int64 = int64(idx * 1000)
		msgCtx.SetValue("params.Cpu.time", big.NewInt(i+j))
		msgCtx.SetValue("params.Cpu.value", rand.Float32())
		msgCtx.SetValue("params.Mem.time", big.NewInt(i+j))
		msgCtx.SetValue("params.Mem.value", rand.Float32())
		message.SetData([]byte(msgCtx.String()))
		return types.PublishMessage(message)
	}
)
View Source
var (
	DefaultSequenceID = uint64(0)
)

Functions

func Execute

func Execute(ctx ruleql.Context, expr string) interface{}

func GetAndAdd

func GetAndAdd(n *uint64, diff uint64) uint64

GetAndAdd perform atomic read and update

func GetValue

func GetValue(data ruleql.Context, path string) interface{}

func IsExist

func IsExist(fileAddr string) bool

判断文件是否存在

func NewBatchSink

func NewBatchSink(name string, options *SinkBatchOptions, doSinkFn ProcessFn) (*batchSink, error)

func StringToDate

func StringToDate(s string) (time.Time, error)

StringToDate attempts to parse a string into a time.Time type using a predefined list of formats. If no suitable format is found, an error is returned.

func ToBool

func ToBool(i interface{}) bool

ToBool casts an interface to a bool type.

func ToBoolE

func ToBoolE(i interface{}) (bool, error)

ToBoolE casts an interface to a bool type.

func ToBoolSlice

func ToBoolSlice(i interface{}) []bool

ToBoolSlice casts an interface to a []bool type.

func ToBoolSliceE

func ToBoolSliceE(i interface{}) ([]bool, error)

ToBoolSliceE casts an interface to a []bool type.

func ToDuration

func ToDuration(i interface{}) time.Duration

ToDuration casts an interface to a time.Duration type.

func ToDurationE

func ToDurationE(i interface{}) (d time.Duration, err error)

ToDurationE casts an interface to a time.Duration type.

func ToDurationSlice

func ToDurationSlice(i interface{}) []time.Duration

ToDurationSlice casts an interface to a []time.Duration type.

func ToDurationSliceE

func ToDurationSliceE(i interface{}) ([]time.Duration, error)

ToDurationSliceE casts an interface to a []time.Duration type.

func ToFloat32

func ToFloat32(i interface{}) float32

ToFloat32 casts an interface to a float32 type.

func ToFloat32E

func ToFloat32E(i interface{}) (float32, error)

ToFloat32E casts an interface to a float32 type.

func ToFloat32Slice

func ToFloat32Slice(i interface{}) []float32

ToStringSlice casts an interface to a []string type.

func ToFloat32SliceE

func ToFloat32SliceE(i interface{}) ([]float32, error)

ToStringSliceE casts an interface to a []string type.

func ToFloat64

func ToFloat64(i interface{}) float64

ToFloat64 casts an interface to a float64 type.

func ToFloat64E

func ToFloat64E(i interface{}) (float64, error)

ToFloat64E casts an interface to a float64 type.

func ToFloat64Slice

func ToFloat64Slice(i interface{}) []float64

ToStringSlice casts an interface to a []string type.

func ToFloat64SliceE

func ToFloat64SliceE(i interface{}) ([]float64, error)

ToStringSliceE casts an interface to a []string type.

func ToInt

func ToInt(i interface{}) int

ToInt casts an interface to an int type.

func ToInt16

func ToInt16(i interface{}) int16

ToInt16 casts an interface to an int16 type.

func ToInt16E

func ToInt16E(i interface{}) (int16, error)

ToInt16E casts an interface to an int16 type.

func ToInt32

func ToInt32(i interface{}) int32

ToInt32 casts an interface to an int32 type.

func ToInt32E

func ToInt32E(i interface{}) (int32, error)

ToInt32E casts an interface to an int32 type.

func ToInt32Slice

func ToInt32Slice(i interface{}) []int32

ToStringSlice casts an interface to a []string type.

func ToInt32SliceE

func ToInt32SliceE(i interface{}) ([]int32, error)

ToStringSliceE casts an interface to a []string type.

func ToInt64

func ToInt64(i interface{}) int64

ToInt64 casts an interface to an int64 type.

func ToInt64E

func ToInt64E(i interface{}) (int64, error)

ToInt64E casts an interface to an int64 type.

func ToInt8

func ToInt8(i interface{}) int8

ToInt8 casts an interface to an int8 type.

func ToInt8E

func ToInt8E(i interface{}) (int8, error)

ToInt8E casts an interface to an int8 type.

func ToIntE

func ToIntE(i interface{}) (int, error)

ToIntE casts an interface to an int type.

func ToIntSlice

func ToIntSlice(i interface{}) []int

ToIntSlice casts an interface to a []int type.

func ToIntSliceE

func ToIntSliceE(i interface{}) ([]int, error)

ToIntSliceE casts an interface to a []int type.

func ToSlice

func ToSlice(i interface{}) []interface{}

ToSlice casts an interface to a []interface{} type.

func ToSliceE

func ToSliceE(i interface{}) ([]interface{}, error)

ToSliceE casts an interface to a []interface{} type.

func ToString

func ToString(i interface{}) string

ToString casts an interface to a string type.

func ToStringE

func ToStringE(i interface{}) (string, error)

ToStringE casts an interface to a string type.

func ToStringMap

func ToStringMap(i interface{}) map[string]interface{}

ToStringMap casts an interface to a map[string]interface{} type.

func ToStringMapBool

func ToStringMapBool(i interface{}) map[string]bool

ToStringMapBool casts an interface to a map[string]bool type.

func ToStringMapBoolE

func ToStringMapBoolE(i interface{}) (map[string]bool, error)

ToStringMapBoolE casts an interface to a map[string]bool type.

func ToStringMapE

func ToStringMapE(i interface{}) (map[string]interface{}, error)

ToStringMapE casts an interface to a map[string]interface{} type.

func ToStringMapInt

func ToStringMapInt(i interface{}) map[string]int

ToStringMapInt casts an interface to a map[string]int type.

func ToStringMapInt64

func ToStringMapInt64(i interface{}) map[string]int64

ToStringMapInt64 casts an interface to a map[string]int64 type.

func ToStringMapInt64E

func ToStringMapInt64E(i interface{}) (map[string]int64, error)

ToStringMapInt64E casts an interface to a map[string]int64{} type.

func ToStringMapIntE

func ToStringMapIntE(i interface{}) (map[string]int, error)

ToStringMapIntE casts an interface to a map[string]int{} type.

func ToStringMapString

func ToStringMapString(i interface{}) map[string]string

ToStringMapString casts an interface to a map[string]string type.

func ToStringMapStringE

func ToStringMapStringE(i interface{}) (map[string]string, error)

ToStringMapStringE casts an interface to a map[string]string type.

func ToStringMapStringSlice

func ToStringMapStringSlice(i interface{}) map[string][]string

ToStringMapStringSlice casts an interface to a map[string][]string type.

func ToStringMapStringSliceE

func ToStringMapStringSliceE(i interface{}) (map[string][]string, error)

ToStringMapStringSliceE casts an interface to a map[string][]string type.

func ToStringSlice

func ToStringSlice(i interface{}) []string

ToStringSlice casts an interface to a []string type.

func ToStringSliceE

func ToStringSliceE(i interface{}) ([]string, error)

ToStringSliceE casts an interface to a []string type.

func ToStringSliceX

func ToStringSliceX(i interface{}) ([]string, error)

ToStringSliceE casts an interface to a []string type.

func ToTime

func ToTime(i interface{}) time.Time

ToTime casts an interface to a time.Time type.

func ToTimeE

func ToTimeE(i interface{}) (tim time.Time, err error)

ToTimeE casts an interface to a time.Time type.

func ToUint

func ToUint(i interface{}) uint

ToUint casts an interface to a uint type.

func ToUint16

func ToUint16(i interface{}) uint16

ToUint16 casts an interface to a uint16 type.

func ToUint16E

func ToUint16E(i interface{}) (uint16, error)

ToUint16E casts an interface to a uint16 type.

func ToUint32

func ToUint32(i interface{}) uint32

ToUint32 casts an interface to a uint32 type.

func ToUint32E

func ToUint32E(i interface{}) (uint32, error)

ToUint32E casts an interface to a uint32 type.

func ToUint64

func ToUint64(i interface{}) uint64

ToUint64 casts an interface to a uint64 type.

func ToUint64E

func ToUint64E(i interface{}) (uint64, error)

ToUint64E casts an interface to a uint64 type.

func ToUint8

func ToUint8(i interface{}) uint8

ToUint8 casts an interface to a uint8 type.

func ToUint8E

func ToUint8E(i interface{}) (uint8, error)

ToUint8E casts an interface to a uint type.

func ToUintE

func ToUintE(i interface{}) (uint, error)

ToUintE casts an interface to a uint type.

Types

type BatchBuilder

type BatchBuilder struct {
	// contains filtered or unexported fields
}

BatchBuilder wraps the objects needed to build a batch.

func NewBatchBuilder

func NewBatchBuilder(maxMessages uint) *BatchBuilder

NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.

func (*BatchBuilder) Add

func (bb *BatchBuilder) Add(payload types.Message) (isFull bool)

Add will add single message to batch.

func (*BatchBuilder) Flush

func (bb *BatchBuilder) Flush() (batchData []types.Message, sequenceID uint64)

Flush all the messages buffered in the client and wait until all messages have been successfully persisted.

func (*BatchBuilder) IsFull

func (bb *BatchBuilder) IsFull() bool

IsFull check if the size in the current batch exceeds the maximum size allowed by the batch

type BatchSink

type BatchSink interface {
	Send(ctx context.Context, msg types.Message) error
	Flush(ctx context.Context) error
	Close()
}

type BlockingQueue

type BlockingQueue interface {
	// Put enqueue one item, block if the queue is full
	Put(item interface{})

	// Take dequeue one item, block until it's available
	Take() interface{}

	// Poll dequeue one item, return nil if queue is empty
	Poll() interface{}

	// Peek return the first item without dequeing, return nil if queue is empty
	Peek() interface{}

	// PeekLast return last item in queue without dequeing, return nil if queue is empty
	PeekLast() interface{}

	// Size return the current size of the queue
	Size() int

	// Iterator return an iterator for the queue
	Iterator() BlockingQueueIterator
}

BlockingQueue is a interface of block queue

func NewBlockingQueue

func NewBlockingQueue(maxSize int) BlockingQueue

NewBlockingQueue init block queue and returns a BlockingQueue

type BlockingQueueIterator

type BlockingQueueIterator interface {
	HasNext() bool
	Next() interface{}
}

BlockingQueueIterator abstract a interface of block queue iterator.

type Callback

type Callback func(msgs []types.Message) (err error)

type CallbackFn

type CallbackFn func(sequenceID uint64, e error)

type CallbackMessage

type CallbackMessage struct {
	Message types.Message
	Callback
}

type Context

type Context = ruleql.Context

func NewMessageContext

func NewMessageContext(message types.PublishMessage) Context

NewJSONContext new context from json

type JsonContext

type JsonContext struct {
	// contains filtered or unexported fields
}

func NewJSONContext

func NewJSONContext(jsonRaw string) *JsonContext

NewJSONContext new context from json

func (*JsonContext) SetValue

func (c *JsonContext) SetValue(xpath string, vaule interface{})

Value get value from context

func (*JsonContext) String

func (c *JsonContext) String() string

Value get value from context

func (*JsonContext) Value

func (c *JsonContext) Value(path string) interface{}

Value get value from context

type MessageBatchQueue

type MessageBatchQueue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(ctx context.Context, id string, queueSize, batchSize int, interval time.Duration, callback Callback) *MessageBatchQueue

func (*MessageBatchQueue) Close

func (this *MessageBatchQueue) Close()

func (*MessageBatchQueue) Flush

func (this *MessageBatchQueue) Flush()

func (*MessageBatchQueue) Push

func (this *MessageBatchQueue) Push(msg types.Message)

type Node

type Node = ruleql.Node

type ProcessFn

type ProcessFn func(msgs []types.Message) (err error)

type SinkBatchOptions

type SinkBatchOptions struct {
	// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000)
	MaxBatching int

	// MaxPendingMessages set the max size of the queue.
	MaxPendingMessages uint

	// BatchingMaxPublishDelay set the time period within which the messages sent will be batched (default: 10ms)
	BatchingMaxPublishDelay time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL