Documentation ¶
Index ¶
- Constants
- func ReadN(r *bufio.Reader, buffer []byte) error
- func RegisterTridentStatusCommand() *cobra.Command
- func ReleaseRecvBuffer(b *RecvBuffer)
- func ValidateFlowVersion(t datatype.MessageType, version uint32) error
- type AdapterStatus
- type Handler
- type QueueCache
- type Receiver
- func (r *Receiver) Close() error
- func (r *Receiver) Closed() bool
- func (r *Receiver) GetCounter() interface{}
- func (r *Receiver) GetTridentStatus() []*Status
- func (r *Receiver) HandleSimpleCommand(op uint16, arg string) string
- func (r *Receiver) ProcessTCPServer()
- func (r *Receiver) ProcessUDPServer()
- func (r *Receiver) RegistHandler(msgType datatype.MessageType, outQueues queue.MultiQueueWriter, nQueues int) error
- func (r *Receiver) SetServerType(serverType ServerType)
- func (r *Receiver) Start()
- type ReceiverCounter
- type RecvBuffer
- type ServerType
- type Status
Constants ¶
View Source
const ( RECV_BUFSIZE_2K = 1 << 11 // 2k for UDP RECV_BUFSIZE_8K = 1 << 13 // 8k RECV_BUFSIZE_64K = 1 << 16 // 64k RECV_BUFSIZE_256K = 1 << 18 // 256k RECV_BUFSIZE_512K = 1 << 19 // 512k RECV_BUFSIZE_MAX = 1 << 24 // 16M, the maximum size of the PCAP packet will be greater than 8M RECV_TIMEOUT = 30 * time.Second QUEUE_CACHE_FLUSH_TIMEOUT = 3 DROP_DETECT_WINDOW_SIZE = 1024 QUEUE_BATCH_NUM = 16 LOG_INTERVAL = 60 RECORD_STATUS_TIMEOUT = 30 // 每30秒记录下trident的活跃信息,platformData模块每分钟会上报trisolaris SOCKET_READ_ERROR = "maybe trident restart." ONE_HOUR = 3600 )
View Source
const (
TRIDENT_ADAPTER_STATUS_CMD = 40
)
Variables ¶
This section is empty.
Functions ¶
func ReleaseRecvBuffer ¶
func ReleaseRecvBuffer(b *RecvBuffer)
func ValidateFlowVersion ¶
func ValidateFlowVersion(t datatype.MessageType, version uint32) error
由于引用了app,导致递归引用,不能在datatype中定义类函数,故放到这里
Types ¶
type AdapterStatus ¶
type AdapterStatus struct { UDPMetrisStatus []*Status // 定期获取trident遥测数据的活跃信息,上报trisolaris TCPMetrisStatus []*Status UDPStatusLocks [datatype.MESSAGE_TYPE_MAX]sync.Mutex TCPStatusLocks [datatype.MESSAGE_TYPE_MAX]sync.RWMutex UDPStatusFlow [datatype.MESSAGE_TYPE_MAX]map[uint16]*Status TCPStatusFlow [datatype.MESSAGE_TYPE_MAX]map[uint16]*Status // vtapID非0, 使用vtapID作为key: 遥测数据,l4流日志数据,l7-http-dns流日志数据 UDPStatusOthers [datatype.MESSAGE_TYPE_MAX]map[string]*Status TCPStatusOthers [datatype.MESSAGE_TYPE_MAX]map[string]*Status // vtapID为0, 使用IP作为key: pcap数据,系统日志数据,statd统计数据 // contains filtered or unexported fields }
func (*AdapterStatus) GetStatus ¶
func (s *AdapterStatus) GetStatus(msgType datatype.MessageType) string
func (*AdapterStatus) Update ¶
func (s *AdapterStatus) Update(now uint32, msgType datatype.MessageType, vtapID uint16, ip net.IP, seq uint64, timestamp uint32, serverType ServerType)
type QueueCache ¶
type Receiver ¶
type Receiver struct { cache.DropDetection UDPAddress *net.UDPAddr UDPConn *net.UDPConn UDPReadBuffer int TCPReadBuffer int TCPReaderBuffer int TCPListener net.Listener TCPAddress string // contains filtered or unexported fields }
func NewReceiver ¶
func (*Receiver) GetCounter ¶
func (r *Receiver) GetCounter() interface{}
func (*Receiver) GetTridentStatus ¶
用来上报trisolaris, trident最后的活跃时间
func (*Receiver) HandleSimpleCommand ¶
func (*Receiver) ProcessTCPServer ¶
func (r *Receiver) ProcessTCPServer()
func (*Receiver) ProcessUDPServer ¶
func (r *Receiver) ProcessUDPServer()
func (*Receiver) RegistHandler ¶
func (r *Receiver) RegistHandler(msgType datatype.MessageType, outQueues queue.MultiQueueWriter, nQueues int) error
注册处理函数,收到msgType的数据,放到outQueues中
func (*Receiver) SetServerType ¶
func (r *Receiver) SetServerType(serverType ServerType)
type ReceiverCounter ¶
type ReceiverCounter struct { Invalid uint64 `statsd:"invalid"` Unregistered uint64 `statsd:"unregistered"` RxPackets uint64 `statsd:"rx_packets"` MaxDelay int64 `statsd:"max_delay"` MinDelay int64 `statsd:"min_delay"` UDPDropped uint64 `statsd:"udp_dropped"` UDPDisorder uint64 `statsd:"udp_disorder"` // 乱序个数 UDPDisorderSize uint64 `statsd:"udp_disorder_size"` // 乱序最大范围 NewBufferCount uint64 `statsd:"new_buffer_count"` // If the received data is large, you need to alloc memory, record the times. }
type RecvBuffer ¶
type RecvBuffer struct { Begin int // 开始位置 End int Buffer []byte IP net.IP // 保存消息的发送方IP VtapID uint16 SocketType ServerType }
func AcquireRecvBuffer ¶
func AcquireRecvBuffer(length int, socketType ServerType) (*RecvBuffer, bool)
func (*RecvBuffer) String ¶
func (r *RecvBuffer) String() string
func (*RecvBuffer) SubReferenceCount ¶
func (r *RecvBuffer) SubReferenceCount() bool
type ServerType ¶
type ServerType byte
const ( UDP ServerType = iota TCP BOTH )
func (ServerType) String ¶
func (s ServerType) String() string
Click to show internal directories.
Click to hide internal directories.