queue

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2021 License: MIT Imports: 11 Imported by: 5

Documentation

Overview

Package queue is a generated GoMock package.

Package queue is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrClosed                   = errors.New("queue has been closed")
	ErrDropExceedsMaxPacketSize = errors.New("maximum packet size exceeded")
	ErrDropQueueFull            = errors.New("the message queue is full")
	ErrDropExpired              = errors.New("the message is expired")
)

Functions

func Drop

func Drop(onMsgDropped OnMsgDropped, l *zap.Logger, clientID string, msg *gmqtt.Message, err error)

Drop wraps the logging for drop event.

func ElemExpiry

func ElemExpiry(now time.Time, elem *Elem) bool

ElemExpiry return whether the elem is expired

Types

type Elem

type Elem struct {
	// At represents the entry time.
	At time.Time
	// Expiry represents the expiry time.
	// Empty means never expire.
	Expiry time.Time
	MessageWithID
}

Elem represents the element store in the queue.

func (*Elem) Decode

func (e *Elem) Decode(b []byte) (err error)

func (*Elem) Encode

func (e *Elem) Encode() []byte

Encode encode the elem structure into bytes. Format: 8 byte timestamp | 1 byte identifier| data

type InitOptions

type InitOptions struct {
	// CleanStart is the cleanStart field in the connect packet.
	CleanStart bool
	// Version is the client MQTT protocol version.
	Version packets.Version
	// ReadBytesLimit indicates the maximum publish size that is allow to read.
	ReadBytesLimit uint32
}

InitOptions is used to pass some required client information to the queue.Init()

type InternalError

type InternalError struct {
	// Err is the error return by the backend storage.
	Err error
}

InternalError wraps the error of the backend storage.

func (*InternalError) Error

func (i *InternalError) Error() string

type MessageWithID

type MessageWithID interface {
	ID() packets.PacketID
	SetID(id packets.PacketID)
}

type MockMessageWithID

type MockMessageWithID struct {
	// contains filtered or unexported fields
}

MockMessageWithID is a mock of MessageWithID interface

func NewMockMessageWithID

func NewMockMessageWithID(ctrl *gomock.Controller) *MockMessageWithID

NewMockMessageWithID creates a new mock instance

func (*MockMessageWithID) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockMessageWithID) ID

ID mocks base method

func (*MockMessageWithID) SetID

func (m *MockMessageWithID) SetID(id packets.PacketID)

SetID mocks base method

type MockMessageWithIDMockRecorder

type MockMessageWithIDMockRecorder struct {
	// contains filtered or unexported fields
}

MockMessageWithIDMockRecorder is the mock recorder for MockMessageWithID

func (*MockMessageWithIDMockRecorder) ID

ID indicates an expected call of ID

func (*MockMessageWithIDMockRecorder) SetID

func (mr *MockMessageWithIDMockRecorder) SetID(id interface{}) *gomock.Call

SetID indicates an expected call of SetID

type MockStore

type MockStore struct {
	// contains filtered or unexported fields
}

MockStore is a mock of Store interface

func NewMockStore

func NewMockStore(ctrl *gomock.Controller) *MockStore

NewMockStore creates a new mock instance

func (*MockStore) Add

func (m *MockStore) Add(elem *Elem) error

Add mocks base method

func (*MockStore) Clean

func (m *MockStore) Clean() error

Clean mocks base method

func (*MockStore) Close

func (m *MockStore) Close() error

Close mocks base method

func (*MockStore) EXPECT

func (m *MockStore) EXPECT() *MockStoreMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockStore) Init

func (m *MockStore) Init(opts *InitOptions) error

Init mocks base method

func (*MockStore) Read

func (m *MockStore) Read(pids []packets.PacketID) ([]*Elem, error)

Read mocks base method

func (*MockStore) ReadInflight

func (m *MockStore) ReadInflight(maxSize uint) ([]*Elem, error)

ReadInflight mocks base method

func (*MockStore) Remove

func (m *MockStore) Remove(pid packets.PacketID) error

Remove mocks base method

func (*MockStore) Replace

func (m *MockStore) Replace(elem *Elem) (bool, error)

Replace mocks base method

type MockStoreMockRecorder

type MockStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockStoreMockRecorder is the mock recorder for MockStore

func (*MockStoreMockRecorder) Add

func (mr *MockStoreMockRecorder) Add(elem interface{}) *gomock.Call

Add indicates an expected call of Add

func (*MockStoreMockRecorder) Clean

func (mr *MockStoreMockRecorder) Clean() *gomock.Call

Clean indicates an expected call of Clean

func (*MockStoreMockRecorder) Close

func (mr *MockStoreMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockStoreMockRecorder) Init

func (mr *MockStoreMockRecorder) Init(opts interface{}) *gomock.Call

Init indicates an expected call of Init

func (*MockStoreMockRecorder) Read

func (mr *MockStoreMockRecorder) Read(pids interface{}) *gomock.Call

Read indicates an expected call of Read

func (*MockStoreMockRecorder) ReadInflight

func (mr *MockStoreMockRecorder) ReadInflight(maxSize interface{}) *gomock.Call

ReadInflight indicates an expected call of ReadInflight

func (*MockStoreMockRecorder) Remove

func (mr *MockStoreMockRecorder) Remove(pid interface{}) *gomock.Call

Remove indicates an expected call of Remove

func (*MockStoreMockRecorder) Replace

func (mr *MockStoreMockRecorder) Replace(elem interface{}) *gomock.Call

Replace indicates an expected call of Replace

type OnMsgDropped

type OnMsgDropped = func(ctx context.Context, clientID string, msg *gmqtt.Message, err error)

OnMsgDropped is same as server.OnMsgDropped. It is used to avoid import cycle.

type Publish

type Publish struct {
	*gmqtt.Message
}

func (*Publish) Decode

func (p *Publish) Decode(b *bytes.Buffer) (err error)

func (*Publish) Encode

func (p *Publish) Encode(b *bytes.Buffer)

Encode encodes the publish structure into bytes and write it to the buffer

func (*Publish) ID

func (p *Publish) ID() packets.PacketID

func (*Publish) SetID

func (p *Publish) SetID(id packets.PacketID)

type Pubrel

type Pubrel struct {
	PacketID packets.PacketID
}

func (*Pubrel) Decode

func (p *Pubrel) Decode(b *bytes.Buffer) (err error)

func (*Pubrel) Encode

func (p *Pubrel) Encode(b *bytes.Buffer)

Encode encode the pubrel structure into bytes.

func (*Pubrel) ID

func (p *Pubrel) ID() packets.PacketID

func (*Pubrel) SetID

func (p *Pubrel) SetID(id packets.PacketID)

type Store

type Store interface {
	// Close will be called when the client disconnect.
	// This method must unblock the Read method.
	Close() error
	// Init will be called when the client connect.
	// If opts.CleanStart set to true, the implementation should remove any associated data in backend store.
	// If it sets to false, the implementation should be able to retrieve the associated data from backend store.
	// The opts.version indicates the protocol version of the connected client, it is mainly used to calculate the publish packet size.
	Init(opts *InitOptions) error
	Clean() error
	// Add inserts a elem to the queue.
	// When the len of queue is reaching the maximum setting, the implementation should drop non-inflight messages according the following priorities:
	// 1. the current elem if there is no more non-inflight messages.
	// 2. expired message
	// 3. qos0 message
	// 4. the front message
	// see queue.mem for more details.
	Add(elem *Elem) error
	// Replace replaces the PUBLISH with the PUBREL with the same packet id.
	Replace(elem *Elem) (replaced bool, err error)

	// Read reads a batch of new message (non-inflight) from the store. The qos0 messages will be removed after read.
	// The size of the batch will be less than or equal to the size of the given packet id list.
	// The implementation must remove and do not return any :
	// 1. expired messages
	// 2. publish message which exceeds the InitOptions.ReadBytesLimit
	// while reading.
	// The caller must call ReadInflight first to read all inflight message before calling this method.
	// Calling this method will be blocked until there are any new messages can be read or the store has been closed.
	// If the store has been closed, returns nil, ErrClosed.
	Read(pids []packets.PacketID) ([]*Elem, error)

	// ReadInflight reads at most maxSize inflight messages.
	// The caller must call this method to read all inflight messages before calling Read method.
	// Returning 0 length elems means all inflight messages have been read.
	ReadInflight(maxSize uint) (elems []*Elem, err error)

	// Remove removes the elem for a given id.
	Remove(pid packets.PacketID) error
}

Store represents a queue store for one client.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL