Documentation ¶
Index ¶
- Variables
- func Logger(ctx context.Context) *zerolog.Logger
- func ParseConfig(ctx context.Context, cfg map[string]string, target any, ...) error
- type ErrorRecord
- type FilterRecord
- type ProcessedRecord
- type Processor
- type ProcessorFunc
- type Reference
- type ReferenceResolver
- type SingleRecord
- type Specification
- type UnimplementedProcessor
- func (UnimplementedProcessor) Configure(context.Context, map[string]string) error
- func (UnimplementedProcessor) Open(context.Context) error
- func (UnimplementedProcessor) Process(context.Context, []opencdc.Record) []ProcessedRecord
- func (UnimplementedProcessor) Specification() (Specification, error)
- func (UnimplementedProcessor) Teardown(context.Context) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnimplemented is returned in functions of plugins that don't implement // a certain method. ErrUnimplemented = errors.New("the processor plugin does not implement " + "this action, please check the source code of the processor and make sure " + "all required processor methods are implemented") ErrFilterRecord = errors.New("filter out this record") )
Functions ¶
func Logger ¶
Logger returns the logger for the current context. Please provide the context that is passed to any of the processor's methods (Configure, Open, Process, Teardown).
func ParseConfig ¶
func ParseConfig( ctx context.Context, cfg map[string]string, target any, params config.Parameters, ) error
ParseConfig sanitizes the configuration, applies defaults, validates it and copies the values into the target object. It combines the functionality provided by github.com/conduitio/conduit-commons/config.Config into a single convenient function. It is intended to be used in the Configure method of a processor to parse the configuration map.
The function does the following:
- Removes leading and trailing spaces from all keys and values in the configuration.
- Applies the default values defined in the parameter specifications to the configuration.
- Validates the configuration by checking for unrecognized parameters, type validations, and value validations.
- Copies configuration values into the target object. The target object must be a pointer to a struct.
Example ¶
cfg := map[string]string{ "foo": "bar ", // will be sanitized // "bar" is missing, will be set to the default value "nested.baz": "1m", } params := config.Parameters{ "foo": config.Parameter{Type: config.ParameterTypeString}, "bar": config.Parameter{ Type: config.ParameterTypeInt, Default: "42", }, "nested.baz": config.Parameter{Type: config.ParameterTypeDuration}, } var target struct { Foo string `json:"foo"` Bar int `json:"bar"` Nested struct { Baz time.Duration `json:"baz"` } `json:"nested"` } err := ParseConfig(context.Background(), cfg, &target, params) if err != nil { panic(err) } fmt.Printf("%+v", target)
Output: {Foo:bar Bar:42 Nested:{Baz:1m0s}}
Types ¶
type ErrorRecord ¶
type ErrorRecord struct { // Error is the error cause. Error error }
ErrorRecord is a record that failed to be processed and will be nacked.
type FilterRecord ¶
type FilterRecord struct{}
FilterRecord is a record that will be acked and filtered out of the pipeline.
type ProcessedRecord ¶
type ProcessedRecord interface {
// contains filtered or unexported methods
}
ProcessedRecord is a record returned by the processor.
type Processor ¶
type Processor interface { // Specification contains the metadata of this processor like name, version, // description and a list of parameters expected in the configuration. Specification() (Specification, error) // Configure is the first function to be called in a processor. It provides the // processor with the configuration that needs to be validated and stored. // In case the configuration is not valid it should return an error. // Configure should not open connections or any other resources. It should solely // focus on parsing and validating the configuration itself. Configure(context.Context, map[string]string) error // Open is called after Configure to signal the processor it can prepare to // start writing records. If needed, the processor should open connections and // start background jobs in this function. Open(context.Context) error // Process takes a number of records and processes them right away. // It should return a slice of ProcessedRecord that matches the length of // the input slice. If an error occurred while processing a specific record // it should be reflected in the ProcessedRecord with the same index as the // input record that caused the error. // Process should be idempotent, as it may be called multiple times with the // same records (e.g. after a restart when records were not flushed). Process(context.Context, []opencdc.Record) []ProcessedRecord // Teardown signals to the processor that the pipeline is shutting down and // there will be no more calls to any other function. After Teardown returns, // the processor will be discarded. Teardown(context.Context) error // contains filtered or unexported methods }
Processor receives records, manipulates them and returns back the processed records.
type ProcessorFunc ¶
type ProcessorFunc struct { UnimplementedProcessor // contains filtered or unexported fields }
ProcessorFunc is an adapter allowing use of a function as a Processor.
func NewProcessorFunc ¶
func NewProcessorFunc(specs Specification, f func(context.Context, opencdc.Record) (opencdc.Record, error)) ProcessorFunc
NewProcessorFunc creates a ProcessorFunc from a function and specifications. This is useful for creating simple processors without needing to implement the full Processor interface.
func (ProcessorFunc) Process ¶
func (f ProcessorFunc) Process(ctx context.Context, records []opencdc.Record) []ProcessedRecord
func (ProcessorFunc) Specification ¶
func (f ProcessorFunc) Specification() (Specification, error)
type Reference ¶
Reference is an interface that represents a reference to a field in a record. It can be used to get and set the value of the field dynamically using input provided by the user.
type ReferenceResolver ¶
ReferenceResolver is a type that knows how to resolve a reference to a field in a record. It is used to specify the target of a processor's output.
Example (Nested) ¶
rec := opencdc.Record{ Key: opencdc.StructuredData{ "foo": map[string]any{ "bar": "baz", }, }, } resolver, err := NewReferenceResolver(".Key.foo.bar") if err != nil { panic(err) } ref, err := resolver.Resolve(&rec) if err != nil { panic(err) } fmt.Println("ref value:", ref.Get()) fmt.Println("setting the field now ...") err = ref.Set("qux") if err != nil { panic(err) } fmt.Println("new value:", rec.Key)
Output: ref value: baz setting the field now ... new value: map[foo:map[bar:qux]]
Example (SetNonExistingField) ¶
rec := opencdc.Record{} // empty record resolver, err := NewReferenceResolver(".Payload.After.foo.bar") if err != nil { panic(err) } ref, err := resolver.Resolve(&rec) if err != nil { panic(err) } fmt.Println("ref value:", ref.Get()) fmt.Println("setting the field now ...") err = ref.Set("hello") if err != nil { panic(err) } fmt.Println("new value:", rec.Payload.After)
Output: ref value: <nil> setting the field now ... new value: map[foo:map[bar:hello]]
Example (Simple) ¶
rec := opencdc.Record{ Position: []byte("my position"), } resolver, err := NewReferenceResolver(".Position") if err != nil { panic(err) } ref, err := resolver.Resolve(&rec) if err != nil { panic(err) } fmt.Println("ref value:", ref.Get()) fmt.Println("setting the position is not allowed, let's try it") err = ref.Set("foo") fmt.Println(err)
Output: ref value: my position setting the position is not allowed, let's try it cannot set .Position: cannot set immutable reference
func NewReferenceResolver ¶
func NewReferenceResolver(input string) (ReferenceResolver, error)
NewReferenceResolver creates a new reference resolver from the input string. The input string is a reference to a field in a record. It can be a simple field name or a path to a nested field. The returned resolver can be used to resolve a reference to the specified field in a record and manipulate that field (get or set the value).
Examples of valid references include:
- .Position
- .Operation
- .Key
- .Metadata.foo (to access a simple metadata value)
- .Metadata["f.o.123"] (to access a metadata value via a key containing non-alpha-numeric characters)
- .Payload.Before.foo (to access a nested field in payload before)
- .Payload.After["1"]["2"] (to access nested fields in payload after containing non-alpha-numeric characters)
func (ReferenceResolver) Resolve ¶
func (r ReferenceResolver) Resolve(rec *opencdc.Record) (Reference, error)
Resolve resolves the reference to a field in the record. If the reference cannot be resolved an error is returned. If the reference is valid but the field does not exist in the record, the field will be created. The returned reference can be used to set the value of the field.
type SingleRecord ¶
SingleRecord is a single processed record that will continue down the pipeline.
type Specification ¶
type Specification struct { // Name is the name of the processor. Name string `json:"name"` // Summary is a brief description of the processor and what it does. Summary string `json:"summary"` // Description is a more long form area appropriate for README-like text // that the author can provide for documentation about the specified // Parameters. Description string `json:"description"` // Version string. Should be a semver prepended with `v`, e.g. `v1.54.3`. Version string `json:"version"` // Author declares the entity that created or maintains this processor. Author string `json:"author"` // Parameters describe how to configure the processor. Parameters config.Parameters `json:"parameters"` }
Specification is returned by a processor when Specify is called. It contains information about the configuration parameters for processors and allows them to describe their parameters.
type UnimplementedProcessor ¶
type UnimplementedProcessor struct{}
UnimplementedProcessor should be embedded to have forward compatible implementations.
func (UnimplementedProcessor) Configure ¶
Configure is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) Open ¶
func (UnimplementedProcessor) Open(context.Context) error
Open is optional and can be overridden in the actual implementation.
func (UnimplementedProcessor) Process ¶
func (UnimplementedProcessor) Process(context.Context, []opencdc.Record) []ProcessedRecord
Process needs to be overridden in the actual implementation.
func (UnimplementedProcessor) Specification ¶
func (UnimplementedProcessor) Specification() (Specification, error)
Specification needs to be overridden in the actual implementation.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package conduit provides the functionality for Conduit to set up and run built-in processors.
|
Package conduit provides the functionality for Conduit to set up and run built-in processors. |
proto
|
|
Package wasm provides the functionality for communicating with Conduit as a standalone plugin.
|
Package wasm provides the functionality for communicating with Conduit as a standalone plugin. |