from modelscope import snapshot_download, AutoTokenizer from config import Config,Default,Dir from peft import LoraConfig, get_peft_model from transformers import AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq from modelscope.msdatasets import MsDataset import os import json import random import pandas as pd import torch from datasets import Dataset import swanlab lora_config = LoraConfig( r=16, # 低秩矩阵的秩(控制参数量,通常4-64) lora_alpha=32, # 缩放因子(影响更新强度) target_modules=["q_proj", "v_proj"], # 微调注意力层的Q/V矩阵(关键层) lora_dropout=0.1, # 防止过拟合 bias="none", # 不微调偏置项 task_type="CAUSAL_LM", # 任务类型(因果语言模型) ) def load_model_and_tokenizer(): print("🧼 开始加载模型...") # 在modelscope上下载Qwen模型到本地目录下 model_dir = snapshot_download(Config.MODEL_NAME) # Transformers加载模型权重 tokenizer = AutoTokenizer.from_pretrained(Config.MODEL_NAME, use_fast=False, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", torch_dtype=torch.bfloat16) model.enable_input_require_grads() # 开启梯度检查点时,要执行该方法 return model,tokenizer def mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file): data_list = list(ds) random.shuffle(data_list) split_idx = int(len(data_list) * 0.9) train_data = data_list[:split_idx] val_data = data_list[split_idx:] with open(dataset_train_json_file, 'w', encoding='utf-8') as f: for item in train_data: json.dump(item, f, ensure_ascii=False) f.write('\n') with open(dataset_test_json_file, 'w', encoding='utf-8') as f: for item in val_data: json.dump(item, f, ensure_ascii=False) f.write('\n') print(f"✅ 原始数据清洗完成 | 训练样本量: {len(train_data)} | 测试样本量: {len(train_data)}") def get_dataset_json(): # 确保缓存目录存在 os.makedirs(Dir.DATASET_DIR, exist_ok=True) ds = MsDataset.load( dataset_name=Config.DATASET_NAME, subset_name=Config.DATASET_SUBJECT, split=Config.DATASET_SPLIT, cache_dir=Dir.DATASET_DIR # 统一使用配置的路径 ) dataset_train_json_file = Dir.DATASET_DIR+"/"+Default.TRAIN_DATASET_FILE dataset_test_json_file = Dir.DATASET_DIR + "/" + Default.TEST_DATASET_FILE print(f"🏷️ 原始训练数据文件:{dataset_train_json_file}") print(f"🏷️ 原始测试数据文件:{dataset_test_json_file}") if (not os.path.exists(dataset_train_json_file) or not os.path.exists(dataset_test_json_file) or not Config.DATASET_USE_CACHE): print("🏷️ 正在生成原始数据集...") mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file) return dataset_train_json_file,dataset_test_json_file def get_model_train_dataset_json_file(): model_train_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TRAIN_JSONL_NEW_FILE model_test_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TEST_JSONL_NEW_FILE return model_train_json_file, model_test_json_file def get_model_train_dataset_json(): model_train_json_file,model_test_json_file = get_model_train_dataset_json_file() print(f"🏷️ 模型训练数据文件:{model_train_json_file}") print(f"🏷️ 模型测试数据文件:{model_test_json_file}") if not os.path.exists(model_train_json_file) or not os.path.exists( model_test_json_file) or not Config.DATASET_USE_CACHE: print("🏷️ 未找到模型对应数据集,准备从原始数据集进行生成数据集...") dataset_train_json_file,dataset_test_json_file = get_dataset_json() print("🏷️ 原始数据集生成成功...") print("🧼 开始转换模型训练数据集...") dataset_jsonl_transfer(dataset_train_json_file, model_train_json_file) print("🧼 开始转换模型验证数据集...") dataset_jsonl_transfer(dataset_test_json_file, model_test_json_file) return model_train_json_file,model_test_json_file def dataset_jsonl_transfer(origin_path, new_path): """ 将原始数据集转换为大模型微调所需数据格式的新数据集 """ messages = [] # 读取旧的JSONL文件 with open(origin_path, "r") as file: for line in file: # 解析每一行的json数据 data = json.loads(line) input = data[Config.QUES_LABEL] output = "" if hasattr(Config, "THINK_LABEL") and Config.THINK_LABEL in data: output = data[Config.THINK_LABEL] if hasattr(Config, "ANS_LABEL") and Config.ANS_LABEL in data: output += f"{data[Config.ANS_LABEL]}" message = { "instruction": Config.PROMPT, "input": f"{input}", "output": output, } messages.append(message) # 保存重构后的JSONL文件 with open(new_path, "w", encoding="utf-8") as file: for message in messages: file.write(json.dumps(message, ensure_ascii=False) + "\n") def predict(messages, model, tokenizer): device = "cuda" text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) model_inputs = tokenizer([text], return_tensors="pt").to(device) generated_ids = model.generate( model_inputs.input_ids, max_new_tokens=Config.DATA_MAX_LENGTH, ) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) ] response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] return response def load_train_and_eval_data(): #获取训练数据集json. print("🧼 开始获取训练数据...") model_train_json_file,model_test_json_file = get_model_train_dataset_json() # 得到训练集 train_df = pd.read_json(model_train_json_file, lines=True) train_ds = Dataset.from_pandas(train_df) train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names) # 得到验证集 eval_df = pd.read_json(model_test_json_file, lines=True) eval_ds = Dataset.from_pandas(eval_df) eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names) return train_dataset,eval_dataset def process_func(example): """ 将数据集进行预处理 """ input_ids, attention_mask, labels = [], [], [] instruction = tokenizer( f"<|im_start|>system\n{Config.PROMPT}<|im_end|>\n<|im_start|>user\n{example['input']}<|im_end|>\n<|im_start|>assistant\n", add_special_tokens=False, ) response = tokenizer(f"{example['output']}", add_special_tokens=False) input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id] attention_mask = ( instruction["attention_mask"] + response["attention_mask"] + [1] ) labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id] if len(input_ids) > Config.DATA_MAX_LENGTH: # 做一个截断 input_ids = input_ids[:Config.DATA_MAX_LENGTH] attention_mask = attention_mask[:Config.DATA_MAX_LENGTH] labels = labels[:Config.DATA_MAX_LENGTH] return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels} def set_trainer(): print("🧼 正在生成训练对象...") print(f"🧼 {Dir.MODEL_DIR}") args = TrainingArguments( output_dir=Dir.MODEL_DIR, per_device_train_batch_size=1, per_device_eval_batch_size=1, gradient_accumulation_steps=4, eval_strategy="steps", eval_steps=500, logging_steps=10, num_train_epochs=2, save_steps=500, learning_rate=1e-4, save_on_each_node=True, gradient_checkpointing=True, report_to="swanlab", run_name=Config.MODEL_NAME, load_best_model_at_end=True, ) trainer = Trainer( model=new_model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True), ) return trainer def set_swanlab(): os.environ["SWANLAB_PROJECT"] = Config.SWANLAB_PROJECT swanlab.config.update({ "model": "", "prompt": Config.PROMPT, "data_max_length": Config.DATA_MAX_LENGTH, }) if __name__ == "__main__": # 释放显存 torch.cuda.empty_cache() # 设置SWANLAB set_swanlab() model,tokenizer = load_model_and_tokenizer() train_dataset, eval_dataset = load_train_and_eval_data() new_model = get_peft_model(model,lora_config) trainer = set_trainer() trainer.train() new_model.save_pretrained("./lora_adapter") tokenizer.save_pretrained("./lora_adapter")