generator

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 10 Imported by: 1

README

Conduit Connector Generator

The generator connector is one of Conduit builtin plugins. It generates sample records using its source connector. It has no destination and trying to use that will result in an error.

How to build it

Run make.

Testing

Run make test to run all the unit tests.

Configuration

[!IMPORTANT] Parameters starting with collections.* are used to configure the format and operations for a specific collection. The * in the parameter name should be replaced with the collection name.

Below is a list of all available configuration parameters:

Name Type Default Description

burst.generateTime

duration

1s

The amount of time the generator is generating records in a burst. Has an effect only if burst.sleepTime is set.

burst.sleepTime

duration

The time the generator "sleeps" between bursts.

collections.*.format.options.*

string

The options for the raw and structured format types. It accepts pairs of field names and field types, where the type can be one of: int, string, time, bool, duration.

collections.*.format.options.path

string

Path to the input file (only applicable if the format type is file).

collections.*.format.type

string

The format of the generated payload data (raw, structured, file).

collections.*.operations

string

create

Comma separated list of record operations to generate. Allowed values are "create", "update", "delete", "snapshot".

format.options.*

string

The options for the raw and structured format types. It accepts pairs of field names and field types, where the type can be one of: int, string, time, bool, duration.

format.options.path

string

Path to the input file (only applicable if the format type is file).

format.type

string

The format of the generated payload data (raw, structured, file).

operations

string

create

Comma separated list of record operations to generate. Allowed values are "create", "update", "delete", "snapshot".

rate

float

The maximum rate in records per second, at which records are generated (0 means no rate limit).

recordCount

int

Number of records to be generated (0 means infinite).

Examples
Bursts

The following configuration generates 100 records in bursts of 10 records each, with a 1 second sleep time between bursts.

[!NOTE] The generator currently has no concept of resuming work. For instance, below we have configured it to generate 100 records, but if we restart the pipeline (by stopping and starting the pipeline or by restarting Conduit), then it will start generating the 100 records from scratch.

version: 2.2
pipelines:
  - id: example
    status: running
    connectors:
      - id: example
        type: source
        plugin: generator
        settings:
          # global settings
          rate: 10
          recordCount: 100
          burst.generateTime: 1s
          burst.sleepTime: 1s
          # default collection
          format.type: structured
          format.options.id: int
          format.options.name: string
          operations: create
Collections

The following configuration generates records forever with a steady rate of 1000 records per second. Records are generated in the users and orders collections. The generated records have a different format, depending on the collection they belong to.

version: 2.2
pipelines:
  - id: example
    status: running
    connectors:
      - id: example
        type: source
        plugin: generator
        settings:
          # global settings
          rate: 1000
          # collection "users"
          collections.users.format.type: structured
          collections.users.format.options.id: int
          collections.users.format.options.name: string
          collections.users.operations: create
          # collection "orders"
          collections.orders.format.type: raw
          collections.orders.format.options.id: int
          collections.orders.format.options.product: string
          collections.orders.operations: create,update,delete

Documentation

Index

Constants

View Source
const (
	FormatTypeRaw        = "raw"
	FormatTypeStructured = "structured"
	FormatTypeFile       = "file"
)
View Source
const (
	ConfigBurstGenerateTime            = "burst.generateTime"
	ConfigBurstSleepTime               = "burst.sleepTime"
	ConfigCollectionsFormatOptions     = "collections.*.format.options.*"
	ConfigCollectionsFormatOptionsPath = "collections.*.format.options.path"
	ConfigCollectionsFormatType        = "collections.*.format.type"
	ConfigCollectionsOperations        = "collections.*.operations"
	ConfigFormatOptions                = "format.options.*"
	ConfigFormatOptionsPath            = "format.options.path"
	ConfigFormatType                   = "format.type"
	ConfigOperations                   = "operations"
	ConfigRate                         = "rate"
	ConfigReadTime                     = "readTime"
	ConfigRecordCount                  = "recordCount"
)

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: Specification,
	NewSource:        NewSource,
	NewDestination:   nil,
}

Functions

func NewSource

func NewSource() sdk.Source

func Specification

func Specification() sdk.Specification

Specification returns the Plugin's Specification.

Types

type BurstConfig added in v0.6.0

type BurstConfig struct {
	// The time the generator "sleeps" between bursts.
	SleepTime time.Duration `json:"sleepTime"`
	// The amount of time the generator is generating records in a burst. Has an
	// effect only if `burst.sleepTime` is set.
	GenerateTime time.Duration `json:"generateTime" default:"1s"`
}

type CollectionConfig added in v0.6.0

type CollectionConfig struct {
	// Comma separated list of record operations to generate. Allowed values are
	// "create", "update", "delete", "snapshot".
	Operations []string     `json:"operations" default:"create" validate:"required"`
	Format     FormatConfig `json:"format"`
}

func (CollectionConfig) SdkOperations added in v0.6.0

func (c CollectionConfig) SdkOperations() []opencdc.Operation

func (CollectionConfig) Validate added in v0.6.0

func (c CollectionConfig) Validate() error

type Config

type Config struct {
	Burst BurstConfig `json:"burst"`
	// Number of records to be generated (0 means infinite).
	RecordCount int `json:"recordCount" validate:"gt=-1"`
	// The time it takes to 'read' a record.
	// Deprecated: use `rate` instead.
	ReadTime time.Duration `json:"readTime"`
	// The maximum rate in records per second, at which records are generated (0
	// means no rate limit).
	Rate float64 `json:"rate"`

	// Configuration for default collection (i.e. records without a collection).
	// Kept for backwards compatibility.
	CollectionConfig
	Collections map[string]CollectionConfig `json:"collections"`
}

func (Config) GetCollectionConfigs added in v0.6.0

func (c Config) GetCollectionConfigs() map[string]CollectionConfig

func (Config) Parameters added in v0.6.0

func (Config) Parameters() map[string]config.Parameter

func (Config) RateLimit added in v0.6.0

func (c Config) RateLimit() rate.Limit

func (Config) Validate added in v0.6.0

func (c Config) Validate() error

type FormatConfig added in v0.6.0

type FormatConfig struct {
	// The format of the generated payload data (raw, structured, file).
	Type string `json:"type" validate:"inclusion=raw|structured|file"`
	// The options for the `raw` and `structured` format types. It accepts pairs
	// of field names and field types, where the type can be one of: `int`, `string`, `time`, `bool`, `duration`.
	Options map[string]string `json:"options"`
	// Path to the input file (only applicable if the format type is `file`).
	FileOptionsPath string `json:"options.path"`
}

func (FormatConfig) Validate added in v0.6.0

func (c FormatConfig) Validate() error

type Source

type Source struct {
	sdk.UnimplementedSource
	// contains filtered or unexported fields
}

Source connector

func (*Source) Ack

func (s *Source) Ack(ctx context.Context, position opencdc.Position) error

func (*Source) Configure

func (s *Source) Configure(ctx context.Context, cfg config.Config) error

func (*Source) Open

func (s *Source) Open(_ context.Context, _ opencdc.Position) error

func (*Source) Parameters added in v0.3.0

func (s *Source) Parameters() config.Parameters

func (*Source) Read

func (s *Source) Read(ctx context.Context) (opencdc.Record, error)

func (*Source) Teardown

func (s *Source) Teardown(_ context.Context) error

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL