Documentation
¶
Index ¶
Examples ¶
Constants ¶
const ( ModType_INSERT = "INSERT" ModType_UPDATE = "UPDATE" ModType_DELETE = "DELETE" )
const (
RootPartitionToken = "Parent0"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChildPartition ¶
type ChildPartitionsRecord ¶
type ChildPartitionsRecord struct { StartTimestamp time.Time `spanner:"start_timestamp" json:"start_timestamp"` RecordSequence string `spanner:"record_sequence" json:"record_sequence"` ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"` }
type ColumnType ¶
type ColumnType struct { Name string `json:"name"` Type Type `json:"type"` IsPrimaryKey bool `json:"is_primary_key,omitempty"` OrdinalPosition int64 `json:"ordinal_position"` }
ColumnType is the metadata of the column.
type Consumer ¶ added in v1.0.0
type Consumer interface {
Consume(change *DataChangeRecord) error
}
Consumer is the interface to consume the DataChangeRecord.
Consume might be called from multiple goroutines and must be re-entrant safe.
type ConsumerFunc ¶ added in v1.0.0
type ConsumerFunc func(*DataChangeRecord) error
ConsumerFunc type is an adapter to allow the use of ordinary functions as Consumer.
func (ConsumerFunc) Consume ¶ added in v1.0.0
func (f ConsumerFunc) Consume(change *DataChangeRecord) error
Consume calls f(change).
type DataChangeRecord ¶
type DataChangeRecord struct { CommitTimestamp time.Time `json:"commit_timestamp"` RecordSequence string `json:"record_sequence"` ServerTransactionID string `json:"server_transaction_id"` IsLastRecordInTransactionInPartition bool `json:"is_last_record_in_transaction_in_partition"` TableName string `json:"table_name"` ColumnTypes []*ColumnType `json:"column_types"` Mods []*Mod `json:"mods"` ModType ModType `json:"mod_type"` ValueCaptureType string `json:"value_capture_type"` NumberOfRecordsInTransaction int64 `json:"number_of_records_in_transaction"` NumberOfPartitionsInTransaction int64 `json:"number_of_partitions_in_transaction"` TransactionTag string `json:"transaction_tag"` IsSystemTransaction bool `json:"is_system_transaction"` }
DataChangeRecord is the change set of the table.
type HeartbeatRecord ¶
type Mod ¶
type Mod struct { Keys map[string]interface{} `json:"keys,omitempty"` NewValues map[string]interface{} `json:"new_values,omitempty"` OldValues map[string]interface{} `json:"old_values,omitempty"` }
Mod contains the keys and the values of the changed records.
type Option ¶
type Option interface {
Apply(*config)
}
func WithEndTimestamp ¶
WithEndTimestamp set the end timestamp option for read change streams.
The value must be within the retention period of the change stream and must be after the start timestamp. If not set, read latest changes until canceled.
func WithHeartbeatInterval ¶ added in v1.0.0
WithHeartbeatInterval set the heartbeat interval for read change streams.
Default value is 10 seconds.
func WithSpannerRequestPriotiry ¶ added in v1.0.0
func WithSpannerRequestPriotiry(priority spannerpb.RequestOptions_Priority) Option
WithSpannerRequestPriotiry set the request priority option for read change streams.
Default value is unspecified, equivalent to high.
func WithStartTimestamp ¶ added in v1.0.0
WithStartTimestamp set the start timestamp option for read change streams.
The value must be within the retention period of the change stream and before the current time. Default value is current timestamp.
type PartitionMetadata ¶ added in v1.0.0
type PartitionMetadata struct { PartitionToken string `spanner:"PartitionToken" json:"partition_token"` ParentTokens []string `spanner:"ParentTokens" json:"parent_tokens"` StartTimestamp time.Time `spanner:"StartTimestamp" json:"start_timestamp"` EndTimestamp time.Time `spanner:"EndTimestamp" json:"end_timestamp"` HeartbeatMillis int64 `spanner:"HeartbeatMillis" json:"heartbeat_millis"` State State `spanner:"State" json:"state"` Watermark time.Time `spanner:"Watermark" json:"watermark"` CreatedAt time.Time `spanner:"CreatedAt" json:"created_at"` ScheduledAt *time.Time `spanner:"ScheduledAt" json:"scheduled_at,omitempty"` RunningAt *time.Time `spanner:"RunningAt" json:"running_at,omitempty"` FinishedAt *time.Time `spanner:"FinishedAt" json:"finished_at,omitempty"` }
PartitionMetadata contains partition tokens and timestamps that have already been read from the stream partition.
func (*PartitionMetadata) IsRootPartition ¶ added in v1.0.0
func (p *PartitionMetadata) IsRootPartition() bool
IsRootPartition returns true if this is root partition.
type PartitionStorage ¶ added in v1.0.0
type PartitionStorage interface { GetUnfinishedMinWatermarkPartition(ctx context.Context) (*PartitionMetadata, error) GetInterruptedPartitions(ctx context.Context) ([]*PartitionMetadata, error) InitializeRootPartition(ctx context.Context, startTimestamp time.Time, endTimestamp time.Time, heartbeatInterval time.Duration) error GetSchedulablePartitions(ctx context.Context, minWatermark time.Time) ([]*PartitionMetadata, error) AddChildPartitions(ctx context.Context, parentPartition *PartitionMetadata, childPartitionsRecord *ChildPartitionsRecord) error UpdateToScheduled(ctx context.Context, partitions []*PartitionMetadata) error UpdateToRunning(ctx context.Context, partition *PartitionMetadata) error UpdateToFinished(ctx context.Context, partition *PartitionMetadata) error UpdateWatermark(ctx context.Context, partition *PartitionMetadata, watermark time.Time) error }
PartitionStorage is an interface for storing and reading PartitionMetadata.
type Subscriber ¶ added in v1.0.0
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber subscribes change stream.
func NewSubscriber ¶ added in v1.0.0
func NewSubscriber( client *spanner.Client, streamName string, partitionStorage PartitionStorage, options ...Option, ) *Subscriber
NewSubscriber creates a new subscriber of change streams.
Example ¶
package main import ( "context" "encoding/json" "errors" "fmt" "os" "os/signal" "sync" "cloud.google.com/go/spanner" "github.com/toga4/spream" "github.com/toga4/spream/partitionstorage" ) func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer stop() database := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "foo-project", "foo-instance", "foo-database") spannerClient, err := spanner.NewClient(ctx, database) if err != nil { panic(err) } defer spannerClient.Close() changeStreamName := "FooStream" subscriber := spream.NewSubscriber(spannerClient, changeStreamName, partitionstorage.NewInmemory()) fmt.Fprintf(os.Stderr, "Reading the stream...\n") var mu sync.Mutex if err := subscriber.SubscribeFunc(ctx, func(change *spream.DataChangeRecord) error { mu.Lock() defer mu.Unlock() return json.NewEncoder(os.Stdout).Encode(change) }); err != nil && !errors.Is(ctx.Err(), context.Canceled) { panic(err) } }
Output:
Example (WithOptions) ¶
package main import ( "context" "encoding/json" "errors" "fmt" "io" "os" "os/signal" "sync" "time" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" "github.com/toga4/spream" "github.com/toga4/spream/partitionstorage" ) func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer stop() database := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "foo-project", "foo-instance", "foo-database") spannerClient, err := spanner.NewClient(ctx, database) if err != nil { panic(err) } defer spannerClient.Close() partitionMetadataTableName := "PartitionMetadata_FooStream" partitionStorage := partitionstorage.NewSpanner(spannerClient, partitionMetadataTableName) if err := partitionStorage.CreateTableIfNotExists(ctx); err != nil { panic(err) } changeStreamName := "FooStream" subscriber := spream.NewSubscriber( spannerClient, changeStreamName, partitionStorage, spream.WithStartTimestamp(time.Now().Add(-time.Hour)), // Start subscribing from 1 hour ago. spream.WithEndTimestamp(time.Now().Add(5*time.Minute)), // Stop subscribing after 5 minutes. spream.WithHeartbeatInterval(3*time.Second), spream.WithSpannerRequestPriotiry(spannerpb.RequestOptions_PRIORITY_MEDIUM), ) logger := &Logger{out: os.Stdout} if err := subscriber.Subscribe(ctx, logger); err != nil && !errors.Is(ctx.Err(), context.Canceled) { panic(err) } } type Logger struct { out io.Writer mu sync.Mutex } func (l *Logger) Consume(change *spream.DataChangeRecord) error { l.mu.Lock() defer l.mu.Unlock() return json.NewEncoder(l.out).Encode(change) }
Output:
func (*Subscriber) Subscribe ¶ added in v1.0.0
func (s *Subscriber) Subscribe(ctx context.Context, consumer Consumer) error
Subscribe starts subscribing to the change stream.
func (*Subscriber) SubscribeFunc ¶ added in v1.0.0
func (s *Subscriber) SubscribeFunc(ctx context.Context, f ConsumerFunc) error
SubscribeFunc is an adapter to allow the use of ordinary functions as Consumer.
function might be called from multiple goroutines and must be re-entrant safe.
type Type ¶
type Type struct { Code TypeCode `json:"code"` ArrayElementType TypeCode `json:"array_element_type,omitempty"` }
Type is the type of the column.
type TypeCode ¶
type TypeCode string
const ( TypeCode_NONE TypeCode = "" TypeCode_BOOL TypeCode = "BOOL" TypeCode_INT64 TypeCode = "INT64" TypeCode_FLOAT64 TypeCode = "FLOAT64" TypeCode_TIMESTAMP TypeCode = "TIMESTAMP" TypeCode_DATE TypeCode = "DATE" TypeCode_STRING TypeCode = "STRING" TypeCode_BYTES TypeCode = "BYTES" TypeCode_NUMERIC TypeCode = "NUMERIC" TypeCode_JSON TypeCode = "JSON" TypeCode_ARRAY TypeCode = "ARRAY" )