Documentation
¶
Index ¶
- Variables
- func NewMetrics() *metrics
- func NewWorkerPool(n int, f WorkerFunc) *workerPool
- func TickerProducer(ctx context.Context, d time.Duration, f func(time.Time, chan Event)) chan Event
- type Dispatcher
- type Event
- type EventBundler
- type LatencyMetrics
- type Metrics
- type Producer
- type Result
- type Results
- type Worker
- type WorkerFunc
- type WorkerPool
- type WorkerState
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var (
EventCount = stats.Int64("evbundler/event_count", "Number of processed events", stats.UnitDimensionless)
)
The following Event measures are supported for use in custom views.
View Source
var (
KeyEventErr, _ = tag.NewKey("event_err")
)
View Source
var (
// WorkerStatus is the status of worker, capitalized (i.e. DEAD, ACTIVE, WAIT_EVENT, PROCESS_EVENT)
// See also `type WorkerState`
KeyWorkerState, _ = tag.NewKey("worker.state")
)
View Source
var (
WorkerCount = stats.Int64("evbundler/worker_count", "Number of total workers", stats.UnitDimensionless)
)
The following Workers measures are supported for use in custom views.
Functions ¶
func NewMetrics ¶
func NewMetrics() *metrics
func NewWorkerPool ¶
func NewWorkerPool(n int, f WorkerFunc) *workerPool
func TickerProducer ¶
Example ¶
package main import ( "context" "fmt" "log" "math/rand" "net/http" "net/http/httptest" "net/url" "strings" "time" "github.com/theoden9014/evbundler" "github.com/theoden9014/evbundler/dispatcher" "github.com/theoden9014/evbundler/event" ) func main() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) countupCh := make(chan int, 10) go func() { var counter int for { select { case <-countupCh: counter++ default: if counter >= 100 { fmt.Print("receive 100 requests\n") cancel() return } } } }() sv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { countupCh <- 1 })) addr := sv.Listener.Addr().String() defer sv.Close() u, err := url.Parse("http://" + addr) if err != nil { log.Fatal(err) } charset := "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" // generate http events with random path. evCh := evbundler.TickerProducer(ctx, 1*time.Millisecond, func(t time.Time, evCh chan evbundler.Event) { var sb strings.Builder r := rand.New(rand.NewSource(t.UnixNano())) for i := 0; i < 10; i++ { sb.WriteByte(charset[r.Intn(len(charset))]) } evCh <- event.HTTPEvent{ URL: u, Method: "GET", Path: sb.String(), Body: nil, } }) wp := evbundler.NewWorkerPool(10, nil) disp := dispatcher.NewGoChannel(wp) _ = disp.Dispatch(ctx, evCh) }
Output: receive 100 requests
Types ¶
type EventBundler ¶
type EventBundler struct {
// contains filtered or unexported fields
}
EventBundler put several receive channels to gather into one.
func (*EventBundler) Out ¶
func (ep *EventBundler) Out() <-chan Event
Out returns the bundled channels
func (*EventBundler) Start ¶
func (ep *EventBundler) Start(ctx context.Context)
Start bypass receive channels to bundled channels. run another goroutine each receive channels.
type LatencyMetrics ¶
type LatencyMetrics struct { Total time.Duration Mean time.Duration P50 time.Duration P90 time.Duration P95 time.Duration P99 time.Duration Max time.Duration // contains filtered or unexported fields }
func (*LatencyMetrics) Add ¶
func (l *LatencyMetrics) Add(latency time.Duration)
func (*LatencyMetrics) MarshalJSON ¶
func (l *LatencyMetrics) MarshalJSON() ([]byte, error)
type Result ¶
type WorkerPool ¶
type WorkerState ¶
type WorkerState int
const ( StateDead WorkerState = iota StateActive StateWaiting StateProcess )
func (WorkerState) String ¶
func (ws WorkerState) String() string
Source Files
¶
Click to show internal directories.
Click to hide internal directories.