Prometheus 技术秘笈(七):PromQL详解 - 从解析到执行的全流程
约 4257 字大约 14 分钟
2026-03-28
导语
PromQL是Prometheus的“灵魂”,能灵活查询和聚合时序数据,但很多人只懂基本用法,不懂底层执行逻辑。本文从源码角度拆解PromQL的解析、抽象语法树(AST)执行全流程,深入讲解Engine引擎的核心作用、AST的生成逻辑,以及各类语法节点的执行细节,让你不仅会用PromQL,更懂它“背后的逻辑”。
一、Engine引擎:PromQL的执行核心
Engine是promql模块处理PromQL查询的核心入口,定义在promql.Engine结构体中。它不仅负责解析和执行每一条PromQL查询,还管理查询生命周期、控制并发、监控执行性能。理解Engine的核心设计,是掌握PromQL执行逻辑的第一步。
1.1 Engine的核心字段
Engine的核心字段承载了查询管控、性能监控、资源限制等关键能力,具体如下:
- metrics(*engineMetrics):记录查询全生命周期的监控指标,用于排查性能问题,核心指标包括:
currentQueries:当前并发执行的查询数;maxConcurrentQueries:配置的最大并发查询数;queryQueueTime:查询排队等待执行的时长;queryPrepareTime:查询解析、准备的时长;queryInnerEval:查询核心计算的执行时长;queryResultSort:查询结果排序的时长。
- timeout(time.Duration):全局查询超时时间,所有PromQL查询均受该超时控制;
- gate(*gate.Gate):并发控制器,通过
gate.Start()/gate.Done()管控并发查询数,超过上限时新查询会阻塞等待; - maxSamplesPerQuery(int):单个查询能处理的时序点上限(含临时计算点和最终返回点),防止单查询占用过多资源。
1.2 Query实例的创建
当通过HTTP API(/api/v1/query//api/v1/query_range)发起PromQL查询时,Engine会先创建query实例。核心入口是NewInstantQuery(瞬时查询)和NewRangeQuery(范围查询)方法,以NewInstantQuery为例:
func (ng *Engine) NewInstantQuery(q storage.Qualifiable, qs string, ts time.Time) (Query, error) {
expr, err := ParseExpr(qs) // 解析PromQL语句为抽象语法树(AST)
if err != nil {
return nil, err // 补充缺失的错误返回逻辑
}
qry := ng.newQuery(q, expr, ts, ts, 0) // 创建Query实例
qry.q = qs // 记录原始PromQL语句
return qry, nil
}该过程的核心是解析PromQL语句生成AST,并将AST、查询时间、原始语句等封装为query实例,为后续执行做准备。
1.3 词法&语法分析:从PromQL到AST
PromQL的解析依赖词法分析器(Lexer)和语法分析器(Parser),最终将字符串形式的PromQL语句转换为抽象语法树(AST)——这是机器能执行的“结构化查询逻辑”。
(1)核心抽象:Statement与Expr
- Statement:封装AST的顶层结构,不同场景对应不同实现:
EvalStmt:HTTP API查询使用,核心字段包括Expr(AST根节点)、Start/End(查询起止时间)、Interval(查询步长,对应HTTP请求的step参数);RecordStmt/AlertStmt:分别用于Recording Rule、Alerting Rule的执行。
- Expr:AST的节点抽象,内嵌
Node接口(定义Type()方法,返回vector/scalar/matrix/string四种PromQL基础类型),核心实现如下表:
| Expr类型 | 对应PromQL语法 | 核心说明 |
|---|---|---|
| Call | 函数调用(如rate、sum) | 封装函数名、参数列表 |
| MatrixSelector | 范围向量查询(Range Vector) | 关联底层存储的范围时序查询 |
| BinaryExpr | 二元运算符(+、-、>、==等) | 包含左右操作数、运算符类型 |
| AggregateExpr | 聚合函数(sum、avg、topk等) | 包含聚合类型、分组规则 |
| UnaryExpr | 一元运算符(+、-) | 仅包含单个操作数 |
| ParenExpr | 括号包裹的表达式 | 仅嵌套子Expr,无额外逻辑 |
| StringLiteral | 字符串常量 | 存储字符串值 |
| VectorSelector | 瞬时向量查询(Instant Vector) | 关联底层存储的瞬时时序查询 |
| NumberLiteral | 数字常量 | 存储整型/浮点型数值 |
注:仅
MatrixSelector和VectorSelector会直接触发底层存储的时序数据查询,是AST中与“数据获取”关联最紧密的节点。
(2)AST示例
以PromQL语句100-(1-sum(go_gc_durationSeconds{instance="localhost:9090",job="test_job"}))为例,解析后的AST结构可简化为:
BinaryExpr(减法:100 - ...)
├─ NumberLiteral(100)
└─ ParenExpr
└─ BinaryExpr(减法:1 - ...)
├─ NumberLiteral(1)
└─ AggregateExpr(sum)
└─ VectorSelector(go_gc_durationSeconds{instance="localhost:9090",job="test_job"})1.4 Engine的执行入口:exec方法
创建query实例后,Engine通过exec()方法执行查询,核心逻辑包括:监控打点、超时控制、并发控制、分发执行。核心代码片段如下:
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
ng.metrics.currentQueries.Inc()
defer ng.metrics.currentQueries.Dec()
// 超时控制:绑定全局超时时间,补充cancel释放逻辑
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
defer cancel()
q.cancel = cancel
// 监控总执行时长
execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime)
defer execSpanTimer.Finish()
// 监控排队时长 & 并发控制
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
if err := ng.gate.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to acquire concurrency gate: %w", err) // 补充错误信息
}
defer ng.gate.Done()
queueSpanTimer.Finish()
// 监控核心执行时长
evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime)
defer evalSpanTimer.Finish()
// 分发到具体执行逻辑
switch s := q.Statement.(type) {
case *EvalStmt:
return ng.execEvalStmt(ctx, q, s)
case *RecordStmt:
return ng.execRecordStmt(ctx, q, s) // 补充缺失的RecordStmt分支
case *AlertStmt:
return ng.execAlertStmt(ctx, q, s) // 补充缺失的AlertStmt分支
default:
return nil, fmt.Errorf("unsupported statement type: %T", s) // 优化错误提示
}
}二、PromQL的执行流程
execEvalStmt()是PromQL查询的核心执行入口,整体流程分为两步:数据查询(populateSeries) 和AST节点计算(evaluator.eval)。
2.1 数据查询:populateSeries方法
该方法的核心是遍历AST,从底层存储(TSDB/远程存储)查询时序数据,并将数据绑定到VectorSelector/MatrixSelector节点,具体步骤如下:
(1)调整查询时间范围
深度优先遍历AST,根据VectorSelector/MatrixSelector的Offset(偏移量)、LookbackDelta(默认5分钟,容错时序采集延迟)调整查询起止时间,确保获取完整的时序数据。核心代码片段:
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) {
var maxOffset time.Duration
// 遍历AST,计算最大偏移量(覆盖VectorSelector/MatrixSelector)
Inspect(s.Expr, func(node Node, _ []Node) error {
switch n := node.(type) {
case *VectorSelector:
// LookbackDelta默认5分钟,兜底采集延迟
baseOffset := LookbackDelta // 补充默认值定义:const LookbackDelta = 5 * time.Minute
if n.Offset+baseOffset > maxOffset {
maxOffset = n.Offset + baseOffset
}
case *MatrixSelector:
// 范围向量需额外叠加Range字段
baseOffset := n.Range + LookbackDelta
if n.Offset+baseOffset > maxOffset {
maxOffset = n.Offset + baseOffset
}
}
return nil
})
// 校验时间范围合法性
start := s.Start.Add(-maxOffset)
if start.After(s.End) {
return nil, fmt.Errorf("adjusted start time %s is after end time %s", start, s.End)
}
// 创建存储查询器
querier, err := q.Querier(ctx, start, s.End)
if err != nil {
return nil, fmt.Errorf("create storage querier failed: %w", err)
}
// 遍历AST,查询时序数据并绑定到节点
Inspect(s.Expr, func(node Node, path []Node) error {
params := &storage.SelectParams{
Start: timestamp.FromTime(start),
End: timestamp.FromTime(s.End),
Step: int64(s.Interval / time.Millisecond),
}
switch n := node.(type) {
case *VectorSelector:
// 调整Offset对应的时间范围
if n.Offset > 0 {
offsetMs := durationMilliseconds(n.Offset)
params.Start -= offsetMs
params.End -= offsetMs
}
// 从存储查询数据(标签匹配)
set, err := querier.Select(params, n.LabelMatchers...)
if err != nil {
return fmt.Errorf("vector selector query failed: %w", err)
}
// 绑定数据到VectorSelector节点
n.series, err = expandSeriesSet(ctx, set)
if err != nil {
return fmt.Errorf("expand series set failed: %w", err)
}
case *MatrixSelector:
// 范围向量查询逻辑(复用VectorSelector基础逻辑,补充Range过滤)
if n.Offset > 0 {
offsetMs := durationMilliseconds(n.Offset)
params.Start -= offsetMs
params.End -= offsetMs
}
set, err := querier.Select(params, n.LabelMatchers...)
if err != nil {
return fmt.Errorf("matrix selector query failed: %w", err)
}
n.series, err = expandSeriesSet(ctx, set)
if err != nil {
return fmt.Errorf("expand series set failed: %w", err)
}
}
return nil
})
return querier, nil
}(2)核心节点的字段说明
| 节点类型 | 核心字段 | 作用说明 |
|---|---|---|
| VectorSelector | Name、Offset、LabelMatchers、series | 瞬时向量查询:指标名、偏移量、标签过滤、绑定的时序数据 |
| MatrixSelector | Name、Offset、Range、LabelMatchers、series | 范围向量查询:在瞬时向量基础上增加时间范围(Range) |
2.2 AST节点计算:evaluator.eval方法
获取时序数据后,Engine创建evaluator实例,递归遍历AST节点,逐节点计算最终结果。evaluator.eval()会根据节点类型执行不同逻辑,核心是递归计算 + 类型转换 + 资源管控(如maxSamplesPerQuery)。
(1)VectorSelector节点:基础时序查询
VectorSelector是最基础的节点,负责将底层存储的storage.Series转换为PromQL可计算的Matrix([]promql.Series),核心逻辑:
case *VectorSelector:
mat := make(Matrix, 0, len(n.series)) // 修正变量名:e.series → n.series
it := storage.NewBuffer(durationMilliseconds(LookbackDelta))
// 资源管控:初始化当前处理的样本数
currentSamples := 0
maxSamples := ev.maxSamplesPerQuery // 补充maxSamples定义
for i, s := range n.series {
it.Reset(s.Iterator())
ss := Series{
Metric: n.series[i].Labels(),
Points: make([]Point, 0, numSteps), // 替换getPointSlice,补充初始化逻辑
}
// 按步长遍历时间,填充时序点
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
// 补充边界校验:防止时间溢出
if ts > ev.endTimestamp {
break
}
_, v, ok := ev.vectorSelectorSingle(it, n, ts) // 修正变量名:e → n
if ok {
// 资源管控:超过上限则终止
if currentSamples >= maxSamples {
ev.err = fmt.Errorf("exceed max samples per query: %d", maxSamples)
return nil
}
ss.Points = append(ss.Points, Point{V: v, T: ts})
currentSamples++
}
}
if len(ss.Points) > 0 {
mat = append(mat, ss)
}
}
ev.currentSamples = currentSamples // 同步样本数到evaluator
return mat关键辅助方法说明:vectorSelectorSingle
当查询步长与时序采集周期不匹配时(如采集周期10s,查询步长1s/15s),该方法会“补齐”时序点——若指定时间戳无数据,取前一个有效点的值,保证查询结果的连续性。
(2)AggregateExpr节点:聚合计算
AggregateExpr对应sum/avg/topk/quantile等聚合函数,核心逻辑由evaluator.rangeEval() + evaluator.aggregation()实现。
① rangeEval:按步长处理时序点
rangeEval是聚合计算的核心封装,步骤如下:
func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ...Expr) Matrix {
// 校验参数合法性
if len(exprs) == 0 {
return nil
}
numSteps := int((ev.endTimestamp - ev.startTimestamp)/ev.interval) + 1
if numSteps <= 0 {
return nil
}
matrices := make([]Matrix, len(exprs))
// 1. 递归计算子表达式(补充错误处理)
for i, e := range exprs {
if e == nil || e.Type() == ValueTypeString {
continue
}
val, err := ev.eval(e) // 补充显式递归调用
if err != nil {
ev.err = err
return nil
}
mat, ok := val.(Matrix)
if !ok {
ev.err = fmt.Errorf("expr %d is not Matrix type", i)
return nil
}
matrices[i] = mat
}
vectors := make([]Vector, len(exprs))
args := make([]Value, len(exprs))
enh := &EvalNodeHelper{out: make(Vector, 0)}
seriesMap := make(map[uint64]Series, numSteps)
// 2. 按步长遍历时间
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
// 提取当前步长的时序点,封装为Vector
for i := range exprs {
vectors[i] = vectors[i][:0]
for _, series := range matrices[i] {
// 精准匹配当前时间戳的点
point, ok := findPoint(series.Points, ts) // 补充findPoint辅助方法
if ok && ev.currentSamples < ev.maxSamples {
vectors[i] = append(vectors[i], Sample{
Metric: series.Metric,
Point: point,
})
ev.currentSamples++
}
}
args[i] = vectors[i]
}
// 3. 执行聚合回调
result := f(args, enh)
if ev.err != nil {
return nil
}
// 4. 封装结果(区分瞬时/范围查询)
if ev.isInstantQuery() { // 补充瞬时查询判断方法
mat := make(Matrix, len(result))
for i, s := range result {
mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}}
}
return mat
} else {
// 范围查询:按Metric Hash分组拼接结果
for _, sample := range result {
h := sample.Metric.Hash()
ss, ok := seriesMap[h]
if !ok {
ss = Series{
Metric: sample.Metric,
Points: make([]Point, 0, numSteps),
}
}
ss.Points = append(ss.Points, sample.Point)
seriesMap[h] = ss
}
}
}
// 范围查询最终结果整理
mat := make(Matrix, 0, len(seriesMap))
for _, ss := range seriesMap {
if len(ss.Points) > 0 {
mat = append(mat, ss)
}
}
return mat
}② aggregation:聚合逻辑实现
aggregation()方法处理具体的聚合规则,以topk为例(覆盖核心逻辑):
func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param float64, vec Vector, enh *EvalNodeHelper) Vector {
// 1. 解析参数(topk/bottomk)
var k int64
if op == itemTopK || op == itemBottomK {
k = int64(param)
if k < 1 {
ev.err = fmt.Errorf("invalid topk/bottomk param: %d (must >=1)", k)
return enh.out
}
}
// 2. 按without/by规则分组
result := make(map[uint64]*groupedAggregation)
for _, s := range vec {
// 计算分组Key
var groupingKey uint64
if without {
// without=true:排除指定标签生成分组Key
groupingKey = s.Metric.HashWithoutLabels(grouping...)
} else {
// without=false:仅保留指定标签生成分组Key
groupingKey = s.Metric.HashWithLabels(grouping...)
}
// 初始化/更新分组聚合实例
group, ok := result[groupingKey]
if !ok {
// 构建分组标签(过滤无关标签)
m := labels.NewBuilder(s.Metric)
if without {
m.Del(grouping...)
m.Del(labels.MetricName) // 移除指标名(聚合后无具体指标)
} else {
// 仅保留grouping指定标签
keepLabels := make(map[string]bool)
for _, l := range grouping {
keepLabels[l] = true
}
for _, l := range s.Metric {
if !keepLabels[l.Name] {
m.Del(l.Name)
}
}
}
// 初始化堆结构(按值排序)
group = &groupedAggregation{
labels: m.Labels(),
heap: make(vectorByValueHeap, 0, k),
}
result[groupingKey] = group
}
// 3. topk核心逻辑:堆排序筛选前K值
if op == itemTopK {
if int64(len(group.heap)) < k || group.heap[0].V < s.V {
if int64(len(group.heap)) == k {
heap.Pop(&group.heap) // 弹出最小值
}
heap.Push(&group.heap, &Sample{
Point: s.Point,
Metric: s.Metric,
})
}
} else if op == itemBottomK {
// 补充bottomk逻辑(与topk相反,筛选最小值)
if int64(len(group.heap)) < k || group.heap[0].V > s.V {
if int64(len(group.heap)) == k {
heap.Pop(&group.heap)
}
heap.Push(&group.heap, &Sample{
Point: s.Point,
Metric: s.Metric,
})
}
}
}
// 4. 封装最终结果
for _, aggr := range result {
switch op {
case itemTopK:
sort.Sort(sort.Reverse(aggr.heap)) // 降序排列
for _, v := range aggr.heap {
enh.out = append(enh.out, *v)
}
case itemBottomK:
sort.Sort(aggr.heap) // 升序排列
for _, v := range aggr.heap {
enh.out = append(enh.out, *v)
}
// 补充sum/avg/max/min等聚合逻辑
case itemSum:
sum := 0.0
for _, p := range aggr.points { // 补充aggr.points初始化逻辑
sum += p.V
}
enh.out = append(enh.out, Sample{
Metric: aggr.labels,
Point: Point{V: sum, T: aggr.timestamp},
})
}
}
return enh.out
}(3)BinaryExpr节点:二元运算
BinaryExpr对应算术运算(+、-、*、/)、比较运算(==、!=、>、<)、集合运算,核心难点是Vector Matching规则(两个Vector运算时的匹配逻辑)。
① 二元运算的基础规则
| 运算类型 | 参与运算的类型 | 执行逻辑 |
|---|---|---|
| 算术运算 | 数字 + 数字 | 直接返回标量计算结果 |
| 算术运算 | 数字 + Instant Vector | 数字与Vector中每个时序点的值运算,返回新的Instant Vector |
| 算术/比较运算 | Instant Vector + Instant Vector | 按Vector Matching规则匹配时序点,匹配成功则运算;失败时,比较运算过滤该点,算术运算丢弃该点 |
② Vector Matching核心规则
当两个Instant Vector参与运算时,Prometheus会遍历左Vector的每个点,按以下规则从右Vector匹配点:
(1)One-to-one 匹配(一一对应)
要求两个Vector的标签集合完全一致才能匹配。
示例(匹配成功):
go_memstats_heap_idle_bytes{instance="localhost:9090",job="test_job"} + go_memstats_heap_alloc_bytes{instance="localhost:9090",job="test_job"}示例(匹配失败):
go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="0"} + go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="1"}可通过ignoring(忽略指定标签)或on(仅匹配指定标签)调整规则:
# ignoring:忽略quantile标签,匹配成功
go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="0"} + ignoring(quantile) go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="1"}
# on:仅匹配instance和job标签,匹配成功
go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="0"} + on(instance,job) go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="1"}(2)Many-to-one 匹配(多对一)
左Vector有多个点,右Vector仅有一个匹配点,需通过group_left指定“多”的一侧为左Vector。
加粗图7-10:Many-to-one匹配逻辑示例
(此处放置图片,直接嵌入正文)
图注:左Vector多个时序点匹配右Vector单个点的规则示意
示例:
go_gc_duration_seconds{instance="localhost:9090",job="test_job"} + ignoring(quantile) group_left
go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="0"}(3)One-to-many 匹配(一对多)
与Many-to-one相反,右Vector有多个点,左Vector仅有一个匹配点,需通过group_right指定“多”的一侧为右Vector:
go_gc_duration_seconds{instance="localhost:9090",job="test_job",quantile="0"} + ignoring(quantile) group_right
go_gc_duration_seconds{instance="localhost:9090",job="test_job"}③ BinaryExpr的执行逻辑
case *BinaryExpr:
// 1. 递归计算左右子表达式
leftVal, err := ev.eval(n.LHS)
if err != nil {
return nil, err
}
rightVal, err := ev.eval(n.RHS)
if err != nil {
return nil, err
}
// 2. 解析Vector Matching规则
matchOpts := n.MatchOptions // 补充MatchOptions定义:包含ignoring/on/group_left/group_right
var leftVec, rightVec Vector
// 类型转换:标量→Vector(适配数字+Vector运算)
if leftVal.Type() == ValueTypeScalar {
leftVec = scalarToVector(leftVal.(Scalar), rightVec) // 补充转换方法
} else {
leftVec = leftVal.(Vector)
}
if rightVal.Type() == ValueTypeScalar {
rightVec = scalarToVector(rightVal.(Scalar), leftVec)
} else {
rightVec = rightVal.(Vector)
}
// 3. 按规则匹配两个Vector
matched := make(map[uint64]Sample)
matchVector(leftVec, rightVec, matchOpts, matched) // 补充匹配核心方法
// 4. 执行二元运算
result := make(Vector, 0, len(matched))
for _, s := range matched {
var val float64
switch n.Op {
case itemAdd:
val = s.Left.V + s.Right.V
case itemSub:
val = s.Left.V - s.Right.V
case itemGt:
// 比较运算:返回布尔值(1=真,0=假)
val = boolToFloat(s.Left.V > s.Right.V)
// 补充其他运算符逻辑
}
result = append(result, Sample{
Metric: s.Left.Metric,
Point: Point{V: val, T: s.Left.T},
})
}
return result(4)其他节点执行逻辑
| 节点类型 | 核心执行逻辑 |
|---|---|
| Call | 函数调用(如rate/increase/abs):遍历子表达式的时序点,执行函数计算(如rate计算每秒增长率) |
| ParenExpr | 括号表达式:仅递归执行内部子表达式,无额外逻辑 |
| UnaryExpr | 一元运算(+/-):遍历子表达式的时序点,对值执行一元运算后返回 |
| StringLiteral | 字符串常量:直接返回字符串值,无计算逻辑 |
| NumberLiteral | 数字常量:直接返回标量值,无计算逻辑 |
小结
PromQL的执行逻辑可总结为“解析-查数据-递归计算”三步:
- 解析:Engine通过词法/语法分析将PromQL字符串转换为AST;
- 查数据:遍历AST,从底层存储查询时序数据并绑定到VectorSelector/MatrixSelector节点;
- 递归计算:从AST根节点开始,逐节点执行计算逻辑,最终返回查询结果。
理解底层逻辑的价值在于:
- 优化查询性能:通过
maxSamplesPerQuery、LookbackDelta等参数避免慢查询; - 精准调试:通过Engine的监控指标(
queryQueueTime/queryInnerEval)定位查询瓶颈; - 写出更健壮的PromQL:掌握Vector Matching规则,避免匹配失败、数据丢失问题;
- 排查异常:理解AST节点执行逻辑,快速定位“查不到数据”“计算结果异常”等问题。
下一篇将聚焦Prometheus的Rule模块,拆解Recording Rule(记录规则)和Alerting Rule(告警规则)的执行逻辑,讲解PromQL在规则中的落地方式,以及规则调度的核心机制。
