Documentation
¶
Index ¶
- func AssertEqual(t *testing.T, want, have any)
- func AssertInRange[T constraints.Ordered](t *testing.T, val, lower, upper T)
- func AssertNoErr(t *testing.T, err error, format string, args ...any)
- func PanicOnErr(err error, format string, args ...any)
- type Message
- type Queue
- type SqliteStore
- func (s *SqliteStore) AppendContext(ctx context.Context, topic Topic, msgs ...[]byte) error
- func (s *SqliteStore) Close()
- func (s *SqliteStore) CommitContext(ctx context.Context, clientID string, topic Topic, idx int) error
- func (s *SqliteStore) FetchNextContext(ctx context.Context, clientID string, topic Topic, limit int) ([]Message, error)
- type Store
- type Topic
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AssertEqual ¶
func AssertInRange ¶
func AssertInRange[T constraints.Ordered](t *testing.T, val, lower, upper T)
func PanicOnErr ¶
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Example ¶
store, err := NewSqliteStore(":memory:") PanicOnErr(err, "init sqlite store") defer store.Close() queue := NewQueue(store) var topic Topic = "topic_1" clientID := "client_1" //consume wg := sync.WaitGroup{} wg.Add(1) ctx, cancel := context.WithCancel(context.Background()) go func(ctx context.Context) { defer wg.Done() for { select { case <-ctx.Done(): return default: msgs, err := queue.ReadContext(ctx, clientID, topic, 2, 50*time.Millisecond) PanicOnErr(err, "read") if len(msgs) == 0 { continue } for _, m := range msgs { fmt.Println(m.Index, string(m.Data)) } err = queue.CommitContext(ctx, clientID, topic, msgs[len(msgs)-1].Index) } } }(ctx) //produce chunkSize := 5 chunks := 10 for i := 0; i < chunks; i++ { msgs := make([][]byte, chunkSize) for j := 0; j < chunkSize; j++ { msgs[j] = []byte(fmt.Sprintf("message_%03d", i*chunkSize+j)) } queue.WriteContext(ctx, topic, msgs...) <-time.After(100 * time.Millisecond) } cancel() wg.Wait()
Output:
func (*Queue) CommitContext ¶
func (*Queue) ReadContext ¶
type SqliteStore ¶
func NewSqliteStore ¶
func NewSqliteStore(dsn string) (*SqliteStore, error)
func (*SqliteStore) AppendContext ¶
func (*SqliteStore) Close ¶
func (s *SqliteStore) Close()
func (*SqliteStore) CommitContext ¶
func (*SqliteStore) FetchNextContext ¶
Click to show internal directories.
Click to hide internal directories.