Documentation ¶
Index ¶
- Constants
- Variables
- type CollectionSnapshot
- type DataEvent
- type EventHandler
- type InitialLoadOptions
- type Message
- type MessageHandler
- type MessageType
- type Options
- type Pipeline
- type PipelineOpt
- func WithPipelineChunkSize(size int) PipelineOpt
- func WithPipelineCollections(cols []string) PipelineOpt
- func WithPipelineInitialLoad(enabled bool, mode string, omittedCount uint64) PipelineOpt
- func WithPipelineRemoteLastSeq(lastSeq uint64) PipelineOpt
- func WithPipelineRequest(pr PipelineRequest) PipelineOpt
- func WithPipelineSnapshotLastSeq(lastSeq uint64) PipelineOpt
- func WithPipelineSnapshotRequest(sr SnapshotRequest) PipelineOpt
- func WithPipelineStateStore(ss StateStore) PipelineOpt
- func WithPipelineSubscriberID(id string) PipelineOpt
- func WithPipelineSubscription(s Subscription) PipelineOpt
- type PipelineRequest
- type PipelineRequestImpl
- func (pr *PipelineRequestImpl) Awake(sid string, pid uint64) (*pipeline_pb.AwakeReply, error)
- func (pr *PipelineRequestImpl) Pull(sid string, pid uint64, startAt uint64, offset uint64, count int64) (*pipeline_pb.PullEventsReply, error)
- func (pr *PipelineRequestImpl) Suspend(sid string, pid uint64, seq uint64) (*pipeline_pb.SuspendReply, error)
- type PipelineState
- type PipelineStateDummy
- type PipelineStatus
- type RequestHandler
- type Runner
- type Snapshot
- type SnapshotEvent
- type SnapshotOpt
- type SnapshotRequest
- type SnapshotRequestImpl
- func (sr *SnapshotRequestImpl) Close(id string, pid uint64) (*pipeline_pb.ReleaseSnapshotReply, error)
- func (sr *SnapshotRequestImpl) Create(id string, pid uint64) (*pipeline_pb.CreateSnapshotReply, error)
- func (sr *SnapshotRequestImpl) Pull(id string, sid string, pid uint64, col string, lastKey []byte, offset uint64, ...) (*pipeline_pb.PullSnapshotReply, error)
- type StateStore
- type StateStoreDummy
- type Subscriber
- func (sub *Subscriber) AddAllPipelines() error
- func (sub *Subscriber) AddPipeline(pipeline *Pipeline) error
- func (sub *Subscriber) Connect(host string, options *core.Options) error
- func (sub *Subscriber) Disconnect()
- func (sub *Subscriber) GetCollectionInfo(collection string) []string
- func (sub *Subscriber) GetEndpoint() (*core.Endpoint, error)
- func (sub *Subscriber) GetPipeline(pipelineID uint64) *Pipeline
- func (sub *Subscriber) GetPipelineCount() (uint64, error)
- func (sub *Subscriber) Register(subscriberType subscriber_manager_pb.SubscriberType, component string, ...) error
- func (sub *Subscriber) ReleasePipeline(pipelineID uint64)
- func (sub *Subscriber) SetEventHandler(cb MessageHandler)
- func (sub *Subscriber) SetSnapshotHandler(cb MessageHandler)
- func (sub *Subscriber) Start()
- func (sub *Subscriber) SubscribeToCollections(colMap map[string][]string) error
- func (sub *Subscriber) SubscribeToPipelines(pipelines []uint64) error
- type Subscription
- type SubscriptionImpl
- type SubscriptionOpt
Constants ¶
View Source
const ( SubscriberType_Transmitter subscriber_manager_pb.SubscriberType = subscriber_manager_pb.SubscriberType_TRANSMITTER SubscriberType_Exporter subscriber_manager_pb.SubscriberType = subscriber_manager_pb.SubscriberType_EXPORTER )
Variables ¶
View Source
var (
ErrUnknownEventType = errors.New("pipeline: unknown event type")
)
View Source
var PipelineStatusNames = map[PipelineStatus]string{
0: "Open",
1: "Half Open",
2: "Close",
}
Functions ¶
This section is empty.
Types ¶
type CollectionSnapshot ¶ added in v0.0.13
type CollectionSnapshot struct {
// contains filtered or unexported fields
}
type DataEvent ¶ added in v0.0.40
type DataEvent struct { PipelineID uint64 Sequence uint64 RawData []byte // Payload *gravity_sdk_types_projection.Projection Payload *gravity_sdk_types_record.Record }
type EventHandler ¶ added in v0.0.40
type EventHandler struct {
// contains filtered or unexported fields
}
func NewEventHandler ¶ added in v0.0.40
func NewEventHandler(subscriber *Subscriber) *EventHandler
func (*EventHandler) ProcessEvent ¶ added in v0.0.40
func (eh *EventHandler) ProcessEvent(data []byte) error
type InitialLoadOptions ¶ added in v0.0.10
type Message ¶
type Message struct { Pipeline *Pipeline Subscription Subscription Type MessageType Payload interface{} Callback func(*Message) }
func NewMessage ¶ added in v0.0.40
func NewMessage(pipeline *Pipeline, sub Subscription, msgType MessageType, payload interface{}) *Message
type MessageHandler ¶
type MessageHandler func(*Message)
type MessageType ¶ added in v0.0.40
type MessageType int32
const ( MESSAGE_TYPE_EVENT MessageType = iota MESSAGE_TYPE_SNAPSHOT )
type Options ¶
type Options struct { Endpoint string Domain string Key *keyring.KeyInfo WorkerCount int BufferSize int ChunkSize int Verbose bool StateStore StateStore InitialLoad InitialLoadOptions }
func NewOptions ¶
func NewOptions() *Options
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func NewPipeline(id uint64, lastSeq uint64, opts ...PipelineOpt) *Pipeline
func (*Pipeline) Initialize ¶ added in v0.0.40
func (*Pipeline) SetUpdatedSequence ¶ added in v0.0.40
func (*Pipeline) UpdateLastSequence ¶
type PipelineOpt ¶ added in v1.0.6
type PipelineOpt func(*Pipeline)
func WithPipelineChunkSize ¶ added in v1.0.6
func WithPipelineChunkSize(size int) PipelineOpt
func WithPipelineCollections ¶ added in v1.0.6
func WithPipelineCollections(cols []string) PipelineOpt
func WithPipelineInitialLoad ¶ added in v1.0.6
func WithPipelineInitialLoad(enabled bool, mode string, omittedCount uint64) PipelineOpt
func WithPipelineRemoteLastSeq ¶ added in v1.0.6
func WithPipelineRemoteLastSeq(lastSeq uint64) PipelineOpt
func WithPipelineRequest ¶ added in v1.0.6
func WithPipelineRequest(pr PipelineRequest) PipelineOpt
func WithPipelineSnapshotLastSeq ¶ added in v1.0.6
func WithPipelineSnapshotLastSeq(lastSeq uint64) PipelineOpt
func WithPipelineSnapshotRequest ¶ added in v1.0.6
func WithPipelineSnapshotRequest(sr SnapshotRequest) PipelineOpt
func WithPipelineStateStore ¶ added in v1.0.6
func WithPipelineStateStore(ss StateStore) PipelineOpt
func WithPipelineSubscriberID ¶ added in v1.0.6
func WithPipelineSubscriberID(id string) PipelineOpt
func WithPipelineSubscription ¶ added in v1.0.6
func WithPipelineSubscription(s Subscription) PipelineOpt
type PipelineRequest ¶ added in v1.0.6
type PipelineRequest interface { Pull(sid string, pid uint64, startAt uint64, offset uint64, count int64) (*pipeline_pb.PullEventsReply, error) Suspend(sid string, pid uint64, seq uint64) (*pipeline_pb.SuspendReply, error) Awake(sid string, pid uint64) (*pipeline_pb.AwakeReply, error) }
func NewPipelineRequestImpl ¶ added in v1.0.6
func NewPipelineRequestImpl(request RequestHandler) PipelineRequest
type PipelineRequestImpl ¶ added in v1.0.6
type PipelineRequestImpl struct {
// contains filtered or unexported fields
}
func (*PipelineRequestImpl) Awake ¶ added in v1.0.6
func (pr *PipelineRequestImpl) Awake(sid string, pid uint64) (*pipeline_pb.AwakeReply, error)
func (*PipelineRequestImpl) Pull ¶ added in v1.0.6
func (pr *PipelineRequestImpl) Pull(sid string, pid uint64, startAt uint64, offset uint64, count int64) (*pipeline_pb.PullEventsReply, error)
func (*PipelineRequestImpl) Suspend ¶ added in v1.0.6
func (pr *PipelineRequestImpl) Suspend(sid string, pid uint64, seq uint64) (*pipeline_pb.SuspendReply, error)
type PipelineState ¶
type PipelineStateDummy ¶ added in v1.0.6
type PipelineStateDummy struct {
// contains filtered or unexported fields
}
func (*PipelineStateDummy) Flush ¶ added in v1.0.6
func (psd *PipelineStateDummy) Flush() error
func (*PipelineStateDummy) GetLastSequence ¶ added in v1.0.6
func (psd *PipelineStateDummy) GetLastSequence() uint64
func (*PipelineStateDummy) UpdateLastSequence ¶ added in v1.0.6
func (psd *PipelineStateDummy) UpdateLastSequence(lastSeq uint64) error
type PipelineStatus ¶ added in v0.0.40
type PipelineStatus int32
const ( PIPELINE_STATUS_OPEN PipelineStatus = iota PIPELINE_STATUS_HALF_OPEN PIPELINE_STATUS_CLOSE )
type RequestHandler ¶ added in v1.0.6
type Runner ¶ added in v0.0.40
type Runner struct {
// contains filtered or unexported fields
}
func (*Runner) AddPipeline ¶ added in v0.0.40
type Snapshot ¶ added in v0.0.10
type Snapshot struct {
// contains filtered or unexported fields
}
func NewSnapshot ¶ added in v0.0.10
func NewSnapshot(opts ...SnapshotOpt) *Snapshot
type SnapshotEvent ¶ added in v0.0.40
type SnapshotEvent struct { PipelineID uint64 Collection string RawData []byte Payload *gravity_sdk_types_snapshot_record.SnapshotRecord }
type SnapshotOpt ¶ added in v1.0.6
type SnapshotOpt func(*Snapshot)
func WithSnapshotChunkSize ¶ added in v1.0.6
func WithSnapshotChunkSize(size int) SnapshotOpt
func WithSnapshotCollections ¶ added in v1.0.6
func WithSnapshotCollections(cols []string) SnapshotOpt
func WithSnapshotPipelineID ¶ added in v1.0.6
func WithSnapshotPipelineID(id uint64) SnapshotOpt
func WithSnapshotRequest ¶ added in v1.0.6
func WithSnapshotRequest(sr SnapshotRequest) SnapshotOpt
func WithSnapshotSubscriberID ¶ added in v1.0.6
func WithSnapshotSubscriberID(id string) SnapshotOpt
type SnapshotRequest ¶ added in v1.0.6
type SnapshotRequest interface { Pull(id string, sid string, pid uint64, col string, lastKey []byte, offset uint64, count int64) (*pipeline_pb.PullSnapshotReply, error) Create(id string, pid uint64) (*pipeline_pb.CreateSnapshotReply, error) Close(id string, pid uint64) (*pipeline_pb.ReleaseSnapshotReply, error) }
func NewSnapshotRequestImpl ¶ added in v1.0.6
func NewSnapshotRequestImpl(request RequestHandler) SnapshotRequest
type SnapshotRequestImpl ¶ added in v1.0.6
type SnapshotRequestImpl struct {
// contains filtered or unexported fields
}
func (*SnapshotRequestImpl) Close ¶ added in v1.0.6
func (sr *SnapshotRequestImpl) Close(id string, pid uint64) (*pipeline_pb.ReleaseSnapshotReply, error)
func (*SnapshotRequestImpl) Create ¶ added in v1.0.6
func (sr *SnapshotRequestImpl) Create(id string, pid uint64) (*pipeline_pb.CreateSnapshotReply, error)
func (*SnapshotRequestImpl) Pull ¶ added in v1.0.6
func (sr *SnapshotRequestImpl) Pull(id string, sid string, pid uint64, col string, lastKey []byte, offset uint64, count int64) (*pipeline_pb.PullSnapshotReply, error)
type StateStore ¶
type StateStore interface { GetPipelineState(uint64) (PipelineState, error) GetPipelines() []uint64 }
type StateStoreDummy ¶ added in v1.0.6
type StateStoreDummy struct { }
func (*StateStoreDummy) GetPipelineState ¶ added in v1.0.6
func (ssd *StateStoreDummy) GetPipelineState(id uint64) (PipelineState, error)
func (*StateStoreDummy) GetPipelines ¶ added in v1.0.6
func (ssd *StateStoreDummy) GetPipelines() []uint64
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(options *Options) *Subscriber
func NewSubscriberWithClient ¶
func NewSubscriberWithClient(client *core.Client, options *Options) *Subscriber
func (*Subscriber) AddAllPipelines ¶
func (sub *Subscriber) AddAllPipelines() error
func (*Subscriber) AddPipeline ¶
func (sub *Subscriber) AddPipeline(pipeline *Pipeline) error
func (*Subscriber) Connect ¶
func (sub *Subscriber) Connect(host string, options *core.Options) error
func (*Subscriber) Disconnect ¶
func (sub *Subscriber) Disconnect()
func (*Subscriber) GetCollectionInfo ¶
func (sub *Subscriber) GetCollectionInfo(collection string) []string
func (*Subscriber) GetEndpoint ¶ added in v0.0.18
func (sub *Subscriber) GetEndpoint() (*core.Endpoint, error)
func (*Subscriber) GetPipeline ¶ added in v0.0.13
func (sub *Subscriber) GetPipeline(pipelineID uint64) *Pipeline
func (*Subscriber) GetPipelineCount ¶
func (sub *Subscriber) GetPipelineCount() (uint64, error)
func (*Subscriber) Register ¶
func (sub *Subscriber) Register(subscriberType subscriber_manager_pb.SubscriberType, component string, subscriberID string, name string) error
func (*Subscriber) ReleasePipeline ¶ added in v0.0.13
func (sub *Subscriber) ReleasePipeline(pipelineID uint64)
func (sub *Subscriber) AwakePipeline(pipelineID uint64) {
if sub.scheduler == nil { return } sub.scheduler.Awake(pipelineID) }
func (*Subscriber) SetEventHandler ¶ added in v0.0.13
func (sub *Subscriber) SetEventHandler(cb MessageHandler)
func (*Subscriber) SetSnapshotHandler ¶ added in v0.0.13
func (sub *Subscriber) SetSnapshotHandler(cb MessageHandler)
func (*Subscriber) Start ¶ added in v0.0.13
func (sub *Subscriber) Start()
func (*Subscriber) SubscribeToCollections ¶
func (sub *Subscriber) SubscribeToCollections(colMap map[string][]string) error
func (*Subscriber) SubscribeToPipelines ¶ added in v0.0.20
func (sub *Subscriber) SubscribeToPipelines(pipelines []uint64) error
type Subscription ¶
type SubscriptionImpl ¶ added in v1.0.6
type SubscriptionImpl struct {
// contains filtered or unexported fields
}
func NewSubscriptionImpl ¶ added in v1.0.6
func NewSubscriptionImpl(opts ...SubscriptionOpt) *SubscriptionImpl
func (*SubscriptionImpl) Push ¶ added in v1.0.6
func (s *SubscriptionImpl) Push(msg *Message)
func (*SubscriptionImpl) Start ¶ added in v1.0.6
func (s *SubscriptionImpl) Start()
func (*SubscriptionImpl) Unsubscribe ¶ added in v1.0.6
func (s *SubscriptionImpl) Unsubscribe() error
type SubscriptionOpt ¶ added in v1.0.6
type SubscriptionOpt func(s *SubscriptionImpl)
func WithSubscriptionBufferSize ¶ added in v1.0.6
func WithSubscriptionBufferSize(size int) SubscriptionOpt
func WithSubscriptionEventHandler ¶ added in v1.0.6
func WithSubscriptionEventHandler(fn MessageHandler) SubscriptionOpt
func WithSubscriptionSnapshotHandler ¶ added in v1.0.6
func WithSubscriptionSnapshotHandler(fn MessageHandler) SubscriptionOpt
func WithSubscriptionWorkerCount ¶ added in v1.0.6
func WithSubscriptionWorkerCount(count int) SubscriptionOpt
Source Files ¶
Click to show internal directories.
Click to hide internal directories.