Documentation ¶
Overview ¶
Package lwes provides lwes related functionalities.
http://www.lwes.org/ (Light Weight Event System) https://github.com/lwes/lwes (Light Weight Event System C library) https://github.com/lwes/lwes-erlang (Light Weight Event System Erlang library, currently the most complete implementation)
Index ¶
- Constants
- func Marshal(v encoding.BinaryMarshaler) ([]byte, error)
- func NewFixedBuffer(pool *sync.Pool, size int) *readBuf
- func Unmarshal(data []byte, v encoding.BinaryUnmarshaler) error
- type Emitter
- type EmitterConfig
- type LwesEvent
- func (lwe *LwesEvent) Enumerate(callback func(key string, value interface{}) bool)
- func (lwe *LwesEvent) FPrint(w io.Writer)
- func (lwe *LwesEvent) MarshalBinary() (buf []byte, err error)
- func (lwe *LwesEvent) Set(key string, value interface{})
- func (lwe *LwesEvent) Size() int
- func (lwe *LwesEvent) UnmarshalBinary(data []byte) error
- type Server
Examples ¶
Constants ¶
const ( /* maximum datagram size for UDP is 65535 (0xffff) minus transport layer overhead (20 bytes for IP header, and 8 bytes for UDP header), so this value should be 65535 - 28 = 65507 */ MAX_MSG_SIZE = 65535 - 20 - 8 DEFAULT_QUEUE_SIZE = 100 * 1000 MAX_PACKET_SIZE = 64 * 1024 // max size of single UDP packet is 64KB - headersize SO_RCVBUF_SIZE = 16 * 1024 * 1024 )
const ( LWES_TYPE_U_INT_16 byte = 1 /*!< 2 byte unsigned integer type */ LWES_TYPE_INT_16 = 2 /*!< 2 byte signed integer type type */ LWES_TYPE_U_INT_32 = 3 /*!< 4 byte unsigned integer type */ LWES_TYPE_INT_32 = 4 /*!< 4 byte signed integer type */ LWES_TYPE_STRING = 5 /*!< variable bytes string type */ LWES_TYPE_IP_ADDR = 6 /*!< 4 byte ipv4 address type, in the Little Endian order */ LWES_TYPE_INT_64 = 7 /*!< 8 byte signed integer type */ LWES_TYPE_U_INT_64 = 8 /*!< 8 byte unsigned integer type */ LWES_TYPE_BOOLEAN = 9 /*!< 1 byte boolean type */ LWES_TYPE_UNDEFINED = 255 /*!< undefined type */ // Extended types LWES_TYPE_BYTE = 10 LWES_TYPE_FLOAT = 11 LWES_TYPE_DOUBLE = 12 LWES_TYPE_LONG_STRING = 13 // the array and sparse array types are not supported yet LWES_TYPE_U_INT_16_ARRAY = 129 LWES_TYPE_INT_16_ARRAY = 130 LWES_TYPE_U_INT_32_ARRAY = 131 LWES_TYPE_INT_32_ARRAY = 132 LWES_TYPE_STRING_ARRAY = 133 LWES_TYPE_IP_ADDR_ARRAY = 134 LWES_TYPE_INT_64_ARRAY = 135 LWES_TYPE_U_INT_64_ARRAY = 136 LWES_TYPE_BOOLEAN_ARRAY = 137 LWES_TYPE_BYTE_ARRAY = 138 LWES_TYPE_FLOAT_ARRAY = 139 LWES_TYPE_DOUBLE_ARRAY = 140 // the nullable array; can be very sparse LWES_TYPE_N_U_INT_16_ARRAY = 141 LWES_TYPE_N_INT_16_ARRAY = 142 LWES_TYPE_N_U_INT_32_ARRAY = 143 LWES_TYPE_N_INT_32_ARRAY = 144 LWES_TYPE_N_STRING_ARRAY = 145 // there is no sparse IP_ADDR_ARRAY LWES_TYPE_N_INT_64_ARRAY = 147 LWES_TYPE_N_U_INT_64_ARRAY = 148 LWES_TYPE_N_BOOLEAN_ARRAY = 149 LWES_TYPE_N_BYTE_ARRAY = 150 LWES_TYPE_N_FLOAT_ARRAY = 151 LWES_TYPE_N_DOUBLE_ARRAY = 152 )
the original types are from https://github.com/lwes/lwes/blob/master/src/lwes_types.c; extended types are from https://github.com/lwes/lwes-erlang/blob/master/include/lwes.hrl
Variables ¶
This section is empty.
Functions ¶
func Marshal ¶
func Marshal(v encoding.BinaryMarshaler) ([]byte, error)
the helper to marshal a BinaryMarshaler to bytes (should this be in "encoding" ?)
func NewFixedBuffer ¶
Types ¶
type EmitterConfig ¶
type EmitterConfig struct { Servers []struct { // contains filtered or unexported fields } // contains filtered or unexported fields }
func (*EmitterConfig) ParseFromString ¶
func (sc *EmitterConfig) ParseFromString(param string) (err error)
each transport param is lwes::ip:port:<ttl> form
type LwesEvent ¶
type LwesEvent struct { Name string // the event name Attrs map[string]interface{} // the attrs in a map // contains filtered or unexported fields }
func NewLwesEvent ¶
for emitting lwes event, start with NewLwesEvent and following by multiple .Set key value pairs then .MarshalBinary to get the bytes
Example ¶
lwe := lwes.NewLwesEvent("MonDemand::PerfMsg") lwe.Set("id", "0db302ef-4ba1-4d6b-86e3-92793d4b0c9e") lwe.Set("caller_label", "broker") timelines := []struct { label string start, end int64 }{ {"adunit:538494050:call:1:ssrtb", 1494880081332, 1494880081487}, } lwe.Set("num", uint16(len(timelines))) for idx, tl := range timelines { lwe.Set(fmt.Sprint("label", idx), tl.label) lwe.Set(fmt.Sprint("start", idx), tl.start) lwe.Set(fmt.Sprint("end", idx), tl.end) } context := map[string]string{ "platform_hash": "7e319737-a81c-4817-bdc6-8f596e5caa46", "bidder_count": "28", "total_count": "28", } if len(context) != 0 { lwe.Set("ctxt_num", uint16(len(context))) // if need stable order over the keys, need extra string slice // otherwise just range over the map is ok; for idx, key := range []string{"platform_hash", "bidder_count", "total_count"} { value := context[key] lwe.Set(fmt.Sprint("ctxt_k", idx), key) lwe.Set(fmt.Sprint("ctxt_v", idx), value) } } // omit ctxt if no context at all buf, _ := lwes.Marshal(lwe) if lwe.Size() != len(buf) || len(buf) != 0x13b { fmt.Fprintf(os.Stderr, "length not matching: %d:%d\n", lwe.Size(), len(buf)) } // fmt.Println(hex.Dump(buf)) lwe1 := new(lwes.LwesEvent) lwes.Unmarshal(buf, lwe1) lwe1.FPrint(os.Stdout)
Output: MonDemand::PerfMsg[13] { id = 0db302ef-4ba1-4d6b-86e3-92793d4b0c9e; caller_label = broker; num = 1; label0 = adunit:538494050:call:1:ssrtb; start0 = 1494880081332; end0 = 1494880081487; ctxt_num = 3; ctxt_k0 = platform_hash; ctxt_v0 = 7e319737-a81c-4817-bdc6-8f596e5caa46; ctxt_k1 = bidder_count; ctxt_v1 = 28; ctxt_k2 = total_count; ctxt_v2 = 28; }
func (*LwesEvent) Enumerate ¶
Enumerate all key/value pairs in the original order
Example ¶
Example printing the lwes event
// find out all 2 consecutive hexdigit from the hexdump output data := ` 00000000 12 4d 6f 6e 44 65 6d 61 6e 64 |.......MonDemand| 00000020 3a 3a 50 65 72 66 4d 73 67 00 0d 07 63 74 78 74 |::PerfMsg...ctxt| 00000030 5f 76 32 05 00 02 32 38 07 63 74 78 74 5f 6b 32 |_v2...28.ctxt_k2| 00000040 05 00 0b 74 6f 74 61 6c 5f 63 6f 75 6e 74 07 63 |...total_count.c| 00000050 74 78 74 5f 76 31 05 00 02 32 38 07 63 74 78 74 |txt_v1...28.ctxt| 00000060 5f 6b 31 05 00 0c 62 69 64 64 65 72 5f 63 6f 75 |_k1...bidder_cou| 00000070 6e 74 07 63 74 78 74 5f 76 30 05 00 24 37 65 33 |nt.ctxt_v0..$7e3| 00000080 31 39 37 33 37 2d 61 38 31 63 2d 34 38 31 37 2d |19737-a81c-4817-| 00000090 62 64 63 36 2d 38 66 35 39 36 65 35 63 61 61 34 |bdc6-8f596e5caa4| 000000a0 36 07 63 74 78 74 5f 6b 30 05 00 0d 70 6c 61 74 |6.ctxt_k0...plat| 000000b0 66 6f 72 6d 5f 68 61 73 68 08 63 74 78 74 5f 6e |form_hash.ctxt_n| 000000c0 75 6d 01 00 03 04 65 6e 64 30 07 00 00 01 5c 0d |um....end0....\.| 000000d0 cb d6 4f 06 73 74 61 72 74 30 07 00 00 01 5c 0d |..O.start0....\.| 000000e0 cb d5 b4 06 6c 61 62 65 6c 30 05 00 1d 61 64 75 |....label0...adu| 000000f0 6e 69 74 3a 35 33 38 34 39 34 30 35 30 3a 63 61 |nit:538494050:ca| 00000100 6c 6c 3a 31 3a 73 73 72 74 62 03 6e 75 6d 01 00 |ll:1:ssrtb.num..| 00000110 01 0c 63 61 6c 6c 65 72 5f 6c 61 62 65 6c 05 00 |..caller_label..| 00000120 06 62 72 6f 6b 65 72 02 69 64 05 00 24 30 64 62 |.broker.id..$0db| 00000130 33 30 32 65 66 2d 34 62 61 31 2d 34 64 36 62 2d |302ef-4ba1-4d6b-| 00000140 38 36 65 33 2d 39 32 37 39 33 64 34 62 30 63 39 |86e3-92793d4b0c9| 00000150 65 0b 52 65 63 65 69 70 74 54 69 6d 65 07 00 00 |e.ReceiptTime...| 00000160 01 5c 0d cb d6 71 08 53 65 6e 64 65 72 49 50 06 |.\...q.SenderIP.| 00000170 46 7f 01 0a 0a 53 65 6e 64 65 72 50 6f 72 74 01 |F....SenderPort.| 00000180 b7 50` allhexin := strings.Replace(strings.Join(regexp.MustCompile(` \b[[:xdigit:]]{2}\b`).FindAllString(data, -1), ""), " ", "", -1) raw, _ := hex.DecodeString(allhexin) lwe := new(lwes.LwesEvent) // lwe, _ := lwes.Decode(raw) lwes.Unmarshal(raw, lwe) fmt.Printf("%s[%d]\n", lwe.Name, len(lwe.Attrs)) fmt.Println("{") lwe.Enumerate(func(key string, value interface{}) bool { switch key { // stop enumerating on no interest fields case "ReceiptTime", "SenderIP", "SenderPort": return false default: fmt.Printf("\t%s = %v;\n", key, value) return true } }) fmt.Println("}")
Output: MonDemand::PerfMsg[16] { ctxt_v2 = 28; ctxt_k2 = total_count; ctxt_v1 = 28; ctxt_k1 = bidder_count; ctxt_v0 = 7e319737-a81c-4817-bdc6-8f596e5caa46; ctxt_k0 = platform_hash; ctxt_num = 3; end0 = 1494880081487; start0 = 1494880081332; label0 = adunit:538494050:call:1:ssrtb; num = 1; caller_label = broker; id = 0db302ef-4ba1-4d6b-86e3-92793d4b0c9e; }
func (*LwesEvent) FPrint ¶
this print all key/value pairs in the original order mainly for debug printing
func (*LwesEvent) MarshalBinary ¶
MarshalBinary implements the encoding.BinaryMarshaler interface.
func (*LwesEvent) Size ¶
calculate the bytes size needed for constructing a new lwes event it's used in MarshalBinary for how many bytes need to allocate
func (*LwesEvent) UnmarshalBinary ¶
UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.
type Server ¶
type Server interface { Serve() // start serving IsServing() bool // check if the server is still in serving mode Stop() // stop the server Wait() // wait till the server is stopped DataChan() <-chan *readBuf // Addr returns server's network address. Addr() net.Addr WaitLwesMode(num_workers int) <-chan *LwesEvent EnableMetricsReport(time.Duration, func(string, interface{})) }
Server is the interface for servers that receive inbound span submissions from client.