oplog

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2021 License: MPL-2.0 Imports: 28 Imported by: 0

README

oplog

oplog is a package for writing operation log (oplog) entries for the purpose of replication and verification of the data stored in the Boundary RDBMS.

Usage


// you must init the ticket in its own transaction.  You only need
// to init a ticket once in the database.  It doesn't need to happen for 
// every connection.  Once it's persistent in the database, you can simply Get it.
initTx := db.Begin()
ticketer := &GormTicketer{Tx: initTx}
err = ticketer.InitTicket("users")
// if there's no error, then commit the initialized ticket
initTx.Commit()

userCreate := oplog_test.TestUser{
  TestUser: oplog_test.TestUser{
    Name: loginName,
  },
}
tx := db.Begin()
// write the user to the database
tx.Create(&userCreate)

ticketer = &GormTicketer{Tx: db}

// get a ticket for writing users to the oplog
ticket, err := ticketer.GetTicket("users")

// create an entry for the oplog with the entry's metadata (likely the entry's Scope)
newLogEntry := NewEntry(
  "test-users",
  []Metadata{
    Metadata{
      Key:   "deployment",
      Value: "amex",
    },
    Metadata{
      Key:   "project",
      Value: "central-info-systems",
    },
  },
  cipherer, // wrapping.Wrapper
  ticketer,
)

// write an entry with N messages (variadic parameter) in the order they were sent to the database 
_, err = newLogEntry.WriteEntryWith(
    context.Background(), 
    &GormWriter{tx}, 
    ticket,
    &Message{Message: &userCreate, TypeName: "user", OpType: OpType_CREATE_OP},
)
// if there's an error writing the oplog then roll EVERYTHING back
if err != nil {
    tx.Rollback()
}
// no err means you can commit all the things.
tx.Commit()

TBD/TODO

We need to discuss and decide how Boundary is going to handle the following oplog things:

  • SQL migrations: you'll find the package's SQL migrations under: ./migrations/postgres We need to decide how Boundary will manage migrations across the system and we will likely need to reference this package's migrations somehow.

oplog entry

  Example Oplog Entry for the Target Aggregate      
┌────────────────────────────────────────────────┐  
│                        FIFO with []byte buffer │  
│                                                │  
│┌─Msg 4────┐┌─Msg 3────┐┌─Msg 2────┐            │  
││          ││          ││          │            │  
││  Tags    ││  Host    ││ HostSet  │            │  
││┌────────┐││┌────────┐││┌────────┐│            │  
│││        ││││        ││││        ││            │  
│││        ││││        ││││        ││            │  
│││  Tag   ││││  Host  ││││HostSet ││            │  
│││protobuf││││protobuf││││protobuf││            │  
│││        ││││        ││││        ││            │  
│││        ││││        ││││        ││            │  
││└────────┘││└────────┘││└────────┘│            │  
││┌────────┐││┌────────┐││┌────────┐│            │  
│││        ││││        ││││        ││            │  
│││typeName││││typeName││││typeName││            │  
│││  Tag   ││││  Host  ││││HostSet ││            │  
││└────────┘││└────────┘││└────────┘│            │  
││┌────────┐││┌────────┐││┌────────┐│            │  
│││        ││││        ││││        ││            │  
│││ OpType ││││ OpType ││││ OpType ││            │  
│││ Create ││││ Create ││││ Create ││            │  
││└────────┘││└────────┘││└────────┘│            │  
│└──────────┘└──────────┘└──────────┘            │  
└────────────────────────────────────────────────┘  

oplog tables

                  oplog tables:                   
      as the diagram shows, we can split the      
      oplog_entries into multiple tables if       
             needed for performance.              
                                                  
┌────────────────┐                                
│┌───────────────┴┐                               
││┌───────────────┴┐            ┌────────────────┐
│││ oplog_entries  │            │  oplog_ticket  │
││├────────────────┤            ├────────────────┤
│││id              │╲           │id              │
│││aggregate_name  │──┼───────┼ │aggregate_name  │
└┤│data            │╱           │version         │
 └┤                │            │                │
  └────────────────┘            └────────────────┘
          ┼                                       
          │                                       
          │                                       
          ┼                                       
         ╱│╲                                      
 ┌────────────────┐                               
 │ oplog_metadata │                               
 ├────────────────┤                               
 │id              │                               
 │entry_id        │                               
 │key             │                               
 │value           │                               
 └────────────────┘                               

oplog optimistic locking using tickets

    Alice's                        Database                                               
  transaction                                                          Bob's transaction  
                                                                                          
     │                                 │                                      │           
     │─────────BEGIN──────────────────▶│                                      │           
     │                                 │◀───────────────BEGIN─────────────────┤           
     │                     ┌───────────┴───────────┐                          │           
     │                     │   ticket version:1    │                          │           
     │                     └───────────┬───────────┘                          │           
     │       Select oplog-ticket for   │                                      │           
     ├──────────────"Target"──────────▶│        Select oplog-ticket for       │           
     │                                 │◀──────────────"Target"───────────────┤           
     │                                 │                                      │           
     │      Write to Tables in         │                                      │           
     ├───────Target Aggregate─────────▶│                                      │           
     │                                 │         Write to Tables in           │           
     │                                 │◀─────────Target Aggregate────────────┤           
     │                                 │                                      │           
     │                                 │                                      │           
     │      Update Ticket              │                                      │           
     ├────Version = 2 where───────────▶│                                      │           
     │       Version = 1     ┌─────────┴─────────┐                            │           
     │                       │ ticket version: 2 │                            │           
     │                       └─────────┬─────────┘                            │           
     │                                 │              Update Ticket           │           
     │                                 │◀───────────Version = 2 where─────────┤           
     │                                 │               Version = 1       ┌────┴──────────┐
     │                                 │                                 │ update failed │
     ├────────────Commit──────────────▶│                                 └────┬──────────┘
     │                                 │                                      │           
     │                                 │                                      │           
     │                                 │                                      │           
     │                                 │◀────────────Rollback─────────────────┤           
     │                                 │                                      │           
     │                                 │                                      │           
     ```

Documentation

Overview

oplog is a package for writing operational log (oplog) entries for the purpose of replication and verification of the data stored in the Boundary RDBMS. please see README.md in this directory for more information.

Index

Constants

View Source
const DefaultAggregateName = "global"
View Source
const Version = "v1"

Version of oplog entries (among other things, it's used to manage upgrade compatibility when replicating)

Variables

View Source
var (
	OpType_name = map[int32]string{
		0: "OP_TYPE_UNSPECIFIED",
		1: "OP_TYPE_CREATE",
		2: "OP_TYPE_UPDATE",
		3: "OP_TYPE_DELETE",
	}
	OpType_value = map[string]int32{
		"OP_TYPE_UNSPECIFIED": 0,
		"OP_TYPE_CREATE":      1,
		"OP_TYPE_UPDATE":      2,
		"OP_TYPE_DELETE":      3,
	}
)

Enum value maps for OpType.

View Source
var (
	ErrTicketNotFound        = errors.New("ticket not found")
	ErrTicketAlreadyRedeemed = errors.New("ticket already redeemed")
	ErrTicketRedeeming       = errors.New("error trying to redeem ticket")
)
View Source
var File_controller_storage_oplog_v1_any_operation_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type AnyOperation

type AnyOperation struct {

	// type_name defines type of operation.
	TypeName string `protobuf:"bytes,1,opt,name=type_name,json=typeName,proto3" json:"type_name,omitempty"`
	// value are the bytes of a marshalled proto buff.
	Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	// operation_type defines the type of database operation.
	OperationType OpType `` /* 141-byte string literal not displayed */
	// field_mask is the mask of fields to update.
	FieldMask *field_mask.FieldMask `protobuf:"bytes,4,opt,name=field_mask,json=fieldMask,proto3" json:"field_mask,omitempty"`
	// null_mask is the mask of fields to set to null.
	NullMask *field_mask.FieldMask `protobuf:"bytes,5,opt,name=null_mask,json=nullMask,proto3" json:"null_mask,omitempty"`
	// contains filtered or unexported fields
}

AnyOperation provides a message for anything and the type of operation it represents.

func (*AnyOperation) Descriptor deprecated

func (*AnyOperation) Descriptor() ([]byte, []int)

Deprecated: Use AnyOperation.ProtoReflect.Descriptor instead.

func (*AnyOperation) GetFieldMask

func (x *AnyOperation) GetFieldMask() *field_mask.FieldMask

func (*AnyOperation) GetNullMask

func (x *AnyOperation) GetNullMask() *field_mask.FieldMask

func (*AnyOperation) GetOperationType

func (x *AnyOperation) GetOperationType() OpType

func (*AnyOperation) GetTypeName

func (x *AnyOperation) GetTypeName() string

func (*AnyOperation) GetValue

func (x *AnyOperation) GetValue() []byte

func (*AnyOperation) ProtoMessage

func (*AnyOperation) ProtoMessage()

func (*AnyOperation) ProtoReflect

func (x *AnyOperation) ProtoReflect() protoreflect.Message

func (*AnyOperation) Reset

func (x *AnyOperation) Reset()

func (*AnyOperation) String

func (x *AnyOperation) String() string

type Entry

type Entry struct {
	*store.Entry
	Cipherer wrapping.Wrapper `sql:"-"`
	Ticketer Ticketer         `sql:"-"`
}

Entry represents an oplog entry

func NewEntry

func NewEntry(aggregateName string, metadata Metadata, cipherer wrapping.Wrapper, ticketer Ticketer) (*Entry, error)

NewEntry creates a new Entry

func (*Entry) DecryptData

func (e *Entry) DecryptData(ctx context.Context) error

DecryptData will decrypt the entry's data using its Cipherer (wrapping.Wrapper)

func (*Entry) EncryptData

func (e *Entry) EncryptData(ctx context.Context) error

EncryptData the entry's data using its Cipherer (wrapping.Wrapper)

func (*Entry) Replay

func (e *Entry) Replay(ctx context.Context, tx Writer, types *TypeCatalog, tableSuffix string) error

Replay provides the ability to replay an entry. you must initialize any new tables ending with the tableSuffix before calling Replay, otherwise you'll get "a table doesn't exist" error.

func (*Entry) UnmarshalData

func (e *Entry) UnmarshalData(types *TypeCatalog) ([]Message, error)

UnmarshalData the data attribute from []byte (treated as a FIFO QueueBuffer) to a []proto.Message

func (*Entry) Write

func (e *Entry) Write(ctx context.Context, tx Writer, ticket *store.Ticket) error

Write the entry as is with whatever it has for e.Data marshaled into a FIFO QueueBuffer

Cipherer != nil then the data is authentication encrypted

func (*Entry) WriteEntryWith

func (e *Entry) WriteEntryWith(ctx context.Context, tx Writer, ticket *store.Ticket, msgs ...*Message) error

WriteEntryWith the []proto.Message marshaled into the entry data as a FIFO QueueBuffer if Cipherer != nil then the data is authentication encrypted

type GormTicketer

type GormTicketer struct {
	// contains filtered or unexported fields
}

GormTicketer uses a gorm DB connection for ticket storage

func NewGormTicketer

func NewGormTicketer(tx *gorm.DB, opt ...Option) (*GormTicketer, error)

NewGormTicketer creates a new ticketer that uses gorm for storage

func (*GormTicketer) GetTicket

func (ticketer *GormTicketer) GetTicket(aggregateName string) (*store.Ticket, error)

GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction that you're using to write to the database tables. Names allow us to shard tickets around domain root names

func (*GormTicketer) Redeem

func (ticketer *GormTicketer) Redeem(t *store.Ticket) error

Redeem will attempt to redeem the ticket. If the ticket version has already been used, then an error is returned

type GormWriter

type GormWriter struct {
	Tx *gorm.DB
}

GormWriter uses a gorm DB connection for writing

func (*GormWriter) Create

func (w *GormWriter) Create(i interface{}) error

Create an object in storage

func (*GormWriter) Delete

func (w *GormWriter) Delete(i interface{}) error

Deleting an object in storage

func (*GormWriter) Update

func (w *GormWriter) Update(i interface{}, fieldMaskPaths, setToNullPaths []string) error

Update the entry using the fieldMaskPaths and setNullPaths, which are Paths from field_mask.proto. fieldMaskPaths and setNullPaths cannot intersect and both cannot be zero len.

type Message

type Message struct {
	proto.Message
	TypeName       string
	OpType         OpType
	FieldMaskPaths []string
	SetToNullPaths []string
}

Message wraps a proto.Message and adds a operation type (Create, Update, Delete)

type Metadata

type Metadata map[string][]string

Metadata provides meta information about the Entry

type OpType

type OpType int32

OpType provides the type of database operation the Any message represents (create, update, delete)

const (
	// OP_TYPE_UNSPECIFIED defines an unspecified operation.
	OpType_OP_TYPE_UNSPECIFIED OpType = 0
	// OP_TYPE_CREATE defines a create operation.
	OpType_OP_TYPE_CREATE OpType = 1
	// OP_TYPE_UPDATE defines an update operation.
	OpType_OP_TYPE_UPDATE OpType = 2
	// OP_TYPE_DELETE defines a delete operation.
	OpType_OP_TYPE_DELETE OpType = 3
)

func (OpType) Descriptor

func (OpType) Descriptor() protoreflect.EnumDescriptor

func (OpType) Enum

func (x OpType) Enum() *OpType

func (OpType) EnumDescriptor deprecated

func (OpType) EnumDescriptor() ([]byte, []int)

Deprecated: Use OpType.Descriptor instead.

func (OpType) Number

func (x OpType) Number() protoreflect.EnumNumber

func (OpType) String

func (x OpType) String() string

func (OpType) Type

func (OpType) Type() protoreflect.EnumType

type Option

type Option func(Options)

Option - how Options are passed as arguments

func WithAggregateNames

func WithAggregateNames(enabled bool) Option

WithAggregateNames enables/disables the use of multiple aggregate names for Ticketers

func WithFieldMaskPaths

func WithFieldMaskPaths(fieldMaskPaths []string) Option

WithFieldMaskPaths represents an optional set of symbolic field paths (for example: "f.a", "f.b.d") used to specify a subset of fields that should be updated. (see google.golang.org/genproto/protobuf/field_mask)

func WithSetToNullPaths

func WithSetToNullPaths(setToNullPaths []string) Option

WithSetToNullPaths represents an optional set of symbolic field paths (for example: "f.a", "f.b.d") used to specify a subset of fields that should be set to null. (see google.golang.org/genproto/protobuf/field_mask)

type Options

type Options map[string]interface{}

Options = how options are represented

func GetOpts

func GetOpts(opt ...Option) Options

GetOpts - iterate the inbound Options and return a struct

type Queue

type Queue struct {
	// Buffer for the queue
	bytes.Buffer

	// Catalog provides a TypeCatalog for the types added to the queue
	Catalog *TypeCatalog
	// contains filtered or unexported fields
}

Queue provides a FIFO queue

func (*Queue) Add

func (q *Queue) Add(m proto.Message, typeName string, t OpType, opt ...Option) error

Add message to queue. typeName defines the type of message added to the queue and allows the msg to be removed using a TypeCatalog with a coresponding typeName entry. OpType defines the msg's operation (create, add, update, etc). If OpType == OpType_OP_TYPE_UPDATE, the WithFieldMaskPaths() and SetToNullPaths() options are supported.

func (*Queue) Remove

func (q *Queue) Remove() (proto.Message, OpType, []string, []string, error)

Remove pb message from the queue and EOF if empty. It also returns the OpType for the msg and if it's OpType_OP_TYPE_UPDATE, the it will also return the fieldMask and setToNullPaths for the update operation.

type ReplayableMessage

type ReplayableMessage interface {
	TableName() string
	SetTableName(name string)
}

Replayable defines an interface for messages that can be replayed from the oplog entries. we need to be able to replay into different table names.

type Ticketer

type Ticketer interface {
	// GetTicket returns a ticket for the specified name.  You MUST GetTicket in the same transaction
	// that you're using to write to the database tables. Names allow us to shard tickets around domain root names.
	// Before getting a ticket you must insert it with it's name into the oplog_ticket table.  This is done via a
	// db migrations script.  Requiring this insert as part of migrations ensures that the tickets are initialized in
	// a separate transaction from when a client calls GetTicket(aggregateName) which is critical for the optimized locking
	// pattern to work properly
	GetTicket(aggregateName string) (*store.Ticket, error)

	// Redeem ticket will attempt to redeem the ticket and ensure it's serialized with other tickets using the same
	// aggregate name
	Redeem(ticket *store.Ticket) error
}

Ticketer provides an interface to storage for Tickets, so you can easily substitute your own ticketer

type Type

type Type struct {
	// Interface of the type
	Interface interface{}
	// Name for the interface
	Name string
}

Type provides the ability to associate an interface with a Type.Name which will decouple the interface from it's reflection type string, so you can refactor the type name without breaking the catalog

type TypeCatalog

type TypeCatalog map[string]reflect.Type

TypeCatalog is an abstraction for dealing with oplog data and their underlying types

func NewTypeCatalog

func NewTypeCatalog(withTypes ...Type) (*TypeCatalog, error)

NewTypeCatalog creates a catalog with the types you pass in

func (TypeCatalog) Get

func (t TypeCatalog) Get(typeName string) (interface{}, error)

Get retrieves the interface via a name

func (*TypeCatalog) GetTypeName

func (t *TypeCatalog) GetTypeName(i interface{}) (string, error)

GetTypeName returns the interface's name from the catalog

func (TypeCatalog) Set

func (t TypeCatalog) Set(i interface{}, typeName string) error

Set creates an entry in the catalog for the interface

type Writer

type Writer interface {
	// Create the entry
	Create(interface{}) error

	// Update the entry using the fieldMaskPaths and setNullPaths, which are
	// Paths from field_mask.proto.  fieldMaskPaths is required.  setToNullPaths
	// is optional.  fieldMaskPaths and setNullPaths cannot instersect and both
	// cannot be zero len.
	Update(entry interface{}, fieldMaskPaths, setToNullPaths []string) error

	// Delete the entry
	Delete(interface{}) error
	// contains filtered or unexported methods
}

Writer interface for Entries

Directories

Path Synopsis
Package oplog_test provides some gorm helper funcs for testing oplog database integrations
Package oplog_test provides some gorm helper funcs for testing oplog database integrations
Package store provides storage types/behavior for the oplog
Package store provides storage types/behavior for the oplog

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL