Documentation ¶
Index ¶
- Constants
- func GetMigrationList() (ml *migration.List)
- func GetPublisher(db *sql.Connection, code string) (p *model.Pub, err error)
- func Publish(db *sql.Connection, p PublishParam) (version int, err error)
- func PublishList(db *sql.Connection, list []PublishParam) (successCount int, err error)
- type BatchPublisher
- func (bp *BatchPublisher) Add(pp PublishParam) (commitCount int, err error)
- func (bp *BatchPublisher) Close() (errList []error)
- func (bp *BatchPublisher) Flush() (commitCount int, err error)
- func (bp *BatchPublisher) GetBatchSize() (batchSize uint)
- func (bp *BatchPublisher) GetList() (mList []*model.Data)
- func (bp *BatchPublisher) SetBatchSize(size uint) (actualBatchSize uint)
- func (bp *BatchPublisher) SetPostCommit(f func(commitCount int) error)
- func (bp *BatchPublisher) SetPreCommit(f func() error)
- type BatchPublisherParam
- type Event
- type PublishParam
- type SubBatchHandler
- type SubDataListener
- type Subscriber
- func (s *Subscriber) Close() (err error)
- func (s *Subscriber) GetStats() (total, success, retry, fail int)
- func (s *Subscriber) Listen(cp *sql.ConnParam, sdl SubDataListener) (err error)
- func (s *Subscriber) Populate() (err error)
- func (s *Subscriber) PopulateAndRun(sbh SubBatchHandler, batchSize, batchLimit int) (err error)
- func (s *Subscriber) Run(sbh SubBatchHandler, batchSize, batchLimit int) (err error)
- func (s *Subscriber) RunBatch(sbh SubBatchHandler, batchSize, batchLimit int) (err error)
- func (s *Subscriber) SetErrorHandler(f func(err error))
- func (s *Subscriber) SetMaxGoRoutines(max uint)
Constants ¶
const ( // Error constants ECode070101 = e.Code0701 + "01" ECode070102 = e.Code0701 + "02" ECode070103 = e.Code0701 + "03" ECode070104 = e.Code0701 + "04" ECode070105 = e.Code0701 + "05" )
const ( // Error constants ECode070D01 = e.Code070D + "01" ECode070D02 = e.Code070D + "02" ECode070D03 = e.Code070D + "03" ECode070D04 = e.Code070D + "04" ECode070D05 = e.Code070D + "05" )
const ( // Error constants ECode070201 = e.Code0702 + "01" ECode070202 = e.Code0702 + "02" )
const ( // Error constants ECode070A01 = e.Code070A + "01" ECode070A02 = e.Code070A + "02" ECode070A03 = e.Code070A + "03" ECode070A04 = e.Code070A + "04" ECode070A05 = e.Code070A + "05" ECode070A06 = e.Code070A + "06" ECode070A07 = e.Code070A + "07" ECode070A08 = e.Code070A + "08" ECode070A09 = e.Code070A + "09" ECode070A0A = e.Code070A + "0A" ECode070A0B = e.Code070A + "0B" ECode070A0C = e.Code070A + "0C" )
const ( // Error constants ECode070B01 = e.Code070B + "01" ECode070B02 = e.Code070B + "02" ECode070B03 = e.Code070B + "03" ECode070B04 = e.Code070B + "04" ECode070B05 = e.Code070B + "05" )
const ( // Error constants ECode070301 = e.Code0703 + "01" ECode070302 = e.Code0703 + "02" ECode070303 = e.Code0703 + "03" ECode070304 = e.Code0703 + "04" ECode070305 = e.Code0703 + "05" ECode070306 = e.Code0703 + "06" ECode070307 = e.Code0703 + "07" ECode070308 = e.Code0703 + "08" ECode070309 = e.Code0703 + "09" ECode07030A = e.Code0703 + "0A" ECode07030B = e.Code0703 + "0B" ECode07030C = e.Code0703 + "0C" )
const ( // Error constants ECode070401 = e.Code0704 + "01" ECode070402 = e.Code0704 + "02" )
const ( // Error constants ECode070C01 = e.Code070C + "01" )
const (
MIGRATION_CODE = "pubsub"
)
Variables ¶
This section is empty.
Functions ¶
func GetMigrationList ¶
GetMigrationList returns this packages migration list
func GetPublisher ¶
GetPublisher returns the pub record if it exists
func Publish ¶
func Publish(db *sql.Connection, p PublishParam) (version int, err error)
Publish upserts a new pub data record for the specified data type/id. If it already exists, it will update the deleted field, the JSON value and increment the version.
func PublishList ¶
func PublishList(db *sql.Connection, list []PublishParam) (successCount int, err error)
PublishList upserts the list of new pub data records. If any already exists, it will update the deleted field, the JSON value and increment the version for that record. If duplicate records are in the list, it is not guaranteed which one will be saved. This would only have an impact if the deleted value or the JSON value are different between the duplicate records.
Types ¶
type BatchPublisher ¶
type BatchPublisher struct {
// contains filtered or unexported fields
}
BatchPublisher helper to publish records in batches. Initialize and set the batch size. Then call Add, and records will be automatically committed when the batch size is reached. Call Flush to commit any pending records and Close when finished.
func NewBatchPublisher ¶
func NewBatchPublisher(db *sql.Connection, p *BatchPublisherParam) (bp *BatchPublisher)
NewBatchPublisher creates a new batch publisher to upserts pub data record in batches.
func (*BatchPublisher) Add ¶
func (bp *BatchPublisher) Add(pp PublishParam) (commitCount int, err error)
Add adds the record to the pending publish list. If the size of the list exceeds the batch size, then it will automatically commit the pending records.
func (*BatchPublisher) Close ¶
func (bp *BatchPublisher) Close() (errList []error)
Close closes any open database statements. This should be called when finished with the batch publishing.
func (*BatchPublisher) Flush ¶
func (bp *BatchPublisher) Flush() (commitCount int, err error)
Flush commits any pending records.
func (*BatchPublisher) GetBatchSize ¶
func (bp *BatchPublisher) GetBatchSize() (batchSize uint)
GetBatchSize get the currently set batch size.
func (*BatchPublisher) GetList ¶
func (bp *BatchPublisher) GetList() (mList []*model.Data)
GetList get the current list of records
func (*BatchPublisher) SetBatchSize ¶
func (bp *BatchPublisher) SetBatchSize(size uint) (actualBatchSize uint)
SetBatchSize set the batch size. The actual batch size may be reduced if it exceeds the maximum allowed rows per insert based on the number of columns in the bulk insert.
func (*BatchPublisher) SetPostCommit ¶
func (bp *BatchPublisher) SetPostCommit(f func(commitCount int) error)
SetPostCommit sets the post commit func called right after each commit. Set to nil to disable
func (*BatchPublisher) SetPreCommit ¶
func (bp *BatchPublisher) SetPreCommit(f func() error)
SetPreCommit sets the pre commit func called right before each commit. Set to nil to disable
type BatchPublisherParam ¶
type BatchPublisherParam struct { BatchSize uint // The size of each batch PreCommit func() error // Called right before a commit of records PostCommit func(committedCount int) error // Called right after a commit of records }
BatchPublisherParam params sent to NewBatchPublisher
type Event ¶
type Event struct { PubID int `json:"pubId"` Type string `json:"dataType"` ID string `json:"dataId"` Deleted bool `json:"deleted"` Version int `json:"version"` PreviousHash string `json:"-"` NewHash string `json:"-"` NewJSON []byte `json:"-"` // contains filtered or unexported fields }
Event the expected JSON from a skyrin_dps_notify call
func (*Event) Error ¶
Error sets an error for the event. If an error is set for an event, it will be automatically saved with the event when the status is updated
func (*Event) GetEventJSON ¶
func (ev *Event) GetEventJSON(db *sql.Connection) (b []byte, err error)
GetEventJSON retrieves the new JSON from the event record
type PublishParam ¶
type PublishParam struct { PublishID int // The publisher id Type string // The data type ID string // The data id Deleted bool // Indicates if the item was deleted or not JSON []byte // Optional JSON bytes representing the object }
PublishParam parameters for the Publish func
type SubBatchHandler ¶
type SubBatchHandler interface { // Push is called for each batch of sub data to send to a subscriber. The subscriber should load and process // the data as needed. If an error is returned, all the pending records will be marked with this error and // their retries will be incremented. If any record has reached its retry limit, it will be marked as failed // and will not be pushed again until it is changed or it is manually reset to pending. Push([]*Event) (err error) }
SubBatchHandler defines the logic to send the publish events for the specific subscriber.
type SubDataListener ¶
type SubDataListener interface { // Send should send the publish event for the subscriber, returning the hash of the // object sent and optionally a JSON representation of the object. If it failed, it // should return an error. Send(ev *Event) (hash string, jsonBytes []byte, err error) }
SubDataListener defines the logic to send the publish event for a listening subscriber
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber use NewSubscriber to initialize and either listen for pub data or process new/updated pub data in the skyrin_dps_pub table
func NewSubscriber ¶
func NewSubscriber(db *sql.Connection, code string) (s *Subscriber, err error)
NewSubscriber initializes the subscriber and processes any pending sub data records
func (*Subscriber) Close ¶
func (s *Subscriber) Close() (err error)
Close stops listening and cleans up
func (*Subscriber) GetStats ¶
func (s *Subscriber) GetStats() (total, success, retry, fail int)
GetStats returns info on how many records have been processed. total = total number of records success = total number of successfully sent records retry = total number of failed records, but have been marked to retry fail = total number of failed records (retry limit has been exceeded and it will not automatically retry again)
func (*Subscriber) Listen ¶
func (s *Subscriber) Listen(cp *sql.ConnParam, sdl SubDataListener) (err error)
Listen use to listen for change events to records in the skyrin_dps_data table. If an insert or update occurs, the event will be triggered with a JSON string. The subscriber will check if the pubId matches a linked publisher. If it does, it will proceed to process that record.
func (*Subscriber) Populate ¶
func (s *Subscriber) Populate() (err error)
Populate creates missing and updates existing sub data records
func (*Subscriber) PopulateAndRun ¶
func (s *Subscriber) PopulateAndRun(sbh SubBatchHandler, batchSize, batchLimit int) (err error)
PopulateAndRun helper that calls Populate then Run
func (*Subscriber) Run ¶
func (s *Subscriber) Run(sbh SubBatchHandler, batchSize, batchLimit int) (err error)
Run runs the batch process for the specified number of records (batchLimit)
func (*Subscriber) RunBatch ¶
func (s *Subscriber) RunBatch(sbh SubBatchHandler, batchSize, batchLimit int) (err error)
RunBatch runs the batch process for the specified number of records (batchLimit)
func (*Subscriber) SetErrorHandler ¶
func (s *Subscriber) SetErrorHandler(f func(err error))
SetErrorHandler sets the error handler. If a pubsub fails after the subscriber's configured number of retries, this will be called if set when the record is marked as failed.
func (*Subscriber) SetMaxGoRoutines ¶
func (s *Subscriber) SetMaxGoRoutines(max uint)
SetMaxGoRoutines set the maximum number of go routines to use while processing the subscriber data