Documentation
¶
Overview ¶
Package netpoll provides a portable event-driven interface for network I/O.
The underlying facility of event notification is OS-specific:
- epoll on Linux - https://man7.org/linux/man-pages/man7/epoll.7.html
- kqueue on *BSD/Darwin - https://man.freebsd.org/cgi/man.cgi?kqueue
With the help of the netpoll package, you can easily build your own high-performance event-driven network applications based on epoll/kqueue.
The Poller represents the event notification facility whose backend is epoll or kqueue. The OpenPoller function creates a new Poller instance:
poller, err := netpoll.OpenPoller() if err != nil { // handle error } defer poller.Close() addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:9090") if err != nil { // handle error } c, err := net.DialTCP("tcp", nil, addr) if err != nil { // handle error } f, err := c.File() if err != nil { // handle error } closeClient := func() { c.Close() f.Close() } defer closeClient()
The PollAttachment consists of a file descriptor and its callback function. PollAttachment is used to register a file descriptor to Poller. The callback function is called when an event occurs on the file descriptor:
pa := netpoll.PollAttachment{ FD: int(f.Fd()), Callback: func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error { if netpoll.IsErrorEvent(event, flags) { closeClient() return errors.ErrEngineShutdown } if netpoll.IsReadEvent(event) { buf := make([]byte, 64) // Read data from the connection. _, err := c.Read(buf) if err != nil { closeClient() return errors.ErrEngineShutdown } // Process the data... } if netpoll.IsWriteEvent(event) { // Write data to the connection. _, err := c.Write([]byte("hello")) if err != nil { closeClient() return errors.ErrEngineShutdown } } return nil }} if err := poller.AddReadWrite(&pa, false); err != nil { // handle error }
The Poller.Polling function starts the event loop monitoring file descriptors and waiting for I/O events to occur:
poller.Polling(func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error { return pa.Callback(fd, event, flags) })
Or
poller.Polling()
if you've enabled the build tag `poll_opt`.
Example ¶
package main import ( "context" "fmt" "net" "os" "os/signal" "time" "github.com/panjf2000/gnet/v2/pkg/errors" "github.com/panjf2000/gnet/v2/pkg/netpoll" ) func main() { ln, err := net.Listen("tcp", "127.0.0.1:9090") if err != nil { panic(fmt.Sprintf("Error listening: %v", err)) } defer ln.Close() ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer cancel() go func() { c, err := ln.Accept() if err != nil { panic(fmt.Sprintf("Error accepting connection: %v", err)) } defer c.Close() buf := make([]byte, 64) for { select { case <-ctx.Done(): cancel() fmt.Printf("Signal received: %v\n", ctx.Err()) return default: } _, err := c.Read(buf) if err != nil { panic(fmt.Sprintf("Error reading data from client: %v", err)) } fmt.Printf("Received data from client: %s\n", buf) _, err = c.Write([]byte("Hello, client!")) if err != nil { panic(fmt.Sprintf("Error writing data to client: %v", err)) } fmt.Println("Sent data to client") time.Sleep(200 * time.Millisecond) } }() // Wait for the server to start running. time.Sleep(500 * time.Millisecond) poller, err := netpoll.OpenPoller() if err != nil { panic(fmt.Sprintf("Error opening poller: %v", err)) } defer poller.Close() addr, err := net.ResolveTCPAddr("tcp", ln.Addr().String()) if err != nil { panic(fmt.Sprintf("Error resolving TCP address: %v", err)) } c, err := net.DialTCP("tcp", nil, addr) if err != nil { panic(fmt.Sprintf("Error dialing TCP address: %v", err)) } f, err := c.File() if err != nil { panic(fmt.Sprintf("Error getting file from connection: %v", err)) } closeClient := func() { c.Close() f.Close() } defer closeClient() sendData := true pa := netpoll.PollAttachment{ FD: int(f.Fd()), Callback: func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error { //nolint:revive if netpoll.IsErrorEvent(event, flags) { closeClient() return errors.ErrEngineShutdown } if netpoll.IsReadEvent(event) { sendData = true buf := make([]byte, 64) _, err := c.Read(buf) if err != nil { closeClient() fmt.Println("Error reading data from server:", err) return errors.ErrEngineShutdown } fmt.Printf("Received data from server: %s\n", buf) // Process the data... } if netpoll.IsWriteEvent(event) && sendData { sendData = false // Write data to the connection... _, err := c.Write([]byte("Hello, server!")) if err != nil { closeClient() fmt.Println("Error writing data to server:", err) return errors.ErrEngineShutdown } fmt.Println("Sent data to server") } return nil }, } if err := poller.AddReadWrite(&pa, false); err != nil { panic(fmt.Sprintf("Error adding file descriptor to poller: %v", err)) } err = poller.Polling(func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error { return pa.Callback(fd, event, flags) }) fmt.Printf("Poller exited with error: %v", err) }
Output:
Index ¶
- Constants
- func IsErrorEvent(event IOEvent, _ IOFlags) bool
- func IsReadEvent(event IOEvent) bool
- func IsWriteEvent(event IOEvent) bool
- type IOEvent
- type IOFlags
- type PollAttachment
- type PollEventHandler
- type Poller
- func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error
- func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error
- func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error
- func (p *Poller) Close() error
- func (p *Poller) Delete(fd int) error
- func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error
- func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error
- func (p *Poller) Polling(callback PollEventHandler) error
- func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error)
Examples ¶
Constants ¶
const ( // InitPollEventsCap represents the initial capacity of poller event-list. InitPollEventsCap = 128 // MaxPollEventsCap is the maximum limitation of events that the poller can process. MaxPollEventsCap = 1024 // MinPollEventsCap is the minimum limitation of events that the poller can process. MinPollEventsCap = 32 // MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time. MaxAsyncTasksAtOneTime = 256 // ReadEvents represents readable events that are polled by epoll. ReadEvents = unix.EPOLLIN | unix.EPOLLPRI // WriteEvents represents writeable events that are polled by epoll. WriteEvents = unix.EPOLLOUT // ReadWriteEvents represents both readable and writeable events. ReadWriteEvents = ReadEvents | WriteEvents // ErrEvents represents exceptional events that occurred. ErrEvents = unix.EPOLLERR | unix.EPOLLHUP )
Variables ¶
This section is empty.
Functions ¶
func IsErrorEvent ¶
IsErrorEvent checks if the event is an error event.
func IsReadEvent ¶
IsReadEvent checks if the event is a read event.
func IsWriteEvent ¶
IsWriteEvent checks if the event is a write event.
Types ¶
type PollAttachment ¶
type PollAttachment struct { FD int Callback PollEventHandler }
PollAttachment is the user data which is about to be stored in "void *ptr" of epoll_data or "void *udata" of kevent.
type PollEventHandler ¶
PollEventHandler is the callback for I/O events notified by the poller.
type Poller ¶
type Poller struct {
// contains filtered or unexported fields
}
Poller represents a poller which is in charge of monitoring file-descriptors.
func (*Poller) AddRead ¶
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error
AddRead registers the given file descriptor with readable event to the poller.
func (*Poller) AddReadWrite ¶
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error
AddReadWrite registers the given file descriptor with readable and writable events to the poller.
func (*Poller) AddWrite ¶
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error
AddWrite registers the given file descriptor with writable event to the poller.
func (*Poller) ModRead ¶
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error
ModRead modifies the given file descriptor with readable event in the poller.
func (*Poller) ModReadWrite ¶
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error
ModReadWrite modifies the given file descriptor with readable and writable events in the poller.
func (*Poller) Polling ¶
func (p *Poller) Polling(callback PollEventHandler) error
Polling blocks the current goroutine, monitoring the registered file descriptors and waiting for network I/O. When I/O occurs on any of the file descriptors, the provided callback function is invoked.
func (*Poller) Trigger ¶
Trigger enqueues task and wakes up the poller to process pending tasks. By default, any incoming task will enqueued into urgentAsyncTaskQueue before the threshold of high-priority events is reached. When it happens, any asks other than high-priority tasks will be shunted to asyncTaskQueue.
Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.