Documentation ¶
Overview ¶
Example (Primitives) ¶
ctx := context.Background() c1 := make(chan *msg.Message, 10) c2 := make(chan *msg.Message, 10) c3 := make(chan *msg.Message, 10) t1, t2, t3 := mem.Topic{C: c1}, mem.Topic{C: c2}, mem.Topic{C: c3} srv1, srv2 := mem.NewServer(c1, 1), mem.NewServer(c2, 1) // split csv into separate messages for analysis go func() { splitFunc := msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { lines, err := csv.NewReader(m.Body).ReadAll() if err != nil { return err } for _, row := range lines { for _, col := range row { w := t2.NewWriter(ctx) w.Write([]byte(col)) w.Close() } } return nil }) srv1.Serve(ctx, splitFunc) }() // perform some analysis on each message go func() { analyzeFunc := msg.ReceiverFunc(func(ctx context.Context, m *msg.Message) error { body, err := msg.DumpBody(m) if err != nil { return err } w := t3.NewWriter(ctx) w.Attributes().Set("Length", fmt.Sprintf("%d", len(body))) w.Attributes().Set("StartsWith", string(body[0:1])) if len(body)%2 == 0 { w.Attributes().Set("Even", "true") w.Attributes().Set("Odd", "false") } else { w.Attributes().Set("Even", "false") w.Attributes().Set("Odd", "true") } w.Write(body) return w.Close() }) srv2.Serve(ctx, analyzeFunc) }() messages := [][]byte{ []byte("foo,bar,baz"), []byte("one,two,three,four"), } for _, m := range messages { w := t1.NewWriter(context.Background()) w.Write(m) w.Close() } calls, expectedCalls := 0, 7 for m := range c3 { orderedKeys := make([]string, 0) for k := range m.Attributes { orderedKeys = append(orderedKeys, k) } sort.Strings(orderedKeys) for _, k := range orderedKeys { fmt.Printf("%s:%v ", k, m.Attributes.Get(k)) } fmt.Printf("%v\n", m.Body) calls++ if calls == expectedCalls { close(c3) } }
Output: Even:false Length:3 Odd:true Startswith:f foo Even:false Length:3 Odd:true Startswith:b bar Even:false Length:3 Odd:true Startswith:b baz Even:false Length:3 Odd:true Startswith:o one Even:false Length:3 Odd:true Startswith:t two Even:false Length:5 Odd:true Startswith:t three Even:true Length:4 Odd:false Startswith:f four
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MessageWriter ¶
type MessageWriter struct { msg.MessageWriter // contains filtered or unexported fields }
MessageWriter is used to publish a single Message to a channel. Once all of the data has been written and closed, it may not be used again.
func (*MessageWriter) Attributes ¶
func (w *MessageWriter) Attributes() *msg.Attributes
Attributes returns the attributes of the MessageWriter.
func (*MessageWriter) Close ¶
func (w *MessageWriter) Close() error
Close publishes a Message to a channel. If the MessageWriter is already closed it will return an error.
type Server ¶
type Server struct { C chan *msg.Message // Concurrency is the maximum number of Messages that can be processed // concurrently by the Server. Concurrency int // contains filtered or unexported fields }
Server subscribes to a channel and listens for Messages.
func (*Server) Serve ¶
Serve always returns a non-nil error. After Shutdown, the returned error is ErrServerClosed
func (*Server) Shutdown ¶
Shutdown attempts to gracefully shut down the Server without interrupting any messages in flight. When Shutdown is signalled, the Server stops polling for new Messages and then it waits for all of the active goroutines to complete.
If the provided context expires before the shutdown is complete, then any remaining goroutines will be killed and the context's error is returned.