Documentation
¶
Index ¶
- Constants
- Variables
- func GetTopic(options GetTopicOptions) string
- type AddSessionFeedbackArgs
- type AddSessionPropertiesArgs
- type AddTrackPropertiesArgs
- type BalancerWrapper
- type ConfigOverride
- type ErrorGroupDataSyncArgs
- type ErrorObjectDataSyncArgs
- type GetTopicOptions
- type IdentifySessionArgs
- type InitializeSessionArgs
- type LogRowMessage
- func (m *LogRowMessage) GetFailures() int
- func (m *LogRowMessage) GetKafkaMessage() *kafka.Message
- func (m *LogRowMessage) GetMaxRetries() int
- func (m *LogRowMessage) GetType() PayloadType
- func (m *LogRowMessage) SetFailures(value int)
- func (m *LogRowMessage) SetKafkaMessage(value *kafka.Message)
- func (m *LogRowMessage) SetMaxRetries(value int)
- type Message
- func (m *Message) GetFailures() int
- func (m *Message) GetKafkaMessage() *kafka.Message
- func (m *Message) GetMaxRetries() int
- func (m *Message) GetType() PayloadType
- func (m *Message) SetFailures(value int)
- func (m *Message) SetKafkaMessage(value *kafka.Message)
- func (m *Message) SetMaxRetries(value int)
- type MessageQueue
- type MockMessageQueue
- type Mode
- type OTeLMetricHistogramRow
- func (m *OTeLMetricHistogramRow) GetFailures() int
- func (m *OTeLMetricHistogramRow) GetKafkaMessage() *kafka.Message
- func (m *OTeLMetricHistogramRow) GetMaxRetries() int
- func (m *OTeLMetricHistogramRow) GetType() PayloadType
- func (m *OTeLMetricHistogramRow) SetFailures(value int)
- func (m *OTeLMetricHistogramRow) SetKafkaMessage(value *kafka.Message)
- func (m *OTeLMetricHistogramRow) SetMaxRetries(value int)
- type OTeLMetricSumRow
- func (m *OTeLMetricSumRow) GetFailures() int
- func (m *OTeLMetricSumRow) GetKafkaMessage() *kafka.Message
- func (m *OTeLMetricSumRow) GetMaxRetries() int
- func (m *OTeLMetricSumRow) GetType() PayloadType
- func (m *OTeLMetricSumRow) SetFailures(value int)
- func (m *OTeLMetricSumRow) SetKafkaMessage(value *kafka.Message)
- func (m *OTeLMetricSumRow) SetMaxRetries(value int)
- type OTeLMetricSummaryRow
- func (m *OTeLMetricSummaryRow) GetFailures() int
- func (m *OTeLMetricSummaryRow) GetKafkaMessage() *kafka.Message
- func (m *OTeLMetricSummaryRow) GetMaxRetries() int
- func (m *OTeLMetricSummaryRow) GetType() PayloadType
- func (m *OTeLMetricSummaryRow) SetFailures(value int)
- func (m *OTeLMetricSummaryRow) SetKafkaMessage(value *kafka.Message)
- func (m *OTeLMetricSummaryRow) SetMaxRetries(value int)
- type PayloadType
- type PushBackendPayloadArgs
- type PushCompressedPayloadArgs
- type PushLogsArgs
- type PushMetricsArgs
- type PushPayloadArgs
- type PushTracesArgs
- type Queue
- func (p *Queue) Commit(ctx context.Context, msg *kafka.Message)
- func (p *Queue) LogStats()
- func (p *Queue) Receive(ctx context.Context) (msg RetryableMessage)
- func (p *Queue) Rewind(ctx context.Context, dur time.Duration) error
- func (p *Queue) Stop(ctx context.Context)
- func (p *Queue) Submit(ctx context.Context, partitionKey string, messages ...RetryableMessage) error
- type RetryableMessage
- type SessionDataSyncArgs
- type SessionEventRowMessage
- func (m *SessionEventRowMessage) GetFailures() int
- func (m *SessionEventRowMessage) GetKafkaMessage() *kafka.Message
- func (m *SessionEventRowMessage) GetMaxRetries() int
- func (m *SessionEventRowMessage) GetType() PayloadType
- func (m *SessionEventRowMessage) SetFailures(value int)
- func (m *SessionEventRowMessage) SetKafkaMessage(value *kafka.Message)
- func (m *SessionEventRowMessage) SetMaxRetries(value int)
- type TopicType
- type TraceRowMessage
- func (m *TraceRowMessage) GetFailures() int
- func (m *TraceRowMessage) GetKafkaMessage() *kafka.Message
- func (m *TraceRowMessage) GetMaxRetries() int
- func (m *TraceRowMessage) GetType() PayloadType
- func (m *TraceRowMessage) SetFailures(value int)
- func (m *TraceRowMessage) SetKafkaMessage(value *kafka.Message)
- func (m *TraceRowMessage) SetMaxRetries(value int)
Constants ¶
View Source
const ( TaskRetries = 0 MaxMessageSizeBytes = 256 * 1024 * 1024 // MiB )
View Source
const ConsumerGroupName = "group-default"
View Source
const KafkaOperationTimeout = 25 * time.Second
KafkaOperationTimeout The timeout for all kafka send/receive operations.
Variables ¶
Functions ¶
func GetTopic ¶
func GetTopic(options GetTopicOptions) string
Types ¶
type AddSessionFeedbackArgs ¶
type AddSessionPropertiesArgs ¶
type AddSessionPropertiesArgs struct { SessionSecureID string PropertiesObject interface{} }
type AddTrackPropertiesArgs ¶
type AddTrackPropertiesArgs struct { SessionSecureID string PropertiesObject interface{} }
type BalancerWrapper ¶
type BalancerWrapper struct {
// contains filtered or unexported fields
}
func (*BalancerWrapper) AssignGroups ¶
func (b *BalancerWrapper) AssignGroups(members []kafka.GroupMember, partitions []kafka.Partition) kafka.GroupMemberAssignments
func (*BalancerWrapper) ProtocolName ¶
func (b *BalancerWrapper) ProtocolName() string
func (*BalancerWrapper) UserData ¶
func (b *BalancerWrapper) UserData() ([]byte, error)
type ConfigOverride ¶
type ErrorGroupDataSyncArgs ¶
type ErrorGroupDataSyncArgs struct {
ErrorGroupID int
}
type ErrorObjectDataSyncArgs ¶
type ErrorObjectDataSyncArgs struct {
ErrorObjectID int
}
type GetTopicOptions ¶
type GetTopicOptions struct {
Type TopicType
}
type IdentifySessionArgs ¶
type InitializeSessionArgs ¶
type InitializeSessionArgs struct { SessionSecureID string CreatedAt time.Time ProjectVerboseID string EnableStrictPrivacy bool PrivacySetting *string EnableRecordingNetworkContents bool ClientVersion string FirstloadVersion string ClientConfig string Environment string AppVersion *string Fingerprint string UserAgent string AcceptLanguage string IP string ClientID string NetworkRecordingDomains []string DisableSessionRecording *bool ServiceName string }
type LogRowMessage ¶
type LogRowMessage struct { Type PayloadType Failures int MaxRetries int KafkaMessage *kafka.Message `json:",omitempty"` *clickhouse.LogRow }
func (*LogRowMessage) GetFailures ¶
func (m *LogRowMessage) GetFailures() int
func (*LogRowMessage) GetKafkaMessage ¶
func (m *LogRowMessage) GetKafkaMessage() *kafka.Message
func (*LogRowMessage) GetMaxRetries ¶
func (m *LogRowMessage) GetMaxRetries() int
func (*LogRowMessage) GetType ¶
func (m *LogRowMessage) GetType() PayloadType
func (*LogRowMessage) SetFailures ¶
func (m *LogRowMessage) SetFailures(value int)
func (*LogRowMessage) SetKafkaMessage ¶
func (m *LogRowMessage) SetKafkaMessage(value *kafka.Message)
func (*LogRowMessage) SetMaxRetries ¶
func (m *LogRowMessage) SetMaxRetries(value int)
type Message ¶
type Message struct { Type PayloadType Failures int MaxRetries int KafkaMessage *kafka.Message `json:",omitempty"` PushPayload *PushPayloadArgs `json:",omitempty"` InitializeSession *InitializeSessionArgs `json:",omitempty"` IdentifySession *IdentifySessionArgs `json:",omitempty"` AddTrackProperties *AddTrackPropertiesArgs `json:",omitempty"` AddSessionProperties *AddSessionPropertiesArgs `json:",omitempty"` PushBackendPayload *PushBackendPayloadArgs `json:",omitempty"` PushMetrics *PushMetricsArgs `json:",omitempty"` AddSessionFeedback *AddSessionFeedbackArgs `json:",omitempty"` PushLogs *PushLogsArgs `json:",omitempty"` PushTraces *PushTracesArgs `json:",omitempty"` SessionDataSync *SessionDataSyncArgs `json:",omitempty"` ErrorGroupDataSync *ErrorGroupDataSyncArgs `json:",omitempty"` ErrorObjectDataSync *ErrorObjectDataSyncArgs `json:",omitempty"` PushCompressedPayload *PushCompressedPayloadArgs `json:",omitempty"` }
func (*Message) GetFailures ¶
func (*Message) GetKafkaMessage ¶
func (m *Message) GetKafkaMessage() *kafka.Message
func (*Message) GetMaxRetries ¶
func (*Message) GetType ¶
func (m *Message) GetType() PayloadType
func (*Message) SetFailures ¶
func (*Message) SetKafkaMessage ¶
func (m *Message) SetKafkaMessage(value *kafka.Message)
func (*Message) SetMaxRetries ¶
type MessageQueue ¶
type MessageQueue interface { Stop(context.Context) Receive(context.Context) RetryableMessage Submit(context.Context, string, ...RetryableMessage) error LogStats() }
type MockMessageQueue ¶
type MockMessageQueue struct{}
func (*MockMessageQueue) LogStats ¶
func (k *MockMessageQueue) LogStats()
func (*MockMessageQueue) Receive ¶
func (k *MockMessageQueue) Receive(context.Context) RetryableMessage
func (*MockMessageQueue) Stop ¶
func (k *MockMessageQueue) Stop(context.Context)
func (*MockMessageQueue) Submit ¶
func (k *MockMessageQueue) Submit(context.Context, string, ...RetryableMessage) error
type OTeLMetricHistogramRow ¶
type OTeLMetricHistogramRow struct { Type PayloadType Failures int MaxRetries int KafkaMessage *kafka.Message `json:",omitempty"` *clickhouse.MetricHistogramRow }
func (*OTeLMetricHistogramRow) GetFailures ¶
func (m *OTeLMetricHistogramRow) GetFailures() int
func (*OTeLMetricHistogramRow) GetKafkaMessage ¶
func (m *OTeLMetricHistogramRow) GetKafkaMessage() *kafka.Message
func (*OTeLMetricHistogramRow) GetMaxRetries ¶
func (m *OTeLMetricHistogramRow) GetMaxRetries() int
func (*OTeLMetricHistogramRow) GetType ¶
func (m *OTeLMetricHistogramRow) GetType() PayloadType
func (*OTeLMetricHistogramRow) SetFailures ¶
func (m *OTeLMetricHistogramRow) SetFailures(value int)
func (*OTeLMetricHistogramRow) SetKafkaMessage ¶
func (m *OTeLMetricHistogramRow) SetKafkaMessage(value *kafka.Message)
func (*OTeLMetricHistogramRow) SetMaxRetries ¶
func (m *OTeLMetricHistogramRow) SetMaxRetries(value int)
type OTeLMetricSumRow ¶
type OTeLMetricSumRow struct { Type PayloadType Failures int MaxRetries int KafkaMessage *kafka.Message `json:",omitempty"` *clickhouse.MetricSumRow }
func (*OTeLMetricSumRow) GetFailures ¶
func (m *OTeLMetricSumRow) GetFailures() int
func (*OTeLMetricSumRow) GetKafkaMessage ¶
func (m *OTeLMetricSumRow) GetKafkaMessage() *kafka.Message
func (*OTeLMetricSumRow) GetMaxRetries ¶
func (m *OTeLMetricSumRow) GetMaxRetries() int
func (*OTeLMetricSumRow) GetType ¶
func (m *OTeLMetricSumRow) GetType() PayloadType
func (*OTeLMetricSumRow) SetFailures ¶
func (m *OTeLMetricSumRow) SetFailures(value int)
func (*OTeLMetricSumRow) SetKafkaMessage ¶
func (m *OTeLMetricSumRow) SetKafkaMessage(value *kafka.Message)
func (*OTeLMetricSumRow) SetMaxRetries ¶
func (m *OTeLMetricSumRow) SetMaxRetries(value int)
type OTeLMetricSummaryRow ¶
type OTeLMetricSummaryRow struct { Type PayloadType Failures int MaxRetries int KafkaMessage *kafka.Message `json:",omitempty"` *clickhouse.MetricSummaryRow }
func (*OTeLMetricSummaryRow) GetFailures ¶
func (m *OTeLMetricSummaryRow) GetFailures() int
func (*OTeLMetricSummaryRow) GetKafkaMessage ¶
func (m *OTeLMetricSummaryRow) GetKafkaMessage() *kafka.Message
func (*OTeLMetricSummaryRow) GetMaxRetries ¶
func (m *OTeLMetricSummaryRow) GetMaxRetries() int
func (*OTeLMetricSummaryRow) GetType ¶
func (m *OTeLMetricSummaryRow) GetType() PayloadType
func (*OTeLMetricSummaryRow) SetFailures ¶
func (m *OTeLMetricSummaryRow) SetFailures(value int)
func (*OTeLMetricSummaryRow) SetKafkaMessage ¶
func (m *OTeLMetricSummaryRow) SetKafkaMessage(value *kafka.Message)
func (*OTeLMetricSummaryRow) SetMaxRetries ¶
func (m *OTeLMetricSummaryRow) SetMaxRetries(value int)
type PayloadType ¶
type PayloadType = int
const ( PushPayload PayloadType = iota InitializeSession PayloadType = iota IdentifySession PayloadType = iota AddTrackProperties PayloadType = iota // Deprecated: track events are now processed in pushPayload AddSessionProperties PayloadType = iota PushBackendPayload PayloadType = iota PushMetrics PayloadType = iota // Deprecated: use OTeL native metrics MarkBackendSetup PayloadType = iota // Deprecated: setup events are written from other payload processing AddSessionFeedback PayloadType = iota PushLogs PayloadType = iota // Deprecated: use a LogRowMessage with payload type PushLogsFlattened PushTraces PayloadType = iota HubSpotCreateContactForAdmin PayloadType = iota // Deprecated: noop HubSpotCreateCompanyForWorkspace PayloadType = iota // Deprecated: noop HubSpotUpdateContactProperty PayloadType = iota // Deprecated: noop HubSpotUpdateCompanyProperty PayloadType = iota // Deprecated: noop HubSpotCreateContactCompanyAssociation PayloadType = iota // Deprecated: noop SessionDataSync PayloadType = iota ErrorGroupDataSync PayloadType = iota ErrorObjectDataSync PayloadType = iota PushCompressedPayload PayloadType = iota PushLogsFlattened PayloadType = iota PushTracesFlattened PayloadType = iota PushSessionEvents PayloadType = iota PushOTeLMetricSum PayloadType = iota PushOTeLMetricHistogram PayloadType = iota PushOTeLMetricSummary PayloadType = iota HealthCheck PayloadType = math.MaxInt )
type PushBackendPayloadArgs ¶
type PushBackendPayloadArgs struct { ProjectVerboseID *string SessionSecureID *string Errors []*customModels.BackendErrorObjectInput }
type PushLogsArgs ¶
type PushLogsArgs struct {
LogRow *clickhouse.LogRow
}
type PushMetricsArgs ¶
type PushMetricsArgs struct { ProjectVerboseID *string SessionSecureID *string Metrics []*customModels.MetricInput }
type PushPayloadArgs ¶
type PushPayloadArgs struct { SessionSecureID string PayloadID *int Events customModels.ReplayEventsInput `json:"events"` Messages string `json:"messages"` Resources string `json:"resources"` WebSocketEvents *string `json:"web_socket_events"` Errors []*customModels.ErrorObjectInput `json:"errors"` IsBeacon *bool `json:"is_beacon"` HasSessionUnloaded *bool `json:"has_session_unloaded"` HighlightLogs *string `json:"highlight_logs"` }
type PushTracesArgs ¶
type PushTracesArgs struct {
TraceRow *clickhouse.TraceRow
}
type Queue ¶
type Queue struct { Topic string ConsumerGroup string MessageSizeBytes int64 Client *kafka.Client // contains filtered or unexported fields }
type RetryableMessage ¶
type RetryableMessage interface { GetType() PayloadType GetFailures() int SetFailures(value int) GetMaxRetries() int SetMaxRetries(value int) GetKafkaMessage() *kafka.Message SetKafkaMessage(value *kafka.Message) }
type SessionDataSyncArgs ¶
type SessionDataSyncArgs struct {
SessionID int
}
type SessionEventRowMessage ¶
type SessionEventRowMessage struct { Type PayloadType Failures int MaxRetries int KafkaMessage *kafka.Message `json:",omitempty"` *clickhouse.SessionEventRow }
func (*SessionEventRowMessage) GetFailures ¶
func (m *SessionEventRowMessage) GetFailures() int
func (*SessionEventRowMessage) GetKafkaMessage ¶
func (m *SessionEventRowMessage) GetKafkaMessage() *kafka.Message
func (*SessionEventRowMessage) GetMaxRetries ¶
func (m *SessionEventRowMessage) GetMaxRetries() int
func (*SessionEventRowMessage) GetType ¶
func (m *SessionEventRowMessage) GetType() PayloadType
func (*SessionEventRowMessage) SetFailures ¶
func (m *SessionEventRowMessage) SetFailures(value int)
func (*SessionEventRowMessage) SetKafkaMessage ¶
func (m *SessionEventRowMessage) SetKafkaMessage(value *kafka.Message)
func (*SessionEventRowMessage) SetMaxRetries ¶
func (m *SessionEventRowMessage) SetMaxRetries(value int)
type TopicType ¶
type TopicType string
const ( TopicTypeDefault TopicType = "default" TopicTypeBatched TopicType = "batched" TopicTypeDataSync TopicType = "datasync" TopicTypeTraces TopicType = "traces" TopicTypeMetricSum TopicType = "metric_sum" TopicTypeMetricHistogram TopicType = "metric_histogram" TopicTypeMetricSummary TopicType = "metric_summary" )
type TraceRowMessage ¶
type TraceRowMessage struct { Type PayloadType Failures int MaxRetries int KafkaMessage *kafka.Message `json:",omitempty"` *clickhouse.ClickhouseTraceRow }
func (*TraceRowMessage) GetFailures ¶
func (m *TraceRowMessage) GetFailures() int
func (*TraceRowMessage) GetKafkaMessage ¶
func (m *TraceRowMessage) GetKafkaMessage() *kafka.Message
func (*TraceRowMessage) GetMaxRetries ¶
func (m *TraceRowMessage) GetMaxRetries() int
func (*TraceRowMessage) GetType ¶
func (m *TraceRowMessage) GetType() PayloadType
func (*TraceRowMessage) SetFailures ¶
func (m *TraceRowMessage) SetFailures(value int)
func (*TraceRowMessage) SetKafkaMessage ¶
func (m *TraceRowMessage) SetKafkaMessage(value *kafka.Message)
func (*TraceRowMessage) SetMaxRetries ¶
func (m *TraceRowMessage) SetMaxRetries(value int)
Click to show internal directories.
Click to hide internal directories.