subscriber

package
v0.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2021 License: MIT Imports: 17 Imported by: 12

Documentation

Index

Constants

Variables

View Source
var SchedulerTaskState_name = map[SchedulerTaskState]string{
	0: "Idle",
	1: "Busy",
	2: "Suspend",
}

Functions

This section is empty.

Types

type CollectionSnapshot added in v0.0.13

type CollectionSnapshot struct {
	// contains filtered or unexported fields
}

type InitialLoadOptions added in v0.0.10

type InitialLoadOptions struct {
	Enabled      bool
	OmittedCount uint64
}

type Message

type Message struct {
	Pipeline     *Pipeline
	Subscription *Subscription
	Event        *gravity_sdk_types_event.EventPayload
	Snapshot     *gravity_sdk_types_event.SnapshotInfo
}

func (*Message) Ack

func (msg *Message) Ack()

type MessageHandler

type MessageHandler func(*Message)

type Options

type Options struct {
	WorkerCount int
	BufferSize  int
	ChunkSize   int
	Verbose     bool
	StateStore  StateStore
	InitialLoad InitialLoadOptions
}

func NewOptions

func NewOptions() *Options

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(subscriber *Subscriber, id uint64, lastSeq uint64) *Pipeline

func (*Pipeline) Awake added in v0.0.13

func (pipeline *Pipeline) Awake()

func (*Pipeline) Idle added in v0.0.13

func (pipeline *Pipeline) Idle()

func (*Pipeline) Pull added in v0.0.10

func (pipeline *Pipeline) Pull() error

func (*Pipeline) Ready added in v0.0.13

func (pipeline *Pipeline) Ready()

func (*Pipeline) SaveLastSequence added in v0.0.13

func (pipeline *Pipeline) SaveLastSequence() error

func (*Pipeline) Suspend added in v0.0.13

func (pipeline *Pipeline) Suspend() error

func (*Pipeline) UpdateLastSequence

func (pipeline *Pipeline) UpdateLastSequence(sequence uint64)

type PipelineState

type PipelineState interface {
	GetLastSequence() uint64
	UpdateLastSequence(uint64) error
}

type Scheduler added in v0.0.13

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler added in v0.0.13

func NewScheduler(taskCount int, workerCount int) *Scheduler

func (*Scheduler) AddPipeline added in v0.0.13

func (scheduler *Scheduler) AddPipeline(pipeline *Pipeline)

func (*Scheduler) Awake added in v0.0.13

func (scheduler *Scheduler) Awake(pipelineID uint64)

func (*Scheduler) GetTaskState added in v0.0.13

func (scheduler *Scheduler) GetTaskState(pipelineID uint64) *TaskState

func (*Scheduler) Idle added in v0.0.13

func (scheduler *Scheduler) Idle(pipelineID uint64)

func (*Scheduler) Initialize added in v0.0.13

func (scheduler *Scheduler) Initialize()

type SchedulerTaskState added in v0.0.13

type SchedulerTaskState int32
const (
	SchedulerTaskState_Idle SchedulerTaskState = iota
	SchedulerTaskState_Busy
	SchedulerTaskState_Suspend
)

type Snapshot added in v0.0.10

type Snapshot struct {
	// contains filtered or unexported fields
}

func NewSnapshot added in v0.0.10

func NewSnapshot(pipeline *Pipeline) *Snapshot

func (*Snapshot) Close added in v0.0.13

func (snapshot *Snapshot) Close() error

func (*Snapshot) Create added in v0.0.10

func (snapshot *Snapshot) Create() error

func (*Snapshot) Pull added in v0.0.10

func (snapshot *Snapshot) Pull() (int64, error)

func (*Snapshot) UpdateLastKey added in v0.0.13

func (snapshot *Snapshot) UpdateLastKey(collection string, key []byte) error

type StateStore

type StateStore interface {
	GetPipelineState(uint64) (PipelineState, error)
	GetPipelines() []uint64
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(options *Options) *Subscriber

func NewSubscriberWithClient

func NewSubscriberWithClient(client *core.Client, options *Options) *Subscriber

func (*Subscriber) AddAllPipelines

func (sub *Subscriber) AddAllPipelines() error

func (*Subscriber) AddPipeline

func (sub *Subscriber) AddPipeline(pipeline *Pipeline) error

func (*Subscriber) AwakePipeline added in v0.0.13

func (sub *Subscriber) AwakePipeline(pipelineID uint64)

func (*Subscriber) Connect

func (sub *Subscriber) Connect(host string, options *core.Options) error

func (*Subscriber) Disconnect

func (sub *Subscriber) Disconnect()

func (*Subscriber) GetCollectionInfo

func (sub *Subscriber) GetCollectionInfo(collection string) []string

func (*Subscriber) GetPipeline added in v0.0.13

func (sub *Subscriber) GetPipeline(pipelineID uint64) *Pipeline

func (*Subscriber) GetPipelineCount

func (sub *Subscriber) GetPipelineCount() (uint64, error)

func (*Subscriber) Register

func (sub *Subscriber) Register(subscriberType subscriber_manager_pb.SubscriberType, component string, subscriberID string, name string) error

func (*Subscriber) ReleasePipeline added in v0.0.13

func (sub *Subscriber) ReleasePipeline(pipelineID uint64)

func (*Subscriber) SetEventHandler added in v0.0.13

func (sub *Subscriber) SetEventHandler(cb MessageHandler)

func (*Subscriber) SetSnapshotHandler added in v0.0.13

func (sub *Subscriber) SetSnapshotHandler(cb MessageHandler)

func (*Subscriber) Start added in v0.0.13

func (sub *Subscriber) Start()

func (*Subscriber) SubscribeToCollections

func (sub *Subscriber) SubscribeToCollections(colMap map[string][]string) error

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(subscriber *Subscriber, bufferSize int) *Subscription

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

type TaskState added in v0.0.13

type TaskState struct {
	// contains filtered or unexported fields
}

func NewTaskState added in v0.0.13

func NewTaskState() *TaskState

func (*TaskState) SetState added in v0.0.13

func (ts *TaskState) SetState(state SchedulerTaskState)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL