netpoll

package
v2.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package netpoll provides a portable event-driven interface for network I/O.

The underlying facility of event notification is OS-specific:

With the help of the netpoll package, you can easily build your own high-performance event-driven network applications based on epoll/kqueue.

The Poller represents the event notification facility whose backend is epoll or kqueue. The OpenPoller function creates a new Poller instance:

poller, err := netpoll.OpenPoller()
if err != nil {
	// handle error
}

defer poller.Close()

addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:9090")
if err != nil {
	// handle error
}
c, err := net.DialTCP("tcp", nil, addr)
if err != nil {
	// handle error
}

f, err := c.File()
if err != nil {
	// handle error
}

closeClient := func() {
	c.Close()
	f.Close()
}
defer closeClient()

The PollAttachment consists of a file descriptor and its callback function. PollAttachment is used to register a file descriptor to Poller. The callback function is called when an event occurs on the file descriptor:

pa := netpoll.PollAttachment{
	FD: int(f.Fd()),
	Callback: func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error {
		if netpoll.IsErrorEvent(event, flags) {
			closeClient()
			return errors.ErrEngineShutdown
		}

		if netpoll.IsReadEvent(event) {
			buf := make([]byte, 64)
			// Read data from the connection.
			_, err := c.Read(buf)
			if err != nil {
				closeClient()
				return errors.ErrEngineShutdown
			}
			// Process the data...
		}

		if netpoll.IsWriteEvent(event) {
			// Write data to the connection.
			_, err := c.Write([]byte("hello"))
			if err != nil {
				closeClient()
				return errors.ErrEngineShutdown
			}
		}

		return nil
	}}

if err := poller.AddReadWrite(&pa, false); err != nil {
	// handle error
}

The Poller.Polling function starts the event loop monitoring file descriptors and waiting for I/O events to occur:

poller.Polling(func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error {
	return pa.Callback(fd, event, flags)
})

Or

poller.Polling()

if you've enabled the build tag `poll_opt`.

Example
package main

import (
	"context"
	"fmt"
	"net"
	"os"
	"os/signal"
	"time"

	"github.com/panjf2000/gnet/v2/pkg/errors"
	"github.com/panjf2000/gnet/v2/pkg/netpoll"
)

func main() {
	ln, err := net.Listen("tcp", "127.0.0.1:9090")
	if err != nil {
		panic(fmt.Sprintf("Error listening: %v", err))
	}

	defer ln.Close()

	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer cancel()

	go func() {
		c, err := ln.Accept()
		if err != nil {
			panic(fmt.Sprintf("Error accepting connection: %v", err))
		}

		defer c.Close()

		buf := make([]byte, 64)

		for {
			select {
			case <-ctx.Done():
				cancel()
				fmt.Printf("Signal received: %v\n", ctx.Err())
				return
			default:
			}

			_, err := c.Read(buf)
			if err != nil {
				panic(fmt.Sprintf("Error reading data from client: %v", err))
			}
			fmt.Printf("Received data from client: %s\n", buf)

			_, err = c.Write([]byte("Hello, client!"))
			if err != nil {
				panic(fmt.Sprintf("Error writing data to client: %v", err))
			}
			fmt.Println("Sent data to client")

			time.Sleep(200 * time.Millisecond)
		}
	}()

	// Wait for the server to start running.
	time.Sleep(500 * time.Millisecond)

	poller, err := netpoll.OpenPoller()
	if err != nil {
		panic(fmt.Sprintf("Error opening poller: %v", err))
	}

	defer poller.Close()

	addr, err := net.ResolveTCPAddr("tcp", ln.Addr().String())
	if err != nil {
		panic(fmt.Sprintf("Error resolving TCP address: %v", err))
	}
	c, err := net.DialTCP("tcp", nil, addr)
	if err != nil {
		panic(fmt.Sprintf("Error dialing TCP address: %v", err))
	}

	f, err := c.File()
	if err != nil {
		panic(fmt.Sprintf("Error getting file from connection: %v", err))
	}

	closeClient := func() {
		c.Close()
		f.Close()
	}
	defer closeClient()

	sendData := true

	pa := netpoll.PollAttachment{
		FD: int(f.Fd()),
		Callback: func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error { //nolint:revive
			if netpoll.IsErrorEvent(event, flags) {
				closeClient()
				return errors.ErrEngineShutdown
			}

			if netpoll.IsReadEvent(event) {
				sendData = true
				buf := make([]byte, 64)
				_, err := c.Read(buf)
				if err != nil {
					closeClient()
					fmt.Println("Error reading data from server:", err)
					return errors.ErrEngineShutdown
				}
				fmt.Printf("Received data from server: %s\n", buf)
				// Process the data...
			}

			if netpoll.IsWriteEvent(event) && sendData {
				sendData = false
				// Write data to the connection...
				_, err := c.Write([]byte("Hello, server!"))
				if err != nil {
					closeClient()
					fmt.Println("Error writing data to server:", err)
					return errors.ErrEngineShutdown
				}
				fmt.Println("Sent data to server")
			}

			return nil
		},
	}

	if err := poller.AddReadWrite(&pa, false); err != nil {
		panic(fmt.Sprintf("Error adding file descriptor to poller: %v", err))
	}

	err = poller.Polling(func(fd int, event netpoll.IOEvent, flags netpoll.IOFlags) error {
		return pa.Callback(fd, event, flags)
	})

	fmt.Printf("Poller exited with error: %v", err)
}
Output:

Index

Examples

Constants

View Source
const (
	// InitPollEventsCap represents the initial capacity of poller event-list.
	InitPollEventsCap = 128
	// MaxPollEventsCap is the maximum limitation of events that the poller can process.
	MaxPollEventsCap = 1024
	// MinPollEventsCap is the minimum limitation of events that the poller can process.
	MinPollEventsCap = 32
	// MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time.
	MaxAsyncTasksAtOneTime = 256
	// ReadEvents represents readable events that are polled by epoll.
	ReadEvents = unix.EPOLLIN | unix.EPOLLPRI
	// WriteEvents represents writeable events that are polled by epoll.
	WriteEvents = unix.EPOLLOUT
	// ReadWriteEvents represents both readable and writeable events.
	ReadWriteEvents = ReadEvents | WriteEvents
	// ErrEvents represents exceptional events that occurred.
	ErrEvents = unix.EPOLLERR | unix.EPOLLHUP
)

Variables

This section is empty.

Functions

func IsErrorEvent

func IsErrorEvent(event IOEvent, _ IOFlags) bool

IsErrorEvent checks if the event is an error event.

func IsReadEvent

func IsReadEvent(event IOEvent) bool

IsReadEvent checks if the event is a read event.

func IsWriteEvent

func IsWriteEvent(event IOEvent) bool

IsWriteEvent checks if the event is a write event.

Types

type IOEvent

type IOEvent = uint32

IOEvent is the integer type of I/O events on Linux.

type IOFlags

type IOFlags = uint16

IOFlags represents the flags of IO events.

type PollAttachment

type PollAttachment struct {
	FD       int
	Callback PollEventHandler
}

PollAttachment is the user data which is about to be stored in "void *ptr" of epoll_data or "void *udata" of kevent.

type PollEventHandler

type PollEventHandler func(int, IOEvent, IOFlags) error

PollEventHandler is the callback for I/O events notified by the poller.

type Poller

type Poller struct {
	// contains filtered or unexported fields
}

Poller represents a poller which is in charge of monitoring file-descriptors.

func OpenPoller

func OpenPoller() (poller *Poller, err error)

OpenPoller instantiates a poller.

func (*Poller) AddRead

func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error

AddRead registers the given file descriptor with readable event to the poller.

func (*Poller) AddReadWrite

func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error

AddReadWrite registers the given file descriptor with readable and writable events to the poller.

func (*Poller) AddWrite

func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error

AddWrite registers the given file descriptor with writable event to the poller.

func (*Poller) Close

func (p *Poller) Close() error

Close closes the poller.

func (*Poller) Delete

func (p *Poller) Delete(fd int) error

Delete removes the given file descriptor from the poller.

func (*Poller) ModRead

func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error

ModRead modifies the given file descriptor with readable event in the poller.

func (*Poller) ModReadWrite

func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error

ModReadWrite modifies the given file descriptor with readable and writable events in the poller.

func (*Poller) Polling

func (p *Poller) Polling(callback PollEventHandler) error

Polling blocks the current goroutine, monitoring the registered file descriptors and waiting for network I/O. When I/O occurs on any of the file descriptors, the provided callback function is invoked.

func (*Poller) Trigger

func (p *Poller) Trigger(priority queue.EventPriority, fn queue.Func, param any) (err error)

Trigger enqueues task and wakes up the poller to process pending tasks. By default, any incoming task will enqueued into urgentAsyncTaskQueue before the threshold of high-priority events is reached. When it happens, any asks other than high-priority tasks will be shunted to asyncTaskQueue.

Note that asyncTaskQueue is a queue of low-priority whose size may grow large and tasks in it may backlog.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL