Documentation ¶
Overview ¶
Package changestreams provides the functionality for reading the Cloud Spanner change streams.
Example ¶
package main import ( "context" "fmt" "log" "github.com/cloudspannerecosystem/spanner-change-streams-tail/changestreams" ) func main() { ctx := context.Background() reader, err := changestreams.NewReader(ctx, "myproject", "myinstance", "mydb", "mystream") if err != nil { log.Fatalf("failed to create a reader: %v", err) } defer reader.Close() if err := reader.Read(ctx, func(result *changestreams.ReadResult) error { for _, cr := range result.ChangeRecords { for _, dcr := range cr.DataChangeRecords { fmt.Printf("[%s] %s %s\n", dcr.CommitTimestamp, dcr.ModType, dcr.TableName) } } return nil }); err != nil { log.Fatalf("failed to read: %v", err) } }
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChangeRecord ¶
type ChangeRecord struct { DataChangeRecords []*DataChangeRecord `spanner:"data_change_record" json:"data_change_record"` HeartbeatRecords []*HeartbeatRecord `spanner:"heartbeat_record" json:"heartbeat_record"` ChildPartitionsRecords []*ChildPartitionsRecord `spanner:"child_partitions_record" json:"child_partitions_record"` }
ChangeRecord is the single unit of the records from the change stream.
type ChildPartition ¶
type ChildPartition struct { Token string `spanner:"token" json:"token"` ParentPartitionTokens []string `spanner:"parent_partition_tokens" json:"parent_partition_tokens"` }
ChildPartition contains the child partition token.
type ChildPartitionsRecord ¶
type ChildPartitionsRecord struct { StartTimestamp time.Time `spanner:"start_timestamp" json:"start_timestamp"` RecordSequence string `spanner:"record_sequence" json:"record_sequence"` ChildPartitions []*ChildPartition `spanner:"child_partitions" json:"child_partitions"` }
ChildPartitionsRecord contains the child partitions of the stream.
type ColumnType ¶
type ColumnType struct { Name string `spanner:"name" json:"name"` Type spanner.NullJSON `spanner:"type" json:"type"` IsPrimaryKey bool `spanner:"is_primary_key" json:"is_primary_key"` OrdinalPosition int64 `spanner:"ordinal_position" json:"ordinal_position"` }
ColumnType is the metadata of the column.
type Config ¶
type Config struct { // If StartTimestamp is a zero value of time.Time, reader reads from the current timestamp. StartTimestamp time.Time // If EndTimestamp is a zero value of time.Time, reader reads until it is cancelled. EndTimestamp time.Time HeartbeatInterval time.Duration SpannerClientConfig spanner.ClientConfig SpannerClientOptions []option.ClientOption }
Config is the configuration for the reader.
type DataChangeRecord ¶
type DataChangeRecord struct { CommitTimestamp time.Time `spanner:"commit_timestamp" json:"commit_timestamp"` RecordSequence string `spanner:"record_sequence" json:"record_sequence"` ServerTransactionID string `spanner:"server_transaction_id" json:"server_transaction_id"` IsLastRecordInTransactionInPartition bool `spanner:"is_last_record_in_transaction_in_partition" json:"is_last_record_in_transaction_in_partition"` TableName string `spanner:"table_name" json:"table_name"` ColumnTypes []*ColumnType `spanner:"column_types" json:"column_types"` Mods []*Mod `spanner:"mods" json:"mods"` ModType string `spanner:"mod_type" json:"mod_type"` ValueCaptureType string `spanner:"value_capture_type" json:"value_capture_type"` NumberOfRecordsInTransaction int64 `spanner:"number_of_records_in_transaction" json:"number_of_records_in_transaction"` NumberOfPartitionsInTransaction int64 `spanner:"number_of_partitions_in_transaction" json:"number_of_partitions_in_transaction"` TransactionTag string `spanner:"transaction_tag" json:"transaction_tag"` IsSystemTransaction bool `spanner:"is_system_transaction" json:"is_system_transaction"` }
DataChangeRecord contains a set of changes to the table.
type HeartbeatRecord ¶
HeartbeatRecord is the heartbeat record returned from Cloud Spanner.
type Mod ¶
type Mod struct { Keys spanner.NullJSON `spanner:"keys" json:"keys"` NewValues spanner.NullJSON `spanner:"new_values" json:"new_values"` OldValues spanner.NullJSON `spanner:"old_values" json:"old_values"` }
Mod is the changes that were made on the table.
type ReadResult ¶
type ReadResult struct { PartitionToken string `json:"partition_token"` ChangeRecords []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"` }
ReadResult is the result of the read change records from the partition.
type Reader ¶ added in v0.2.0
type Reader struct {
// contains filtered or unexported fields
}
Reader is the change stream reader.
func NewReader ¶ added in v0.2.0
func NewReader(ctx context.Context, projectID, instanceID, databaseID, streamID string) (*Reader, error)
NewReader creates a new reader.
func NewReaderWithConfig ¶ added in v0.2.0
func NewReaderWithConfig(ctx context.Context, projectID, instanceID, databaseID, streamID string, config Config) (*Reader, error)
NewReaderWithConfig creates a new reader with a given configuration.