lwes

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2024 License: BSD-3-Clause Imports: 16 Imported by: 1

README

Light Weight Event System (LWES)

Click here for more information about lwes. For more information about using lwes from erlang read on.

Example Usage:

	lwe := lwes.NewLwesEvent("MonDemand::PerfMsg")
	lwe.Set("id", "0db302ef-4ba1-4d6b-86e3-92793d4b0c9e")
	lwe.Set("caller_label", "broker")
	lwe.Set("ctxt_num", uint16(3))
	lwe.Set("ctxt_k0", "platform_hash")
	lwe.Set("ctxt_v0", "7e319737-a81c-4817-bdc6-8f596e5caa46")
	lwe.Set("ctxt_k1", "bidder_count")
	lwe.Set("ctxt_v1", "28")
	lwe.Set("ctxt_k2", "total_count")
	lwe.Set("ctxt_v2", "28")
	lwe.Set("num", uint16(1))
	lwe.Set("label0", "adunit:538494050:call:1:ssrtb")
	lwe.Set("start0", int64(1494880081332))
	lwe.Set("end0", int64(1494880081487))

    bs, err := lwes.Marshal(lwe)
    // check if no error then bs is the bytes of encoded binary

Documentation

Overview

Package lwes provides lwes related functionalities.

http://www.lwes.org/ (Light Weight Event System)
https://github.com/lwes/lwes (Light Weight Event System C library)
https://github.com/lwes/lwes-erlang (Light Weight Event System Erlang library, currently the most complete implementation)

Index

Examples

Constants

View Source
const (
	/* maximum datagram size for UDP is 65535 (0xffff) minus
	    transport layer overhead (20 bytes for IP header, and 8 bytes for UDP header), so this value
	   should be 65535 - 28 = 65507
	*/
	MAX_MSG_SIZE       = 65535 - 20 - 8
	DEFAULT_QUEUE_SIZE = 100 * 1000

	MAX_PACKET_SIZE = 64 * 1024 // max size of single UDP packet is 64KB - headersize
	SO_RCVBUF_SIZE  = 16 * 1024 * 1024
)
View Source
const (
	LWES_TYPE_U_INT_16  byte = 1   /*!< 2 byte unsigned integer type */
	LWES_TYPE_INT_16         = 2   /*!< 2 byte signed integer type type */
	LWES_TYPE_U_INT_32       = 3   /*!< 4 byte unsigned integer type */
	LWES_TYPE_INT_32         = 4   /*!< 4 byte signed integer type */
	LWES_TYPE_STRING         = 5   /*!< variable bytes string type */
	LWES_TYPE_IP_ADDR        = 6   /*!< 4 byte ipv4 address type, in the Little Endian order */
	LWES_TYPE_INT_64         = 7   /*!< 8 byte signed integer type */
	LWES_TYPE_U_INT_64       = 8   /*!< 8 byte unsigned integer type */
	LWES_TYPE_BOOLEAN        = 9   /*!< 1 byte boolean type */
	LWES_TYPE_UNDEFINED      = 255 /*!< undefined type */

	// Extended types
	LWES_TYPE_BYTE        = 10
	LWES_TYPE_FLOAT       = 11
	LWES_TYPE_DOUBLE      = 12
	LWES_TYPE_LONG_STRING = 13

	// the array and sparse array types are not supported yet
	LWES_TYPE_U_INT_16_ARRAY = 129
	LWES_TYPE_INT_16_ARRAY   = 130
	LWES_TYPE_U_INT_32_ARRAY = 131
	LWES_TYPE_INT_32_ARRAY   = 132
	LWES_TYPE_STRING_ARRAY   = 133
	LWES_TYPE_IP_ADDR_ARRAY  = 134
	LWES_TYPE_INT_64_ARRAY   = 135
	LWES_TYPE_U_INT_64_ARRAY = 136
	LWES_TYPE_BOOLEAN_ARRAY  = 137
	LWES_TYPE_BYTE_ARRAY     = 138
	LWES_TYPE_FLOAT_ARRAY    = 139
	LWES_TYPE_DOUBLE_ARRAY   = 140

	// the nullable array; can be very sparse
	LWES_TYPE_N_U_INT_16_ARRAY = 141
	LWES_TYPE_N_INT_16_ARRAY   = 142
	LWES_TYPE_N_U_INT_32_ARRAY = 143
	LWES_TYPE_N_INT_32_ARRAY   = 144
	LWES_TYPE_N_STRING_ARRAY   = 145
	// there is no sparse IP_ADDR_ARRAY
	LWES_TYPE_N_INT_64_ARRAY   = 147
	LWES_TYPE_N_U_INT_64_ARRAY = 148
	LWES_TYPE_N_BOOLEAN_ARRAY  = 149
	LWES_TYPE_N_BYTE_ARRAY     = 150
	LWES_TYPE_N_FLOAT_ARRAY    = 151
	LWES_TYPE_N_DOUBLE_ARRAY   = 152
)

the original types are from https://github.com/lwes/lwes/blob/master/src/lwes_types.c; extended types are from https://github.com/lwes/lwes-erlang/blob/master/include/lwes.hrl

Variables

This section is empty.

Functions

func Marshal

func Marshal(v encoding.BinaryMarshaler) ([]byte, error)

the helper to marshal a BinaryMarshaler to bytes (should this be in "encoding" ?)

func NewFixedBuffer

func NewFixedBuffer(pool *sync.Pool, size int) *readBuf

func Unmarshal

func Unmarshal(data []byte, v encoding.BinaryUnmarshaler) error

the helper to unmarshal bytes into a BinaryMarshaler (should this be in "encoding" ?)

Types

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

func Open

func Open(cfg EmitterConfig) *Emitter

each transport is lwes::ip:port:<ttl> form

func (*Emitter) Close

func (em *Emitter) Close()

func (*Emitter) Emit

func (em *Emitter) Emit(lwe encoding.BinaryMarshaler) error

type EmitterConfig

type EmitterConfig struct {
	Servers []struct {
		// contains filtered or unexported fields
	}
	// contains filtered or unexported fields
}

func (*EmitterConfig) ParseFromString

func (sc *EmitterConfig) ParseFromString(param string) (err error)

each transport param is lwes::ip:port:<ttl> form

type LwesEvent

type LwesEvent struct {
	Name  string                 // the event name
	Attrs map[string]interface{} // the attrs in a map
	// contains filtered or unexported fields
}

func NewLwesEvent

func NewLwesEvent(name string) *LwesEvent

for emitting lwes event, start with NewLwesEvent and following by multiple .Set key value pairs then .MarshalBinary to get the bytes

Example
lwe := lwes.NewLwesEvent("MonDemand::PerfMsg")
lwe.Set("id", "0db302ef-4ba1-4d6b-86e3-92793d4b0c9e")
lwe.Set("caller_label", "broker")

timelines := []struct {
	label      string
	start, end int64
}{
	{"adunit:538494050:call:1:ssrtb", 1494880081332, 1494880081487},
}
lwe.Set("num", uint16(len(timelines)))
for idx, tl := range timelines {
	lwe.Set(fmt.Sprint("label", idx), tl.label)
	lwe.Set(fmt.Sprint("start", idx), tl.start)
	lwe.Set(fmt.Sprint("end", idx), tl.end)
}

context := map[string]string{
	"platform_hash": "7e319737-a81c-4817-bdc6-8f596e5caa46",
	"bidder_count":  "28",
	"total_count":   "28",
}
if len(context) != 0 {
	lwe.Set("ctxt_num", uint16(len(context)))

	// if need stable order over the keys, need extra string slice
	// otherwise just range over the map is ok;
	for idx, key := range []string{"platform_hash", "bidder_count", "total_count"} {
		value := context[key]
		lwe.Set(fmt.Sprint("ctxt_k", idx), key)
		lwe.Set(fmt.Sprint("ctxt_v", idx), value)
	}
} // omit ctxt if no context at all

buf, _ := lwes.Marshal(lwe)
if lwe.Size() != len(buf) || len(buf) != 0x13b {
	fmt.Fprintf(os.Stderr, "length not matching: %d:%d\n", lwe.Size(), len(buf))
}

// fmt.Println(hex.Dump(buf))

lwe1 := new(lwes.LwesEvent)
lwes.Unmarshal(buf, lwe1)
lwe1.FPrint(os.Stdout)
Output:

MonDemand::PerfMsg[13]
{
	id = 0db302ef-4ba1-4d6b-86e3-92793d4b0c9e;
	caller_label = broker;
	num = 1;
	label0 = adunit:538494050:call:1:ssrtb;
	start0 = 1494880081332;
	end0 = 1494880081487;
	ctxt_num = 3;
	ctxt_k0 = platform_hash;
	ctxt_v0 = 7e319737-a81c-4817-bdc6-8f596e5caa46;
	ctxt_k1 = bidder_count;
	ctxt_v1 = 28;
	ctxt_k2 = total_count;
	ctxt_v2 = 28;
}

func (*LwesEvent) Enumerate

func (lwe *LwesEvent) Enumerate(callback func(key string, value interface{}) bool)

Enumerate all key/value pairs in the original order

Example

Example printing the lwes event

// find out all 2 consecutive hexdigit from the hexdump output

data := `
00000000                    12 4d  6f 6e 44 65 6d 61 6e 64  |.......MonDemand|
00000020  3a 3a 50 65 72 66 4d 73  67 00 0d 07 63 74 78 74  |::PerfMsg...ctxt|
00000030  5f 76 32 05 00 02 32 38  07 63 74 78 74 5f 6b 32  |_v2...28.ctxt_k2|
00000040  05 00 0b 74 6f 74 61 6c  5f 63 6f 75 6e 74 07 63  |...total_count.c|
00000050  74 78 74 5f 76 31 05 00  02 32 38 07 63 74 78 74  |txt_v1...28.ctxt|
00000060  5f 6b 31 05 00 0c 62 69  64 64 65 72 5f 63 6f 75  |_k1...bidder_cou|
00000070  6e 74 07 63 74 78 74 5f  76 30 05 00 24 37 65 33  |nt.ctxt_v0..$7e3|
00000080  31 39 37 33 37 2d 61 38  31 63 2d 34 38 31 37 2d  |19737-a81c-4817-|
00000090  62 64 63 36 2d 38 66 35  39 36 65 35 63 61 61 34  |bdc6-8f596e5caa4|
000000a0  36 07 63 74 78 74 5f 6b  30 05 00 0d 70 6c 61 74  |6.ctxt_k0...plat|
000000b0  66 6f 72 6d 5f 68 61 73  68 08 63 74 78 74 5f 6e  |form_hash.ctxt_n|
000000c0  75 6d 01 00 03 04 65 6e  64 30 07 00 00 01 5c 0d  |um....end0....\.|
000000d0  cb d6 4f 06 73 74 61 72  74 30 07 00 00 01 5c 0d  |..O.start0....\.|
000000e0  cb d5 b4 06 6c 61 62 65  6c 30 05 00 1d 61 64 75  |....label0...adu|
000000f0  6e 69 74 3a 35 33 38 34  39 34 30 35 30 3a 63 61  |nit:538494050:ca|
00000100  6c 6c 3a 31 3a 73 73 72  74 62 03 6e 75 6d 01 00  |ll:1:ssrtb.num..|
00000110  01 0c 63 61 6c 6c 65 72  5f 6c 61 62 65 6c 05 00  |..caller_label..|
00000120  06 62 72 6f 6b 65 72 02  69 64 05 00 24 30 64 62  |.broker.id..$0db|
00000130  33 30 32 65 66 2d 34 62  61 31 2d 34 64 36 62 2d  |302ef-4ba1-4d6b-|
00000140  38 36 65 33 2d 39 32 37  39 33 64 34 62 30 63 39  |86e3-92793d4b0c9|
00000150  65 0b 52 65 63 65 69 70  74 54 69 6d 65 07 00 00  |e.ReceiptTime...|
00000160  01 5c 0d cb d6 71 08 53  65 6e 64 65 72 49 50 06  |.\...q.SenderIP.|
00000170  46 7f 01 0a 0a 53 65 6e  64 65 72 50 6f 72 74 01  |F....SenderPort.|
00000180  b7 50`

allhexin := strings.Replace(strings.Join(regexp.MustCompile(` \b[[:xdigit:]]{2}\b`).FindAllString(data, -1), ""), " ", "", -1)
raw, _ := hex.DecodeString(allhexin)

lwe := new(lwes.LwesEvent)
// lwe, _ := lwes.Decode(raw)
lwes.Unmarshal(raw, lwe)

fmt.Printf("%s[%d]\n", lwe.Name, len(lwe.Attrs))
fmt.Println("{")
lwe.Enumerate(func(key string, value interface{}) bool {
	switch key {
	// stop enumerating on no interest fields
	case "ReceiptTime", "SenderIP", "SenderPort":
		return false
	default:
		fmt.Printf("\t%s = %v;\n", key, value)
		return true
	}
})
fmt.Println("}")
Output:

MonDemand::PerfMsg[16]
{
	ctxt_v2 = 28;
	ctxt_k2 = total_count;
	ctxt_v1 = 28;
	ctxt_k1 = bidder_count;
	ctxt_v0 = 7e319737-a81c-4817-bdc6-8f596e5caa46;
	ctxt_k0 = platform_hash;
	ctxt_num = 3;
	end0 = 1494880081487;
	start0 = 1494880081332;
	label0 = adunit:538494050:call:1:ssrtb;
	num = 1;
	caller_label = broker;
	id = 0db302ef-4ba1-4d6b-86e3-92793d4b0c9e;
}

func (*LwesEvent) FPrint

func (lwe *LwesEvent) FPrint(w io.Writer)

this print all key/value pairs in the original order mainly for debug printing

func (*LwesEvent) MarshalBinary

func (lwe *LwesEvent) MarshalBinary() (buf []byte, err error)

MarshalBinary implements the encoding.BinaryMarshaler interface.

func (*LwesEvent) Set

func (lwe *LwesEvent) Set(key string, value interface{})

use NewLwesEvent and Set for events to be encoded

func (*LwesEvent) Size

func (lwe *LwesEvent) Size() int

calculate the bytes size needed for constructing a new lwes event it's used in MarshalBinary for how many bytes need to allocate

func (*LwesEvent) UnmarshalBinary

func (lwe *LwesEvent) UnmarshalBinary(data []byte) error

UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.

type Server

type Server interface {
	Serve()          // start serving
	IsServing() bool // check if the server is still in serving mode
	Stop()           // stop the server
	Wait()           // wait till the server is stopped
	DataChan() <-chan *readBuf

	// Addr returns server's network address.
	Addr() net.Addr

	WaitLwesMode(num_workers int) <-chan *LwesEvent
	EnableMetricsReport(time.Duration, func(string, interface{}))
}

Server is the interface for servers that receive inbound span submissions from client.

func Listen

func Listen(multi_addrport string) (Server, error)

listen on the multicast "addr:port" form return a server with the Server interface methods

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL