taskdb

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2022 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPayloadNotFound = errors.New("payload not found")
	ErrStateNotFound   = errors.New("state not found")
)
View Source
var (
	ErrAttachEnded = errors.New("attach ended")
)

Functions

This section is empty.

Types

type AttachIterator

type AttachIterator interface {
	// Returns an array of task states. Returns nil when there are no more, safe to call multiple times when empty
	Next() ([]*TaskDBTaskState, error)
}

type BadgerTaskStateWithID added in v0.0.7

type BadgerTaskStateWithID struct {
	State   *TaskDBTaskState
	ID      []byte
	Payload string
}

type DrainIterator

type DrainIterator interface {
	// Returns an array of task payloads that can be joined to their current states, and sent to other partitions. Returns nil when there are no more, safe to call multiple times when empty
	Next() ([]*DrainTask, error)
}

type DrainTask

type DrainTask struct {
	Topic    string
	Priority int32
	Payload  string
}

type FakeAttachIterator added in v0.0.2

type FakeAttachIterator struct{}

func (*FakeAttachIterator) Next added in v0.0.2

func (fai *FakeAttachIterator) Next() ([]*TaskDBTaskState, error)

type KVAttachIterator added in v0.0.9

type KVAttachIterator struct {
	// contains filtered or unexported fields
}

func (*KVAttachIterator) Next added in v0.0.9

func (ai *KVAttachIterator) Next() ([]*TaskDBTaskState, error)

type KVDrainIterator added in v0.0.9

type KVDrainIterator struct {
	// contains filtered or unexported fields
}

func (*KVDrainIterator) Next added in v0.0.9

func (di *KVDrainIterator) Next() ([]*DrainTask, error)

type KVTaskDB added in v0.0.9

type KVTaskDB struct {
	// contains filtered or unexported fields
}

func NewKVTaskDB added in v0.0.9

func NewKVTaskDB(partition string) (*KVTaskDB, error)

func (*KVTaskDB) Attach added in v0.0.9

func (tdb *KVTaskDB) Attach() AttachIterator

func (*KVTaskDB) Close added in v0.0.12

func (tdb *KVTaskDB) Close() error

func (*KVTaskDB) Delete added in v0.0.9

func (tdb *KVTaskDB) Delete(topicName, taskID string) WriteResult

func (*KVTaskDB) Drain added in v0.0.9

func (tdb *KVTaskDB) Drain() DrainIterator

func (*KVTaskDB) GetPayload added in v0.0.9

func (tdb *KVTaskDB) GetPayload(topicName, taskID string) (payload string, err error)

func (*KVTaskDB) PutPayload added in v0.0.9

func (tdb *KVTaskDB) PutPayload(topicName, taskID string, payload string) WriteResult

func (*KVTaskDB) PutState added in v0.0.9

func (tdb *KVTaskDB) PutState(state *TaskDBTaskState) WriteResult

type KVWriteResult added in v0.0.9

type KVWriteResult struct {
	// contains filtered or unexported fields
}

func (KVWriteResult) Get added in v0.0.9

func (wr KVWriteResult) Get() error

type MemoryDrainIterator added in v0.0.2

type MemoryDrainIterator struct {
	// contains filtered or unexported fields
}

func (*MemoryDrainIterator) Next added in v0.0.2

func (di *MemoryDrainIterator) Next() ([]*DrainTask, error)

type MemoryTaskDB added in v0.0.2

type MemoryTaskDB struct {
	// contains filtered or unexported fields
}

func NewMemoryTaskDB added in v0.0.2

func NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) Attach added in v0.0.2

func (mtdb *MemoryTaskDB) Attach() AttachIterator

func (MemoryTaskDB) Close added in v0.0.12

func (MemoryTaskDB) Close() error

func (*MemoryTaskDB) Delete added in v0.0.2

func (mtdb *MemoryTaskDB) Delete(topicName, taskID string) WriteResult

func (*MemoryTaskDB) Drain added in v0.0.2

func (mtdb *MemoryTaskDB) Drain() DrainIterator

func (*MemoryTaskDB) GetPayload added in v0.0.2

func (mtdb *MemoryTaskDB) GetPayload(topicName, taskID string) (string, error)

func (*MemoryTaskDB) NewMemoryTaskDB added in v0.0.2

func (mtdb *MemoryTaskDB) NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) PutPayload added in v0.0.2

func (mtdb *MemoryTaskDB) PutPayload(topicName, taskID string, payload string) WriteResult

func (*MemoryTaskDB) PutState added in v0.0.2

func (mtdb *MemoryTaskDB) PutState(state *TaskDBTaskState) WriteResult

type MemoryWriteResult added in v0.0.2

type MemoryWriteResult struct{}

func (MemoryWriteResult) Get added in v0.0.2

func (mwr MemoryWriteResult) Get() error

type TaskDB

type TaskDB interface {
	// Acquires the TaskDB lock as needed, then returns an AttachIterator
	Attach() AttachIterator

	// A Task will be inserted into the task table, and its first state inserted
	PutPayload(topicName, taskID string, payload string) WriteResult

	// A new task state
	PutState(state *TaskDBTaskState) WriteResult

	// Retrieves the payload for a given task
	GetPayload(topicName, taskID string) (string, error)

	// Deletes all task states for a topic, and removes the topic from the task table. If no more tasks exist then the task will be removed from the task table
	Delete(topicName, taskID string) WriteResult

	// Returns an iterator that will read all payloads from the DB, so they can be drained into other partitions.
	// Automatically closes the TaskDB
	Drain() DrainIterator

	// Closes the TaskDB without draining
	Close() error
}

type TaskDBTaskState

type TaskDBTaskState struct {
	Topic     string
	Partition string

	ID               string
	State            TaskState
	Version          int32
	DeliveryAttempts int32
	CreatedAt        time.Time
	Priority         int32
}

func NewTaskDBTaskState

func NewTaskDBTaskState(partition, topicName, taskID string, state TaskState, version, deliveryAttempts, priority int32, createdAt time.Time) *TaskDBTaskState

type TaskState

type TaskState int32
const (
	TASK_STATE_ENQUEUED TaskState = 0
	TASK_STATE_DELAYED  TaskState = 1
	TASK_STATE_INFLIGHT TaskState = 2
)

func (TaskState) String

func (ts TaskState) String() string

type WriteResult

type WriteResult interface {
	// Waits for the write to be committed to the TaskDB.
	// Should use a buffered channel to wait for a batch to be committed.
	// If the TaskDB does not batch it should still immediately write to
	// the buffered channel so that function calls are non-blocking and
	// return immediately
	Get() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL