Documentation ¶
Index ¶
- Constants
- func GetDataPath(queueID string) string
- func GetFileName(queueID string, segmentID int64) string
- func GetLastCompressFileNum(queueID string) int64
- func GetLastS3UploadFileNum(queueID string) int64
- func Notify(queue string, eventType EventType, fileNum int64)
- func RegisterEventListener(handler EventHandler)
- func RemoveFile(cfg *DiskQueueConfig, queueID string, segmentID int64)
- func SmartGetFileName(cfg *DiskQueueConfig, queueID string, segmentID int64) (string, bool, bool)
- type CompressConfig
- type Consumer
- type Context
- type DiskBasedQueue
- func (d *DiskBasedQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, ...) (queue.ConsumerAPI, error)
- func (d *DiskBasedQueue) Close() error
- func (d *DiskBasedQueue) Delete() error
- func (d *DiskBasedQueue) DeleteSegmentConsumerInReading(consumerID string)
- func (d *DiskBasedQueue) Depth() int64
- func (d *DiskBasedQueue) Destroy() error
- func (d *DiskBasedQueue) Empty() error
- func (d *DiskBasedQueue) GetFileName(segmentID int64) string
- func (d *DiskBasedQueue) LatestOffset() queue.Offset
- func (d *DiskBasedQueue) Put(data []byte) WriteResponse
- func (d *DiskBasedQueue) ReadChan() <-chan []byte
- func (d *DiskBasedQueue) ReadContext() Context
- func (d *DiskBasedQueue) UpdateSegmentConsumerInReading(consumerID string, segment int64)
- type DiskCompress
- type DiskQueue
- func (module *DiskQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.ConsumerAPI, error)
- func (module *DiskQueue) AcquireProducer(cfg *queue.QueueConfig) (queue.ProducerAPI, error)
- func (module *DiskQueue) Close(k string) error
- func (module *DiskQueue) CommitOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (bool, error)
- func (module *DiskQueue) DeleteOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) error
- func (module *DiskQueue) Depth(k string) int64
- func (module *DiskQueue) Destroy(k string) error
- func (module *DiskQueue) GetEarlierOffsetByQueueID(queueID string) (int, int64)
- func (module *DiskQueue) GetLatestOffsetByQueueID(queueID string) (int, int64)
- func (module *DiskQueue) GetOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.Offset, error)
- func (module *DiskQueue) GetQueues() []string
- func (module *DiskQueue) GetStorageSize(k string) uint64
- func (module *DiskQueue) Init(name string) error
- func (module *DiskQueue) LatestOffset(k *queue.QueueConfig) queue.Offset
- func (module *DiskQueue) Name() string
- func (module *DiskQueue) Pop(k string, timeoutDuration time.Duration) (data []byte, timeout bool)
- func (module *DiskQueue) Push(k string, v []byte) error
- func (module *DiskQueue) ReadChan(k string) <-chan []byte
- func (this *DiskQueue) ReleaseConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, ...) error
- func (this *DiskQueue) ReleaseProducer(k *queue.QueueConfig, producer queue.ProducerAPI) error
- func (module *DiskQueue) Setup()
- func (module *DiskQueue) Start() error
- func (module *DiskQueue) Stop() error
- type DiskQueueConfig
- type Event
- type EventHandler
- type EventType
- type Message
- type Producer
- type RetentionConfig
- type WriteResponse
Constants ¶
View Source
const ConsumerOffsetBucket = "queue_consumer_commit_offset"
View Source
const ReadComplete = EventType("ReadComplete")
View Source
const WriteComplete = EventType("WriteComplete")
Variables ¶
This section is empty.
Functions ¶
func GetDataPath ¶
func GetFileName ¶
func GetLastCompressFileNum ¶
func GetLastS3UploadFileNum ¶
func RegisterEventListener ¶
func RegisterEventListener(handler EventHandler)
func RemoveFile ¶
func RemoveFile(cfg *DiskQueueConfig, queueID string, segmentID int64)
func SmartGetFileName ¶
if local file not found, try to download from s3
Types ¶
type CompressConfig ¶
type Consumer ¶
type Consumer struct { ID string // contains filtered or unexported fields }
NOTE: Consumer is not thread-safe
func (*Consumer) FetchMessages ¶
func (*Consumer) ResetOffset ¶
type DiskBasedQueue ¶
providing a filesystem backed FIFO queue
func NewDiskQueueByConfig ¶
func NewDiskQueueByConfig(name, dataPath string, cfg *DiskQueueConfig) *DiskBasedQueue
NewDiskQueue instantiates a new instance of DiskBasedQueue, retrieving metadata from the filesystem and starting the read ahead goroutine
func (*DiskBasedQueue) AcquireConsumer ¶
func (d *DiskBasedQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (queue.ConsumerAPI, error)
func (*DiskBasedQueue) Close ¶
func (d *DiskBasedQueue) Close() error
Close cleans up the queue and persists metadata
func (*DiskBasedQueue) Delete ¶
func (d *DiskBasedQueue) Delete() error
func (*DiskBasedQueue) DeleteSegmentConsumerInReading ¶
func (d *DiskBasedQueue) DeleteSegmentConsumerInReading(consumerID string)
func (*DiskBasedQueue) Depth ¶
func (d *DiskBasedQueue) Depth() int64
func (*DiskBasedQueue) Destroy ¶
func (d *DiskBasedQueue) Destroy() error
Destroy cleans up all data for the specified queue
func (*DiskBasedQueue) Empty ¶
func (d *DiskBasedQueue) Empty() error
Empty destructively clears out any pending data in the queue by fast forwarding read positions and removing intermediate files
func (*DiskBasedQueue) GetFileName ¶
func (d *DiskBasedQueue) GetFileName(segmentID int64) string
func (*DiskBasedQueue) LatestOffset ¶
func (d *DiskBasedQueue) LatestOffset() queue.Offset
func (*DiskBasedQueue) Put ¶
func (d *DiskBasedQueue) Put(data []byte) WriteResponse
Put writes a []byte to the queue
func (*DiskBasedQueue) ReadChan ¶
func (d *DiskBasedQueue) ReadChan() <-chan []byte
ReadChan returns the receive-only []byte channel for reading data
func (*DiskBasedQueue) ReadContext ¶
func (d *DiskBasedQueue) ReadContext() Context
Depth returns the depth of the queue
func (*DiskBasedQueue) UpdateSegmentConsumerInReading ¶
func (d *DiskBasedQueue) UpdateSegmentConsumerInReading(consumerID string, segment int64)
type DiskCompress ¶
type DiskCompress struct { DeleteAfterCompress bool `config:"delete_after_compress"` Message CompressConfig `config:"message"` Segment CompressConfig `config:"segment"` IdleThreshold int64 `config:"idle_threshold"` NumOfFilesDecompressAhead int64 `config:"num_of_files_decompress_ahead"` }
type DiskQueue ¶
type DiskQueue struct {
// contains filtered or unexported fields
}
func (*DiskQueue) AcquireConsumer ¶
func (module *DiskQueue) AcquireConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.ConsumerAPI, error)
func (*DiskQueue) AcquireProducer ¶
func (module *DiskQueue) AcquireProducer(cfg *queue.QueueConfig) (queue.ProducerAPI, error)
func (*DiskQueue) CommitOffset ¶
func (module *DiskQueue) CommitOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig, offset queue.Offset) (bool, error)
func (*DiskQueue) DeleteOffset ¶
func (module *DiskQueue) DeleteOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) error
func (*DiskQueue) GetEarlierOffsetByQueueID ¶
func (*DiskQueue) GetLatestOffsetByQueueID ¶
func (*DiskQueue) GetOffset ¶
func (module *DiskQueue) GetOffset(k *queue.QueueConfig, consumer *queue.ConsumerConfig) (queue.Offset, error)
func (*DiskQueue) GetStorageSize ¶
func (*DiskQueue) LatestOffset ¶
func (module *DiskQueue) LatestOffset(k *queue.QueueConfig) queue.Offset
func (*DiskQueue) ReleaseConsumer ¶
func (this *DiskQueue) ReleaseConsumer(qconfig *queue.QueueConfig, consumer *queue.ConsumerConfig, instance queue.ConsumerAPI) error
func (*DiskQueue) ReleaseProducer ¶
func (this *DiskQueue) ReleaseProducer(k *queue.QueueConfig, producer queue.ProducerAPI) error
type DiskQueueConfig ¶
type DiskQueueConfig struct { MinMsgSize int32 `config:"min_msg_size"` MaxMsgSize int32 `config:"max_msg_size"` MaxBytesPerFile int64 `config:"max_bytes_per_file"` SyncEveryRecords int64 `config:"sync_every_records"` SyncTimeoutInMS int `config:"sync_timeout_in_ms"` NotifyChanBuffer int `config:"notify_chan_buffer_size"` ReadChanBuffer int `config:"read_chan_buffer_size"` WriteChanBuffer int `config:"write_chan_buffer_size"` WriteTimeoutInMS int64 `config:"write_timeout_in_ms" json:"write_timeout_in_ms,omitempty"` EOFRetryDelayInMs int64 `config:"eof_retry_delay_in_ms" json:"eof_retry_delay_in_ms,omitempty"` MaxUsedBytes uint64 `config:"max_used_bytes"` WarningFreeBytes uint64 `config:"warning_free_bytes"` ReservedFreeBytes uint64 `config:"reserved_free_bytes"` AutoSkipCorruptFile bool `config:"auto_skip_corrupted_file"` UploadToS3 bool `config:"upload_to_s3"` AlwaysDownload bool `config:"always_download"` PrepareFilesToRead bool `config:"prepare_files_to_read"` CompressAndCleanupDuringInit bool `config:"cleanup_files_on_init"` //default queue adaptor Default bool `config:"default"` Enabled bool `config:"enabled"` SkipZeroConsumers bool `config:"skip_zero_consumers"` Compress DiskCompress `config:"compress"` Retention RetentionConfig `config:"retention"` S3 config.S3BucketConfig `config:"s3"` }
type EventHandler ¶
type Message ¶
type Message struct { Payload *bytebufferpool.ByteBuffer Context Context }
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) Produce ¶
func (p *Producer) Produce(reqs *[]queue.ProduceRequest) (*[]queue.ProduceResponse, error)
type RetentionConfig ¶
type RetentionConfig struct {
MaxNumOfLocalFiles int64 `config:"max_num_of_local_files"`
}
type WriteResponse ¶
Click to show internal directories.
Click to hide internal directories.