Documentation
¶
Index ¶
- func GetRetryWait(retriedCount uint16) time.Duration
- type DBOrTx
- type Handler
- type Message
- type RetryWait
- type SqlMQ
- type StdMessage
- type StdTable
- func (table *StdTable) CleanMessages(db *sql.DB) (int64, error)
- func (table *StdTable) EarliestMessage(tx *sql.Tx) (Message, error)
- func (table *StdTable) MarkGivenUp(db DBOrTx, message Message) error
- func (table *StdTable) MarkRetry(db DBOrTx, message Message, retryAfter time.Duration) error
- func (table *StdTable) MarkSuccess(tx *sql.Tx, message Message) error
- func (table *StdTable) Name() string
- func (table *StdTable) ProduceMessage(db DBOrTx, message Message) error
- func (table *StdTable) SetQueues(queues []string)
- type Table
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Handler ¶
type Handler func(ctx context.Context, tx *sql.Tx, msg Message) ( retryAfter time.Duration, canCommit bool, err error, )
On successful handling, a nil error should be returned, retryAfter and canCommit is ignored. On failing handling, a non nil error should be returned, and retryAfter means: 1. if retryAfter is positive, means try again that time period later; 2. if retryAfter is zero, means try again immediately; 3. if retryAfter is negative, means give up this message, don't try again. canCommit means when an error is returned, can the transaction be committed or must be rollbacked. If canCommit is false, this transaction is rollbacked, and another statements is executed to update retry time.
type SqlMQ ¶
type SqlMQ struct { DB *sql.DB Table Table Logger *logger.Logger // The max number of messages to be consumed concurrently. // If ConsumeConcurrency <= 0, the default value 10 is used. ConsumeConcurrency int // If no message is available for consuming, wait how long before try to fetch message again. // If IdleWait <= 0, the default value one minute is used. IdleWait time.Duration // If encounter an error when fetching message, wait how long before try to fetch message again. // If ErrorWait <= 0, the default value one minute is used. ErrorWait time.Duration // Transaction timeout for message fecthing and handling. // If TxTimeout <= 0, the default value one minute is used. TxTimeout time.Duration // The time interval to clean successfully consumed messages. CleanInterval time.Duration // contains filtered or unexported fields }
Example ¶
package main import ( "context" "database/sql" "encoding/json" "errors" "fmt" "os" "time" "gitee.com/go-better/dev/debug/logger" _ "github.com/lib/pq" ) var testDB = getDB() var testMQ = recreateSqlMQ() func main() { testMQ.Debug(true) if err := testMQ.Register("test", testHandler); err != nil { panic(err) } produce(testMQ, "success") produce(testMQ, "retry") produce(testMQ, "given up") go testMQ.Consume() time.Sleep(5 * time.Second) } func testHandler(ctx context.Context, tx *sql.Tx, msg Message) (time.Duration, bool, error) { m := msg.(*StdMessage) var data string if err := json.Unmarshal(m.Data.([]byte), &data); err != nil { return 0, true, err } m.Data = data fmt.Println(data, m.TriedCount) switch m.Data { case "success": return 0, true, nil case "retry": switch m.TriedCount { case 0: return time.Second, false, errors.New("error happened") case 1: return time.Second, true, errors.New("error happened") default: return 0, true, nil } default: return -1, false, errors.New("given up") } } func getDB() *sql.DB { db, err := sql.Open("postgres", "postgres://postgres:postgres@localhost/postgres?sslmode=disable") if err != nil { panic(err) } return db } func recreateSqlMQ() *SqlMQ { if _, err := testDB.Exec("DROP TABLE IF EXISTS sqlmq"); err != nil { panic(err) } return getSqlMQ() } func getSqlMQ() *SqlMQ { logFile, err := os.Create(".log.json") if err != nil { panic(err) } return &SqlMQ{ DB: testDB, Table: NewStdTable(testDB, "sqlmq", time.Hour), Logger: logger.New(logFile), CleanInterval: time.Hour, } } func produce(mq *SqlMQ, data string) { if err := mq.Produce(nil, &StdMessage{Queue: "test", Data: data}); err != nil { panic(err) } }
Output: success 0 retry 0 given up 0 retry 1 retry 2
Example (Consume) ¶
var mq = recreateSqlMQ() if err := mq.Register("test3", noopHandler); err != nil { panic(err) } mq.TxTimeout = time.Nanosecond // set smallest time to make it timeout fmt.Println(mq.consume(2*time.Minute, 3*time.Minute)) mq.TxTimeout = 0 fmt.Println(mq.consume(2*time.Minute, 3*time.Minute)) mq.Produce(nil, &StdMessage{Queue: "test3", RetryAt: time.Now().Add(time.Hour)}) fmt.Println(mq.consume(2*time.Minute, 3*time.Minute))
Output: 3m0s 2m0s 2m0s
Example (Handle) ¶
var mq = getSqlMQ() tx, cancel, err := mq.beginTx() if err != nil { panic(err) } fmt.Println(mq.handle(context.Background(), cancel, tx, &StdMessage{Queue: "test"}))
Output: 1m0s unknown queue: test
Example (MarkFail) ¶
var mq = getSqlMQ() var buf bytes.Buffer mq.Logger = logger.New(&buf) mq.markFail(mq.DB, &StdMessage{}, -1, false) fmt.Println(bytes.Contains(buf.Bytes(), []byte(`"msg":"affected 0 rows"`)))
Output: true
Example (Validate) ¶
var mq SqlMQ fmt.Println(mq.validate()) mq.DB = testMQ.DB fmt.Println(mq.validate()) mq.Table = testMQ.Table fmt.Println(mq.validate())
Output: SqlMQ.DB must not be nil SqlMQ.Table must not be nil <nil>
func (*SqlMQ) Consume ¶
func (mq *SqlMQ) Consume()
Example ¶
var mq = getSqlMQ() go mq.Consume() time.Sleep(10 * time.Millisecond)
Output:
Example (Panic) ¶
defer func() { fmt.Println(strings.HasSuffix(recover().(string), "SqlMQ.DB must not be nil")) }() var mq SqlMQ mq.Consume()
Output: true
func (*SqlMQ) NotifyConsumeAt ¶
notify mq that there are messages to be consumed at a time.
func (*SqlMQ) Produce ¶
Produce a meesage. tx can be nil.
Example ¶
fmt.Println(testMQ.Produce(nil, &StdMessage{Queue: "test2"})) if err := testMQ.Register("test2", noopHandler); err != nil { panic(err) } tx, err := testDB.Begin() if err != nil { panic(err) } err = testMQ.Produce(tx, &StdMessage{Queue: "test2"}) if err != nil { tx.Rollback() } else { err = tx.Commit() } fmt.Println(err) fmt.Println(testMQ.Produce(tx, &StdMessage{Queue: "test2", Data: make(chan int)}))
Output: unknown queue: test2 <nil> json: unsupported type: chan int
type StdMessage ¶
type StdMessage struct { Id int64 Queue string // quene name Data interface{} // data of any type Status string CreatedAt time.Time TriedCount uint16 // how many times have tried already. RetryAt time.Time // next retry at when. }
func (*StdMessage) ConsumeAt ¶
func (msg *StdMessage) ConsumeAt() time.Time
func (*StdMessage) QueueName ¶
func (msg *StdMessage) QueueName() string
type StdTable ¶
type StdTable struct {
// contains filtered or unexported fields
}
StdTable is a standard `sqlmq.Table` implementation.
func NewStdTable ¶
NewStdTable create a standard `sqlmq.Table` instance. db: db use to create table and index. name: database table name. keep: keep a successfully consumed message for how long before delete it.
Example ¶
table := NewStdTable(testDB, "test_name", -1) fmt.Println(table.Name(), table.keep)
Output: test_name 24h0m0s
func (*StdTable) EarliestMessage ¶
func (*StdTable) MarkSuccess ¶
func (*StdTable) ProduceMessage ¶
if ProduceMessage runs succussfully, message id is set in message(which is *StdMessage).
type Table ¶
type Table interface { // Set the queues for EarliestMessage. This method must be concurrency safe. SetQueues(queues []string) // Get the earliest message in the "SetQueues" which have not been "MarkSuccess". // The "earliest" means smallest "ConsumeAt". // If no such message, return a nil interface. // The tx must be used to exclusively lock (SELECT FOR UPDATE) the returned message. // Don't commit or rollback the tx. EarliestMessage(tx *sql.Tx) (Message, error) // Mark a message as consumed successfully. // The tx must be used to update the message. Don't commit or rollback the tx. MarkSuccess(tx *sql.Tx, msg Message) error // mark a message should be retried after a time period MarkRetry(db DBOrTx, msg Message, retryAfter time.Duration) error // mark a message as given up MarkGivenUp(db DBOrTx, msg Message) error // produce a message. ProduceMessage(db DBOrTx, msg Message) error // clean successfully consumed messages, may keep a duration after consumed for debugging. // return the number of cleaned messages. CleanMessages(db *sql.DB) (int64, error) }