Documentation
¶
Index ¶
- func CounterTypeValuesString() string
- type ARN
- type App
- func (app *App) Handler(ctx context.Context, event *KinesisTimeWindowEvent) (*TimeWindowEventResponse, error)
- func (app *App) Run(ctx context.Context, streamName string, tumblingWindow time.Duration) error
- func (app *App) SetIgnorePutRecord(f bool)
- func (app *App) SetOutput(w io.Writer)
- func (app *App) SetVersion(version string)
- type BatchItemFailure
- type Config
- type CounterConfig
- type CounterState
- type CounterStates
- type CounterType
- func (i CounterType) IsACounterType() bool
- func (i CounterType) MarshalJSON() ([]byte, error)
- func (i CounterType) MarshalYAML() (interface{}, error)
- func (t *CounterType) Set(str string) error
- func (i CounterType) String() string
- func (i *CounterType) UnmarshalJSON(data []byte) error
- func (i *CounterType) UnmarshalYAML(unmarshal func(interface{}) error) error
- type FirehoseClient
- type IntermediateRecord
- type KinesisClient
- type KinesisTimeWindow
- type KinesisTimeWindowEvent
- type TimeWindowEventResponse
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CounterTypeValuesString ¶
func CounterTypeValuesString() string
Types ¶
type ARN ¶
func (*ARN) IsAmbiguous ¶
func (*ARN) IsKinesisDataStream ¶
func (*ARN) MarshalText ¶
func (*ARN) StreamName ¶
func (*ARN) UnmarshalText ¶
type App ¶
type App struct {
// contains filtered or unexported fields
}
func NewWithClient ¶
func NewWithClient(cfg *Config, kinesisClient KinesisClient, firehoseClient FirehoseClient) *App
func (*App) Handler ¶
func (app *App) Handler(ctx context.Context, event *KinesisTimeWindowEvent) (*TimeWindowEventResponse, error)
func (*App) SetIgnorePutRecord ¶
func (*App) SetVersion ¶
type BatchItemFailure ¶
type BatchItemFailure struct {
ItemIdentifier string `json:"itemIdentifier"`
}
type Config ¶
type Config struct { RequiredVersion string `yaml:"required_version"` Counters []*CounterConfig `yaml:"counters"` // contains filtered or unexported fields }
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
func (*Config) ValidateVersion ¶
type CounterConfig ¶
type CounterConfig struct { ID string `yaml:"id,omitempty"` InputStreamARN *ARN `yaml:"input_stream_arn,omitempty"` OutputStreamARN *ARN `yaml:"output_stream_arn,omitempty"` AggregateStreamArn *ARN `yaml:"aggregate_stream_arn,omitempty"` TargetColumn string `yaml:"target_column,omitempty"` TargetExpr string `yaml:"target_expr,omitempty"` CounterType CounterType `yaml:"counter_type,omitempty"` SipHashKeyHex string `yaml:"siphash_key_hex"` JQExpr string `yaml:"jq_expr"` // contains filtered or unexported fields }
func (*CounterConfig) Restrict ¶
func (cfg *CounterConfig) Restrict() error
type CounterState ¶
type CounterState struct { CounterType CounterType `json:"counter_type"` RowCount int64 `json:"row_count,omitempty"` Base64HLLPP string `json:"base64_hllpp,omitempty"` }
type CounterStates ¶
type CounterStates map[string]map[string]*CounterState
func (CounterStates) MergeInto ¶
func (s CounterStates) MergeInto(other CounterStates)
type CounterType ¶
type CounterType int
const ( Count CounterType = iota + 1 ApproxCountDistinct )
func CounterTypeString ¶
func CounterTypeString(s string) (CounterType, error)
CounterTypeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func CounterTypeValues ¶
func CounterTypeValues() []CounterType
CounterTypeValues returns all values of the enum
func (CounterType) IsACounterType ¶
func (i CounterType) IsACounterType() bool
IsACounterType returns "true" if the value is listed in the enum definition. "false" otherwise
func (CounterType) MarshalJSON ¶
func (i CounterType) MarshalJSON() ([]byte, error)
MarshalJSON implements the json.Marshaler interface for CounterType
func (CounterType) MarshalYAML ¶
func (i CounterType) MarshalYAML() (interface{}, error)
MarshalYAML implements a YAML Marshaler for CounterType
func (*CounterType) Set ¶
func (t *CounterType) Set(str string) error
func (CounterType) String ¶
func (i CounterType) String() string
func (*CounterType) UnmarshalJSON ¶
func (i *CounterType) UnmarshalJSON(data []byte) error
UnmarshalJSON implements the json.Unmarshaler interface for CounterType
func (*CounterType) UnmarshalYAML ¶
func (i *CounterType) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implements a YAML Unmarshaler for CounterType
type FirehoseClient ¶
type FirehoseClient interface {
PutRecord(ctx context.Context, params *firehose.PutRecordInput, optFns ...func(*firehose.Options)) (*firehose.PutRecordOutput, error)
}
type IntermediateRecord ¶
type IntermediateRecord struct { EventSourceARN string `json:"event_source_arn,omitempty"` ShardID string `json:"shard_id,omitempty"` CounterID string `json:"counter_id,omitempty"` CounterType CounterType `json:"counter_type,omitempty"` CounterVersion string `json:"counter_version,omitempty"` Window *KinesisTimeWindow `json:"window,omitempty"` State *CounterState `json:"counter_state,omitempty"` }
type KinesisClient ¶
type KinesisClient interface { GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error) }
type KinesisTimeWindow ¶
func (*KinesisTimeWindow) String ¶ added in v0.1.2
func (w *KinesisTimeWindow) String() string
type KinesisTimeWindowEvent ¶
type KinesisTimeWindowEvent struct { Records []events.KinesisEventRecord `json:"Records"` Window *KinesisTimeWindow `json:"window"` State CounterStates `json:"state"` ShardID string `json:"shardId"` EventSourceArn string `json:"eventSourceARN"` IsFinalInvokeForWindow bool `json:"isFinalInvokeForWindow"` IsWindowTerminatedEarly bool `json:"isWindowTerminatedEarly"` }
type TimeWindowEventResponse ¶
type TimeWindowEventResponse struct { State CounterStates `json:"state"` BatchItemFailures []BatchItemFailure `json:"batchItemFailures"` // contains filtered or unexported fields }
func (*TimeWindowEventResponse) AddBatchItemFailures ¶
func (resp *TimeWindowEventResponse) AddBatchItemFailures(items ...BatchItemFailure)
func (*TimeWindowEventResponse) MergeInto ¶
func (resp *TimeWindowEventResponse) MergeInto(other *TimeWindowEventResponse)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.