Documentation ¶
Index ¶
- Constants
- Variables
- type ClosableQueue
- type Config
- type Message
- func (m *Message) Attempt() int64
- func (m *Message) Bytes() ([]byte, error)
- func (m *Message) Commit(discard bool) error
- func (m *Message) Get(propertyName string) string
- func (m *Message) ID() uint64
- func (m *Message) Open() (io.ReadSeekCloser, error)
- func (m *Message) Properties() map[string][]byte
- func (m *Message) Size() int64
- type Queue
- type Stats
Examples ¶
Constants ¶
const ( DefaultBucket = "messages" DefaultStorageDir = "queue-data" DefaultInlineSize = 8192 )
Variables ¶
var (
ErrEmpty = errors.New("queue is empty")
)
Functions ¶
This section is empty.
Types ¶
type ClosableQueue ¶
type ClosableQueue struct {
*Queue
}
func Default ¶
func Default(dir string) (*ClosableQueue, error)
Default is alias to Open with all defaults. dir/data as storage dir and dir/index.db as metadata storage. Queue must be closed to avoid resource leak.
Example ¶
package main import ( "bytes" "context" "fmt" "os" "github.com/reddec/pqueue" ) func main() { _ = os.RemoveAll("./data") // remove old queue // error handling omitted for convenience q, _ := pqueue.Default("./data") id, _ := q.Put(bytes.NewBufferString("hello world"), nil) fmt.Println("id:", id) msg, _ := q.Get(context.TODO()) defer msg.Commit(true) fmt.Println("got id:", msg.ID()) data, _ := msg.Bytes() // this will read all payload into memory; for streaming use Open() fmt.Println(string(data)) }
Output: id: 1 got id: 1 hello world
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func (*Message) Commit ¶
Commit message from the queue.
If discard flag set, message will be completely removed from the queue, otherwise message will be released and available for next Get operation.
func (*Message) Open ¶
func (m *Message) Open() (io.ReadSeekCloser, error)
Open stream of payload. Can be invoked several times to get multiple parallel streams. All streams must be closed individually.
func (*Message) Properties ¶
Properties as user provided for Put. Any modifications will not be visible for next Get/Try operation (ie: not saved).
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is designed to be universal and process messages with payload bigger than RAM by storing data as separated file and keeping in queue only reference to that file. Small messages will be inlined in the queue.
Important: to re-use queue it's required to define same storage location as before, otherwise links to payload files will be broken.
func (*Queue) Clear ¶
Clear queue items and linked files. This may take a time in case of big queue. Running clear with opened linked files may cause platform-depended behaviour.
func (*Queue) Get ¶
Get message from the queue or block till message will be available or context canceled. Returned message MUST be committed.
func (*Queue) Put ¶
Put item to queue. If data stream is smaller or equal to inline size it will be stored in queue metadata, otherwise it will be stored as linked file.
Properties (could be nil) always stored in queue. Do not put too much information into properties because queue item should be read in-memory before processing.
Returns unique ID of the message.
type Stats ¶
type Stats struct { Added int64 // total amount of successfully added items to the queue (can only grow) Returned int64 // total amount of successfully committed items with discard=false (can only grow) Removed int64 // total amount of successfully committed items with discard=true (can only grow) Size int64 // size of the queue Locked int64 // number of locked messages }
Stats of queue operation.