专业名词对齐

token
token 究竟是个什么东西?它是怎么计算的?相信很多同学或多或少的都有这种疑问,在各家大模型产品里也都是按 token 进行计费的,比如每 千 token/0.01 元 这样的后付费模式。 所以我们先来讲 token 的概念和它的计算方法。

如果用我们的话来理解 token,那可以把它翻译成 “词”,大模型理解人类语言是一个词一个词去理解的,而不是一个字一个字的理解。当一段文本到来的时候,系统需要先通过 “分词器”(tokenizer)把一个句子切分成一个一个的词去理解,分词器有一张词表,词表中的词都有自己的 id,而模型就是通过这些 id 来区分这些词的。并且我们说它是词,其实也不完全正确,因为标点符号也会被切成一个独立的 token,但用 “词” 来表达比较容易人类理解。比如在 “我喜欢篮球,也喜欢 RAP” 这句话中,“逗号” 是一个 token,“喜欢” 是一个 token,“我” 是一个 token,“RAP” 中 “R” 可能是个独立的 token,“A” 也可能是个独立的 token,而 “RAP” 合起来也可能是个独立的 token,这就要看不同的分词器中,那张词表是怎么定义的

那大模型为什么要这样去计算呢?我们举个例子, 请问大家知道 “旯” 这个字念什么么?是不是不知道? 那我这么写一下 “犄角旮旯”,是不是就一下子认出来了,再举个例子 “圳” 这个字有多少同学认得? 而如果我说 “深圳” 是不是就一下子反应过来了。 这是人类大脑的运作模式,节省了大脑的记忆和提取的成本,而模型也是模仿这个思路来的。 所以模型吐出答案的时候,也是一个 token 一个 token 的吐出来的,这也对应了后面性能测试指标中的 “吐字率”,不过这个指标我们后面再说。

了解 token 的概念很重要,因为有很多指标是与 token 息息相关的,并且我们接下来要说的一个名词也跟它有关。

=============================================================
模型支持的长度
不知道大家去看一些大模型产品的时候,模型参数上都会描述这个模型是 72B 32K/16K 这样的字眼, 72B 相信很多同学都知道了,B 是 billion(十亿) 的缩写,代表着模型参数 (也叫特征) 的量级。而这个 32K/16K 是什么意思?

32K/16K 描述的分别是模型支持的最长的输入 token 数量以及输出 token 的数量。 毕竟模型无法处理超过自己承受极限的用户请求(显存是有限的)。 一旦用户输入的问题超过了规定的 token 数量,一般就会报错或者截断(按策略删除一部分)

这也就需要我们在收集测试数据的时候注意模型的相关配置,尤其要明白 token 数不是字符数,token 是分词的结果,所以我们有些时候需要自己引入分词器来计算测试数据的 token 长度。比如:

from transformers import AutoTokenizer

加载分词器

tokenizer = AutoTokenizer.from_pretrained(‘./deepseek-r1-tokenizer/’)

def get_tokens(text: str):
tokens = tokenizer.tokenize(text)
token_ids = tokenizer.convert_tokens_to_ids(tokens)
return token_ids
上面就是一个计算 token 的代码,我们需要使用 transformers 库并加载分词器配置(通常可以从网络上下载,有很多开源项目)。 这都是在确认自己的测试数据有多少 token。
主要目的:
保证测试数据不会超过模型限定的输入长度,避免报错或者截断。
将测试数据进行分组,问题的 token 数量与性能有着直接的关系,token 数量越大,各项性能指标就会差,这个应该很好理解, 模型要计算 100K 的问题和计算 1K 的问题需要的时间是不一样的。所以测试人员往往需要测试出不同规模的数据下的性能表现。
这里需要注意的是, 有些厂商会明确输入和输出的限定参数,比如输入不超过 32k,输出不超过 16k。但实际上底层模型上往往只有一个参数,比如 128K,代表输入和输出的总数不能超过 128K。

模型分桶
我们知道了 token 和模型支持的长度这两个概念后, 我们就要再讲一件事情,模型有一个参数来限定支持的长度,而对于同样的模型,同样的数据,这个支持 token 的长度参数设置的不同,往往有不同的性能表现。

比如同样是 deep-seek r1 模型,同样的一份数据。 我们给模型设置成支持 32K 长度和支持 128K 长度,他们在这份数据下的性能表现可能是不一样的。 比如对于现今的两大模型推理框架 vllm 来说(另一个是 sglang),vllm 使用的 page attention 机制,可以理解为跟内存管理中的虚拟内存很像(page memory),它的特点是针对任何一个用户的请求,都会在内存中预留出模型限定的长度的内存空间出来。 比如模型限定了支持的 token 为 32K,那么即便用户的只是问了一个 “你是谁”,那也要为这个问题预留出 32K token 的内存空间出来。
模型推理框架:
1.vllm 使用的 page attention 机制,可以理解为跟内存管理中的虚拟内存很像(page memory),它的特点是针对任何一个用户的请求,都会在内存中预留出模型限定的长度的内存空间出来。 比如模型限定了支持的 token 为 32K,那么即便用户的只是问了一个 “你是谁”,那也要为这个问题预留出 32K token 的内存空间出来。

2.sglang使用内容分配方式,不会预留空间,属于实报实销,面对高并发的项目可能会出现性能问题,因此要根据不能场景使用不同类型的模型以及数据规模在vllm上的预留模型大小过大或者过小的情况

所以在真实的线上场景中,模型是按策略 “分桶” 的,比如可以按用户输入的长度分到适合处理这个数据的模型中。也可以按场景分桶,比如走了联网检索和文档检索这样的 RAG 场景的分到一个单独的桶中。

总结
模型计算的是 token,而不是字。
模型有自己支持的输入输出长度。
真实环境中不同的数据要分配到不同的桶中的模型。
测试人员需要把数据按 token 数量分组,测试不同数据规模下的性能表现。

=======================================================
大模型性与常规产品性能测试的差异点
首先我们来对比一下大模型与普通的互联网产品在性能测试场景上有什么区别,它主要表现在两方面:

性能测试指标
性能测试的发压方法
大模型的性能测试指标
普通产品的测试指标主要包含平均响应时间和 QPS 等。而大模型由于在用户体验方面有一些特点,所以评估大模型性能的指标时就有些许不同。

吐字率(平均输出 token 速率/s):大模型问答多为流式输出的交互形式,系统不会等模型把答案都计算完毕才返回给用户,而是模型计算出一个 token 就会返回给用户一个 token(有些系统可能也会有合并若干个 token 然后输出给用户),这是因为模型思考的时间通常都会很长,如果不及时给反馈就会流失用户。所以在这种交互模式下,模型每秒钟能返回给用户多少个 token 就成为了一个非常重要的性能指标。单位时间内计算出的 token 越多,就证明模型的计算性能更好。现在一般的大模型都会要求在某些并发下不低于 20/s 的吐字速率。
首 token 耗时(从用户发送问题到返回第一个 token 的时间):这里需要简扼说明一下首 token 与后续 token 的计算是不同的,因为不少同学会困惑为什么有了吐字率可以评估模型的计算能力,还需要再单独计算一个首 token 耗时。 事实上,吐字率的计算也是排除了首 token 的。这涉及到了大模型训练和推理的原理。这里先不过度展开,大家只需要知道对于模型来说,计算第一个 token 和后续 token 是不一样的过程就可以了。 而首 token 的耗时直接影响了用户体验,毕竟如果用户问一个问题,结果几十秒都没有返回第一个字,那么也可能会流失用户。
QPM:这个指标比较容易理解,毕竟我们都是 QPS 是什么。但因为大模型问答的时间都太长了, 所以我们统计的是分钟维度的。
一般来说主要看上面三个指标, 但还有一些其他指标需要对齐。比如:

输入平均 token 数量(input tokens):用户输入的问题长度很大程度上影响了计算性能,输入越长,首 token 的耗时就越长,吐字率也会受到影响,所以我们测试的时候需要对齐测试数据是在什么量级上的。 一般来说可以把测试数据按长度进行分组,比如测试长文本的时候,可以分成 16k~32k, 32k48k,48k64k 以此类推。
输出平均 token 数量(output tokens):模型输出的答案有多长,这一点也影响到了 QPM 指标。 有些模型生成的答案和思考过程言简意赅,而有些模型的输出很长。所以在对比两个模型性能的时候,需要对齐输出的答案长度是否在一个量级。
大模型的发压方法
大多数同学在性能测试的经验都是阻塞式 http 接口,统计 qps 和平均响应时间为主。而对于大模型的压测来说,情况就相对复杂一些,大模型的压测都是对流式接口进行测试,大多是调用 sse,websock 和 openai sdk 这 3 种形式。流式接口就好像是在调用异步接口一样(很像),需要我们通过轮询的方式去分析每一个传回来的包并做分析,如果是 openai sdk 的接口,那么交互起来还比较方便,因为 sdk 里把很多东西都封装好了。 而如果是 websocket 和 sse 则比较麻烦,需要去分析包的内容来判断哪些是思考包,哪些是答案包,哪些是统计包,哪些是心跳包。处理起来就麻烦了很多。 为了简单,我这里还是用 openai sdk 的格式给大家写个 demo:

client = OpenAI(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key=“sk-xxx”,
api_key=“sk-xxxxx”,
# 如何获取API Key:https://help.aliyun.com/zh/model-studio/developer-reference/get-api-key
base_url=“https://dashscope.aliyuncs.com/compatible-mode/v1”,
)

response = client.chat.completions.create(
    model="deepseek-r1",
    messages=[{'role': 'system', 'content': 'You are a helpful assistant.'},
              {'role': 'user', 'content':"今天北京天气怎么样?"}],
    stream=True,
    stream_options={
        "include_usage": True
    }
)

# 初始化变量
first_packet_time = None
start_time = time.time()

# 标记第一个思考包是否出现,主要用于计算首token耗时
is_first_think = False

# 输出token数量
completion_tokens = 0
# 输入token数量
prompt_tokens = 0

for chunk in response:
    # 判断思考包,chunk.choices[0].delta.reasoning_content保存的是思考内容
    if len(chunk.choices) > 0 and chunk.choices[0].delta.reasoning_content is not None and chunk.choices[
        0].delta.reasoning_content != "":
        if not is_first_think:
            first_packet_time = time.time()
            first_elapsed_time = first_packet_time - start_time
            is_first_think = True

    elif len(chunk.choices) > 0 and chunk.choices[0].delta.content is not None:
        # 这里是答案包
        print(chunk.choices[0].delta.content, end="")
    else:
        # 这里一般是尾包,用于返回模型处理的token数量信息,主要用于计费,我们这里用来计算输出和输出的token的长度。
        completion_tokens = chunk.usage.completion_tokens
        prompt_tokens = chunk.usage.prompt_tokens

上面是我用阿里百炼上面的 deepseek-r1 模型编写的 demo,这里需要注意的是 deepseek-r1 是一个思考模型,所以他有很长的思考过程。什么是思考模型呢? 我在这里给大家截个图:

图片
上图是在豆包上使用 deepseek-r1 的截图,我点击了深度思考能力,询问了一个问题,模型会把整个思考过程像上图一样输出出来。 从感官上这就是思考模型了,它擅长思考用户场景,意图识别,在大量的智能体(agent)场景中都需要思考模型作为核心,判断用户意图,决策后续行动以及调用对应工具。

所以在上述代码中,首包耗时的指标计算是从思考包开始计算的。如果大家测试的不是思考模型,那需要从答案包开始计算。 当然我们也可以把首思考包耗时和首答案包耗时都计算出来,事实上在项目中我们也确实是这样做的。

各位对大模型输出的包不熟悉的话可以找一个环境实际调用一下看看,也可以看看 openai sdk 的文档。上面的代码逻辑还是比较简单的,关键逻辑都写在了代码注释中。

压测工具方面,我们通常习惯用 locust 进行压测,只不过 locust 自带的指标中没有 QPM, 吐字率,首 token 耗时这种指标,所以我们需要通过 locust 的函数进行自定义上报,且 locust UI 上已经很难算出这些指标了,我们需要下载对应的 csv 文件后进行二次计算产出指标。

from openai import OpenAI
import time
from locust import TaskSet, HttpUser, task, run_single_user, events
import random

def send_stream(query):

client = OpenAI(
    # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
    api_key="sk-xxxx",
    # 如何获取API Key:https://help.aliyun.com/zh/model-studio/developer-reference/get-api-key
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

response = client.chat.completions.create(

    # model="deepseek-v3",
    model="deepseek-r1",
    messages=[{'role': 'system', 'content': 'You are a helpful assistant.'},
              {'role': 'user', 'content': query}],
    stream=True,
    stream_options={
        "include_usage": True
    }
)

# 初始化变量
first_packet_time = None
start_time = time.time()
# 标记第一个思考包是否出现,主要用于计算首token耗时
is_first_think = False
# 输出token数量
completion_tokens = 0
# 输入token数量
prompt_tokens = 0

for chunk in response:
    # 判断思考包,chunk.choices[0].delta.reasoning_content保存的是思考内容
    if len(chunk.choices) > 0 and chunk.choices[0].delta.reasoning_content is not None and chunk.choices[
        0].delta.reasoning_content != "":
        if not is_first_think:
            first_packet_time = time.time()
            first_elapsed_time = first_packet_time - start_time # 计算首包耗时,因为我们用的是deepseek R1思考模型,所以首包其实是思考包的首包
            is_first_think = True

    elif len(chunk.choices) > 0 and chunk.choices[0].delta.content is not None:
        # 这里是答案包
        print(chunk.choices[0].delta.content, end="")
    else:
        # 这里一般是尾包,用于返回模型处理的token数量信息,用于计费,我们这里用来计算输出和输出的token的长度。
        completion_tokens = chunk.usage.completion_tokens
        prompt_tokens = chunk.usage.prompt_tokens

end_time = time.time()
tokens_per_second = completion_tokens / (end_time - first_packet_time) # 计算吐字率,因为吐字率是不计算首包耗时的,所以要把首包耗时的时间去掉
all_time = end_time - start_time # 计算整个请求花费的时间,方便计算后面的QPM

print(f"每秒输出token数量: {tokens_per_second:.2f}")

return all_time, first_elapsed_time, tokens_per_second, prompt_tokens, completion_tokens

class ModelRequestSet(TaskSet):
“”"
任务集
“”"

@task
def send_request(self):
    query = random.choice(self.user.share_data).strip()

    try:
        # 自定义指标,分别用于计算首包,吐字率,qpm,平均输入token数量和平均输出token数量。
        all_time, first_elapsed_time, tokens_per_second, input_tokens, output_tokens = send_stream(query)
        events.request.fire(request_type="http", name="first", response_time=first_elapsed_time,
                            response_length=0)
        events.request.fire(request_type="http", name="token_per_second", response_time=tokens_per_second,
                            response_length=0)
        events.request.fire(request_type="http", name="whole", response_time=all_time,
                            response_length=0)
        events.request.fire(request_type="http", name="input_token", response_time=input_tokens,
                            response_length=input_tokens)
        events.request.fire(request_type="http", name="output_token", response_time=output_tokens,
                            response_length=output_tokens)
    except Exception as e:
        events.request.fire(request_type="http", name="error", response_time=1 * 1000,
                            response_length=0, exception=f"大模型调用错误:{e}")
        print(e)

def get_test_data():
return [‘你是谁’, “今天北京天气如何”, “胃疼怎么办”]

class ModelUser(HttpUser):
“”"
- 会产生并发用户实例
- 产生的用户实例会依据规则去执行任务集

"""
# 定义的任务集
tasks = [ModelRequestSet, ]
host = 'http://www.baidu.com'
share_data = get_test_data()

压测逻辑同样写在了代码注释中,包括各项指标的计算方法也都在注释中。 压测启动起来后的样子是这样的:

图片
为了输出最终指标,我们需要使用 locust 的 --csv 参数把结果保存到本地路径中,或者也可以选择从界面上下载:

图片
当 csv 文件下载好后,可以通过下面的脚本计算指标:

coding=utf-8

import csv
import sys
import os

‘’’
计算最终指标的脚本
‘’’

def deal_data(data):
tmp = {}
tmp[“SUC_REQUEST_COUNT”] = int(data[‘whole’][‘Request Count’])
print(data)
if ‘error’ in data:
tmp[“FAIL_REQUEST_COUNT”] = int(data[‘error’][‘Failure Count’])
else:
tmp[“FAIL_REQUEST_COUNT”] = 0
tmp[“REQUEST_COUNT”] = tmp[“SUC_REQUEST_COUNT”] + tmp[“FAIL_REQUEST_COUNT”]
tmp[‘FAIL_RATE’] = 100 * tmp[‘FAIL_REQUEST_COUNT’] / tmp[‘REQUEST_COUNT’]

# qps qpm
tmp['QPS'] = float(data['whole']['Requests/s'])
tmp['QPM'] = tmp['QPS'] * 60


# 首token耗时
tmp['FIRST_TOKEN_TIME'] = float(data['first']['Average Response Time'])
tmp['FIRST_TOKEN_TIME_P50'] = float(data['first']['50%'])
tmp['FIRST_TOKEN_TIME_P90'] = float(data['first']['90%'])
tmp['FIRST_TOKEN_TIME_P95'] = float(data['first']['95%'])
tmp['FIRST_TOKEN_TIME_P99'] = float(data['first']['99%'])

# input token数量
tmp['INPUT_TOKEN'] = float(data['input_token']['Average Response Time'])

# output token数量
tmp['OUTPUT_TOKEN'] = float(data['output_token']['Average Response Time'])

# 请求耗时
tmp['WHOLE_TOKEN_TIME'] = float(data['whole']['Average Response Time'])
tmp['WHOLE_TOKEN_TIME_P50'] = float(data['whole']['50%'])
tmp['WHOLE_TOKEN_TIME_P90'] = float(data['whole']['90%'])
tmp['WHOLE_TOKEN_TIME_P95'] = float(data['whole']['95%'])
tmp['WHOLE_TOKEN_TIME_P99'] = float(data['whole']['99%'])

# 平均输入输出token处理耗时
tmp['OUTPUT_TIME'] = tmp['WHOLE_TOKEN_TIME'] - tmp['FIRST_TOKEN_TIME']
tmp['IPNUT_TIME'] = tmp['FIRST_TOKEN_TIME']

# 平均每秒输出token数
tmp['OUTPUT_TOKEN_PER_SECOND'] = float(data['token_per_second']['Average Response Time'])
tmp['OUTPUT_TOKEN_PER_SECOND_P50'] = float(data['whole']['50%'])
tmp['OUTPUT_TOKEN_PER_SECOND_P90'] = float(data['whole']['90%'])
tmp['OUTPUT_TOKEN_PER_SECOND_P95'] = float(data['whole']['95%'])
tmp['OUTPUT_TOKEN_PER_SECOND_P99'] = float(data['whole']['99%'])

return tmp

def main():
if len(sys.argv) < 2:
print(f"Error: No filename provided")
print(f"Usage: python3 {sys.argv[0]} ")
sys.exit(1)

filename = sys.argv[1]
basename = os.path.splitext(os.path.basename(filename))[0]
print(f"filename {filename} basename {basename}")
with open(filename, mode='r', encoding='utf-8') as file:
    # 创建一个CSV阅读器
    csv_reader = csv.DictReader(file)

    ret = []
    # 遍历每一行
    data = {}
    for row in csv_reader:
        # 解析每一行的数据
        if row['Type'] == '':
            print(data.keys())
            ret.append(deal_data(data))
        elif row['Type'] == 'Type':
            data = {}
            continue
        data[row['Name']] = row


headers = ['QPM', 'QPS', 'INPUT_TOKEN', 'OUTPUT_TOKEN',
           'FIRST_TOKEN_TIME', 'FIRST_TOKEN_TIME_P50', 'FIRST_TOKEN_TIME_P90', 'FIRST_TOKEN_TIME_P99',
           'WHOLE_TOKEN_TIME', 'WHOLE_TOKEN_TIME_P50', 'WHOLE_TOKEN_TIME_P90', 'WHOLE_TOKEN_TIME_P99',
           'OUTPUT_TIME',  'OUTPUT_TOKEN_PER_SECOND',
           'SUC_REQUEST_COUNT', 'FAIL_REQUEST_COUNT', 'FAIL_RATE', ]

# 按header顺序写入csv
with open(f'result-{basename}.csv', mode='w', encoding='utf-8') as file:
    csv_writer = csv.DictWriter(file, fieldnames=headers)
    csv_writer.writeheader()
    for row in ret:
        filtered_row = {key: row[key] for key in headers if key in row}
        csv_writer.writerow(filtered_row)

print(f"result-{basename}.csv saved")

if name == ‘main’:
main()

====================================================

性能测试进阶
首先要明白, 模型和推理框架是两个不同的东西,模型只是一系列的文件,而推理框架需要读取模型,加载到显存中,还要根据策略把一个模型拆分到不同的 GPU 卡中(模型太大了,一个卡存不下,一个卡计算的也太慢),推理的时候要在不同的机器不同的卡中来回调度通信,所以模型虽然是同一个模型, 但是推理框架部分可以上非常多的手段,比如 TP 并行,DP 并行,EP 并行,PP 并行,PD 分离等等。

大模型推理的原理
首先我们需要知道大模型是怎么回答用户的问题的

大模型推理的时候,会先通过用户的问题去计算第一个 token(首 token),而它计算的方法就是到词表中针对每个词去算一个概率,意思就是:根据用户输入的问题,去预测词表中每一个词会出现在回答的第一个 token 的概率,比如 “我” 的概率是 80%,“你” 的概率是 10%,“他” 的概率是 5%,依此类推。 模型会计算出每个词出现在这个位置的概率。

但模型一定是取概率最高的那个词么? 很多时候不是的,不知道大家在使用大模型的时候有没有注意过 top k,top n,和温度这些参数。 因为现在的大模型几乎用的都是 sampling 模式,也就是说按一定的随机策略去采样,概率更高的词有更大的可能性会被采样到,但不是一定的。 所以模型启动的时候都要设置一个 seed 参数,就是一个采样的随机种子。 而 top k 参数,意思是按概率排序后,取前 K 个词进行采样。 比如 我设置 top k= 3, 那么在上图中 “我”,“你”,“他” 都有可能模型采样到并输出,只不过 80% 概率的 “我” 有更大的可能性被选中而已。sampling 模式下使用温度,top k,top p 来控制采样策略,所以大家会看到针对同一个问题,每次询问,模型给出的答案可能是不一样的。 这就是 sampling 模式的作用,之所以这样设计是为了让模型能更加的灵活,而不是每次都输出固定的答案。

注意:推理框架每次提测,都要测试精度,并且测试的时候需要保持 top k, top p, 温度这些参数是固定的,比如我们测试 deepseek-r1 的时候用的参数值就是论文中写的。 并选取一些开源的数据(比如 math500,都是数学题)来进行效果的评估。 因为都是数学题,都是要求输出的精准的数字,所以是可以自动化测试的。

为什么每次提测推理框架的性能优化,还要测试效果呢? 因为推理加速的很多策略,都是会影响精度的。 比如本来模型从 fp16 的存储精度变成了 int4,那么 int4 能存储的特征肯定比 fp16 的要少, 当然这带来的优点就是模型变小了, 计算变快了。 测试人员需要验证这样的优化是以不影响模型效果为前提的。

当模型计算出第一个 token 后,它会用用户的问题 + 第一个 token 再去预测第二个 token,
模型使用用户的问题预测出第一个 token,然后使用问题 + 第一个 token 去预测第二个 token,依此类推。

================================================================

Prefill 与 Decode
上述的推理过程其实被划分成了两个大的阶段,分别是 prefill(预填充)和 decode(解码)阶段。prefill 阶段负责计算 K 矩阵和 V 矩阵并生成首 token,并且 K,V 会保存到 cache 中,而 decode 负责用 Q 矩阵去从 kv cache 中拿出 K 和 V,与 K 矩阵计算注意力机制,再与 V 矩阵计算特征,并输出一个又一个的 token 返回给用户。 其中的 Q,K,V 可以先不用纠结,如果大家想知道是怎么回事,那就大概可以理解成 Q 代表着用户的问题,K 是自注意力机制,Q 与 K 计算是计算出模型应该在上下文中的哪些 token 放更多的注意力(也可以叫权重),而 V 暂时理解成特征矩阵,当 Q 与 K 计算后,要与 V 计算生成最终的 token。

针对 Q,K,V 的理解对测试人员来说不是特别重要,我这里就不过度展开了,重要的是需要知道 prefill 和 decode 阶段各自在负责什么事情。尤其要明确以下几点:

1.prefill 和 decode 阶段是可以各自优化的(后面会说 PD 分离)
通常首 token 是 prefill 阶段计算出来的,decode 阶段负责输出其余 token。所以首 token 耗时评估的是 prefill 阶段的性能,吐字率是评估 decode 阶段的性能,这个一定要记清楚。后面做单独优化的时候要知道什么时候关注首 token,什么时候关注吐字率。
prefill 针对相同的用户问题,计算出来的 k,v 一定是一模一样的。 所以 kv 才会被缓存到 cache 中,decode 要从 cache 里拉 k,v 去计算。 所以有一种优化方向就是这种缓存的分布式化(毕竟有很多模型实例,如果能共享 k,v cache 就能提升性能)。或者缓存还是本地的,但是上层的路由要做哈希一致性计算,保证相同前缀的用户问题能路由到同样的 prefill 节点上,这样才能保证缓存命中。 同时需要注意的是缓存的命中做的是前缀匹配,就是不需要用户问一模一样的问题还会命中缓存,哪怕只有前几个 token 是一样的,那这几个 token 也会使用缓存中的 kv 而不是重新计算。
所以除了上面提到的精度测试外,第二个测试场景出来了,那就是针对 prefill 的优化,很常见的一个方式就是针对 kv cache 的优化,比如原来这些 cache 是存到 GPU 显存中的,我们可以把它优化到存到内存中以节省显存。 这里的测试方法大体上还是之前的那一套,但是需要注意的是:

在正常的性能测试中,为了避免命中 kv cache,我们通常都会在测试数据的最开始加上一个 uuid,只要前缀没有匹配到,那么就肯定无法命中缓存。而在优化 kv cache 的测试场景中,我们则要把 uuid 去掉,验证命中缓存后的性能收益。 PS:decode 阶段无法缓存,它必须每一次都重新计算生成 token,所以 kv cache 的优化在理论上对吐字率是没有明显影响的。
有些问题的输出很长,比如数学计算的思考过程超级长,经常思考个几分钟,10 几分钟甚至几十分钟的。 而我们现在的测试场景是在优化 prefill 阶段,关注的是首 token 的提升,而非评价 decode 阶段的吐字率,所以有时候为了压缩测试时间,可以设置模型的 max_tokens 参数,该参数限制模型输出的 token 长度。所以可以把它设置成一个很小的值来减少测试需要的时间。
测试方法可以使用同样的数据发送两次,验证第二次命中缓存后相比第一次节省了多少时间,但这只能作为基准测试,我们仍需评估上线后的缓存命中率和收益情况。因为:
线上模型是分桶的,且每个桶有 N 多个模型实例,如果 kv cache 是保存在本地,那么想命中就一定要把相同前缀(可以是前 N 个字符)的用户请求路由到同一个模型实例上,同时还要保证负载均衡,不能让某个模型实例负载过高。 所以这个调度策略其实是决定了缓存的命中率的。
线上每次命中多少 token 的前缀缓存是不确定的,也许命中了前 100 个 token,也可能命中 1W 个 token,所以不用真实的数据是评估不出来的。
缓存是有上限的,我们粗算下来 1K 的 kv cache 差不多占用 75M 的内存(这是在我们的环境中的表现),对于长文本(比如 100K),一条用户请求下来就占用了 7.5G,所以在线上流量比较大的情况下,内存空间很快就会被沾满,这样就需要把老的 kv cache 淘汰掉。 这导致原本可能会命中的数据因为之前的缓存被淘汰掉而无法命中。
所以基于以上情况,我们的测试场景需要设计成:

采集线上真实的数据,按时间排序,且只采集跟测试环境模型实例数量相同的线上模型实例的数据。比如测试环境只有 1 个模型实例, 那线上也采集 1 个模型实例的数据。否则模拟不出较为真实的缓存命中情况。
请求的 max_tokens 设置为 10,加速测试进程,只关注首 token 耗时
测试代码中把测试数据压入一个线程安全,先进先出的队列,保证每个压测的协程不会压重复的数据,且一定是按时间顺序取数据进行压测的。
给出一个案例:数据的处理方式,主要是把数据压入队列,供每个压测协程使用

class ModelUser(HttpUser):
“”"
- 会产生并发用户实例
- 产生的用户实例会依据规则去执行任务集

"""
# 定义的任务集
tasks = [ModelRequestSet, ]
host = 'http://www.baidu.com'
user_data_queue = queue.Queue()

# 初始化测试数据
for i in range(10):
    file_path = f"group_mb_output_{str(i+1)}.txt"
    with open(file_path, "r") as file:
        for line in file:
            try:
                data = json.loads(line.strip())
                content = ''
                is_valid = True
                for q in data[0]:
                    try:
                        content += q['content']
                    except Exception as e:
                        print(content)
                        is_valid = False
                        break
                if not is_valid:
                    continue
                user_data_queue.put(data[0])
            except json.JSONDecodeError as e:
                print(f"无效行: {line}")
                print(e)

压测的时候,数据从队列中获取,由于队列是线程安全的,get 操作是原子操作。所以可以保证数据不会压重复,数据消耗光后自动结束任务。

def get_data(self):
if self.user.user_data_queue.empty():
print(“数据耗尽”)
self.user.environment.runner.quit()
try:
message = self.user.user_data_queue.get_nowait() # 原子操作获取数据队列
return message
except queue.Empty:
print(“数据耗尽”)
self.user.environment.runner.quit()
@task
def send_request(self):
query = self.get_data()

try:
    # 自定义指标,分别用于计算首包,吐字率,qpm,平均输入token数量和平均输出token数量。
    content, first_time, all_time, prompt_tokens, completion_tokens, token_per_second = send_stream(
        query)
    # all_time, first_elapsed_time, tokens_per_second, input_tokens, output_tokens = send_stream(query)
    events.request.fire(request_type="http", name="first", response_time=first_time,
                        response_length=0)
    events.request.fire(request_type="http", name="token_per_second", response_time=token_per_second,
                        response_length=0)
    events.request.fire(request_type="http", name="whole", response_time=all_time,
                        response_length=0)
    events.request.fire(request_type="http", name="input_token", response_time=prompt_tokens,
                        response_length=0)
    events.request.fire(request_type="http", name="output_token", response_time=completion_tokens,
                        response_length=0)
except Exception as e:
    events.request.fire(request_type="http", name="error", response_time=1 * 1000,
                        response_length=0, exception=f"大模型调用错误:{e}")
    time.sleep(1)

PS: 有些时候测试数据的采集可能没有那么顺利,比如线上捞取数据的时候往往是从日志中捞取,但是日志中可能无法区分哪些数据是属于同一个模型实例的,这时候可以退而求其次, 把数据分组,比如根据用户分组(因为大部分时候命中缓存是因为同一个用户问的问题是相关性很高的,或者重复度很高的),用这样的数据验证一下命中缓存的收益,但是这种对真实线上环境还是有一定的差别。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐