gtm

package
v0.9.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2019 License: Apache-2.0 Imports: 12 Imported by: 6

README

gtm

gtm (go tail mongo) is a utility written in Go which tails the MongoDB oplog and sends create, update, delete events to your code. It can be used to send emails to new users, index documents, write time series data, or something else.

Requirements
  • Go
  • mgo, the mongodb driver for Go
  • mongodb
    • Pass argument --master to mongod to ensure an oplog is created OR
    • Setup replica sets to create oplog
Installation
go get github.com/rwynn/gtm
Usage
package main

import "gopkg.in/mgo.v2"
import "gopkg.in/mgo.v2/bson"
import "github.com/rwynn/gtm"
import "fmt"

func main() {
	// get a mgo session	
	session, err := mgo.Dial("localhost")
	if err != nil {
		panic(err)
	}
	defer session.Close()
	session.SetMode(mgo.Monotonic, true)
	// nil options get initialized to gtm.DefaultOptions()
	ctx := gtm.Start(session, nil)
	// ctx.OpC is a channel to read ops from
	// ctx.ErrC is a channel to read errors from
	// ctx.Stop() stops all go routines started by gtm.Start
	for {
		// loop forever receiving events	
		select {
		case err= <-ctx.ErrC:
			// handle errors
			fmt.Println(err)
		case op:= <-ctx.OpC:
			// op will be an insert, delete, update, or drop to mongo
			// you can check which by calling 
			// op.IsInsert(), op.IsDelete(), op.IsUpdate(), or op.IsDrop()
			// op.Data will get you the full document for inserts and updates
			msg := fmt.Sprintf(`Got op <%v> for object <%v> 
			in database <%v>
			and collection <%v>
			and data <%v>
			and timestamp <%v>`,
				op.Operation, op.Id, op.GetDatabase(),
				op.GetCollection(), op.Data, op.Timestamp)
			fmt.Println(msg) // or do something more interesting
		}
	}
}
Configuration
func NewUsers(op *gtm.Op) bool {
	return op.Namespace == "users.users" && op.IsInsert()
}

// if you want to listen only for certain events on certain collections
// pass a filter function in options
ctx := gtm.Start(session, &gtm.Options{
	Filter:              NewUsers, 	   // only receive inserts in the user collection
})
// more options are available for tuning
ctx := gtm.Start(session, &gtm.Options{
	After:               nil,     	    // if nil defaults to LastOpTimestamp
	OpLogDatabaseName:   nil,     	    // defaults to "local"
	OpLogCollectionName: nil,     	    // defaults to a collection prefixed "oplog."
	CursorTimeout:       nil,     	    // defaults to 100s
	ChannelSize:         0,       	    // defaults to 20
	BufferSize:          25,            // defaults to 50. used to batch fetch documents on bursts of activity
	BufferDuration:      0,             // defaults to 750 ms. after this timeout the batch is force fetched
	WorkerCount:         8,             // defaults to 1. number of go routines batch fetching concurrently
	Ordering:            gtm.Document,  // defaults to gtm.Oplog. ordering guarantee of events on the output channel
	UpdateDataAsDelta:   false,         // set to true to only receive delta information in the Data field on updates (info straight from oplog)
	DirectReadNs: []string{"db.users"}, // set to a slice of namespaces to read data directly from bypassing the oplog
	DirectReadLimit:     200,           // defaults to 100. the maximum number of documents to return in each direct read query
})
Direct Reads

If, in addition to tailing the oplog, you would like to also read entire collections you can set the DirectReadNs field to a slice of MongoDB namespaces. Documents from these collections will be read directly and output on the ctx.OpC channel.

You can wait till all the collections have been fully read by using the DirectReadWg wait group on the ctx.

go func() {
	ctx.DirectReadWg.Wait()
	fmt.Println("direct reads are done")
}()
Advanced

You may want to distribute event handling between a set of worker processes on different machines. To do this you can leverage the github.com/rwynn/gtm/consistent package.

Create a TOML document containing a list of all the event handlers.

Workers = [ "Tom", "Dick", "Harry" ] 

Create a consistent filter to distribute the work between Tom, Dick, and Harry.

name := flag.String("name", "", "the name of this worker")
flag.Parse()
filter, filterErr := consistent.ConsistentHashFilterFromFile(*name, "/path/to/toml")
if filterErr != nil {
	panic(filterErr)
}

// there is also a method **consistent.ConsistentHashFilterFromDocument** which allows
// you to pass a Mongo document representing the config if you would like to avoid
// copying the same config file to multiple servers

Pass the filter into the options when calling gtm.Tail

ctx := gtm.Start(session, &gtm.Options{Filter: filter})

(Optional) If you have your own filter you can use the gtm utility method ChainOpFilters

func ChainOpFilters(filters ...OpFilter) OpFilter

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DirectRead

func DirectRead(ctx *OpCtx, session *mgo.Session, idx int, ns string, options *Options) (err error)

func FetchDocuments

func FetchDocuments(ctx *OpCtx, session *mgo.Session, filter OpFilter, buf *OpBuf, inOp OpChan) error

func GetOpLogQuery

func GetOpLogQuery(session *mgo.Session, after bson.MongoTimestamp, options *Options) *mgo.Query

func LastOpTimestamp

func LastOpTimestamp(session *mgo.Session, options *Options) bson.MongoTimestamp

func OpLogCollection

func OpLogCollection(session *mgo.Session, options *Options) *mgo.Collection

func OpLogCollectionName

func OpLogCollectionName(session *mgo.Session, options *Options) string

func ParseTimestamp

func ParseTimestamp(timestamp bson.MongoTimestamp) (int32, int32)

func TailOps

func TailOps(ctx *OpCtx, session *mgo.Session, channels []OpChan, options *Options) error

func UpdateIsReplace

func UpdateIsReplace(entry map[string]interface{}) bool

Types

type Op

type Op struct {
	Id        interface{} `json:"_id"`
	Operation string      `json:"operation"`
	Namespace string      `json:"namespace"`
	// Data is the change filed in oplog
	Data map[string]interface{} `json:"data"`
	// Row is the data of this row
	Row                   map[string]interface{} `json:"row"`
	Timestamp             bson.MongoTimestamp    `json:"-"`
	Source                QuerySource            `json:"source"`
	protocol.KafkaMsgMeta `json:"-"`
}

func (*Op) GetCollection

func (this *Op) GetCollection() string

func (*Op) GetDatabase

func (this *Op) GetDatabase() string

func (*Op) IsCommand

func (this *Op) IsCommand() bool

func (*Op) IsDelete

func (this *Op) IsDelete() bool

func (*Op) IsDrop

func (this *Op) IsDrop() bool

func (*Op) IsDropCollection

func (this *Op) IsDropCollection() (string, bool)

func (*Op) IsDropDatabase

func (this *Op) IsDropDatabase() (string, bool)

func (*Op) IsInsert

func (this *Op) IsInsert() bool

func (*Op) IsSourceDirect

func (this *Op) IsSourceDirect() bool

func (*Op) IsSourceOplog

func (this *Op) IsSourceOplog() bool

func (*Op) IsUpdate

func (this *Op) IsUpdate() bool

func (*Op) MarshalJSON

func (this *Op) MarshalJSON() ([]byte, error)

func (*Op) ParseLogEntry

func (this *Op) ParseLogEntry(entry *OpLog, options *Options) (include bool)

func (*Op) ParseNamespace

func (this *Op) ParseNamespace() []string

type OpBuf

type OpBuf struct {
	UseBufferDuration bool
	Entries           []*Op
	BufferSize        int
	BufferDuration    time.Duration
	FlushTicker       *time.Ticker
}

func (*OpBuf) Append

func (this *OpBuf) Append(op *Op)

func (*OpBuf) Flush

func (this *OpBuf) Flush(session *mgo.Session, ctx *OpCtx)

func (*OpBuf) IsFull

func (this *OpBuf) IsFull() bool

type OpChan

type OpChan chan *Op

func Tail

func Tail(session *mgo.Session, options *Options) (OpChan, chan error)

type OpCtx

type OpCtx struct {
	OpC          OpChan
	ErrC         chan error
	DirectReadWg *sync.WaitGroup
	// contains filtered or unexported fields
}

func Start

func Start(session *mgo.Session, options *Options) *OpCtx

func (*OpCtx) Pause

func (ctx *OpCtx) Pause()

func (*OpCtx) Resume

func (ctx *OpCtx) Resume()

func (*OpCtx) Since

func (ctx *OpCtx) Since(ts bson.MongoTimestamp)

func (*OpCtx) Stop

func (ctx *OpCtx) Stop()

type OpFilter

type OpFilter func(*Op) bool

func ChainOpFilters

func ChainOpFilters(filters ...OpFilter) OpFilter

func OpFilterForOrdering

func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker string) OpFilter

type OpLog

type OpLog struct {
	Timestamp    bson.MongoTimestamp "ts"
	HistoryID    int64               "h"
	MongoVersion int                 "v"
	Operation    string              "op"
	Namespace    string              "ns"
	Doc          *bson.Raw           "o"
	Update       *bson.Raw           "o2"
}

type OpLogEntry

type OpLogEntry map[string]interface{}

type Options

type Options struct {
	PipelineName        string
	SourceName          string
	After               TimestampGenerator
	Filter              OpFilter
	OpLogDatabaseName   *string
	OpLogCollectionName *string
	CursorTimeout       *string
	ChannelSize         int
	UseBufferDuration   bool
	BufferSize          int
	BufferDuration      time.Duration
	Ordering            OrderingGuarantee
	WorkerCount         int
	UpdateDataAsDelta   bool
	DirectReadNs        []string
	DirectReadersPerCol int
	DirectReadLimit     int
	DirectReadFilter    OpFilter
	DirectReadBatchSize int
}

func DefaultOptions

func DefaultOptions() *Options

func (*Options) Fill

func (this *Options) Fill(session *mgo.Session, sourceHost string)

func (*Options) SetDefaults

func (this *Options) SetDefaults()

type OrderingGuarantee

type OrderingGuarantee int
const (
	Oplog     OrderingGuarantee = iota // ops sent in oplog order (strong ordering)
	Namespace                          // ops sent in oplog order within a namespace
	Document                           // ops sent in oplog order for a single document
)

type QuerySource

type QuerySource int
const (
	OplogQuerySource QuerySource = iota
	DirectQuerySource
)

type TimestampGenerator

type TimestampGenerator func(*mgo.Session, *Options) bson.MongoTimestamp

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL