Documentation ¶
Overview ¶
Package changes implements backward-compatible Change api as defined in older version of Pydio
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BackgroundLogger ¶ added in v1.0.1
BackgroundLogger creates a logger with service name
Types ¶
type BatchInsert ¶ added in v1.0.1
type BatchInsert struct {
// contains filtered or unexported fields
}
BatchInsert buffers changes to store them using a bulk insert query
func NewBatchInsert ¶ added in v1.0.1
func NewBatchInsert(dao DAO, timeout time.Duration, maxSize int) *BatchInsert
NewBatchInsert creates a new BatchInsert and start watching for incoming changes
func (*BatchInsert) Flush ¶ added in v1.0.1
func (b *BatchInsert) Flush()
Flush empties queue and store its content to DAO using BulkPut() method
func (*BatchInsert) Put ¶ added in v1.0.1
func (b *BatchInsert) Put(c *tree.SyncChange)
Put enqueues change to internal queue
func (*BatchInsert) Start ¶ added in v1.0.1
func (b *BatchInsert) Start()
Start should be called as a goroutine to receive changes and trigger flush when necessary
type ChangeBuffer ¶
ChangeBuffer contains a full set of changes affecting a single node.
func (*ChangeBuffer) Append ¶
func (b *ChangeBuffer) Append(c *tree.SyncChange)
Append the change to the end of the buffer
func (*ChangeBuffer) Range ¶
func (b *ChangeBuffer) Range() <-chan *list.Element
Range over the list
type ChangeChan ¶
type ChangeChan <-chan *tree.SyncChange
ChangeChan is a naive implementation of ChangeStreamer that performs no pre-processing.
func (ChangeChan) Changes ¶
func (ch ChangeChan) Changes() <-chan *tree.SyncChange
Changes produces a read-only stream of *tree.Change instances. It performs no pre-processing.
type ChangeOperation ¶
type ChangeOperation interface { GetSeq() uint64 GetNodeId() string OpType() OpCode GetSource() string GetTarget() string }
ChangeOperation can inform us of the nature of a change
type ChangeStreamer ¶
type ChangeStreamer interface {
Changes() <-chan *tree.SyncChange
}
ChangeStreamer is used to avoid sending bidirectional channels to the optimizer. It types the channel as <-chan and also provides a place to hook in close logic & pre-processing.
type DAO ¶
type DAO interface { dao.DAO Put(*tree.SyncChange) error BulkPut([]*tree.SyncChange) error Get(uint64, string) (chan *tree.SyncChange, error) LastSeq() (uint64, error) HasNodeById(id string) (bool, error) }
DAO extends sql.DAO for the changes service
type StreamConsumer ¶
type StreamConsumer interface {
Send(*tree.SyncChange) error
}
StreamConsumer can receive a *tree.SyncChange.
type StreamOptimizer ¶
type StreamOptimizer struct {
// contains filtered or unexported fields
}
StreamOptimizer applies optimizations to the stream of changes.
func NewOptimizer ¶
func NewOptimizer(ctx context.Context, c ChangeStreamer) (o *StreamOptimizer)
NewOptimizer produces a new StreamOptimizer.
func (StreamOptimizer) Output ¶
func (o StreamOptimizer) Output(ctx context.Context, c StreamConsumer) (err error)
Output the optimized stream to a consumer.