Pipeline 配置
Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制, 由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json)和纯文本(text/plain)格式的日志数据作为输入。
这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便于后续的结构化查询。
整体结构
Pipeline 由两部分组成:Processors 和 Transform,这两部分均为数组形式。一个 Pipeline 配置可以包含多个 Processor 和多个 Transform。Transform 所描述的数据类型会决定日志数据保存到数据库时的表结构。
- Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
- Transform 用于对数据进行格式转换,例如将字符串类型转换为数字类型。
一个包含 Processor 和 Transform 的简单配置示例如下:
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
transform:
- fields:
- string_field_a
- string_field_b
type: string
# 写入的数据必须包含 timestamp 字段
- fields:
- reqTimeSec, req_time_sec
# epoch 是特殊字段类型,必须指定精度
type: epoch, ms
index: timestamp
Processor
Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors 字段下。
Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。
Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。
我们目前内置了以下几种 Processor:
date: 解析格式化的时间字符串字段,例如2024-07-12T16:18:53.048。epoch: 解析数字时间戳字段,例如1720772378893。dissect: 对 log 数据字段进行拆分。gsub: 对 log 数据字段进行替换。join: 对 log 中的 array 类型字段进行合并。letter: 对 log 数据字段进行字母转换。regex: 对 log 数据字段进行正则匹配。urlencoding: 对 log 数据字段进行 URL 编解码。csv: 对 log 数据字段进行 CSV 解析。
大多数 Processor 都有 field 或 fields 字段,用于指定需要被处理的字段。大部分 Processor 处理完成后会覆盖掉原先的 field。如果你不想影响到原数据中的对应字段,我们可以把结果输出到其他字段来避免覆盖。
当字段名称包含 , 时,该字段将被重命名。例如,reqTimeSec, req_time_sec 表示将 reqTimeSec 字段重命名为 req_time_sec,处理完成后的数据将写入中间状态的 req_time_sec 字段中。原始的 reqTimeSec 字段不受影响。如果某些 Processor 不支持字段重命名,则重命名字段名称将被忽略,并将在文档中注明。
例如:
processors:
- letter:
fields:
- message, message_upper
method: upper
ignore_missing: true
message 字段将被转换为大写并存储在 message_upper 字段中。
date
date Processor 用于解析时间字段。示例配置如下:
processors:
- date:
fields:
- time
formats:
- '%Y-%m-%d %H:%M:%S%.3f'
ignore_missing: true
timezone: 'Asia/Shanghai'
如上所示,date Processor 的配置包含以下字段:
fields: 需要解析的时间字段名列表。formats: 时间格式化字符串,支持多个时间格式化字符串。按照提供的顺序尝试解析,直到解析成功。你可以在这里找到格式化的语法说明。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。timezone: 时区。使用tz_database 中的时区标识符来指定时区。默认为UTC。
epoch
epoch Processor 用于解析时间戳字段,示例配置如下:
processors:
- epoch:
fields:
- reqTimeSec
resolution: millisecond
ignore_missing: true
如上所示,epoch Processor 的配置包含以下字段:
fields: 需要解析的时间戳字段名列表。resolution: 时间戳精度,支持s,sec,second,ms,millisecond,milli,us,microsecond,micro,ns,nanosecond,nano。默认为ms。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
dissect
dissect Processor 用于对 log 数据字段进行拆分,示例配置如下:
processors:
- dissect:
fields:
- message
patterns:
- '%{key1} %{key2}'
ignore_missing: true
append_separator: '-'
如上所示,dissect Processor 的配置包含以下字段:
fields: 需要拆分的字段名列表。不支持字段重命名。patterns: 拆分的 dissect 模式。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。append_separator: 对于多个追加到一起的字段,指定连接符。默认是一个空字符串。
Dissect 模式
和 Logstash 的 Dissect 模式类似,Dissect 模式由 %{key} 组成,其中 %{key} 为一个字段名。例如:
"%{key1} %{key2} %{+key3} %{+key4/2} %{key5->} %{?key6}"
Dissect 修饰符
Dissect 模式支持以下修饰符:
| 修饰符 | 说明 | 示例 |
|---|---|---|
+ | 将两个或多个字段追加到一起 | %{+key} %{+key} |
+ 和 /n | 按照指定的顺序将两个或多个字段追加到一起 | %{+key/2} %{+key/1} |
-> | 忽略右侧的任何重复字符 | %{key1->} %{key2->} |
? | 忽略匹配的值 | %{?key} |
dissect 示例
例如,对于以下 log 数据:
"key1 key2 key3 key4 key5 key6"
使用以下 Dissect 模式:
"%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6}"
将得到以下结果:
{
"key1": "key1",
"key2": "key2",
"key3": "key3 key4",
"key5": "key5"
}
gsub
gsub Processor 用于对 log 数据字段进行替换,示例配置如下:
processors:
- gsub:
fields:
- message
pattern: 'old'
replacement: 'new'
ignore_missing: true
如上所示,gsub Processor 的配置包含以下字段:
fields: 需要替换的字段名列表。pattern: 需要替换的字符串。支持正则表达式。replacement: 替换后的字符串。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
join
join Processor 用于对 log 中的 Array 类型字段进行合并,示例配置如下:
processors:
- join:
fields:
- message
separator: ','
ignore_missing: true
如上所示,join Processor 的配置包含以下字段:
fields: 需要合并的字段名列表。注意,这里每行字段的值需要是 Array 类型,每行字段会单独合并自己数组内的值,所有行的字段不会合并到一起。separator: 合并后的分隔符。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
join 示例
例如,对于以下 log 数据:
{
"message": ["a", "b", "c"]
}
使用以下配置:
processors:
- join:
fields:
- message
separator: ','
将得到以下结果:
{
"message": "a,b,c"
}
letter
letter Processor 用于对 log 数据字段进行字母转换,示例配置如下:
processors:
- letter:
fields:
- message
method: upper
ignore_missing: true
如上所示,letter Processor 的配置包含以下字段:
fields: 需要转换的字段名列表。method: 转换方法,支持upper,lower,capital。默认为lower。注意capital只会将第一个字母转换为大写。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
regex
regex Processor 用于对 log 数据字段进行正则匹配,示例配置如下:
processors:
- regex:
fields:
- message
patterns:
- ':(?<id>[0-9])'
ignore_missing: true
如上所示,regex Processor 的配置包含以下字段:
fields: 需要匹配的字段名列表。如果重命名了字段,重命名后的字段名将与pattern中的命名捕获组名进行拼接。pattern: 要进行匹配的正则表达式,需要使用命名捕获组才可以从对应字段中取出对应数据。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
regex 命名捕获组的规则
regex Processor 支持使用 (?<group-name>...) 的语法来命名捕获组,最终将数据处理为这种形式:
{
"<field-name>_<group-name>": "<value>"
}
例如 regex Processor 中 field 填写的字段名为 message,对应的内容为 "[ERROR] error message",
你可以将 pattern 设置为 \[(?<level>[A-Z]+)\] (?<content>.+),
最终数据会被处理为:
{
"message_level": "ERROR",
"message_content": "error message"
}
urlencoding
urlencoding Processor 用于对 log 数据字段进行 URL 编码,示例配置如下:
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
如上所示,urlencoding Processor 的配置包含以下字段:
fields: 需要编码的字段名列表。method: 编码方法,支持encode,decode。默认为encode。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
csv
csv Processor 用于对 log 数据中没有携带 header 的 CSV 类型字段解析,示例配置如下:
processors:
- csv:
fields:
- message
separator: ','
quote: '"'
trim: true
ignore_missing: true
如上所示,csv Processor 的配置包含以下字段:
fields: 需要解析的字段名列表。separator: 分隔符。quote: 引号。trim: 是否去除空格。默认为false。ignore_missing: 忽略字段不存在的情况。默认为false。如果字段不存在,并且此配置为 false,则会抛出异常。
json_path(实验性)
注意:json_path 处理器目前处于实验阶段,可能会有所变动。
json_path 处理器用于从 JSON 数据中提取字段。以下是一个配置示例:
processors:
- json_path:
fields:
- complex_object
json_path: "$.shop.orders[?(@.active)].id"
ignore_missing: true
result_index: 1
在上述示例中,json_path processor 的配置包括以下字段:
fields:要提取的字段名称列表。json_path:要提取的 JSON 路径。ignore_missing:忽略字段缺失的情况。默认为false。如果字段缺失且此配置设置为false,将抛出异常。result_index:指定提取数组中要用作结果值的下标。默认情况下,包含所有值。Processor 提取的结果值是包含 path 中所有值的数组。如果指定了索引,将使用提取数组中对应的下标的值作为最终结果。
JSON 路径语法
JSON 路径语法基于 jsonpath-rust 库。
在此阶段,我们仅推荐使用一些简单的字段提取操作,以便将嵌套字段提取到顶层。
json_path 示例
例如,给定以下日志数据:
{
"product_object": {
"hello": "world"
},
"product_array": [
"hello",
"world"
],
"complex_object": {
"shop": {
"orders": [
{
"id": 1,
"active": true
},
{
"id": 2
},
{
"id": 3
},
{
"id": 4,
"active": true
}
]
}
}
}
使用以下配置:
processors:
- json_path:
fields:
- product_object, object_target
json_path: "$.hello"
result_index: 0
- json_path:
fields:
- product_array, array_target
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_1
json_path: "$.shop.orders[?(@.active)].id"
- json_path:
fields:
- complex_target_1, complex_target_2
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_3
json_path: "$.shop.orders[?(@.active)].id"
result_index: 1
transform:
- fields:
- object_target
- array_target
type: string
- fields:
- complex_target_3
- complex_target_2
type: uint32
- fields:
- complex_target_1
type: json
结果将是:
{
"object_target": "world",
"array_target": "world",
"complex_target_3": 4,
"complex_target_2": 4,
"complex_target_1": [1, 4]
}
Transform
Transform 用于对 log 数据进行转换,其配置位于 YAML 文件中的 transform 字段下。
Transform 由一个或多个配置组成,每个配置包含以下字段:
fields: 需要转换的字段名列表。type: 转换类型index: 索引类型(可选)on_failure: 转换失败时的处理方式(可选)default: 默认值(可选)
fields 字段
每个字段名都是一个字符串,当字段名称包含 , 时,会进行字段重命名。例如,reqTimeSec, req_time_sec 表示将 reqTimeSec 字段重命名为 req_time_sec,
最终数据将被写入到 GreptimeDB 的 req_time_sec 列。
type 字段
GreptimeDB 目前内置了以下几种转换类型:
int8,int16,int32,int64: 整数类型。uint8,uint16,uint32,uint64: 无符号整数类型。float32,float64: 浮点数类型。string: 字符串类型。time: 时间类型。将被转换为 GreptimeDBtimestamp(9)类型。epoch: 时间戳类型。将被转换为 GreptimeDBtimestamp(n)类型。n 为时间戳精度,n 的值视 epoch 精度而定。当精度为s时,n 为 0;当精度为ms时,n 为 3;当精度为us时,n 为 6;当精度为ns时,n 为 9。
如果字段在转换过程中获得了非法值,Pipeline 将会抛出异常。例如将一个字符串 abc 转换为整数时,由于该字符串不是一个合法的整数,Pipeline 将会抛出异常。
index 字段
Pipeline 会将处理后的数据写入到 GreptimeDB 自动创建的数据表中。为了提高查询效率,GreptimeDB 会为表中的某些列创建索引。index 字段用于指定哪些字段需要被索引。关于 GreptimeDB 的列类型,请参考数据模型文档。
GreptimeDB 支持以下三种字段的索引类型:
tag: 用于指定某列为 Tag 列fulltext: 用于指定某列使用 fulltext 类型的索引,该列需要是字符串类型timestamp: 用于指定某列是时间索引列
不提供 index 字段时,GreptimeDB 会将该字段作为 Field 列。
在 GreptimeDB 中,一张表里必须包含一个 timestamp 类型的列作为该表的时间索引列,因此一个 Pipeline 有且只有一个时间索引列。
时间戳列
通过 index: timestamp 指定哪个字段是时间索引列,写法请参考下方的 Transform 示例。
Tag 列
通过 index: tag 指定哪个字段是 Tag 列,写法请参考下方的 Transform 示例。
Fulltext 列
通过 index: fulltext 指定哪个字段将会被用于全文搜索,该索引可大大提升 日志搜索 的性能,写法请参考下方的 Transform 示例。
on_failure 字段
on_failure 字段用于指定转换失败时的处理方式,支持以下几种方式:
ignore: 忽略转换失败的字段,不写入数据库。default: 写入默认值。默认值由default字段指定。
default 字段
default 字段用于指定转换失败时的默认值。
Transform 示例
例如,对于以下 log 数据:
{
"num_field_a": "3",
"string_field_a": "john",
"string_field_b": "It was snowing when he was born.",
"time_field_a": 1625760000
}
使用以下配置:
transform:
- fields:
- string_field_a, name
type: string
index: tag
- fields:
- num_field_a, age
type: int32
- fields:
- string_field_b, description
type: string
index: fulltext
- fields:
- time_field_a, bron_time
type: epoch, s
index: timestamp
将得到以下结果:
{
"name": "john",
"age": 3,
"description": "It was snowing when he was born.",
"bron_time": 2021-07-08 16:00:00
}