Documentation ¶
Index ¶
- Constants
- Variables
- func Init()
- func IsEnableRateLimit() bool
- func SetEnableEventSchemasFeature(b bool) bool
- func SetEnableRateLimit(b bool) bool
- func SetEnableSuppressUserFeature(b bool) bool
- type DSStats
- type GatewayAdmin
- type GatewayRPCHandler
- func (g *GatewayRPCHandler) GetDSFailedJobs(arg string, result *string) (err error)
- func (g *GatewayRPCHandler) GetDSJobCount(arg string, result *string) (err error)
- func (g *GatewayRPCHandler) GetDSList(_ string, result *string) (err error)
- func (g *GatewayRPCHandler) GetDSStats(dsName string, result *string) (err error)
- func (g *GatewayRPCHandler) GetJobByID(arg string, result *string) (err error)
- func (g *GatewayRPCHandler) GetJobIDStatus(arg string, result *string) (err error)
- type HandleT
- func (gateway *HandleT) GetWebhookSourceDefName(writeKey string) (name string, ok bool)
- func (gateway *HandleT) IncrementAckCount(count uint64)
- func (gateway *HandleT) IncrementRecvCount(count uint64)
- func (*HandleT) MaxReqSize() int
- func (gateway *HandleT) ProcessWebRequest(w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, ...) string
- func (gateway *HandleT) SetReadonlyDBs(...)
- func (gateway *HandleT) Setup(application app.Interface, backendConfig backendconfig.BackendConfig, ...) error
- func (gateway *HandleT) Shutdown() error
- func (gateway *HandleT) StartAdminHandler(ctx context.Context) error
- func (gateway *HandleT) StartWebHandler(ctx context.Context) error
- func (gateway *HandleT) TrackRequestMetrics(errorMessage string)
- func (gateway *HandleT) UpdateSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]string)
- type ImportRequestHandler
- type RegularRequestHandler
- type RequestHandler
- type SourceEvents
- type SqlRunner
Constants ¶
const (
DELIMITER = string("<<>>")
)
Variables ¶
var ( ReadTimeout time.Duration ReadHeaderTimeout time.Duration WriteTimeout time.Duration IdleTimeout time.Duration Diagnostics diagnostics.DiagnosticsI )
var BatchEvent = []byte(`
{
"batch": [
]
}
`)
var CustomVal string
CustomVal is used as a key in the jobsDB customval column
Functions ¶
func IsEnableRateLimit ¶ added in v0.1.10
func IsEnableRateLimit() bool
IsEnableRateLimit is true if rate limiting is enabled on gateway
func SetEnableEventSchemasFeature ¶ added in v0.1.10
SetEnableEventSchemasFeature overrides enableEventSchemasFeature configuration and returns previous value
func SetEnableRateLimit ¶ added in v0.1.10
SetEnableRateLimit overrides enableRateLimit configuration and returns previous value
func SetEnableSuppressUserFeature ¶ added in v0.1.10
SetEnableSuppressUserFeature overrides enableSuppressUserFeature configuration and returns previous value
Types ¶
type DSStats ¶ added in v0.1.10
type DSStats struct { SourceNums []SourceEvents NumUsers int AvgBatchSize float64 TableSize int64 NumRows int }
type GatewayAdmin ¶ added in v0.1.10
type GatewayAdmin struct {
// contains filtered or unexported fields
}
func (*GatewayAdmin) Status ¶ added in v0.1.10
func (g *GatewayAdmin) Status() interface{}
Status function is used for debug purposes by the admin interface
type GatewayRPCHandler ¶ added in v0.1.10
type GatewayRPCHandler struct {
// contains filtered or unexported fields
}
func (*GatewayRPCHandler) GetDSFailedJobs ¶ added in v0.1.10
func (g *GatewayRPCHandler) GetDSFailedJobs(arg string, result *string) (err error)
func (*GatewayRPCHandler) GetDSJobCount ¶ added in v0.1.10
func (g *GatewayRPCHandler) GetDSJobCount(arg string, result *string) (err error)
func (*GatewayRPCHandler) GetDSList ¶ added in v0.1.10
func (g *GatewayRPCHandler) GetDSList(_ string, result *string) (err error)
func (*GatewayRPCHandler) GetDSStats ¶ added in v0.1.10
func (g *GatewayRPCHandler) GetDSStats(dsName string, result *string) (err error)
GetDSStats TODO : first_event, last_event min--maxid to event: available in dsrange Average batch size ⇒ num_events we want per ds writeKey, count(*) we want source name to count per ds Num Distinct users per ds Avg Event size = Table_size / (avg Batch size * Total rows) is Table_size correct measure? add job status group by
EventsBySource ================================================================================ │───────│───────────│───────────────────────────────│ │ COUNT │ NAME │ ID │ │───────│───────────│───────────────────────────────│ │ 7 │ test-dev │ "1jEZBT9aChBgbVkfKBjtLau8XAM" │ │ 1 │ and-raid │ "1lBkol38t4m5Xz3zZAeSZ0P26QU" │ │───────│───────────│───────────────────────────────│ ================================================================================ NumUsers : 2 AvgBatchSize : 1 TableSize : 65536 NumRows : 8
func (*GatewayRPCHandler) GetJobByID ¶ added in v0.1.10
func (g *GatewayRPCHandler) GetJobByID(arg string, result *string) (err error)
func (*GatewayRPCHandler) GetJobIDStatus ¶ added in v0.1.10
func (g *GatewayRPCHandler) GetJobIDStatus(arg string, result *string) (err error)
type HandleT ¶
type HandleT struct {
// contains filtered or unexported fields
}
HandleT is the struct returned by the Setup call
func (*HandleT) GetWebhookSourceDefName ¶ added in v0.1.10
GetWebhookSourceDefName returns the webhook source definition name by write key
func (*HandleT) IncrementAckCount ¶ added in v0.1.10
IncrementAckCount increments the acknowledged count for gateway requests
func (*HandleT) IncrementRecvCount ¶ added in v0.1.10
IncrementRecvCount increments the received count for gateway requests
func (*HandleT) MaxReqSize ¶ added in v0.1.10
MaxReqSize is the maximum request body size, in bytes, accepted by gateway web handlers
func (*HandleT) ProcessWebRequest ¶ added in v0.1.10
func (gateway *HandleT) ProcessWebRequest(w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string
ProcessWebRequest is an Interface wrapper for webhook
func (*HandleT) SetReadonlyDBs ¶ added in v0.1.10
func (gateway *HandleT) SetReadonlyDBs(readonlyGatewayDB, readonlyRouterDB, readonlyBatchRouterDB jobsdb.ReadonlyJobsDB)
func (*HandleT) Setup ¶
func (gateway *HandleT) Setup( application app.Interface, backendConfig backendconfig.BackendConfig, jobsDB jobsdb.JobsDB, rateLimiter ratelimiter.RateLimiter, versionHandler func(w http.ResponseWriter, r *http.Request), rsourcesService rsources.JobService, ) error
Setup initializes this module: - Monitors backend config for changes. - Starts web request batching goroutine, that batches incoming messages. - Starts web request batch db writer goroutine, that writes incoming batches to JobsDB. - Starts debugging goroutine that prints gateway stats.
This function will block until backend config is initially received.
func (*HandleT) StartAdminHandler ¶ added in v0.1.10
StartAdminHandler for Admin Operations
func (*HandleT) StartWebHandler ¶ added in v0.1.10
StartWebHandler starts all gateway web handlers, listening on gateway port. Supports CORS from all origins. This function will block.
func (*HandleT) TrackRequestMetrics ¶ added in v0.1.10
TrackRequestMetrics provides access to add request success/failure telemetry
type ImportRequestHandler ¶ added in v0.1.10
type ImportRequestHandler struct{}
ImportRequestHandler is an empty struct to capture import specific request handling functionality
func (*ImportRequestHandler) ProcessRequest ¶ added in v0.1.10
func (irh *ImportRequestHandler) ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, _ string, payload []byte, writeKey string) string
ProcessRequest on ImportRequestHandler splits payload by user and throws them into the webrequestQ and waits for all their responses before returning
type RegularRequestHandler ¶ added in v0.1.10
type RegularRequestHandler struct{}
RegularRequestHandler is an empty struct to capture non-import specific request handling functionality
func (*RegularRequestHandler) ProcessRequest ¶ added in v0.1.10
func (rrh *RegularRequestHandler) ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string
ProcessRequest throws a webRequest into the queue and waits for the response before returning
type RequestHandler ¶ added in v0.1.10
type RequestHandler interface {
ProcessRequest(gateway *HandleT, w *http.ResponseWriter, r *http.Request, reqType string, payload []byte, writeKey string) string
}
RequestHandler interface for abstracting out server-side import request processing and rest of the calls