Documentation ¶
Overview ¶
Package channels implements some channels functions and utilities.
Index ¶
- func AbortOnErr(errs <-chan error, cancelParent func()) error
- func CombineFactories[I, T, O any](fa func() Operator[I, T], fb func() Operator[T, O]) func() Operator[I, O]
- func EndScope(errs ErrorPipe, err Error) error
- func ErrorScope(cancel func()) (ErrorPipe, Error)
- func ToFirst[T any](in <-chan T, predicate func(T) bool, cancelParent func()) (t T, ok bool)
- func ToFirstErr(errs chan error, cancelParent func()) <-chan error
- func ToSlice[T any](in <-chan T) []T
- func ToSliceParallel[T any](in <-chan T) <-chan []T
- func ToSlices[I, L any](i <-chan I, l <-chan L) ([]I, []L)
- type Clock
- type Error
- type ErrorPipe
- type ErrorSink
- type ErrorSource
- type FanInOperator
- type FanOutOperator
- type Operator
- func At[T any](index int, cancelParent func()) Operator[T, T]
- func AtWithDefault[T any](index int, deflt T, cancelParent func()) Operator[T, T]
- func Audit[T, D any](emit <-chan D) Operator[T, T]
- func Buffer[T, D any](emit <-chan D) Operator[T, []T]
- func BufferChan[T any](count int) Operator[T, T]
- func BufferCount[T any](count int) Operator[T, []T]
- func BufferTime[T any](tickerFactory func(time.Duration) Ticker, duration time.Duration) Operator[T, []T]
- func BufferTimeOrCount[T any](tickerFactory func(time.Duration) Ticker, duration time.Duration, maxCount int) Operator[T, []T]
- func BufferToggle[T, I1, I2 any](openings <-chan I1, closings <-chan I2) Operator[T, []T]
- func CollectErrs(cancel func()) Operator[Error, error]
- func Combine[I, T, O any](a Operator[I, T], b Operator[T, O]) Operator[I, O]
- func Concat[T any]() Operator[<-chan T, T]
- func ConcatMap[I, O any](project func(in I) <-chan O) Operator[I, O]
- func Count[D any]() Operator[D, int]
- func DefaultIfEmpty[T any](deflt T) Operator[T, T]
- func Distinct[T comparable]() Operator[T, T]
- func DistinctKey[T any, K comparable](keyer func(T any) K) Operator[T, T]
- func DistinctUntilChanged[T comparable]() Operator[T, T]
- func DistinctUntilChangedEqualer[T interface{ ... }]() Operator[T, T]
- func DistinctUntilChangedFunc[T any](equals func(T, T) bool) Operator[T, T]
- func DistinctUntilKeyChanged[T any, K comparable](key func(T) K) Operator[T, T]
- func Every[T any](predicate func(T) bool, cancelParent func()) Operator[T, bool]
- func ExhaustMap[I, O any](project func(in I) <-chan O) Operator[I, O]
- func Filter[T any](predicate func(T) bool) Operator[T, T]
- func FilterCancel[T any](predicate func(T) (emit, last bool), cancelParent func()) Operator[T, T]
- func FindIndex[T any](predicate func(T) bool, cancelParent func()) Operator[T, int]
- func First[T any](predicate func(T) bool, cancelParent func()) Operator[T, T]
- func IgnoreElements[D any]() Operator[D, D]
- func IsEmpty[D any](cancelParent func()) Operator[D, bool]
- func Last[T any](predicate func(T) bool) Operator[T, T]
- func Map[I, O any](project func(in I) O) Operator[I, O]
- func MapCancel[I, O any](project func(in I) (projected O, ok bool), cancelParent func()) Operator[I, O]
- func MapFilter[I, O any](project func(in I) (projected O, emit bool)) Operator[I, O]
- func MapFilterCancel[I, O any](project func(in I) (projected O, emit, ok bool), cancelParent func()) Operator[I, O]
- func MapFilterCancelTeardown[I, O any](project func(in I) (projected O, emit, ok bool), ...) Operator[I, O]
- func Max[T constraints.Ordered]() Operator[T, T]
- func Merge[T any]() Operator[<-chan T, T]
- func MergeMap[I, O any](maxParallelism int, project func(in I) <-chan O) Operator[I, O]
- func Min[T constraints.Ordered]() Operator[T, T]
- func PairWise[T any]() Operator[T, [2]T]
- func ParallelMap[I, O any](maxParallelism int, project func(in I) O) Operator[I, O]
- func ParallelMapCancel[I, O any](maxParallelism int, project func(context.Context, I) (o O, ok bool), ...) Operator[I, O]
- func ParallelMapStable[I, O any](maxParallelism, maxWindow int, project func(in I) O) Operator[I, O]
- func Reduce[I, O any](project func(accum O, in I) O, seed O) Operator[I, O]
- func ReduceAcc[I, O, A any](project func(accum A, in I) (newAccum A, out O), seed A) Operator[I, O]
- func ReduceFunc[I, O any](project func(accum O, in I) O, seed func(in I) O) Operator[I, O]
- func ReduceFuncAcc[I, O, A any](project func(accum A, in I) (A, O), seed func(in I) (A, O)) Operator[I, O]
- func Sample[T, D any](emit <-chan D) Operator[T, T]
- func Scan[I, O any](project func(accum O, cur I) O, seed O) Operator[I, O]
- func ScanAccum[I, O, A any](project func(accum A, cur I) (nextAccum A, o O), seed A) Operator[I, O]
- func ScanPreamble[I, O, A any](project func(accum A, in I) (A, O), seedFn func(in I) (A, O)) Operator[I, O]
- func Skip[T any](count int) Operator[T, T]
- func SkipLast[T any](count int) Operator[T, T]
- func StartWith[T any](initial T) Operator[T, T]
- func SwitchMap[I, O any](ctx context.Context, project func(ctx context.Context, in I) <-chan O) Operator[I, O]
- func Take[T any](count int, cancelParent func()) Operator[T, T]
- func TakeUntil[T, D any](emit <-chan D, cancelParent func()) Operator[T, T]
- func Tap[T any](observer func(T)) Operator[T, T]
- func Teardown[T any](deferred func(last T, emitted bool) (additional T, emitAdditional bool)) Operator[T, T]
- func Timeout[T any](c Clock, maxDuration time.Duration, cancelParent func()) Operator[T, T]
- func Window[T, D any](emit <-chan D) Operator[T, <-chan T]
- func WindowCount[T any](count int) Operator[T, <-chan T]
- func WithLatestFrom[I, L any](other <-chan L, cancelOther func()) Operator[I, Pair[I, L]]
- type Pair
- type ParallelOperator
- type RealClock
- type RealTicker
- type SinkOperator
- type SourceOperator
- func FromFileLines(path string, errs ErrorSink) SourceOperator[string]
- func FromFunc[T any](generator func(index int) (t T, ok bool)) SourceOperator[T]
- func FromRange(start, end int) SourceOperator[int]
- func FromReaderLines(r io.Reader, errs chan<- error) SourceOperator[string]
- func FromSlice[T any](s []T) SourceOperator[T]
- func FromTicker(ctx context.Context, tickerFactory func(time.Duration) Ticker, ...) SourceOperator[time.Time]
- type SplitOperator
- type Ticker
- type Triplet
- type ZipOperator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AbortOnErr ¶
AbortOnErr returns the first non-nil error encountered (cancelling the parent) and discards the others. If no error is encountered, it returns nil.
func CombineFactories ¶
func CombineFactories[I, T, O any](fa func() Operator[I, T], fb func() Operator[T, O]) func() Operator[I, O]
CombineFactories is like Combine, but it works on operator constructors instead of operators.
func ErrorScope ¶
func ErrorScope(cancel func()) (ErrorPipe, Error)
ErrorScope is a helper to simplify error handling.
func ToFirstErr ¶
ToFirstErr emits the first non-nil error.
func ToSliceParallel ¶
func ToSliceParallel[T any](in <-chan T) <-chan []T
ToSliceParallel is like ToSlice, but emits the collected slice when the input is closed.
Types ¶
type ErrorSource ¶
type ErrorSource = <-chan Error
type FanInOperator ¶
type FanInOperator[I, O any] func(...<-chan I) <-chan O
type FanOutOperator ¶
type FanOutOperator[I, O any] func(<-chan I) []<-chan O
type Operator ¶
type Operator[I, O any] func(<-chan I) <-chan O
func AtWithDefault ¶
AtWithDefault is like At but if the input concludes too early a default value is emitted.
func Audit ¶
func Audit[T, D any](emit <-chan D) Operator[T, T]
Audit can be used to observe the last emitted value for the input emitter. When emit emits, the most recent value for the input will be forwarded to the output.
Note that this might cause repeated items to be emitted, for example if emit fires twice between input emissions. If this behavior is not desired, please use Sample instead.
func Buffer ¶
func Buffer[T, D any](emit <-chan D) Operator[T, []T]
Buffer buffers elements from the input until the emitter emits, emitting a slice of the values seen between two emissions (or the start and the first emission).
Emitted slices preserve the order of receiving.
If the emitter emits when the buffer is empty, nothing is emitted.
If the emitter or the input are closed when there are still elements in the buffer, those elements are emitted immediately and the output is closed.
func BufferChan ¶
BufferChan returns a buffered channel that copies the input emissions. If a non-positive count is passed an arbitrary value is used.
func BufferCount ¶
BufferCount collects elements from the input and emits them in batches of length equal to count. If there are still elements in the buffer when the input is closed, one last, shorter, batch is emitted. Any count value smaller than 1 is treated as 1.
func BufferTime ¶
func BufferTime[T any](tickerFactory func(time.Duration) Ticker, duration time.Duration) Operator[T, []T]
BufferTime is like Buffer, but it uses a ticker to decide when to emit. If tickerFactory is nil, the real time is used.
func BufferTimeOrCount ¶
func BufferTimeOrCount[T any](tickerFactory func(time.Duration) Ticker, duration time.Duration, maxCount int) Operator[T, []T]
BufferTimeOrCount buffers input for at most maxCount elements and at most duration time. If tickerFactory is nil, the real time is used.
func BufferToggle ¶
func BufferToggle[T, I1, I2 any](openings <-chan I1, closings <-chan I2) Operator[T, []T]
BufferToggle buffers elements between an emission of openings and an emission of closings. Two consecutive emissions of openings or closings are ignored. If the buffer is open when any of the inputs ends the leftovers are emitted without an emission of closings. Elements between an emission of closings and an emission of openings are ignored.
func CollectErrs ¶
func CollectErrs(cancel func()) Operator[Error, error]
CollectErrs processes all the given errors and returns a collection of them, or nil. If a cancel func is passed it will be called on the first non-nil error.
func Combine ¶
func Combine[I, T, O any](a Operator[I, T], b Operator[T, O]) Operator[I, O]
Combine returns an operator that is the combination of the two input ones. When using Combine keep in mind that every operator usually spawns at least one goroutine, so very long combination chains might end up being expensive to run.
Also consider that re-using operators might cause unexpected behaviors as some operators preserve state, so for these cases consider CombineFactories or re-calling Combine with freshly built operators every time the combined result is needed.
func Concat ¶
func Concat[T any]() Operator[<-chan T, T]
Concat emits all values from all inner inputs, one after the other, exhausting the previous ones before consuming the next.
func ConcatMap ¶
func ConcatMap[I, O any](project func(in I) <-chan O) Operator[I, O]
ConcatMap calls Map(project) and then concats the resulting emitters to a single one.
func DefaultIfEmpty ¶
func DefaultIfEmpty[T any](deflt T) Operator[T, T]
DefaultIfEmpty emits the default value if and only if the input emitter did not emit before being closed.
func Distinct ¶
func Distinct[T comparable]() Operator[T, T]
Distinct forwards the input to the output for values that have not been seen yet. Note: this needs to keep a set of all seen values in memory, so it might use a lot of memory.
func DistinctKey ¶
func DistinctKey[T any, K comparable](keyer func(T any) K) Operator[T, T]
DistinctKey is like Distinct but it accepts a keyer function to compare elements.
func DistinctUntilChanged ¶
func DistinctUntilChanged[T comparable]() Operator[T, T]
DistinctUntilChanged is like DistinctUntilChangedFunc but it works on comparable items.
func DistinctUntilChangedEqualer ¶
func DistinctUntilChangedEqualer[T interface{ Equal(T) bool }]() Operator[T, T]
DistinctUntilChangedEqualer is like DistinctUntilChangedFunc, but it compares items that have a Equal(T)bool method on them.
func DistinctUntilChangedFunc ¶
DistinctUntilChangedFunc forwards the input to the output, with the exception of identical consecutive values, which are discarded. In other words this behaves like slices.Compact, but for channels.
func DistinctUntilKeyChanged ¶
func DistinctUntilKeyChanged[T any, K comparable](key func(T) K) Operator[T, T]
DistinctUntilKeyChanged is like DistinctUntilChangedFunc but uses a key function to match consecutive values.
func Every ¶
Every returns whether all values emitted by the input match the predicate. If no value is emitted, it returns true. Every cancels and discard the parent as soon as the first non-matching value is encountered.
func ExhaustMap ¶
func ExhaustMap[I, O any](project func(in I) <-chan O) Operator[I, O]
ExhaustMap project inputs to inner emitters and forwards all inner emissions to the output.
If the input emits while an inner emitter is being consumed, the value is discarded. In summary: this operator prioritizes inputs from the projected emitters over the input, making the input lossy.
This is like SwitchMap, but the inner emitters are prioritized.
func Filter ¶
Filter conditionally forwards inputs to the output. Values that the predicate returns true for are forwarded.
func FilterCancel ¶
FilterCancel is like filter, but it can report when the last emission happens, which will cause the output channel to be closed and the input to be drained.
func FindIndex ¶
FindIndex emits the index for the first input that matches the predicate and it ends.
func First ¶
First emits the first item that the predicate returns true for and exits. If predicate is nil, the first element from the input is forwarded to the output.
func IgnoreElements ¶
func IgnoreElements[D any]() Operator[D, D]
IgnoreElements never emits, and it concludes when the input ends.
func Last ¶
Last only emits the last element from the input that the predicate retuned true for (if any) when the input is closed. If the predicate is nil, the last element is emitted.
func MapCancel ¶
func MapCancel[I, O any](project func(in I) (projected O, ok bool), cancelParent func()) Operator[I, O]
MapCancel projects inputs to outputs until the first not-ok value is reached, it then cancels the parent and drains the input. The not-ok value returned is discarded.
func MapFilter ¶
MapFilter conditionally projects inputs to outputs. If the project func returns false as a second return value, that emission is skipped.
func MapFilterCancel ¶
func MapFilterCancel[I, O any](project func(in I) (projected O, emit, ok bool), cancelParent func()) Operator[I, O]
MapFilterCancel conditionally projects inputs to outputs until the first non-ok value is reached, it then cancels the parent and drains the input. It is like a combination of MapFilter and MapCancel.
func MapFilterCancelTeardown ¶
func MapFilterCancelTeardown[I, O any]( project func(in I) (projected O, emit, ok bool), teardown func(last I, emitted bool) (lastItem O, shouldEmitLast bool), cancelParent func(), ) Operator[I, O]
MapFilterCancelTeardown conditionally projects inputs to outputs until the last value is reached, it then cancels the parent, drains the input and runs the teardown routine. It is like a combination of MapFilter, MapCancel and Teardown.
func Merge ¶
func Merge[T any]() Operator[<-chan T, T]
Merge concurrently reads all inner inputs and emits them. Oder is not preserved.
func MergeMap ¶
MergeMap projects the input emissions to inner emitters that are then merged in the output. Order of the output is not guaranteed to be related to the order of the input.
func PairWise ¶
func PairWise[T any]() Operator[T, [2]T]
PairWise emits couples of subcessive emissions. This means all values are emitted twice, once as the second item of the pair, and then as the first item of the pair (the oldest is the one at index 0, as if it was a sliding window of length 2. The only exceptions are the first and last items, which will only be emitted once.
func ParallelMap ¶
ParallelMap is like Map but runs the project function in parallel. Output order is not guaranteed to be related to input order. If maxParallelism is 0 or negative, the number of CPU is used.
func ParallelMapCancel ¶
func ParallelMapCancel[I, O any](maxParallelism int, project func(context.Context, I) (o O, ok bool), cancelParent func()) Operator[I, O]
ParallelMapCancel is like ParallelMap, but allows the project function to abort operations.
func ParallelMapStable ¶
func ParallelMapStable[I, O any](maxParallelism, maxWindow int, project func(in I) O) Operator[I, O]
ParallelMapStable is like ParallelMap, but it guarantees that the output is in the same order of the input. The maxWindow parameter is used to determine how large the sorting window should be. In other words: if the input at index N is still processing when the item at index N+maxWindow is read, ParallelMapStable will wait for item N to be done processing. Using a maxWindow smaller than maxParallelism is not useful as the maxParallelism will never be achieved. If maxParallelism is 0 or negative the number of CPUs is used instead. If maxWindow is 0 or negative a default is used.
func Reduce ¶
func Reduce[I, O any](project func(accum O, in I) O, seed O) Operator[I, O]
Reduce is like ReduceAcc, but the accumulator is used as the output value.
func ReduceAcc ¶
func ReduceAcc[I, O, A any](project func(accum A, in I) (newAccum A, out O), seed A) Operator[I, O]
ReduceAcc scans the input calling the project function with the previous value for the accumulator and the current value for the input. Once the input is closed, it emits the last output value, or nothing if the input never emitted.
func ReduceFunc ¶
func ReduceFunc[I, O any](project func(accum O, in I) O, seed func(in I) O) Operator[I, O]
ReduceFunc calls the seed func to compute the first accumulator value, then scans the input subcessively calling project. Once the input is closed, it emits the last output value, or nothing if the input never emitted.
func ReduceFuncAcc ¶
func ReduceFuncAcc[I, O, A any](project func(accum A, in I) (A, O), seed func(in I) (A, O)) Operator[I, O]
ReduceFuncAcc calls the seed func to compute the first accumulator value, then scans the input subcessively calling project. Once the input is closed, it emits the last output value, or nothing if the input never emitted.
func Sample ¶
func Sample[T, D any](emit <-chan D) Operator[T, T]
Sample emits the latest value from the input when the provided emitter emits. If no emission from the input has happened between consecutive requests to emit, they are ignored. In other words, no input elements are emitted more than once. If this behavior is not desired, please use Audit instead.
func Scan ¶
func Scan[I, O any](project func(accum O, cur I) O, seed O) Operator[I, O]
Scan is like Map, but the project function is called with the the last value emitted and the current value from the input. The first iteraion will be called with seed.
func ScanAccum ¶
func ScanAccum[I, O, A any](project func(accum A, cur I) (nextAccum A, o O), seed A) Operator[I, O]
ScanAccum is like Scan, but the accumulator is kept separate from the return value, and they can be of different types.
func ScanPreamble ¶
func ScanPreamble[I, O, A any](project func(accum A, in I) (A, O), seedFn func(in I) (A, O)) Operator[I, O]
ScanPreamble is like ScanAccum, but instead of taking a seed value it computes the seed and the first emission by calling seedFn func on the first input value.
func Skip ¶
Skip discards the first count emissions from the input, then it becomes the copy of the input.
func SkipLast ¶
SkipLast skips the last count items from the input. Since it needs to keep a buffer, SkipLast will wait until count items are emitted or the input is closed before emitting the first value, and then it becomes a delayed copy of the input that will not emit the last count items.
func StartWith ¶
func StartWith[T any](initial T) Operator[T, T]
StartWith emits the initial value and then it becomes a copy of the input.
func SwitchMap ¶
func SwitchMap[I, O any](ctx context.Context, project func(ctx context.Context, in I) <-chan O) Operator[I, O]
SwitchMap projects the inputs to inner emitters and copies them to the output until the input emits a new value. When a new input value comes in, the last inner emitter's context is cancelled and its values are discarded in favor of the ones emitted by the most recent inner emitter. This is like ExhaustMap but the input is prioritized.
func TakeUntil ¶
func TakeUntil[T, D any](emit <-chan D, cancelParent func()) Operator[T, T]
TakeUntil forwards the input to the output until the first emission (or closure) of emit, then it ends.
func Tap ¶
func Tap[T any](observer func(T)) Operator[T, T]
Tap calls the observer for every value emitted by the input and forwards that value to the output once observer returns.
func Teardown ¶
func Teardown[T any](deferred func(last T, emitted bool) (additional T, emitAdditional bool)) Operator[T, T]
Teardown returns an emitter that is the copy of the input, but it calls deferred at the end of the input with the last emitted value, or with the zero value and emitted set to false. The deferred func can optionally return a value to emit as a last value.
func Timeout ¶
Timeout creates a copy of the input unless the input takes more than maxDuration to emit the first value, in which case the output is closed.
func Window ¶
func Window[T, D any](emit <-chan D) Operator[T, <-chan T]
Window maps the input to a series of emitters that just copy the input values. Every time emit emits a new inner emitter that copies the input values is emitted.
In other words this is like buffer, but instead of emitting slices of values it emits emitters of these values.
Consecutive emissions that emit no values do not cause empty emitters to be generated.
func WindowCount ¶
WindowCount is like Window but it starts a new inner emitter every count emissions received from the input. The last inner emitter might emit less than count items.
func WithLatestFrom ¶
func WithLatestFrom[I, L any](other <-chan L, cancelOther func()) Operator[I, Pair[I, L]]
WithLatestFrom starts emitting when both the input and the other emitter have emitted at least once. It then emits every time the input emits, pairing that value with the latest coming from the other one. If the other emitter is closed, its last emitted value will still be used until the input is closed. If the other emitter never emits, the returned emitter will also never emit and end when the input ends.
type ParallelOperator ¶
type ParallelOperator[I, O any] func([]<-chan I) []<-chan O
type RealTicker ¶
type RealTicker struct {
// contains filtered or unexported fields
}
func (RealTicker) Chan ¶
func (r RealTicker) Chan() <-chan time.Time
func (RealTicker) Stop ¶
func (r RealTicker) Stop()
type SinkOperator ¶
type SinkOperator[T any] func(<-chan T)
func Collect ¶
Collect calls the consume func once per every input item. If the consume function returns false, it cancels the parent, discards the input and exits.
func Discard ¶
func Discard[D any]() SinkOperator[D]
Discard spawns a goroutine that consumes the input and returns immediately. If a nil input is passed, it's ignored.
func ToFileLines ¶
ToFileLines truncates or creates the file at the specified path and writes to it all inputs as lines (using fmt.Fprintln).
type SourceOperator ¶
type SourceOperator[T any] func() <-chan T
func FromFileLines ¶
func FromFileLines(path string, errs ErrorSink) SourceOperator[string]
FromFileLines is like FromReaderLines but for files.
func FromFunc ¶
func FromFunc[T any](generator func(index int) (t T, ok bool)) SourceOperator[T]
FromFunc uses the provided generator as a source for data, and emits all values until ok becomes false (the element returned when ok is false is not emitted).
func FromRange ¶
func FromRange(start, end int) SourceOperator[int]
FromRange emits all values between start (included) and end (excluded).
func FromReaderLines ¶
func FromReaderLines(r io.Reader, errs chan<- error) SourceOperator[string]
FromReaderLines emits all of the lines of the reader. If an error is encountered it's emitted on the errs chan, otherwise a nil one is emitted when the read is finished.
func FromSlice ¶
func FromSlice[T any](s []T) SourceOperator[T]
FromSlice emits all values in s and ends.
func FromTicker ¶
func FromTicker(ctx context.Context, tickerFactory func(time.Duration) Ticker, duration time.Duration, max int) SourceOperator[time.Time]
FromTicker emits the current time at most max times, separated by a wait of duration. If the context is cancelled it stops. If tickerFactory is nil, the real time is used.
type SplitOperator ¶
type SplitOperator[I, O, P any] func(<-chan I) (<-chan O, <-chan P)
type ZipOperator ¶
type ZipOperator[A, B, O any] func(<-chan A, <-chan B) <-chan O
func CombineLatest ¶
func CombineLatest[A, B any]() ZipOperator[A, B, Pair[A, B]]
CombineLatest emits pairs of the last values emitted by the given inputs.
That means that if input a emits twice and input b emits once, the two subsequent emitted pairs will have the same value for B, but different ones for A.
Emissions only start after both inputs have emitted at least once, all emissions before then for the input that emitted first are discarded except for the last value.
CombineLatest ends when both inputs end.
func Zip ¶
func Zip[A, B any](cancelA, cancelB func()) ZipOperator[A, B, Pair[A, B]]
Zip emits pairs of values from the inputs. Once a value is received, it waits for the other input to emit one before reading more from the same.
In other words: every value from both inputs is guaranteed to be emitted exactly once in exactly one pair until one input is closed.
If one input is closed, the other is cancelled and discarded.
If both inputs have the same length, the second one to be closed might get cancelled right after its last emission, which should be a no-op.