1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
| import os import asyncio import random import json import sys from typing import List, Dict from openai import AsyncOpenAI import platform
client = AsyncOpenAI( api_key="sk-xxxxxxxxxxxxxxxxxxx", base_url="https://dashscope-intl.aliyuncs.com/compatible-mode/v1" )
provinces = [ "北京市", "天津市", "河北省", "山西省", "内蒙古自治区", "辽宁省", "吉林省", "黑龙江省", "上海市", "江苏省", "浙江省", "安徽省", "福建省", "江西省", "山东省", "河南省", "湖北省", "湖南省", "广东省", "广西壮族自治区", "海南省", "重庆市", "四川省", "贵州省", "云南省", "西藏自治区", "陕西省", "甘肃省", "青海省", "宁夏回族自治区", "新疆维吾尔自治区" ]
recipient_templates = [ "收件人{name}", "收件人:{name}", "收件人是{name}", "收件:{name}", "收件人为{name}", "{name}", "姓名:{name}", "姓名{name}", "联系人{name}", "联系人:{name}", "接收人{name}", "接收人:{name}", "收货人{name}", "收货人:{name}", "寄给{name}", "给{name}", "收件者{name}", "收件者:{name}", "接收者{name}", "接收者:{name}" ]
phone_templates = [ "tel:{phone}", "tel:{phone}", "mobile:{phone}", "mobile:{phone}", "手机号码{phone}", "手机号码:{phone}", "手机:{phone}", "手机{phone}", "电话:{phone}", "电话{phone}", "联系电话{phone}", "联系电话:{phone}", "号码:{phone}", "号码{phone}", "TEL:{phone}", "MOBILE:{phone}", "contact:{phone}", "phone:{phone}", "{phone}", "call:{phone}", "联系方式{phone}", "联系方式:{phone}", "电话号码{phone}", "电话号码:{phone}", "手机号{phone}", "手机号:{phone}", "电话号码是{phone}", "联系电话是{phone}" ]
def generate_mobile(): prefixes = ['130', '131', '132', '133', '134', '135', '136', '137', '138', '139', '170', '171', '172', '173', '174', '175', '176', '177', '178', '179', '150', '151', '152', '153', '154', '155', '156', '157', '158', '159', '190', '191', '192', '193', '194', '195', '196', '197', '198', '199'] return random.choice(prefixes) + ''.join([str(random.randint(0, 9)) for _ in range(8)])
def generate_landline(): area_codes = ['010', '021', '022', '023', '024', '025', '027', '028', '029', '0311', '0351', '0431', '0451'] area_code = random.choice(area_codes) number = ''.join([str(random.randint(0, 9)) for _ in range(random.choice([7, 8]))]) return f"{area_code}-{number}"
async def generate_recipient_and_address_by_llm(province: str): """使用大模型生成指定省份的收件人姓名和地址信息""" prompt = f"""请为{province}生成一个收件人的信息,包含: 1. 一个真实的中文姓名(可以是常见姓名,也可以是不那么常见的,要多样化) 2. 该省份下的一个城市名称 3. 该城市下的一个行政区名称(如区、县等) 4. 一个具体的街道地址(如路名+门牌号、小区名+楼栋号、商业大厦+楼层等,要真实)
请直接返回JSON格式: {{"name": "收件人姓名", "city": "城市名", "district": "行政区名", "specific_location": "具体地址"}}
不要包含任何其他内容,只返回JSON。姓名要多样化,不要总是常见的张三李四。"""
try: response = await client.chat.completions.create( messages=[{"role": "user", "content": prompt}], model="qwen-plus-latest", temperature=1.7, )
result = response.choices[0].message.content.strip() if result.startswith('```'): result = result.split('\n', 1)[1] if result.endswith('```'): result = result.rsplit('\n', 1)[0]
info = json.loads(result) return info except Exception as e: print(f"生成收件人和地址失败: {e}, 使用备用方案") backup_names = ["王建军", "李春燕", "张志华", "陈美玲", "刘德强", "赵敏慧", "孙文博", "周晓丽"] return { "name": random.choice(backup_names), "city": f"{province.replace('省', '').replace('市', '').replace('自治区', '')}市", "district": "市辖区", "specific_location": f"人民路{random.randint(1, 999)}号" }
async def generate_record(): province = random.choice(provinces)
info = await generate_recipient_and_address_by_llm(province)
recipient = random.choice(recipient_templates).format(name=info['name'])
if random.random() < 0.7: phone = generate_mobile() else: phone = generate_landline()
phone_info = random.choice(phone_templates).format(phone=phone)
full_address = f"{info['city']}{info['district']}{info['specific_location']}"
components = [recipient, phone_info, full_address]
random.shuffle(components)
separators = [' ', ',', ',', ';', ';', ':', ':', '、', '|', '\t', '', ' ', ' | ', ' , ', ' ; ', '/'] separator = random.choice(separators)
if separator == '': combined_data = ''.join(components) else: combined_data = separator.join(components) return combined_data
async def generate_batch_data(count: int) -> List[str]: """生成指定数量的数据""" print(f"开始生成 {count} 条数据...") data = []
semaphore = asyncio.Semaphore(20)
async def generate_single_record(index): async with semaphore: record = await generate_record() print(f"生成第{index + 1}条数据: {record}") return record
tasks = [generate_single_record(i) for i in range(count)] data = await asyncio.gather(*tasks)
return data
def save_data(data: List[str], filename: str = "recipient_data.json"): """保存数据到JSON文件""" with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"数据已保存到 {filename}")
async def produce_data_phase(): print("=== 第一阶段:开始生成收件人数据 ===")
batch_size = 2000 data = await generate_batch_data(batch_size)
save_data(data)
print(f"\n总共生成了 {len(data)} 条数据") print("\n示例数据:") for i, record in enumerate(data[:3]): print(f"{i + 1}. 原始数据: {record}") print()
print("=== 第一阶段完成 ===\n") return True
def get_system_prompt(): """返回系统提示词""" return """你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的结构化信息。
## 任务说明 请根据给定的输入文本,准确提取并生成包含以下六个字段的JSON格式输出: - province: 省份/直辖市/自治区(必须是完整的官方名称,如"河南省"、"上海市"、"新疆维吾尔自治区"等) - city: 城市名称(包含"市"字,如"郑州市"、"西安市"等) - district: 区县名称(包含"区"、"县"等,如"金水区"、"雁塔区"等) - specific_location: 具体地址(街道、门牌号、小区、楼栋等详细信息) - name: 收件人姓名(完整的中文姓名) - phone: 联系电话(完整的电话号码,包括区号)
## 抽取规则 1. **地址信息处理**: - 必须准确识别省、市、区的层级关系 - 省份名称必须使用官方全称(如"河南省"而非"河南") - 直辖市的province和city字段应该相同(如都填"上海市") - specific_location应包含详细的街道地址、小区名称、楼栋号等
2. **姓名识别**: - 准确提取完整的中文姓名,包括复姓 - 包括少数民族姓名
3. **电话号码处理**: - 提取完整的电话号码,保持原有格式
## 输出格式 请严格按照以下JSON格式输出,不要添加任何解释性文字: { "province": "省份名称", "city": "城市名称", "district": "区县名称", "specific_location": "详细地址", "name": "收件人姓名", "phone": "联系电话" }"""
async def predict_structured_data(raw_data: str): """使用qwen3-235b-a22b模型预测结构化数据""" system_prompt = get_system_prompt()
try: response = await client.chat.completions.create( messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": raw_data} ], model="qwen3-235b-a22b", temperature=0.1, response_format={"type": "json_object"}, extra_body={"enable_thinking": False} )
result = response.choices[0].message.content.strip()
if result.startswith('```'): lines = result.split('\n') for i, line in enumerate(lines): if line.strip().startswith('{'): result = '\n'.join(lines[i:]) break if result.endswith('```'): result = result.rsplit('\n```', 1)[0]
structured_data = json.loads(result) return structured_data
except Exception as e: print(f"预测结构化数据失败: {e}, 原始数据: {raw_data}") return { "province": "", "city": "", "district": "", "specific_location": "", "name": "", "phone": "" }
async def convert_data_phase(): """转换数据格式并使用大模型预测结构化数据""" print("=== 第二阶段:开始转换数据格式 ===")
try: print("开始读取recipient_data.json文件...")
with open('recipient_data.json', 'r', encoding='utf-8') as f: raw_data_list = json.load(f)
print(f"成功读取数据,共有 {len(raw_data_list)} 条记录") print("开始使用qwen3-235b-a22b模型预测结构化数据...") system_prompt = "你是一个专业的信息抽取助手,专门负责从中文文本中提取收件人的JSON信息,包含的Key有province(省份)、city(城市名称)、district(区县名称)、specific_location(街道、门牌号、小区、楼栋等详细信息)、name(收件人姓名)、phone(联系电话) 现在输入如下:" output_file = 'recipient_sft_data.json'
semaphore = asyncio.Semaphore(10)
async def process_single_item(index, raw_data): async with semaphore: structured_data = await predict_structured_data(raw_data) print(f"处理第{index + 1}条数据: {raw_data}")
conversation = { "instruction": system_prompt + raw_data, "output": json.dumps(structured_data, ensure_ascii=False) }
return conversation
print(f"开始转换数据到 {output_file}...")
tasks = [process_single_item(i, raw_data) for i, raw_data in enumerate(raw_data_list)] conversations = await asyncio.gather(*tasks)
with open(output_file, 'w', encoding='utf-8') as outfile: json.dump(conversations, outfile, ensure_ascii=False, indent=4)
print(f"转换完成!共处理 {len(raw_data_list)} 条记录") print(f"输出文件:{output_file}") print("=== 第二阶段完成 ===")
except FileNotFoundError: print("错误:找不到 recipient_data.json 文件") sys.exit(1) except json.JSONDecodeError as e: print(f"JSON解析错误:{e}") sys.exit(1) except Exception as e: print(f"转换过程中发生错误:{e}") sys.exit(1)
async def main(): print("开始执行合并的数据处理流程...") print("这个程序将依次执行两个阶段:") print("1. 生成原始收件人数据") print("2. 使用qwen3-235b-a22b模型预测结构化数据并转换为SFT训练格式") print("-" * 50)
success = await produce_data_phase()
if success: await convert_data_phase()
print("\n" + "=" * 50) print("全部流程执行完成!") print("生成的文件:") print("- recipient_data.json: 原始数据列表") print("- recipient_sft_data.jsonl: SFT训练格式数据") print("=" * 50) else: print("数据生成阶段失败,终止执行")
if __name__ == '__main__': if platform.system() == 'Windows': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main(), debug=False)
|