receiver

package
v1.6.65 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RECV_BUFSIZE_2K           = 1 << 11       // 2k for UDP
	RECV_BUFSIZE_8K           = 1 << 13       // 8k
	RECV_BUFSIZE_64K          = 1 << 16       // 64k
	RECV_BUFSIZE_256K         = 1 << 18       // 256k
	RECV_BUFSIZE_320K         = 1<<18 + 1<<16 // 320k
	RECV_BUFSIZE_512K         = 1 << 19       // 512k
	RECV_BUFSIZE_MAX          = 1 << 24       // 16M, the maximum size of the PCAP packet will be greater than 8M
	RECV_TIMEOUT              = 30 * time.Second
	QUEUE_CACHE_FLUSH_TIMEOUT = 3
	DROP_DETECT_WINDOW_SIZE   = 1024
	QUEUE_BATCH_NUM           = 16
	LOG_INTERVAL              = 60
	RECORD_STATUS_TIMEOUT     = 30 // 每30秒记录下trident的活跃信息,platformData模块每分钟会上报trisolaris
	SOCKET_READ_ERROR         = "maybe trident restart."
	ONE_HOUR                  = 3600
)
View Source
const (
	TRIDENT_ADAPTER_STATUS_CMD = 40
)

Variables

This section is empty.

Functions

func ReadN

func ReadN(r *bufio.Reader, buffer []byte) error

固定读取buffer长度的数据

func RegisterTridentStatusCommand

func RegisterTridentStatusCommand() *cobra.Command

客户端注册命令

func ReleaseRecvBuffer

func ReleaseRecvBuffer(b *RecvBuffer)

func ValidateFlowVersion

func ValidateFlowVersion(t datatype.MessageType, version uint32) error

由于引用了app,导致递归引用,不能在datatype中定义类函数,故放到这里

Types

type AdapterStatus

type AdapterStatus struct {
	UDPMetrisStatus []*Status // 定期获取trident遥测数据的活跃信息,上报trisolaris
	TCPMetrisStatus []*Status
	UDPStatusLocks  [datatype.MESSAGE_TYPE_MAX]sync.Mutex
	TCPStatusLocks  [datatype.MESSAGE_TYPE_MAX]sync.RWMutex
	UDPStatusFlow   [datatype.MESSAGE_TYPE_MAX]map[uint16]*Status
	TCPStatusFlow   [datatype.MESSAGE_TYPE_MAX]map[uint16]*Status // vtapID非0, 使用vtapID作为key: 遥测数据,l4流日志数据,l7-http-dns流日志数据
	UDPStatusOthers [datatype.MESSAGE_TYPE_MAX]map[string]*Status
	TCPStatusOthers [datatype.MESSAGE_TYPE_MAX]map[string]*Status // vtapID为0, 使用IP作为key: pcap数据,系统日志数据,statd统计数据
	// contains filtered or unexported fields
}

func (*AdapterStatus) GetStatus

func (s *AdapterStatus) GetStatus(msgType datatype.MessageType) string

func (*AdapterStatus) Update

func (s *AdapterStatus) Update(now uint32, msgType datatype.MessageType, vtapID, orgId uint16, ip net.IP, seq uint64, timestamp uint32, serverType ServerType)

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

type QueueCache

type QueueCache struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type Receiver

type Receiver struct {
	cache.DropDetection

	UDPAddress      *net.UDPAddr
	UDPConn         *net.UDPConn
	UDPReadBuffer   int
	TCPReadBuffer   int
	TCPReaderBuffer int
	TCPListener     net.Listener
	TCPAddress      string
	// contains filtered or unexported fields
}

func NewReceiver

func NewReceiver(
	listenPort, UDPReadBuffer, TCPReadBuffer, TCPReaderBuffer int,
) *Receiver

func (*Receiver) Close

func (r *Receiver) Close() error

func (*Receiver) Closed

func (r *Receiver) Closed() bool

func (*Receiver) GetCounter

func (r *Receiver) GetCounter() interface{}

func (*Receiver) GetTridentStatus

func (r *Receiver) GetTridentStatus(orgId uint16) []*Status

用来上报trisolaris, agent最后的活跃时间

func (*Receiver) HandleSimpleCommand

func (r *Receiver) HandleSimpleCommand(op uint16, arg string) string

func (*Receiver) ProcessTCPServer

func (r *Receiver) ProcessTCPServer()

func (*Receiver) ProcessUDPServer

func (r *Receiver) ProcessUDPServer()

func (*Receiver) RegistHandler

func (r *Receiver) RegistHandler(msgType datatype.MessageType, outQueues queue.MultiQueueWriter, nQueues int) error

注册处理函数,收到msgType的数据,放到outQueues中

func (*Receiver) SetServerType

func (r *Receiver) SetServerType(serverType ServerType)

func (*Receiver) Start

func (r *Receiver) Start()

type ReceiverCounter

type ReceiverCounter struct {
	Invalid         uint64 `statsd:"invalid"`
	Unregistered    uint64 `statsd:"unregistered"`
	RxPackets       uint64 `statsd:"rx_packets"`
	MaxDelay        int64  `statsd:"max_delay"`
	MinDelay        int64  `statsd:"min_delay"`
	UDPDropped      uint64 `statsd:"udp_dropped"`
	UDPDisorder     uint64 `statsd:"udp_disorder"`      // 乱序个数
	UDPDisorderSize uint64 `statsd:"udp_disorder_size"` // 乱序最大范围
	NewBufferCount  uint64 `statsd:"new_buffer_count"`  // If the received data is large, you need to alloc memory, record the times.
}

type RecvBuffer

type RecvBuffer struct {
	Begin      int // 开始位置
	End        int
	Buffer     []byte
	IP         net.IP // 保存消息的发送方IP
	VtapID     uint16
	TeamID     uint32
	OrgID      uint16
	SocketType ServerType
}

func AcquireRecvBuffer

func AcquireRecvBuffer(length int, socketType ServerType) (*RecvBuffer, bool)

func (*RecvBuffer) AddReferenceCount

func (r *RecvBuffer) AddReferenceCount()

实现空接口,仅用于队列调试打印

func (*RecvBuffer) String

func (r *RecvBuffer) String() string

func (*RecvBuffer) SubReferenceCount

func (r *RecvBuffer) SubReferenceCount() bool

type ServerType

type ServerType byte
const (
	UDP ServerType = iota
	TCP
	BOTH
)

func (ServerType) String

func (s ServerType) String() string

type Status

type Status struct {
	VTAPID uint16

	LastLocalTimestamp uint32 // 最后一次收到数据时的本地时间
	// contains filtered or unexported fields
}

func NewStatus

func NewStatus(now uint32, msgType datatype.MessageType, vtapID, orgId uint16, ip net.IP, seq uint64, timestamp uint32, serverType ServerType) *Status

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL