使用PAI利用235B模型微调0.6B小模型

暗香疏影 创作者

大参数模型虽然性能优异,但计算成本高、推理速度慢。为在保持效果的同时提升效率,可利用大模型生成高质量标注数据,并以此微调小模型,使其在特定任务中逼近大模型表现,这一方法称为模型蒸馏或者更专业准确的说是知识蒸馏(Distill)。

原理在于教师模型(235B)对大量数据进行预测,生成“软标签”(soft labels),这些标签包含了比硬标签(真实标签)更多的置信度信息。
学生模型(0.6B)通过学习这些软标签,模仿教师模型的行为,从而在保持较高性能的同时实现更小的模型规模。

本方案将以从一句话中提取结构化信息(如收件人、地址、电话)为例,演示如何通过模型蒸馏,让 Qwen3-0.6B 模型在此任务上达到大参数模型的表现。

以下是阿里云官方微调NL2SQL的实践,虽然不是利用大模型微调,但是也方便大家参考阅读。
面向NL2BI的大模型微调最佳实践

样例

图1

  1. 准备数据集
    由于我没有真实的物流填单数据,而且这只是一个微调概念验证,所以将使用一批虚拟地址描述信息作为输入,并使用 Qwen3-235B-A22B 作为教师模型提取结构化的信息作为输出。实际上模型微调时使用业务场景的真实数据。

  2. 微调模型
    在获取教师模型的输入输出后,我们可以使用该数据来微调 Qwen3-0.6B 模型,提升其在此任务场景下的表现。本方案将使用PAI的Model Gallery创建模型训练任务,帮助您零代码、快捷的完成开源大模型的微调。

  3. 部署并调用微调后的模型
    模型微调完成后,还无法直接调用,本方案将使用Model Gallery创建部署任务,将微调后的模型部署为在线服务,供您在其他应用中调用该模型。

  4. 验证模型效果
    最后,准备好评测数据与评测标准,对微调后的模型进行效果验证。

训练前工作 - 准备数据集

为了将教师模型(Qwen3-235B-A22B)处理该任务的知识蒸馏到 Qwen3-0.6B 中,我们需要先调用教师模型的 API,将收件人的地址信息提取成结构化的 JSON 数据。

以百炼的API为例:

1
2
3
4
# 国内账号
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
# 国际账号
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1"

提醒:百炼的模型名称和PAI不一样,他都是小写。阿里云百炼-模型列表

本方案已为您准备好了示例训练集train_qwen3.json和验证集eval_qwen3.json,您可以直接下载使用。

JSON文件中包含多个训练样本,每个样本包括instruction(指令)和output(标准答案)两个字段。

在业务数据不够丰富时,可以考虑使用大模型做数据增强,使数据的多样性和覆盖范围得到提升。为了避免泄漏用户隐私,本方案使用大模型生成了一批虚拟的地址数据,如下生成代码供您参考。如果需要使用以下代码,则需要生成2次,一次作为示例训练集,一次作为验证集。Python代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# -*- coding: utf-8 -*-
import os
import asyncio
import random
import json
import sys
from typing import List, Dict
from openai import AsyncOpenAI
import platform

# 创建异步客户端实例
client = AsyncOpenAI(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
# api_key=os.getenv("DASHSCOPE_API_KEY"),
api_key="sk-xxxxxxxxxxxxxxxxxxx",
# base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
)

# 中国省份列表
provinces = [
"北京市", "天津市", "河北省", "山西省", "内蒙古自治区", "辽宁省", "吉林省", "黑龙江省",
"上海市", "江苏省", "浙江省", "安徽省", "福建省", "江西省", "山东省", "河南省",
"湖北省", "湖南省", "广东省", "广西壮族自治区", "海南省", "重庆市", "四川省", "贵州省",
"云南省", "西藏自治区", "陕西省", "甘肃省", "青海省", "宁夏回族自治区", "新疆维吾尔自治区"
]

# 收件人写法模板
recipient_templates = [
"收件人{name}", "收件人:{name}", "收件人是{name}", "收件:{name}",
"收件人为{name}", "{name}", "姓名:{name}", "姓名{name}",
"联系人{name}", "联系人:{name}", "接收人{name}", "接收人:{name}",
"收货人{name}", "收货人:{name}", "寄给{name}", "给{name}",
"收件者{name}", "收件者:{name}", "接收者{name}", "接收者:{name}"
]

# 电话号码写法模板
phone_templates = [
"tel:{phone}", "tel:{phone}", "mobile:{phone}", "mobile:{phone}",
"手机号码{phone}", "手机号码:{phone}", "手机:{phone}", "手机{phone}",
"电话:{phone}", "电话{phone}", "联系电话{phone}", "联系电话:{phone}",
"号码:{phone}", "号码{phone}", "TEL:{phone}", "MOBILE:{phone}",
"contact:{phone}", "phone:{phone}", "{phone}", "call:{phone}",
"联系方式{phone}", "联系方式:{phone}", "电话号码{phone}", "电话号码:{phone}",
"手机号{phone}", "手机号:{phone}", "电话号码是{phone}", "联系电话是{phone}"
]


# 生成虚拟手机号码(建议改为2开头避免与真实号码重合)
def generate_mobile():
prefixes = ['130', '131', '132', '133', '134', '135', '136', '137', '138', '139',
'170', '171', '172', '173', '174', '175', '176', '177', '178', '179',
'150', '151', '152', '153', '154', '155', '156', '157', '158', '159',
'190', '191', '192', '193', '194', '195', '196', '197', '198', '199']
return random.choice(prefixes) + ''.join([str(random.randint(0, 9)) for _ in range(8)])


# 生成固定电话
def generate_landline():
area_codes = ['010', '021', '022', '023', '024', '025', '027', '028', '029', '0311', '0351', '0431', '0451']
area_code = random.choice(area_codes)
number = ''.join([str(random.randint(0, 9)) for _ in range(random.choice([7, 8]))])
return f"{area_code}-{number}"


# 使用大模型生成收件人信息和地址信息
async def generate_recipient_and_address_by_llm(province: str):
"""使用大模型生成指定省份的收件人姓名和地址信息"""
prompt = f"""请为{province}生成一个收件人的信息,包含:
1. 一个真实的中文姓名(可以是常见姓名,也可以是不那么常见的,要多样化)
2. 该省份下的一个城市名称
3. 该城市下的一个行政区名称(如区、县等)
4. 一个具体的街道地址(如路名+门牌号、小区名+楼栋号、商业大厦+楼层等,要真实)

请直接返回JSON格式:
{{"name": "收件人姓名", "city": "城市名", "district": "行政区名", "specific_location": "具体地址"}}

不要包含任何其他内容,只返回JSON。姓名要多样化,不要总是常见的张三李四。"""

try:
response = await client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="qwen-plus-latest",
temperature=1.7, # 提高温度让姓名更多样化
)

result = response.choices[0].message.content.strip()
# 清理可能的markdown代码块标记
if result.startswith('```'):
result = result.split('\n', 1)[1]
if result.endswith('```'):
result = result.rsplit('\n', 1)[0]

# 尝试解析JSON
info = json.loads(result)
return info
except Exception as e:
print(f"生成收件人和地址失败: {e}, 使用备用方案")
# 备用方案
backup_names = ["王建军", "李春燕", "张志华", "陈美玲", "刘德强", "赵敏慧", "孙文博", "周晓丽"]
return {
"name": random.choice(backup_names),
"city": f"{province.replace('省', '').replace('市', '').replace('自治区', '')}市",
"district": "市辖区",
"specific_location": f"人民路{random.randint(1, 999)}号"
}


# 生成一条记录
async def generate_record():
# 随机选择省份
province = random.choice(provinces)

# 使用大模型生成收件人和地址信息
info = await generate_recipient_and_address_by_llm(province)

# 生成收件人信息格式
recipient = random.choice(recipient_templates).format(name=info['name'])

# 生成电话号码(70%概率手机号,30%概率固话)
if random.random() < 0.7:
phone = generate_mobile()
else:
phone = generate_landline()

phone_info = random.choice(phone_templates).format(phone=phone)

# 组装地址
full_address = f"{info['city']}{info['district']}{info['specific_location']}"

# 组装数据
components = [recipient, phone_info, full_address]

# 随机打乱顺序
random.shuffle(components)

# 随机选择分割符
separators = [' ', ',', ',', ';', ';', ':', ':', '、', '|', '\t', '', ' ', ' | ', ' , ', ' ; ', '/']
separator = random.choice(separators)

# 合并数据
if separator == '':
# 没有分割符的情况
combined_data = ''.join(components)
else:
combined_data = separator.join(components)
return combined_data


# 生成批量数据
async def generate_batch_data(count: int) -> List[str]:
"""生成指定数量的数据"""
print(f"开始生成 {count} 条数据...")
data = []

# 使用信号量控制并发数量,QPM=1500,设置为20个并发
semaphore = asyncio.Semaphore(20)

async def generate_single_record(index):
async with semaphore:
record = await generate_record()
print(f"生成第{index + 1}条数据: {record}")
return record

# 并发生成数据
tasks = [generate_single_record(i) for i in range(count)]
data = await asyncio.gather(*tasks)

return data


# 保存数据到文件
def save_data(data: List[str], filename: str = "recipient_data.json"):
"""保存数据到JSON文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"数据已保存到 {filename}")


# 数据生成阶段
async def produce_data_phase():
print("=== 第一阶段:开始生成收件人数据 ===")

# 生成2000条数据
batch_size = 2000
data = await generate_batch_data(batch_size)

# 保存数据
save_data(data)

print(f"\n总共生成了 {len(data)} 条数据")
print("\n示例数据:")
for i, record in enumerate(data[:3]): # 显示前3条作为示例
print(f"{i + 1}. 原始数据: {record}")
print()

print("=== 第一阶段完成 ===\n")
return True


def get_system_prompt():
"""返回系统提示词"""
return """你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的结构化信息。

## 任务说明
请根据给定的输入文本,准确提取并生成包含以下六个字段的JSON格式输出:
- province: 省份/直辖市/自治区(必须是完整的官方名称,如"河南省"、"上海市"、"新疆维吾尔自治区"等)
- city: 城市名称(包含"市"字,如"郑州市"、"西安市"等)
- district: 区县名称(包含"区"、"县"等,如"金水区"、"雁塔区"等)
- specific_location: 具体地址(街道、门牌号、小区、楼栋等详细信息)
- name: 收件人姓名(完整的中文姓名)
- phone: 联系电话(完整的电话号码,包括区号)

## 抽取规则
1. **地址信息处理**:
- 必须准确识别省、市、区的层级关系
- 省份名称必须使用官方全称(如"河南省"而非"河南")
- 直辖市的province和city字段应该相同(如都填"上海市")
- specific_location应包含详细的街道地址、小区名称、楼栋号等

2. **姓名识别**:
- 准确提取完整的中文姓名,包括复姓
- 包括少数民族姓名

3. **电话号码处理**:
- 提取完整的电话号码,保持原有格式

## 输出格式
请严格按照以下JSON格式输出,不要添加任何解释性文字:
{
"province": "省份名称",
"city": "城市名称",
"district": "区县名称",
"specific_location": "详细地址",
"name": "收件人姓名",
"phone": "联系电话"
}"""


# 使用大模型预测结构化数据
async def predict_structured_data(raw_data: str):
"""使用qwen3-235b-a22b模型预测结构化数据"""
system_prompt = get_system_prompt()

try:
response = await client.chat.completions.create(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": raw_data}
],
model="qwen3-235b-a22b",
temperature=0.1, # 降低温度以提高预测准确性
response_format={"type": "json_object"},
extra_body={"enable_thinking": False}
)

result = response.choices[0].message.content.strip()

# 清理可能的markdown代码块标记
if result.startswith('```'):
lines = result.split('\n')
for i, line in enumerate(lines):
if line.strip().startswith('{'):
result = '\n'.join(lines[i:])
break
if result.endswith('```'):
result = result.rsplit('\n```', 1)[0]

# 尝试解析JSON
structured_data = json.loads(result)
return structured_data

except Exception as e:
print(f"预测结构化数据失败: {e}, 原始数据: {raw_data}")
# 返回空的结构化数据作为备用
return {
"province": "",
"city": "",
"district": "",
"specific_location": "",
"name": "",
"phone": ""
}


# 数据转换阶段
async def convert_data_phase():
"""转换数据格式并使用大模型预测结构化数据"""
print("=== 第二阶段:开始转换数据格式 ===")

try:
print("开始读取recipient_data.json文件...")

# 读取原始数据
with open('recipient_data.json', 'r', encoding='utf-8') as f:
raw_data_list = json.load(f)

print(f"成功读取数据,共有 {len(raw_data_list)} 条记录")
print("开始使用qwen3-235b-a22b模型预测结构化数据...")
# 使用简单与明确的system message 有助于训练与推理速度的提高
system_prompt = "你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的JSON信息,包含的Key有province(省份)、city(城市名称)、district(区县名称)、specific_location(街道、门牌号、小区、楼栋等详细信息)、name(收件人姓名)、phone(联系电话) 现在输入如下:"
output_file = 'recipient_sft_data.json'

# 使用信号量控制并发数量
semaphore = asyncio.Semaphore(10)

async def process_single_item(index, raw_data):
async with semaphore:
# 使用大模型预测结构化数据
structured_data = await predict_structured_data(raw_data)
print(f"处理第{index + 1}条数据: {raw_data}")

conversation = {
"instruction": system_prompt + raw_data,
"output": json.dumps(structured_data, ensure_ascii=False)
}

return conversation

print(f"开始转换数据到 {output_file}...")

# 并发处理所有数据
tasks = [process_single_item(i, raw_data) for i, raw_data in enumerate(raw_data_list)]
conversations = await asyncio.gather(*tasks)

with open(output_file, 'w', encoding='utf-8') as outfile:
json.dump(conversations, outfile, ensure_ascii=False, indent=4)

print(f"转换完成!共处理 {len(raw_data_list)} 条记录")
print(f"输出文件:{output_file}")
print("=== 第二阶段完成 ===")

except FileNotFoundError:
print("错误:找不到 recipient_data.json 文件")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"JSON解析错误:{e}")
sys.exit(1)
except Exception as e:
print(f"转换过程中发生错误:{e}")
sys.exit(1)


# 主函数
async def main():
print("开始执行合并的数据处理流程...")
print("这个程序将依次执行两个阶段:")
print("1. 生成原始收件人数据")
print("2. 使用qwen3-235b-a22b模型预测结构化数据并转换为SFT训练格式")
print("-" * 50)

# 第一阶段:生成数据
success = await produce_data_phase()

if success:
# 第二阶段:转换数据
await convert_data_phase()

print("\n" + "=" * 50)
print("全部流程执行完成!")
print("生成的文件:")
print("- recipient_data.json: 原始数据列表")
print("- recipient_sft_data.jsonl: SFT训练格式数据")
print("=" * 50)
else:
print("数据生成阶段失败,终止执行")


if __name__ == '__main__':
# 设置事件循环策略
if platform.system() == 'Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 运行主协程
asyncio.run(main(), debug=False)

创建0.6B模型

创建完工作空间后,在左侧导航栏单击Model Gallery,搜索并找到Qwen3-0.6B选项卡,然后单击训练。
配置训练任务参数。只需配置如下关键参数,其他参数默认即可。
训练方式:默认选择SFT监督微调,LoRA微调方法。

图2

填写OSS数据集路径及上传

然后调整超参数配置:

learning_rate: 设置为0.0005

num_train_epochs:设置为4

per_device_train_batch_size:设置为8

seq_length:设置为512

此超参数配置下模型在本方案的测试数据上表现较好。当您使用模型微调解决您自己的业务问题时,如果准确率不高,也可以尝试调整超参数。您可以上网深入了解超参数的作用,以及如何通过损失曲线来判断超参数的调整方向。

查看训练任务并等待训练完成。模型微调大约需要14分钟,在微调过程中,任务详情页面将展示任务日志及指标曲线,待训练执行完成后,微调后的模型将存储到设置的OSS目录中。

后续查看训练任务详情,您可通过在左侧导航栏单击Model Gallery > 任务管理 > 训练任务,然后再单击任务名称查看。
在任务详情滑动最下面有指标曲线可以查看。
图3

调整参数

根据loss图像,调整超参数,提升模型效果。
图4
图5

在任务详情页面可以分别看到train_loss曲线(反映训练集损失)与 eval_loss曲线(反映验证集损失):

您可以根据损失值的变化趋势,初步判断当前模型的训练效果:

  1. 在结束训练前 train_loss 与 eval_loss 仍有下降趋势(欠拟合)
    您可以增加 num_train_epochs(训练轮次,与训练深度正相关) 参数,或适当增大 lora_rank(低秩矩阵的秩,秩越大,模型能表达更复杂的任务,但更容易过度训练)的值后再进行训练,加大模型的对训练数据的拟合程度;

  2. 在结束训练前 train_loss 持续下降,eval_loss 开始变大(过拟合)
    您可以减少 num_train_epochs 参数,或适当减小lora_rank的值后再进行训练,防止模型过度训练;

  3. 在结束训练前 train_loss 与 eval_loss 均处于平稳状态(良好拟合)
    模型处于该状态时,您可以进行后续步骤。

任务完成后,直接在任务详情右上角点击部署。

测试部署结果

有三种办法测试,一种是部署Dify,或者是Python SDK实现,还有一种是使用Cherry Studio或者VS Code安装Cline插件实现对话(使用Cline插件时请确保你打开文件工作空间是以信任的模式,否则左侧边栏不会显示这个插件)。

示例对话测试

1
你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的JSON信息,包含的Key有province(省份)、city(城市名称)、district(区县名称)、specific_location(街道、门牌号、小区、楼栋等详细信息)、name(收件人姓名)、phone(联系电话) 现在输入如下:哈尔滨市南岗区长江路399号融创大厦B座26层 ; 收件:尉迟翔宇 ; 手机:23075622756

Python SDK实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from openai import OpenAI
import os

# 建议将Token设置为环境变量,防止敏感信息泄漏
# 环境变量配置方法请参见:https://help.aliyun.com/zh/sdk/developer-reference/configure-the-alibaba-cloud-accesskey-environment-variable-on-linux-macos-and-windows-systems
token = os.environ.get("Token")
# <调用地址>后面有 “/v1”不要去除
client = OpenAI(
api_key="tokenXxxxxxxxxxxxxxxxxxx==",
base_url="http://12345678.eu-central-1.pai-eas.aliyuncs.com/api/predict/qw3smalld/v1",

)

query = '你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的JSON信息,包含的Key有province(省份)、city(城市名称)、district(区县名称)、specific_location(街道、门牌号、小区、楼栋等详细信息)、name(收件人姓名)、phone(联系电话) 现在输入如下:哈尔滨市南岗区长江路399号融创大厦B座26层 ; 收件:尉迟翔宇 ; 手机:23075622756'
messages = [{'role': 'user', 'content': query}]

resp = client.chat.completions.create(model='Qwen3-0.6B', messages=messages, max_tokens=512, temperature=0)
query = messages[0]['content']
response = resp.choices[0].message.content
print(f'query: {query}')
print(f'response: {response}')

我们额外部署一个Qwen3-0.6B的未经训练模型,然后调用API看看结果:
(百炼也有这个模型,不过名称不是Qwen3-0.6B,而是qwen3-0.6b)
如下图,未经训练的模型在数据没有省份的时候错误识别省份,而经过训练的大模型,则可输出正确结果。
图6
图7

使用Python 快速对比验证数据集

我们对比未经训练的0.6B模型与训练后的0.6B模型,

这次验证的数据集不再是简单的Python生成的recipient_sft_data.json,格式变成如下:

1
{"messages": [{"role": "system", "content": "你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的JSON信息,包含的Key有province(省份)、city(城市名称)、district(区县名称)、specific_location(街道、门牌号、小区、楼栋等详细信息)、name(收件人姓名)、phone(联系电话)"}, {"role": "user", "content": "电话:23204753945:大理市大理市人民路25号 大理古城国际酒店 3号楼:收件者:段丽娟"}, {"role": "assistant", "content": "{\"province\": \"云南省\", \"city\": \"大理市\", \"district\": \"大理市\", \"specific_location\": \"人民路25号 大理古城国际酒店 3号楼\", \"name\": \"段丽娟\", \"phone\": \"23204753945\"}"}]}

使用训练过的大模型结果如下:

1
2
3
4
样本数: 399 条
响应正确: 354 条
响应错误: 45 条
准确率: 88.7218045112782 %

图8

1
2
3
4
样本数: 396 条
响应正确: 59 条
响应错误: 337 条
准确率: 14.8989898989899 %

而未训练的结果如下:
图9

Python实现测试验证训练效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from openai import AsyncOpenAI
import requests
import json
import asyncio
import os


# 建议将Token设置为环境变量,防止敏感信息泄漏
# 环境变量配置方法请参见:https://help.aliyun.com/zh/sdk/developer-reference/configure-the-alibaba-cloud-accesskey-environment-variable-on-linux-macos-and-windows-systems
client = AsyncOpenAI(
api_key=os.getenv("Token"),
base_url="调用地址/v1"
)

# 您也可以调用百炼的Qwen3-0.6b模型,测试原始模型的准确率,但注意需要将 model="Qwen3-0.6B" 改成 model="qwen3-0.6b"
# client = AsyncOpenAI(
# 若没有配置环境变量,请用阿里云百炼/PAI-EAS API Key将下行替换为:api_key="sk-xxx",
# api_key=os.getenv("DASHSCOPE_API_KEY"),
# base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
# )

system_prompt = """你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的结构化信息。

## 任务说明
请根据给定的输入文本,准确提取并生成包含以下六个字段的JSON格式输出:
- province: 省份/直辖市/自治区(必须是完整的官方名称,如"河南省"、"上海市"、"新疆维吾尔自治区"等)
- city: 城市名称(包含"市"字,如"郑州市"、"西安市"等)
- district: 区县名称(包含"区"、"县"等,如"金水区"、"雁塔区"等)
- specific_location: 具体地址(街道、门牌号、小区、楼栋等详细信息)
- name: 收件人姓名(完整的中文姓名)
- phone: 联系电话(完整的电话号码,包括区号)

## 抽取规则
1. **地址信息处理**:
- 必须准确识别省、市、区的层级关系
- 省份名称必须使用官方全称(如"河南省"而非"河南")
- 直辖市的province和city字段应该相同(如都填"上海市")
- specific_location应包含详细的街道地址、小区名称、楼栋号等

2. **姓名识别**:
- 准确提取完整的中文姓名,包括复姓
- 包括少数民族姓名

3. **电话号码处理**:
- 提取完整的电话号码,保持原有格式

## 输出格式
请严格按照以下JSON格式输出,不要添加任何解释性文字:
{
"province": "省份名称",
"city": "城市名称",
"district": "区县名称",
"specific_location": "详细地址",
"name": "收件人姓名",
"phone": "联系电话"
}"""


def compare_address_info(actual_address_str, predicted_address_str):
"""比较两个JSON字符串表示的地址信息是否相同"""
try:
# 解析实际地址信息
if actual_address_str:
actual_address_json = json.loads(actual_address_str)
else:
actual_address_json = {}

# 解析预测地址信息
if predicted_address_str:
predicted_address_json = json.loads(predicted_address_str)
else:
predicted_address_json = {}

# 直接比较两个JSON对象是否完全相同
is_same = actual_address_json == predicted_address_json

return {
"is_same": is_same,
"actual_address_parsed": actual_address_json,
"predicted_address_parsed": predicted_address_json,
"comparison_error": None
}

except json.JSONDecodeError as e:
return {
"is_same": False,
"actual_address_parsed": None,
"predicted_address_parsed": None,
"comparison_error": f"JSON解析错误: {str(e)}"
}
except Exception as e:
return {
"is_same": False,
"actual_address_parsed": None,
"predicted_address_parsed": None,
"comparison_error": f"比较错误: {str(e)}"
}


async def predict_single_conversation(conversation_data):
"""预测单个对话的标签"""
try:
# 提取user content(去除assistant message)
messages = conversation_data.get("messages", [])
user_content = None

for message in messages:
if message.get("role") == "user":
user_content = message.get("content", "")
break

if not user_content:
return {"error": "未找到用户消息"}

response = await client.chat.completions.create(
model="Qwen3-0.6B",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
],
response_format={"type": "json_object"},
extra_body={
"enable_thinking": False
}
)

predicted_labels = response.choices[0].message.content.strip()
return {"prediction": predicted_labels}

except Exception as e:
return {"error": f"预测失败: {str(e)}"}


async def process_batch(batch_data, batch_id):
"""处理一批数据"""
print(f"处理批次 {batch_id},包含 {len(batch_data)} 条数据...")

tasks = []
for i, conversation in enumerate(batch_data):
task = predict_single_conversation(conversation)
tasks.append(task)

results = await asyncio.gather(*tasks, return_exceptions=True)

batch_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
batch_results.append({"error": f"异常: {str(result)}"})
else:
batch_results.append(result)

return batch_results


async def main():
output_file = "predicted_labels.jsonl"
batch_size = 20 # 每批处理的数据量

# 读取测试数据
url = 'https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20250616/ssrgii/test.jsonl'
conversations = []

try:
response = requests.get(url)
response.raise_for_status() # 检查请求是否成功
for line_num, line in enumerate(response.text.splitlines(), 1):
try:
data = json.loads(line.strip())
conversations.append(data)
except json.JSONDecodeError as e:
print(f"第 {line_num} 行JSON解析错误: {e}")
continue
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
return

print(f"成功读取 {len(conversations)} 条对话数据")

# 分批处理
all_results = []
total_batches = (len(conversations) + batch_size - 1) // batch_size

for batch_id in range(total_batches):
start_idx = batch_id * batch_size
end_idx = min((batch_id + 1) * batch_size, len(conversations))
batch_data = conversations[start_idx:end_idx]

batch_results = await process_batch(batch_data, batch_id + 1)
all_results.extend(batch_results)

print(f"批次 {batch_id + 1}/{total_batches} 完成")

# 添加小延迟避免请求过快
if batch_id < total_batches - 1:
await asyncio.sleep(1)

# 保存结果
same_count = 0
different_count = 0
error_count = 0

with open(output_file, 'w', encoding='utf-8') as f:
for i, (original_data, prediction_result) in enumerate(zip(conversations, all_results)):
result_entry = {
"index": i,
"original_user_content": None,
"actual_address": None,
"predicted_address": None,
"prediction_error": None,
"address_comparison": None
}

# 提取原始用户内容
messages = original_data.get("messages", [])
for message in messages:
if message.get("role") == "user":
result_entry["original_user_content"] = message.get("content", "")
break

# 提取实际地址信息(如果存在assistant message)
for message in messages:
if message.get("role") == "assistant":
result_entry["actual_address"] = message.get("content", "")
break

# 保存预测结果
if "error" in prediction_result:
result_entry["prediction_error"] = prediction_result["error"]
error_count += 1
else:
result_entry["predicted_address"] = prediction_result.get("prediction", "")

# 比较地址信息
comparison_result = compare_address_info(
result_entry["actual_address"],
result_entry["predicted_address"]
)
result_entry["address_comparison"] = comparison_result

# 统计比较结果
if comparison_result["comparison_error"]:
error_count += 1
elif comparison_result["is_same"]:
same_count += 1
else:
different_count += 1

f.write(json.dumps(result_entry, ensure_ascii=False) + '\n')

print(f"所有预测完成! 结果已保存到 {output_file}")

# 统计结果
success_count = sum(1 for result in all_results if "error" not in result)
prediction_error_count = len(all_results) - success_count
print(f"样本数: {success_count} 条")
print(f"响应正确: {same_count} 条")
print(f"响应错误: {different_count} 条")
print(f"准确率: {same_count * 100 / success_count} %")


if __name__ == "__main__":
asyncio.run(main())

其他 - 生成数据集转换

上面,我们使用的是阿里云提供的数据集,那么我们如何使用我们生成的数据集呢?
把数据集instruction 字段被拆分为:system 角色内容(固定提示词)和user 角色内容(提取“现在输入如下:”之后的用户输入)然后,
output 字段直接作为 assistant 的回复内容。

下面是转换为新格式JSON(请忽略这个代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import json
import os

# 指定输入文件路径(适用于Windows)
input_file = r"D:\Projects\LLMTraining\recipient_sft_data.json" # ← 请替换为你的实际文件路径

# 读取原始JSON数据
try:
with open(input_file, "r", encoding="utf-8") as f:
original_data = json.load(f)
except FileNotFoundError:
print(f"错误:找不到文件 {input_file}")
exit(1)
except json.JSONDecodeError:
print(f"错误:文件 {input_file} 不是有效的JSON格式")
exit(1)


# 转换函数(保持原有逻辑)
def transform_data(original_data):
transformed_data = []
for item in original_data:
# 提取 instruction 中的 user 内容
instruction_text = item["instruction"]

# 提取 user 输入内容(冒号或逗号后的内容)
user_content = instruction_text.split("现在输入如下:")[-1].strip()

# 构建 messages 列表
messages = [
{"role": "system", "content": instruction_text.split("现在输入如下:")[0].strip()},
{"role": "user", "content": user_content},
{"role": "assistant", "content": item["output"]}
]

transformed_data.append({"messages": messages})
return transformed_data


# 转换数据
transformed = transform_data(original_data)

# 生成输出文件路径(与输入文件同一目录)
output_dir = os.path.dirname(input_file)
output_file = os.path.join(output_dir, "transformed_data.json")

# 保存为新文件
try:
with open(output_file, "w", encoding="utf-8") as f:
json.dump(transformed, f, ensure_ascii=False, indent=2)
print(f"转换完成!结果已保存至:{output_file}")
except Exception as e:
print(f"写入文件时发生错误:{e}")

我们发现他转换为JSON,开头结尾是大括号,每个数组有逗号。但是我希望他是jsonl格式,
于是我们再拿去大模型帮我们再次生成下代码。使得将原来的 JSON 输出格式从标准的 JSON 数组格式([…])转换为 每行一个独立的 JSON 对象(也就是通常称为 JSON Lines / .jsonl 格式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import json
import os

# 指定输入文件路径(适用于Windows)
input_file = r"D:\Projects\LLMTraining\recipient_sft_data.json" # ← 请替换为你的实际文件路径

# 读取原始JSON数据
try:
with open(input_file, "r", encoding="utf-8") as f:
original_data = json.load(f)
except FileNotFoundError:
print(f"错误:找不到文件 {input_file}")
exit(1)
except json.JSONDecodeError:
print(f"错误:文件 {input_file} 不是有效的JSON格式")
exit(1)


# 转换函数(保持原有逻辑)
def transform_data(original_data):
transformed_data = []
for item in original_data:
# 提取 instruction 中的 user 内容
instruction_text = item["instruction"]

# 提取 user 输入内容(冒号或逗号后的内容)
user_content = instruction_text.split("现在输入如下:")[-1].strip()

# 构建 messages 列表
messages = [
{"role": "system", "content": instruction_text.split("现在输入如下:")[0].strip()},
{"role": "user", "content": user_content},
{"role": "assistant", "content": item["output"]}
]

transformed_data.append({"messages": messages})
return transformed_data


# 转换数据
transformed = transform_data(original_data)

# 生成输出文件路径(与输入文件同一目录)
output_dir = os.path.dirname(input_file)
output_file = os.path.join(output_dir, "transformed_data.jsonl") # 使用.jsonl扩展名更清晰

# 保存为每行一个JSON对象的格式
try:
with open(output_file, "w", encoding="utf-8") as f:
for item in transformed:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"转换完成!结果已保存至:{output_file}")
except Exception as e:
print(f"写入文件时发生错误:{e}")

受制于我们jsonl需要上传到某个网站、对象储存,为了防止被拦截报错(TLS原因及User-Agent原因)可以添加如下:

1
2
3
4
5
6
headers = {
"User-Agent": "Mozilla/5.0",
"Content-Type": "application/json",
"Accept": "application/json"
}
response = requests.get(url, verify=False, headers=headers)

本次教程结束。

  • 标题: 使用PAI利用235B模型微调0.6B小模型
  • 作者: 暗香疏影
  • 创建于 : 2025-07-18 10:00:00
  • 更新于 : 2025-07-19 09:09:00
  • 链接: https://blog.pptcar.com/2025/07/18/2025-07-18-PAI-Fine-Tuning-Distill/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论