import pandas as pd import torch from datasets import Dataset from modelscope import snapshot_download, AutoTokenizer from transformers import AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq import os import swanlab from modelscope.msdatasets import MsDataset import json import random from config import Config,Default,Dir def dataset_jsonl_transfer(origin_path, new_path): """ 将原始数据集转换为大模型微调所需数据格式的新数据集 """ messages = [] # 读取旧的JSONL文件 with open(origin_path, "r") as file: for line in file: # 解析每一行的json数据 data = json.loads(line) input = data["question"] output = f"{data['think']} \n {data['answer']}" message = { "instruction": Config.PROMPT, "input": f"{input}", "output": output, } messages.append(message) # 保存重构后的JSONL文件 with open(new_path, "w", encoding="utf-8") as file: for message in messages: file.write(json.dumps(message, ensure_ascii=False) + "\n") def predict(messages, model, tokenizer): device = "cuda" text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) model_inputs = tokenizer([text], return_tensors="pt").to(device) generated_ids = model.generate( model_inputs.input_ids, max_new_tokens=Config.DATA_MAX_LENGTH, ) generated_ids = [ output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) ] response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] return response def load_model_and_tokenizer(): print("🧼 开始加载模型...") # 在modelscope上下载Qwen模型到本地目录下 model_dir = snapshot_download(Config.MODEL_NAME) # Transformers加载模型权重 tokenizer = AutoTokenizer.from_pretrained(Config.MODEL_NAME, use_fast=False, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", dtype=torch.bfloat16) model.enable_input_require_grads() # 开启梯度检查点时,要执行该方法 return model,tokenizer def mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file): data_list = list(ds) random.shuffle(data_list) split_idx = int(len(data_list) * 0.9) train_data = data_list[:split_idx] val_data = data_list[split_idx:] with open(dataset_train_json_file, 'w', encoding='utf-8') as f: for item in train_data: json.dump(item, f, ensure_ascii=False) f.write('\n') with open(dataset_test_json_file, 'w', encoding='utf-8') as f: for item in val_data: json.dump(item, f, ensure_ascii=False) f.write('\n') print(f"✅ 原始数据清洗完成 | 训练样本量: {len(train_data)} | 测试样本量: {len(train_data)}") def get_dataset_json(): # 确保缓存目录存在 os.makedirs(Dir.DATASET_DIR, exist_ok=True) ds = MsDataset.load( dataset_name=Config.DATASET_NAME, subset_name=Config.DATASET_SUBJECT, split=Config.DATASET_SPLIT, cache_dir=Dir.DATASET_DIR # 统一使用配置的路径 ) dataset_train_json_file = Dir.DATASET_DIR+"/"+Default.TRAIN_DATASET_FILE dataset_test_json_file = Dir.DATASET_DIR + "/" + Default.TEST_DATASET_FILE print(f"🏷️ 原始训练数据文件:{dataset_train_json_file}") print(f"🏷️ 原始测试数据文件:{dataset_test_json_file}") if (not os.path.exists(dataset_train_json_file) or not os.path.exists(dataset_test_json_file) or not Config.DATASET_USE_CACHE): print("🏷️ 正在生成原始数据集...") mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file) return dataset_train_json_file,dataset_test_json_file def get_model_train_dataset_json_file(): model_train_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TRAIN_JSONL_NEW_FILE model_test_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TEST_JSONL_NEW_FILE return model_train_json_file, model_test_json_file def get_model_train_dataset_json(): model_train_json_file,model_test_json_file = get_model_train_dataset_json_file() print(f"🏷️ 模型训练数据文件:{model_train_json_file}") print(f"🏷️ 模型测试数据文件:{model_test_json_file}") if not os.path.exists(model_train_json_file) or not os.path.exists( model_test_json_file) or not Config.DATASET_USE_CACHE: print("🏷️ 未找到模型对应数据集,准备从原始数据集进行生成数据集...") dataset_train_json_file,dataset_test_json_file = get_dataset_json() print("🏷️ 原始数据集生成成功...") print("🧼 开始转换模型训练数据集...") dataset_jsonl_transfer(dataset_train_json_file, model_train_json_file) print("🧼 开始转换模型验证数据集...") dataset_jsonl_transfer(dataset_test_json_file, model_test_json_file) return model_train_json_file,model_test_json_file def load_train_and_eval_data(): #获取训练数据集json. print("🧼 开始获取训练数据...") model_train_json_file,model_test_json_file = get_model_train_dataset_json() # 得到训练集 train_df = pd.read_json(model_train_json_file, lines=True) train_ds = Dataset.from_pandas(train_df) train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names) # 得到验证集 eval_df = pd.read_json(model_test_json_file, lines=True) eval_ds = Dataset.from_pandas(eval_df) eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names) return train_dataset,eval_dataset def set_swanlab(): os.environ["SWANLAB_PROJECT"] = Config.SWANLAB_PROJECT swanlab.config.update({ "model": "", "prompt": Config.PROMPT, "data_max_length": Config.DATA_MAX_LENGTH, }) def process_func(example): """ 将数据集进行预处理 """ input_ids, attention_mask, labels = [], [], [] instruction = tokenizer( f"<|im_start|>system\n{Config.PROMPT}<|im_end|>\n<|im_start|>user\n{example['input']}<|im_end|>\n<|im_start|>assistant\n", add_special_tokens=False, ) response = tokenizer(f"{example['output']}", add_special_tokens=False) input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id] attention_mask = ( instruction["attention_mask"] + response["attention_mask"] + [1] ) labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id] if len(input_ids) > Config.DATA_MAX_LENGTH: # 做一个截断 input_ids = input_ids[:Config.DATA_MAX_LENGTH] attention_mask = attention_mask[:Config.DATA_MAX_LENGTH] labels = labels[:Config.DATA_MAX_LENGTH] return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels} def set_trainer(train_dataset): print("🧼 正在生成训练对象...") print(f"🧼 {Dir.MODEL_DIR}") args = TrainingArguments( output_dir=Dir.MODEL_DIR, per_device_train_batch_size=1, per_device_eval_batch_size=1, gradient_accumulation_steps=4, eval_strategy="steps", eval_steps=100, logging_steps=10, num_train_epochs=2, save_steps=400, learning_rate=1e-4, save_on_each_node=True, gradient_checkpointing=True, report_to="swanlab", run_name=Config.MODEL_NAME, ) trainer = Trainer( model=model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset, data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True), ) return trainer if __name__ == "__main__": #设置SWANLAB set_swanlab() #加载模型和分词器 model, tokenizer = load_model_and_tokenizer() # 加载数据集和验证集 train_dataset, eval_dataset = load_train_and_eval_data() model_train_json_file, model_test_json_file = get_model_train_dataset_json_file() #设置训练对象 trainer = set_trainer(train_dataset) print("🚬🚬🚬 开始模型训练...") trainer.train() print(" 🎇🎇🎇模型训练完成,...") # 用测试集的前3条,主观看模型 test_df = pd.read_json(model_train_json_file, lines=True)[:3] test_text_list = [] for index, row in test_df.iterrows(): instruction = row['instruction'] input_value = row['input'] messages = [ {"role": "system", "content": f"{instruction}"}, {"role": "user", "content": f"{input_value}"} ] response = predict(messages, model, tokenizer) response_text = f""" Question: {input_value} LLM:{response} """ test_text_list.append(swanlab.Text(response_text)) print(response_text) swanlab.log({"Prediction": test_text_list}) swanlab.finish()