queue

package
v0.0.0-...-38575d5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: AGPL-3.0 Imports: 29 Imported by: 1

Documentation

Index

Constants

View Source
const ConsumerOffsetBucket = "queue_consumer_commit_offset"
View Source
const ReadComplete = EventType("ReadComplete")
View Source
const WriteComplete = EventType("WriteComplete")

Variables

This section is empty.

Functions

func GetDataPath

func GetDataPath(queueID string) string

func GetFileName

func GetFileName(queueID string, segmentID int64) string

func GetLastCompressFileNum

func GetLastCompressFileNum(queueID string) int64

func GetLastS3UploadFileNum

func GetLastS3UploadFileNum(queueID string) int64

func Notify

func Notify(queue string, eventType EventType, fileNum int64)

func RegisterEventListener

func RegisterEventListener(handler EventHandler)

func RemoveFile

func RemoveFile(cfg *DiskQueueConfig, queueID string, segmentID int64)

func SmartGetFileName

func SmartGetFileName(cfg *DiskQueueConfig, queueID string, segmentID int64) (string, bool, bool)

if local file not found, try to download from s3

Types

type CompressConfig

type CompressConfig struct {
	Enabled bool `config:"enabled"`
	Level   int  `config:"level"`
}

type Consumer

type Consumer struct {
	ID string
	// contains filtered or unexported fields
}

NOTE: Consumer is not thread-safe

func (*Consumer) Close

func (d *Consumer) Close() error

func (*Consumer) CommitOffset

func (this *Consumer) CommitOffset(offset queue.Offset) error

func (*Consumer) FetchMessages

func (d *Consumer) FetchMessages(ctx *queue.Context, numOfMessages int) (messages []queue.Message, isTimeout bool, err error)

func (*Consumer) ResetOffset

func (d *Consumer) ResetOffset(segment, readPos int64) error

type Context

type Context struct {
	WriteFile    string `json:"write_file_path"`
	WriteFileNum int64  `json:"write_file_num"`
}

type DiskBasedQueue

type DiskBasedQueue struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

providing a filesystem backed FIFO queue

func NewDiskQueueByConfig

func NewDiskQueueByConfig(name, dataPath string, cfg *DiskQueueConfig) *DiskBasedQueue

NewDiskQueue instantiates a new instance of DiskBasedQueue, retrieving metadata from the filesystem and starting the read ahead goroutine

func (*DiskBasedQueue) AcquireConsumer

func (d *DiskBasedQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (queue.ConsumerAPI, error)

func (*DiskBasedQueue) Close

func (d *DiskBasedQueue) Close() error

Close cleans up the queue and persists metadata

func (*DiskBasedQueue) Delete

func (d *DiskBasedQueue) Delete() error

func (*DiskBasedQueue) DeleteSegmentConsumerInReading

func (d *DiskBasedQueue) DeleteSegmentConsumerInReading(consumerID string)

func (*DiskBasedQueue) Depth

func (d *DiskBasedQueue) Depth() int64

func (*DiskBasedQueue) Destroy

func (d *DiskBasedQueue) Destroy() error

Destroy cleans up all data for the specified queue

func (*DiskBasedQueue) Empty

func (d *DiskBasedQueue) Empty() error

Empty destructively clears out any pending data in the queue by fast forwarding read positions and removing intermediate files

func (*DiskBasedQueue) GetFileName

func (d *DiskBasedQueue) GetFileName(segmentID int64) string

func (*DiskBasedQueue) LatestOffset

func (d *DiskBasedQueue) LatestOffset() queue.Offset

func (*DiskBasedQueue) Put

func (d *DiskBasedQueue) Put(data []byte) WriteResponse

Put writes a []byte to the queue

func (*DiskBasedQueue) ReadChan

func (d *DiskBasedQueue) ReadChan() <-chan []byte

ReadChan returns the receive-only []byte channel for reading data

func (*DiskBasedQueue) ReadContext

func (d *DiskBasedQueue) ReadContext() Context

Depth returns the depth of the queue

func (*DiskBasedQueue) UpdateSegmentConsumerInReading

func (d *DiskBasedQueue) UpdateSegmentConsumerInReading(consumerID string, segment int64)

type DiskCompress

type DiskCompress struct {
	DeleteAfterCompress       bool           `config:"delete_after_compress"`
	Message                   CompressConfig `config:"message"`
	Segment                   CompressConfig `config:"segment"`
	IdleThreshold             int64          `config:"idle_threshold"`
	NumOfFilesDecompressAhead int64          `config:"num_of_files_decompress_ahead"`
}

type DiskQueue

type DiskQueue struct {
	// contains filtered or unexported fields
}

func (*DiskQueue) AcquireConsumer

func (module *DiskQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.ConsumerAPI, error)

func (*DiskQueue) AcquireProducer

func (module *DiskQueue) AcquireProducer(cfg *queue.QueueConfig) (queue.ProducerAPI, error)

func (*DiskQueue) Close

func (module *DiskQueue) Close(k string) error

func (*DiskQueue) CommitOffset

func (module *DiskQueue) CommitOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (bool, error)

func (*DiskQueue) DeleteOffset

func (module *DiskQueue) DeleteOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) error

func (*DiskQueue) Depth

func (module *DiskQueue) Depth(k string) int64

func (*DiskQueue) Destroy

func (module *DiskQueue) Destroy(k string) error

func (*DiskQueue) GetEarlierOffsetByQueueID

func (module *DiskQueue) GetEarlierOffsetByQueueID(queueID string) (int, int64)

func (*DiskQueue) GetLatestOffsetByQueueID

func (module *DiskQueue) GetLatestOffsetByQueueID(queueID string) (int, int64)

func (*DiskQueue) GetOffset

func (module *DiskQueue) GetOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.Offset, error)

func (*DiskQueue) GetQueues

func (module *DiskQueue) GetQueues() []string

func (*DiskQueue) GetStorageSize

func (module *DiskQueue) GetStorageSize(k string) uint64

func (*DiskQueue) Init

func (module *DiskQueue) Init(name string) error

func (*DiskQueue) LatestOffset

func (module *DiskQueue) LatestOffset(k *queue.QueueConfig) queue.Offset

func (*DiskQueue) Name

func (module *DiskQueue) Name() string

func (*DiskQueue) Pop

func (module *DiskQueue) Pop(k string, timeoutDuration time.Duration) (data []byte, timeout bool)

func (*DiskQueue) Push

func (module *DiskQueue) Push(k string, v []byte) error

func (*DiskQueue) ReadChan

func (module *DiskQueue) ReadChan(k string) <-chan []byte

func (*DiskQueue) ReleaseConsumer

func (this *DiskQueue) ReleaseConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, instance queue.ConsumerAPI) error

func (*DiskQueue) ReleaseProducer

func (this *DiskQueue) ReleaseProducer(k *queue.QueueConfig, producer queue.ProducerAPI) error

func (*DiskQueue) Setup

func (module *DiskQueue) Setup()

func (*DiskQueue) Start

func (module *DiskQueue) Start() error

func (*DiskQueue) Stop

func (module *DiskQueue) Stop() error

type DiskQueueConfig

type DiskQueueConfig struct {
	MinMsgSize       int32 `config:"min_msg_size"`
	MaxMsgSize       int32 `config:"max_msg_size"`
	MaxBytesPerFile  int64 `config:"max_bytes_per_file"`
	SyncEveryRecords int64 `config:"sync_every_records"`
	SyncTimeoutInMS  int   `config:"sync_timeout_in_ms"`
	NotifyChanBuffer int   `config:"notify_chan_buffer_size"`
	ReadChanBuffer   int   `config:"read_chan_buffer_size"`
	WriteChanBuffer  int   `config:"write_chan_buffer_size"`

	WriteTimeoutInMS  int64 `config:"write_timeout_in_ms" json:"write_timeout_in_ms,omitempty"`
	EOFRetryDelayInMs int64 `config:"eof_retry_delay_in_ms" json:"eof_retry_delay_in_ms,omitempty"`

	MaxUsedBytes      uint64 `config:"max_used_bytes"`
	WarningFreeBytes  uint64 `config:"warning_free_bytes"`
	ReservedFreeBytes uint64 `config:"reserved_free_bytes"`

	AutoSkipCorruptFile bool `config:"auto_skip_corrupted_file"`

	UploadToS3     bool `config:"upload_to_s3"`
	AlwaysDownload bool `config:"always_download"`

	PrepareFilesToRead bool `config:"prepare_files_to_read"`

	CompressAndCleanupDuringInit bool `config:"cleanup_files_on_init"`

	//default queue adaptor
	Default bool `config:"default"`
	Enabled bool `config:"enabled"`

	SkipZeroConsumers bool `config:"skip_zero_consumers"`

	Compress DiskCompress `config:"compress"`

	Retention RetentionConfig `config:"retention"`

	S3 config.S3BucketConfig `config:"s3"`
}

type Event

type Event struct {
	Queue   string
	Type    EventType
	FileNum int64
}

type EventHandler

type EventHandler func(event Event) error

type EventType

type EventType string

type Message

type Message struct {
	Payload *bytebufferpool.ByteBuffer
	Context Context
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Produce

func (p *Producer) Produce(reqs *[]queue.ProduceRequest) (*[]queue.ProduceResponse, error)

type RetentionConfig

type RetentionConfig struct {
	MaxNumOfLocalFiles int64 `config:"max_num_of_local_files"`
}

type WriteResponse

type WriteResponse struct {
	Segment  int64
	Position int64
	Error    error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL