iter

package
v0.0.0-...-1ce6c9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2024 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Cartesian

func Cartesian[T, U any](ch1 chan T, ch2 chan U) chan pair.Pair[T, U]

Cartesian 生成笛卡尔积 参数:

  • ch1: 一个通道,通道中的值是类型为 T 的数据。
  • ch2: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 一个通道,通道中的值是类型为 array.Pair[T, T] 的数据,表示笛卡尔积。

注意: 由于需要对收集第二个通道的数据,因此可以将较少数据的通道传递给第二个通道。 如果第二个通道数据很多,要考虑内存占用问题。

func Collect

func Collect[T any](ch chan T) []T

Collect 从通道中收集所有值并返回一个切片。 参数:

  • ch: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 一个切片,其中包含从通道中读取的所有值。

函数功能:

  • 从输入通道 ch 中读取所有值,并将它们按顺序添加到一个切片中。
  • 当通道关闭时,函数返回包含所有读取值的切片。

func CollectFun

func CollectFun[T any, U gotools.Comparable](f func(x T) U) func(ch chan T) []U

func Count

func Count[T any](ch chan T) int

Count 计算通道中元素的数量 参数:

  • ch: 一个通道,用于接收数据。

返回:

  • int: 通道中元素的数量。

func Distinct

func Distinct[T gotools.Comparable](ch chan T) chan T

Distinct 返回一个函数,该函数接受一个输入通道(chan T), 从中移除重复的元素,并将唯一的元素发送到一个新的输出通道(chan T)中。 函数内部使用一个映射(map)来跟踪已经见过的元素,以确保每个元素只出现一次。 返回的输出通道在处理完成后会被关闭。

func DistinctCount

func DistinctCount[T comparable](ch chan T) int

DistinctCount 去重计数 参数:

  • ch: 一个通道,用于接收数据。

返回:

  • int: 通道中去重后元素的数量。

func DropWhile

func DropWhile[T any](f func(x T) bool) func(ch chan T) chan T

DropWhile 从通道中读取值,直到遇到第一个不满足条件的值,之后将所有后续值传递到新通道。 参数:

  • f: 一个函数,接受一个类型为 T 的值,返回一个布尔值,表示该值是否满足条件。
  • ch: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 一个新通道,其中包含从第一个不满足条件的值之后的所有值。

函数功能:

  • 从输入通道 ch 中读取值,应用函数 f 来检查每个值是否满足条件。
  • 跳过所有满足条件的值,直到遇到第一个不满足条件的值。
  • 从第一个不满足条件的值开始,将所有后续值写入新通道 ch_。
  • 新通道 ch_ 包含从第一个不满足条件的值之后的所有值。

func ErrorCH

func ErrorCH(ch chan error)

func Filter

func Filter[T any](f func(x T) bool) func(ch chan T) chan T

Filter 过滤通道中的数据,只将符合条件的数据发送到新的通道。 参数:

  • f: 一个函数,接受类型为 T 的输入,并返回布尔值。如果返回 true,则将该数据发送到新通道;如果返回 false,则忽略该数据。
  • ch: 一个通道,通道中的每个值是类型为 T 的数据。

返回:

  • 一个通道,通道中的值是类型为 T 的数据,只包含符合函数 f 条件的数据。

函数功能:

  • 从输入通道 ch 中读取数据,对每个数据应用函数 f。如果函数 f 返回 true,则将数据写入新的通道 ch_

func First

func First[T any](f func(x T) bool) func(ch chan T) T

Find 从通道中查找第一个满足条件的值。 参数:

  • f: 一个函数,接受一个类型为 T 的值,返回一个布尔值,表示该值是否满足条件。
  • ch: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 第一个满足条件的值。如果通道中的值都不满足条件,返回类型 T 的零值。

函数功能:

  • 从输入通道 ch 中读取数据,应用函数 f 来检查每个值是否满足条件。
  • 如果找到第一个满足条件的值,则返回该值。
  • 如果通道关闭且没有找到满足条件的值,则返回类型 T 的零值。

func FlatMap

func FlatMap[T any, U any](f func(x T) []U) func(ch chan T) chan U

FlatMap 返回一个函数,该函数接受一个输入通道(chan T), 参数:

  • f: 一个函数,接受类型为 T 的输入,返回类型为 []U 的结果。
  • ch: 一个通道,通道中的每个值是类型为 T 的数据。

返回:

  • 一个通道,通道中的值是类型为 U 的数据,每个值是应用函数 f 之后的结果。

func Flatten

func Flatten[S ~[]T, T any](ch chan S) chan T

Flatten 接收一个通道,该通道发送包含元素 T 的切片 S。 它将所有切片中的元素展平并发送到新的通道中。

参数:

  • ch: 一个通道,发送元素为类型 S 的切片,其中 S 是类型 T 的切片。

返回:

  • 返回一个新的通道,该通道发送类型为 T 的元素,这些元素是展平后的切片元素。

func FromArray

func FromArray[T any](a []T) chan T

FromArray 将输入切片 `a` 中的每个元素发送到通道中。

func FromArray2

func FromArray2[T any, U any](f func(x T) U) func(a []T) chan U

FromArray2 将输入切片 `a` 中的每个元素应用函数 `f`,并返回一个包含结果的通道。 参数:

  • f: 一个函数,接受切片中的元素 `T` 类型,并返回 `U` 类型的结果。
  • a: 一个 `T` 类型的切片,将对其每个元素应用函数 `f`。

返回:

  • 一个 `U` 类型的通道,通道中的值是对切片 `a` 中的每个元素应用函数 `f` 的结果。

func FromCK

func FromCK[T any](ck *db.CKinfo) func(query *query.SQLBuilder) (chan T, chan error)

FromCK 从 ClickHouse 数据库中执行查询并返回数据通道。 参数:

  • query: *query.SQLBuilder - 查询语句

返回:

  • chan T: 查询结果数据通道
  • error: 错误信息,如果查询失败。

func FromCKStr

func FromCKStr(ck *db.CKinfo) func(query *query.SQLBuilder) (chan []string, chan error)

FromCKStr 从 ClickHouse 数据库中执行查询并返回数据通道。 使用 SQL 查询字符串。

参数:

- query - *SQLBuilder 类型的结构体,用于构建 SQL 查询语句。

返回:

  • chan []string: 查询结果数据通道。
  • error: 错误信息,如果查询失败。

func FromCsv

func FromCsv(path string) func(header bool) (chan []string, chan error)

FromCsv 从指定的 CSV 文件路径读取数据,并将其以切片的形式发送到通道。 参数:

  • path: CSV 文件的路径,字符串类型。

返回:

  • 一个通道,通道中的值是读取的 CSV 文件中的每一行数据,每一行数据是一个字符串切片([]string)。
  • 一个通道,通道中的值是读取 CSV 文件时发生的错误。

func FromES

func FromES[T any](client *elastic.Client) func(index string, query any) (chan db.ElasticBluk[T], chan error)

FromES 从 ElasticSearch 中读取数据。 参数:

  • client: ElasticSearch 客户端。
  • index: ElasticSearch 索引名称。
  • query: ElasticSearch 查询请求。

返回:

  • chan db.ElasticBluk[T]: 数据通道。
  • chan error: 错误通道。

func FromESShard

func FromESShard[T any](client *elastic.Client) func(index string, query any) (chan db.ElasticBluk[T], chan error)

FromESShard 将数据写入 ElasticSearch。 参数:

  • client: ElasticSearch 客户端。
  • index: ElasticSearch 索引名称。
  • ctype: ElasticSearch 写入类型 index create
  • data: 需要写入的数据。

返回:

  • chan db.ElasticBluk[T]: 数据通道。
  • chan error: 错误通道。

func FromExcel

func FromExcel(path string) func(sheet string, header bool) (chan []string, chan error)

FromExcel 从指定的 Excel 文件路径读取数据,并将其以切片的形式发送到通道。 参数:

  • path: Excel 文件的路径,字符串类型。
  • sheet: 工作表的名称,字符串类型。
  • header: 是否包含表头,布尔类型。

返回:

  • 一个通道,通道中的值是读取的 Excel 文件中的每一行数据,每一行数据是一个字符串切片([]string)。
  • 一个通道,通道中的值是读取 Excel 文件时发生的错误。

func FromGob

func FromGob[T any](path string, del bool) (chan T, chan error)

FromGob 从指定的 gob 文件路径读取数据,并通过通道返回。 参数:

  • path - 文件路径
  • del - 是否删除文件

返回:

  • chan T: 数据通道
  • chan error: 错误通道

func FromGzip

func FromGzip(path string) func(skip int) (chan string, chan error)

FromGzip 方法从 gzip 文件中读取数据并发送到通道中 参数:

  • path - gzip 文件路径
  • skip - 跳过的行数

返回:

  • 一个通道,用于接收从 gzip 文件中读取的数据。
  • 一个通道,用于接收错误信息

func FromJson

func FromJson[T any](path string) func(skip int) (chan T, chan error)

FromJson 方法从 JSON 文件中读取数据并发送到通道中 参数:

  • path - 文件路径
  • skip - 跳过的行数

返回:

- 一个通道,用于接收从文件中读取的数据。 - 一个通道,用于接收错误信息

func FromMap

func FromMap[K gotools.Comparable, V any](m map[K]V) func() chan pair.Pair[K, V]

FromMap 将一个映射转换为一个通道,每个通道中的元素是映射中的键值对。 参数:

  • m: 一个映射,键类型为 K,值类型为 V。

返回:

  • 一个通道,每个通道中的元素是包含一个键值对的切片,类型为 []array.Pair[K, V]。

函数功能:

  • 遍历映射 m,将每个键值对包装在一个切片中,然后将这些切片逐个发送到通道 ch 中。
  • 每个通道中的元素都是一个包含单个键值对的切片。
  • 当所有键值对都被发送到通道后,关闭通道。

func FromMysql

func FromMysql[T any](con string) func(query *query.SQLBuilder) (chan T, chan error)

FromMysqlQuery 从 MySQL 数据库中执行查询并返回数据通道。 参数:

  • query: *query.SQLBuilder - 查询语句

返回:

  • chan T: 查询结果数据通道
  • error: 错误信息,如果查询失败。

func FromMysqlStr

func FromMysqlStr(con string) func(query *query.SQLBuilder) (chan []string, chan error)

FromMysqlStr 从 MySQL 数据库中执行查询并返回数据通道, 使用 SQL 查询字符串。

参数:

- query - *SQLBuilder 类型的结构体,用于构建 SQL 查询语句。

返回:

  • chan []string: 查询结果数据通道。
  • error: 错误信息,如果查询失败。

func FromRedisList

func FromRedisList[T any](host, pwd string, dbN, num int) func(key string) (chan T, chan error)

FromRedisList 方法从 Redis 中读取数据并发送到通道中 参数:

  • host - Redis 地址
  • pwd - Redis 密码
  • dbNUM - Redis 数据库编号
  • num - 从 Redis 中读取的数据条数 等于0 直接返回,大于0 读取指定数量的数据, 小于0 读取所有数据

返回:

  • 返回一个函数,接受一个键名作为参数,返回一个通道,用于接收从 Redis 中读取的数据,并返回错误信息

注意: 该函数会持续从 Redis 中读取数据

func FromTable

func FromTable(path string) func(header bool, seq string, escape byte) (chan []string, chan error)

FromTable 从指定的文本文件路径读取数据,并将其以切片的形式发送到通道。 参数:

  • path: 文本文件的路径,字符串类型。
  • header: 是否包含表头,布尔类型。
  • seq: 文本文件的分隔符,字符串类型。

返回:

  • 一个通道,通道中的值是读取的表格文件中的每一行数据,每一行数据是一个字符串切片([]string)。
  • 一个通道,通道中的值是读取表格文件时发生的错误。

func FromTxt

func FromTxt(path string) func(skip int) (chan string, chan error)

FromTxt 方法从文本文件中读取数据并发送到通道中 参数:

  • path - 文件路径
  • skip - 跳过的行数

返回:

- 一个通道,用于接收从文件中读取的数据。 - 一个通道,用于接收错误信息

func GetBufferSize

func GetBufferSize() int

用于读取 BufferSize 的函数

func GetSortWindowSize

func GetSortWindowSize() int

用于读取 sortWindowSize 的函数

func Group

func Group[T any](f func(x, y T) bool) func(ch chan T) chan []T

Group 对通道进行分组,返回一个chan 参数:

  • f: 一个函数,接受两个类型为 T 的值,返回一个布尔值,表示是否满足分组条件。 当 `fun(x, y)` 返回 `true`,则分组是相同的
  • ch: 一个通道,用于接收数据。数据必须是排序后的

返回:

  • 一个通道,用于接收分组后的数据。

func InnerJoin

func InnerJoin[T any, U any, R any](f func(x T, y U) R, f1 func(x T, y U) int) func(ch1 chan T, ch2 chan U) chan R

InnerJoin 按照某个函数进行连接 参数:

  • f: 一个函数,接受两个类型为 T 和 U 的值,返回一个类型为 R 的值。
  • f1: 一个函数,用于比较大小, -1 表示小于, 0 表示相等, 1 表示大于。,通道数据必须升序,当通道数据降序,函数结果数字要相反
  • ch1: 一个通道,用于接收第一个数据
  • ch2: 一个通道,用于接收第二个数据

返回:

  • 一个通道,用于接收连接后的数据。

func InterS

func InterS[T gotools.Comparable](ch1 chan T, ch2 chan T) chan T

InterS 返回两个通道的交集 参数:

  • ch1: 一个通道,通道中的值是类型为 T 的数据。
  • ch2: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 一个通道,通道中的值是类型为 T 的数据,表示两个通道的交集。

注意: 由于需要对收集第二个通道的数据,因此可以将较少数据的通道传递给第二个通道。 如果第二个通道数据很多,要考虑内存占用问题。

func Intersect

func Intersect[T any](f func(x, y T) int) func(ch1, ch2 chan T) chan T

Intersect 返回两个通道的交集 参数:

  • f: 一个函数,接受两个类型为 T 的值,返回一个int,表示比较大小 0 相等,-1 小于,1 大于。
  • ch1: 一个通道,用于接收数据。必须排序
  • ch2: 一个通道,用于接收数据。必须排序

返回:

  • 一个通道,用于接收排序后的数据。

func Lag

func Lag[T any](def T, n int) func(ch chan T) chan T

Lag 将通道数据后推n位,最前缺失的n位用def填充

参数:

  • def: 用于填充缺失的元素
  • n: 后推的位数

返回:

  • 返回一个新的通道,该通道发送类型为 T 的元素,这些元素是后推的切片元素

func Last

func Last[T any](f func(x T) bool) func(ch chan T) T

Last 从通道中查找最后一个满足条件的值。 参数:

  • f: 一个函数,接受一个类型为 T 的值,返回一个布尔值,表示该值是否满足条件。
  • ch: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 最后一个满足条件的值。如果通道中的值都不满足条件,返回类型 T 的零值。

函数功能:

  • 从输入通道 ch 中读取数据,应用函数 f 来检查每个值是否满足条件。
  • 如果找到最后一个满足条件的值,则返回该值。
  • 如果通道关闭且没有找到满足条件的值,则返回类型 T 的零值。

func Lead

func Lead[T any](def T, n int) func(ch chan T) chan T

Lead 将通道数据提前n位,最后缺失的n位用def填充

参数:

  • def: 用于填充缺失的元素
  • n: 提前的位数

返回:

  • 返回一个新的通道,该通道发送类型为 T 的元素,这些元素是提前的切片元素

func LeftJoin

func LeftJoin[T any, U any, R any](f func(x T, y U) R, f1 func(x T, y U) int) func(ch1 chan T, ch2 chan U) chan R

LeftJoin 按照某个函数进行连接 参数:

  • f: 一个函数,接受两个类型为 T 和 U 的值,返回一个类型为 R 的值。
  • f1: 一个函数,用于比较大小, -1 表示小于, 0 表示相等, 1 表示大于,通道数据必须升序,当通道数据降序,函数结果数字要相反
  • ch1: 一个通道,用于接收第一个数据
  • ch2: 一个通道,用于接收第二个数据

返回:

  • 一个通道,用于接收连接后的数据。

func Map

func Map[T any, U any](f func(x T) U) func(ch chan T) chan U

Map 将通道中的每个元素应用函数 f,并将结果发送到一个新的通道。 参数:

  • f: 一个函数,接受类型为 T 的输入,返回类型为 U 的结果。
  • ch: 一个通道,通道中的每个值是类型为 T 的数据。

返回:

  • 一个通道,通道中的值是类型为 U 的数据,每个值是应用函数 f 之后的结果。

函数功能:

  • 从输入通道 ch 中读取数据,将每个数据应用函数 f,然后将结果写入新的通道 ch_。
  • 使用一个 goroutine 执行这些操作,并在完成后关闭通道 ch_。

func Maps

func Maps[T any, U any](fn func(ch chan T, num int) U) func(cs ...chan T) chan U

Maps 将多个通道中的数据进行映射 参数:

  • fn: 一个函数,接受一个类型为 T 的值 ,类型为int的通道位置,返回一个类型为 U 的值。
  • cs: 一个包含多个通道的切片。

返回:

  • 一个通道,用于接收映射后的数据。

func Merge

func Merge[T any](f func(x, y T) bool) func(cs ...chan T) chan T

Merge 合并排序多个通道,按照func(x, y T) bool进行排序 参数:

  • f: 一个函数,接受两个类型为 T 和 U 的值,返回一个布尔值,表示是否满足排序条件。 当 `fun(x, y)` 返回 `true`,则在排序时 `x` 应位于 `y` 之前。
  • cs: 一个包含多个通道的切片。

返回:

  • 一个通道,用于接收排序后的数据。

func More

func More[T, U, R any](f func(x T, y U) R) func(ch1 chan T, ch2 chan U) chan R

More 按照某个函数进行连接,接受两个通道,通道元素一一对应进行处理 参数:

  • f: 一个函数,接受两个类型为 T 和 U 的值,返回一个类型为 R 的值。
  • ch1: 一个通道,用于接收第一个数据
  • ch2: 一个通道,用于接收第二个数据

返回:

  • 一个通道,用于接收连接后的数据

func Partition

func Partition[T any](f func(x T) bool) func(ch chan T) (chan T, chan T)

Partition 根据给定的条件函数将通道中的值分为两个通道。 参数:

  • f: 一个函数,接受一个类型为 T 的值,返回一个布尔值,表示该值是否满足条件。
  • ch: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • ch1: 一个通道,通道中的值是类型为 T 的数据,表示满足条件的值。
  • ch2: 一个通道,通道中的值是类型为 T 的数据,表示不满足条件的值。

函数功能:

  • 从输入通道 ch 中读取数据,根据函数 f 的结果将每个值分配到两个新的通道 ch1 和 ch2。
  • 当输入通道 ch 关闭时,关闭新的通道 ch1 和 ch2。

func Reduce

func Reduce[T, U any](f func(x U, y T) U, init U) func(ch chan T) U

Reduce 对通道中的数据进行归约操作,将通道中的所有数据合并成一个值。 参数:

  • f: 一个函数,接受两个参数:一个类型为 U 的累加器和一个类型为 T 的当前值,返回一个类型为 U 的新累加器。
  • init: 初始值,类型为 U,用作归约操作的起始累加器。
  • ch: 一个通道,通道中的每个值是类型为 T 的数据,将被用于归约操作。

返回:

  • 返回归约操作后的结果,类型为 U。

函数功能:

  • 从输入通道 ch 中读取数据,并将每个数据应用归约函数 f,使用 init 作为初始值。
  • 在处理完所有数据后,返回最终的累加器值。

func Scanl

func Scanl[T, U any](f func(x U, y T) U, init U) func(ch chan T) chan U

Scanl 对通道中的数据进行扫描操作,生成一个累加序列,并将结果发送到新的通道。 参数:

  • f: 一个函数,接受两个参数:一个类型为 U 的累加器和一个类型为 T 的当前值,返回一个类型为 U 的新累加器。
  • init: 初始值,类型为 U,用作扫描操作的起始累加器。
  • ch: 一个通道,通道中的每个值是类型为 T 的数据,将用于扫描操作。

返回:

  • 一个通道,通道中的值是类型为 U 的数据,表示扫描操作的累加序列。

函数功能:

  • 从输入通道 ch 中读取数据,并将每个数据应用扫描函数 f,使用 init 作为初始值。
  • 将每个步骤的累加器值写入新的通道 ch_。
  • 使用一个 goroutine 执行这些操作,并在完成后关闭通道 ch_。

func Sequence

func Sequence(start, end, step int) chan int

Sequence 生成一个整数序列,并将其发送到通道。 参数:

  • start: 序列的起始值,整数类型。
  • end: 序列的结束值,整数类型。序列生成会在达到该值时停止(不包括此值)。
  • step: 序列的步长,整数类型。可以为正数或负数。

返回:

  • 一个通道,通道中的值是整数类型,表示生成的序列。

函数功能:

  • 从 start 开始,根据步长 step 生成整数序列,直到达到 end 为止(不包括 end)。

func SetBufferSize

func SetBufferSize(newSize int)

用于修改 BufferSize 的函数

func SetParallel

func SetParallel(parallel int)

func SetSortWindowSize

func SetSortWindowSize(newSize int)

用于修改 sortWindowSize 的函数

func Slider

func Slider[T, U any](f func(x ...T) U, before, after int, defaultValue T) func(ch chan T) chan U

Slider 滑动窗口函数 参数:

  • f: 一个函数,接受一个类型为 T 的值,返回一个类型为 U 的值。
  • before: 滑动窗口的前置元素数量。
  • after: 滑动窗口的后置元素数量。
  • defaultValue: 滑动窗口中的默认值。

返回:

  • 一个函数,接受一个通道,返回一个通道,用于接收滑动窗口的元素。

func Sort

func Sort[T any](f func(x, y T) bool) func(ch chan T) chan T

Sort 针对数据较大的情况进行处理,使用了外部文件排序,如果内存满足请使用SortSimple 参数:

  • f: 一个函数,接受两个类型为 T 和 U 的值,返回一个布尔值,表示是否满足排序条件。 当 `fun(x, y)` 返回 `true`,则在排序时 `x` 应位于 `y` 之前。
  • ch: 一个通道,用于接收数据。

返回:

  • 一个通道,用于接收排序后的数据。

示例:

Sort(func(x, y int) bool { return x > y })(ch)

func SortS

func SortS[T any](f func(x, y T) bool) func(ch chan T) chan T

SortS 排序通道,并返回排序后的chan 参数:

  • f: 一个函数,接受两个类型为 T 和 U 的值,返回一个布尔值,表示是否满足排序条件。 当 `fun(x, y)` 返回 `true`,则在排序时 `x` 应位于 `y` 之前。
  • ch: 一个通道,用于接收数据。

返回:

  • 一个通道,用于接收排序后的数据。

func Split

func Split[T any](fn func(T) int, num int) func(ch chan T) []chan T

Split 将通道中的数据分组 参数:

  • fn: 一个函数,接受一个类型为 T 的值,返回一个整数值,表示该值属于哪个分组。
  • num: 分组的数量。

返回:

  • 一个函数,接受一个通道,返回一个包含 num 个通道的切片。

func SubS

func SubS[T gotools.Comparable](ch1 chan T, ch2 chan T) chan T

SubS 返回两个通道的差集 参数:

  • ch1: 一个通道,通道中的值是类型为 T 的数据。
  • ch2: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 一个通道,通道中的值是类型为 T 的数据,表示两个通道的差集。

注意: 由于需要对收集第二个通道的数据,因此可以将较少数据的通道传递给第二个通道。 如果第二个通道数据很多,要考虑内存占用问题。

func Subtract

func Subtract[T any](f func(x, y T) int) func(ch1 chan T, ch2 chan T) chan T

Subtract 返回两个通道的差集 参数:

  • f: 一个函数,接受两个类型为 T 的值,返回一个int,表示比较大小 0 相等,-1 小于,1 大于。
  • ch1: 一个通道,用于接收数据。必须排序
  • ch2: 一个通道,用于接收数据。必须排序

返回:

  • 一个通道,用于接收排序后的数据。

func TakeWhile

func TakeWhile[T any](f func(x T) bool) func(ch chan T) chan T

TakeWhile 从通道中读取值,直到遇到第一个不满足条件的值。 参数:

  • f: 一个函数,接受一个类型为 T 的值,返回一个布尔值,表示该值是否满足条件。
  • ch: 一个通道,通道中的值是类型为 T 的数据。

返回:

  • 一个新通道,其中包含从输入通道中读取的所有满足条件的值。

函数功能:

  • 从输入通道 ch 中读取值,应用函数 f 来检查每个值是否满足条件。
  • 将所有满足条件的值写入新通道 ch_。
  • 当遇到第一个不满足条件的值时,停止读取并关闭新通道 ch_。
  • 新通道 ch_ 只包含在遇到第一个不满足条件的值之前的所有值

func ToCK

func ToCK(ck *db.CKinfo, q query.SqlInsert) func(ch chan []any) error

ToCKInsert 方法将通道中的数据插入到 ClickHouse 数据库中 参数:

  • ck *db.CKinfo - *db.CKinfo 类型的结构体,用于构建 ck 客户端信息
  • q - *SQLBuilder 类型的结构体,用于构建 SQL 查询语句。
  • ch - 一个通道,用于接收待插入的数据。

返回:

  • 一个函数, 用于执行数据库插入操作。

func ToCsv

func ToCsv(path string, append bool, header ...string) func(ch chan []string) error

ToCsv 将通道中的数据写入指定的 CSV 文件。 参数:

  • path: CSV 文件的路径,字符串类型。
  • ch: 一个通道,通道中的每个值是一个字符串切片([]string),表示 CSV 文件中的一行数据。

函数功能:

  • 从通道中读取数据,并将数据写入指定的 CSV 文件。
  • 使用一个 goroutine 执行写入操作,并通过 stop 通道同步写入完成。

func ToES

func ToES[T any](client *elastic.Client) func(ch chan db.ElasticBluk[T]) error

ToES 将数据写入 ElasticSearch。 参数:

  • client: ElasticSearch 客户端。
  • index: ElasticSearch 索引名称。
  • ctype: ElasticSearch 写入类型 index create
  • data: 需要写入的数据。

返回:

error: 错误信息,如果写入失败。

func ToExcel

func ToExcel(e *ExcelField) func(ch chan []string) error

ToExcel 将通道中的数据写入指定的 Excel 文件。 参数:

  • e *ExcelField: Excel 文件配置
  • ch: 一个通道,通道中的每个值是一个字符串切片([]string),表示 Excel 文件中的一行数据。

返回:

error

func ToGob

func ToGob[T any](path string, replace bool) func(ch chan T) error

ToGob 接收一个路径和一个通道,将通道中的数据按块写入到指定路径的 gob 文件中。 参数:

  • path: gob 文件的路径。
  • ch: 一个通道,用于接收待写入的数据。
  • replace: 是否覆盖已存在的文件。

返回:

一个函数,用于执行 gob 文件写入操作。

func ToGobArr

func ToGobArr[S ~[]T, T any](path string) func(data S) error

ToGobArr 接收一个路径和一个切片,将切片中的数据按块写入到指定路径的 gob 文件中。 参数:

  • path: gob 文件的路径。
  • data: 需要写入的数据。

返回:

一个函数,用于将数据写入到 gob 文件中。

func ToGzip

func ToGzip(path string, append bool) func(ch chan string) error

ToGzip 方法将通道中的数据写入 gzip 文件 参数:

  • path - gzip 文件路径
  • append - 是否追加到文件末尾(true)还是覆盖文件(false)

返回:

  • 一个函数,接受一个通道作为参数,写入通道中的数据到 gzip 文件中,并返回错误信息。

func ToJson

func ToJson[T any](path string, append bool) func(ch chan T) error

ToJson 方法将通道中的数据写入到 JSON 文件中 参数:

  • path - 文件路径
  • append - 是否追加写入文件
  • ch - 一个通道,用于接收待写入的数据。

返回:

  • 一个函数,用于执行 JSON 文件写入操作。

func ToMysqlInset

func ToMysqlInset(con string, q query.SqlInsert) func(ch chan []any) error

ToMysqlInset 方法将通道中的数据插入到 MySQL 数据库中 参数:

  • con mysql 连接字符串
  • q - *SQLBuilder 类型的结构体,用于构建 SQL 查询语句。
  • ch - 一个通道,用于接收待插入的数据。

返回:

  • 一个函数, 用于执行数据库插入操作。

func ToRedisList

func ToRedisList[T any](host, pwd string, dbNUM int) func(key string, ch chan T) error

ToRedisList 方法将通道中的数据写入 Redis 中list键 参数:

  • host - Redis 地址
  • pwd - Redis 密码
  • dbNUM - Redis 数据库编号
  • key - Redis 中的列表名

返回:

一个函数,接受一个list键,一个通道作为参数,将通道中的数据写入 Redis 中list键,并返回错误信息

func ToTable

func ToTable(t *TableField) func(ch chan []string) error

ToTable 将通道中的数据写入指定分隔符 文件。 参数:

  • t *TableField: 表格配置
  • ch: 一个通道,通道中的每个值是一个字符串切片([]string),表示表格文件中的一行数据。

返回:

  • 一个函数,用于执行文件写入操作。
  • error

func ToTxt

func ToTxt(path string, append bool) func(ch chan string) error

ToTxt 方法将通道中的数据写入到文本文件中 参数:

  • path - 文件路径
  • append - 是否追加写入文件
  • ch - 一个通道,用于接收待写入的数据。

返回:

  • 一个函数,用于执行文件写入操作。

func Union

func Union[T, U any](fn func(x T) U) func(chs ...chan T) chan U

Union 将多个通道合并为一个通道。

参数:

chs: 多个需要合并的通道,这些通道的数据类型必须相同。

返回值:

一个通道,该通道将接收来自所有输入通道的数据。

说明:

Union 函数会启动多个 goroutine,每个 goroutine 从一个输入通道中读取数据
并将数据写入到返回的通道中。所有输入通道的数据都将被合并到这个返回的通道中。
当所有输入通道的数据都被读取完毕后,返回的通道将会被关闭。

func Unique

func Unique[T any](f func(x, y T) bool) func(ch chan T) chan T

Unique 去重, 要求传入的ch必须是排序过的 参数:

  • f: 一个函数,接受两个类型为 T 的值,返回一个布尔值,表示是否相等。
  • ch: 一个通道,用于接收数据。

返回:

  • 一个通道,用于接收排序后的数据。

func UniqueCount

func UniqueCount[T any](f func(x, y T) bool) func(ch chan T) int

UniqueCount 去重, 要求传入的ch必须是排序过的 参数:

  • f: 一个函数,接受两个类型为 T 的值,返回一个布尔值,表示是否相等。
  • ch: 一个通道,用于接收数据。

返回:

  • 去重后的数量

func Walk

func Walk[T any](f func(x T)) func(ch chan T)

Walk 对通道中的数据进行遍历操作。 参数:

  • f: 一个函数,接受类型为 T 的输入。
  • ch: 一个通道,通道中的每个值是类型为 T 的数据。

函数功能:

  • 从输入通道 ch 中读取数据,对每个数据应用函数 f。

func Window

func Window[T any](ch chan T) func(windowSize int) chan []T

Window 滚动窗口函数 参数:

  • windowSize: 窗口的大小。
  • ch: 输入通道,包含要处理的数据。

返回:

  • 输出通道,其中包含滚动窗口的数据切片。

func Zip

func Zip[T any, U any, V any](f func(x T, y U) V) func(ch1 chan T, ch2 chan U) chan V

Zip 结合两个通道中的值,生成一个新的通道。 参数:

  • f: 一个函数,接受两个参数:一个类型为 T 的值和一个类型为 U 的值,返回一个类型为 V 的值。
  • ch1: 一个通道,通道中的值是类型为 T 的数据。
  • ch2: 一个通道,通道中的值是类型为 U 的数据。

返回:

  • 一个通道,通道中的值是类型为 V 的数据,表示将 ch1 和 ch2 中的值通过函数 f 组合后的结果。

函数功能:

  • 从通道 ch1 和 ch2 中读取数据,并使用函数 f 将这两个值组合成一个新值。
  • 将生成的新值发送到新的通道 ch 中。
  • 当两个通道都关闭时,停止处理并关闭通道 ch。

Types

type ExcelField

type ExcelField struct {
	Path   string
	Sheet  string
	Header []string
	Exist  bool
	Append bool
}

ExcelField Excel 文件配置

func E

func E(path string) *ExcelField

func (*ExcelField) SetAppend

func (e *ExcelField) SetAppend(append bool) *ExcelField

func (*ExcelField) SetExist

func (e *ExcelField) SetExist(append bool) *ExcelField

func (*ExcelField) SetHeader

func (e *ExcelField) SetHeader(header ...string) *ExcelField

func (*ExcelField) SetPath

func (e *ExcelField) SetPath(path string) *ExcelField

func (*ExcelField) SetSheet

func (e *ExcelField) SetSheet(sheet string) *ExcelField

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline 表示一系列处理步骤,可以对数据进行处理

func NewPipeline

func NewPipeline[T any]() *Pipeline[T]

NewPipeline 创建一个新的 Pipeline 实例,支持任意数据类型

func (*Pipeline[T]) AddStep

func (p *Pipeline[T]) AddStep(f func(chan T) chan T)

AddStep 向管道中添加一个处理步骤。 参数:

step: 处理步骤函数,该函数接受一个输入通道,返回一个输出通道。

返回:

void

func (*Pipeline[T]) Compute

func (p *Pipeline[T]) Compute(input chan T) chan T

Compute 仅仅使用管道中的处理步骤,不包含起点和终点,直接对输入通道应用

func (*Pipeline[T]) Run

func (p *Pipeline[T]) Run()

Run 启动管道处理流程。 参数:

input: 输入数据通道。

返回:

void

func (*Pipeline[T]) SetEnd

func (p *Pipeline[T]) SetEnd(f func(chan T))

SetEnd 设置管道的终点函数

func (*Pipeline[T]) SetStart

func (p *Pipeline[T]) SetStart(f func() chan T)

SetStart 设置管道的起点函数

type TableField

type TableField struct {
	Path     string
	Seq      string
	UseQuote bool
	Header   []string
	Append   bool
	Escape   byte
}

TableField 表格配置

func T

func T(path string) *TableField

T 表格

func (*TableField) SetAppend

func (t *TableField) SetAppend(append bool) *TableField

func (*TableField) SetEscape

func (t *TableField) SetEscape(escape byte) *TableField

func (*TableField) SetHeader

func (t *TableField) SetHeader(header ...string) *TableField

func (*TableField) SetPath

func (t *TableField) SetPath(path string) *TableField

func (*TableField) SetSeq

func (t *TableField) SetSeq(seq string) *TableField

func (*TableField) SetUseQuote

func (t *TableField) SetUseQuote(useQuote bool) *TableField

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL