来源:投稿 作者:175
编辑:学姐
引言
上篇文章我们通过
Sentence-Bert提出的分类目标函数
来训练句子嵌入模型,本文同样基于Sentence-Bert的架构,但改用回归目标函数。
架构
如上图,计算两个句嵌入u 和v 之间的余弦相似度,然后可以使用均方误差(mean-squared-error)作为目标函数。
这里的y 是真实标签。
回归目标函数的预测不再是整数标签1或0了,而可以为数值。比如对于给定的句子对,可以计算相似度得分。此时推理流程与训练完全相同。
实现
实现采用类似Huggingface的形式,每个文件夹下面有一种模型。分为
modeling
、
arguments
、
trainer
等不同的文件。不同的架构放置在不同的文件夹内。
modeling.py:
from dataclasses import dataclass import torch from torch import Tensor, nn from transformers.file_utils import ModelOutput from transformers import ( AutoModel, AutoTokenizer, ) import numpy as np from tqdm.autonotebook import trange from typing import Optional @dataclass class BiOutput(ModelOutput): loss: Optional[Tensor] = None scores: Optional[Tensor] = None class SentenceBert(nn.Module): def __init__( self, model_name: str, trust_remote_code: bool = True, max_length: int = None, num_classes: int = 2, pooling_mode: str = "mean" , normalize_embeddings: bool = False, ) -> None: super().__init__() self.model_name = model_name self.normalize_embeddings = normalize_embeddings self.device = "cuda" if torch.cuda.is_available() else "cpu" self.tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=trust_remote_code ) self.model = AutoModel.from_pretrained( model_name, trust_remote_code=trust_remote_code ).to(self.device) self.max_length = max_length self.pooling_mode = pooling_mode self.loss_fct = nn.MSELoss() def sentence_embedding(self, last_hidden_state, attention_mask): if self.pooling_mode == "mean" : attention_mask = attention_mask.unsqueeze(-1).float () return torch.sum(last_hidden_state * attention_mask, dim=1) / torch.clamp( attention_mask.sum(1), min=1e-9 ) else : # cls return last_hidden_state[:, 0] def encode( self, sentences: str | list[str], batch_size: int = 64, convert_to_tensor: bool = True, show_progress_bar: bool = False, ): if isinstance(sentences, str): sentences = [sentences] all_embeddings = [] for start_index in trange( 0, len(sentences), batch_size, desc="Batches" , disable =not show_progress_bar ): batch = sentences[start_index : start_index + batch_size] features = self.tokenizer( batch, padding=True, truncation=True, return_tensors="pt" , return_attention_mask=True, max_length=self.max_length, ).to(self.device) out_features = self.model(**features, return_dict=True) embeddings = self.sentence_embedding( out_features.last_hidden_state, features["attention_mask" ] ) if not self.training: embeddings = embeddings.detach() if self.normalize_embeddings: embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) if not convert_to_tensor: embeddings = embeddings.cpu() all_embeddings.extend(embeddings) if convert_to_tensor: all_embeddings = torch.stack(all_embeddings) else : all_embeddings = np.asarray([emb.numpy() for emb in all_embeddings]) return all_embeddings def compute_loss(self, scores, labels): labels = torch.tensor(labels).float ().to(self.device) return self.loss_fct(scores, labels.view(-1)) def forward(self, source , target, labels) -> BiOutput: "" " Args: source : target : " "" source_embed = self.encode(source ) target_embed = self.encode(target) scores = torch.cosine_similarity(source_embed, target_embed) loss = self.compute_loss(scores, labels) return BiOutput(loss, scores) def save_pretrained(self, output_dir: str): state_dict = self.model.state_dict() state_dict = type (state_dict)( {k: v.clone().cpu().contiguous() for k, v in state_dict.items()} ) self.model.save_pretrained(output_dir, state_dict=state_dict)
整个模型的实现放到
modeling.py
文件中。
arguments.py: from dataclasses import dataclass, field from typing import Optional import os @dataclass class ModelArguments: model_name_or_path: str = field( metadata={ "help"
: "Path to pretrained model" } ) config_name: Optional[str] = field( default=None, metadata={ "help" : "Pretrained config name or path if not the same as model_name" }, ) tokenizer_name: Optional[str] = field( default=None, metadata={ "help" : "Pretrained tokenizer name or path if not the same as model_name" }, ) @dataclass class DataArguments: train_data_path: str = field( default=None, metadata={"help" : "Path to train corpus" } ) eval_data_path: str = field(default=None, metadata={"help" : "Path to eval corpus" }) max_length: int = field( default=512, metadata={ "help" : "The maximum total input sequence length after tokenization for input text." }, ) def __post_init__(self): if not os.path.exists(self.train_data_path): raise FileNotFoundError( f"cannot find file: {self.train_data_path}, please set a true path" ) if not os.path.exists(self.eval_data_path): raise FileNotFoundError( f"cannot find file: {self.eval_data_path}, please set a true path" )
定义了模型和数据相关参数。
dataset.py:
from torch.utils.data import Dataset from datasets import Dataset as dt import pandas as pd from utils import build_dataframe_from_csv class PairDataset(Dataset): def __init__(self, data_path: str) -> None: df = build_dataframe_from_csv(data_path) self.dataset = dt.from_pandas(df, split="train" ) self.total_len = len(self.dataset) def __len__(self): return self.total_len def __getitem__(self, index) -> dict[str, str]: query1 = self.dataset[index]["query1" ] query2 = self.dataset[index]["query2" ] label = self.dataset[index]["label" ] return {"query1" : query1, "query2" : query2, "label" : label} class PairCollator: def __call__(self, features) -> dict[str, list[str]]: queries1 = [] queries2 = [] labels = [] for feature in features: queries1.append(feature["query1" ]) queries2.append(feature["query2" ]) labels.append(feature["label" ]) return {"source" : queries1, "target" : queries2, "labels" : labels}
数据集类考虑了LCQMC数据集的格式,即成对的语句和一个数值标签。类似:
Hello. Hi. 1 Nice to see you. Nice 0
trainer.py:
import torch from transformers.trainer import Trainer from typing import Optional import os import logging from modeling import SentenceBert TRAINING_ARGS_NAME = "training_args.bin" logger = logging.getLogger(__name__) class BiTrainer(Trainer): def compute_loss(self, model: SentenceBert, inputs, return_outputs=False): outputs = model(**inputs) loss = outputs.loss return (loss, outputs) if return_outputs else loss def _save(self, output_dir: Optional[str] = None, state_dict=None): # If we are executing this function, we are the process zero, so we don't check for that. output_dir = output_dir if output_dir is not None else self.args.output_dir os.makedirs(output_dir, exist_ok=True) logger.info(f"Saving model checkpoint to {output_dir}" ) self.model.save_pretrained(output_dir) if self.tokenizer is not None: self.tokenizer.save_pretrained(output_dir) # Good practice: save your training arguments together with the trained model torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))
继承🤗 Transformers的
Trainer
类,重写了
compute_loss
和
_save
方法。
这样我们就可以利用🤗 Transformers来训练我们的模型了。
utils.py:
import torch import pandas as pd from scipy.stats import pearsonr, spearmanr from typing import Tuple def build_dataframe_from_csv(dataset_csv: str) -> pd.DataFrame: df = pd.read_csv( dataset_csv, sep="\t" , header=None, names=["query1" , "query2" , "label" ], ) return df def compute_spearmanr(x, y): return spearmanr(x, y).correlation def compute_pearsonr(x, y): return pearsonr(x, y)[0] def find_best_acc_and_threshold(scores, labels, high_score_more_similar: bool): "" "Copied from https://github.com/UKPLab/sentence-transformers/tree/master" "" assert len(scores) == len(labels) rows = list(zip(scores, labels)) rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar) print (rows) max_acc = 0 best_threshold = -1 # positive examples number so far positive_so_far = 0 # remain negative examples remaining_negatives = sum(labels == 0) for i in range(len(rows) - 1): score, label = rows[i] if label == 1: positive_so_far += 1 else : remaining_negatives -= 1 acc = (positive_so_far + remaining_negatives) / len(labels) if acc > max_acc: max_acc = acc best_threshold = (rows[i][0] + rows[i + 1][0]) / 2 return max_acc, best_threshold def metrics(y: torch.Tensor, y_pred: torch.Tensor) -> Tuple[float , float , float , float ]: TP = ((y_pred == 1) & (y == 1)).sum().float () # True Positive TN = ((y_pred == 0) & (y == 0)).sum().float () # True Negative FN = ((y_pred == 0) & (y == 1)).sum().float () # False Negatvie FP = ((y_pred == 1) & (y == 0)).sum().float () # False Positive p = TP / (TP + FP).clamp(min=1e-8) # Precision r = TP / (TP + FN).clamp(min=1e-8) # Recall F1 = 2 * r * p / (r + p).clamp(min=1e-8) # F1 score acc = (TP + TN) / (TP + TN + FP + FN).clamp(min=1e-8) # Accurary return acc, p, r, F1 def compute_metrics(predicts, labels): return metrics(labels, predicts)
定义了一些帮助函数,从sentence-transformers库中拷贝了寻找最佳准确率阈值的实现
find_best_acc_and_threshold
。
除了准确率,还计算了句嵌入的余弦相似度与真实标签之间的斯皮尔曼等级相关系数指标。
最后定义训练和测试脚本。
train.py:
from transformers import set_seed, HfArgumentParser, TrainingArguments import logging from pathlib import Path from datetime import datetime from modeling import SentenceBert from trainer import BiTrainer from arguments import DataArguments, ModelArguments from dataset import PairCollator, PairDataset logger = logging.getLogger(__name__) logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s" , datefmt="%m/%d/%Y %H:%M:%S" , level=logging.INFO, ) def main(): parser = HfArgumentParser((TrainingArguments, DataArguments, ModelArguments)) training_args, data_args, model_args = parser.parse_args_into_dataclasses() # 根据当前时间生成输出目录 output_dir = f"{training_args.output_dir}/{model_args.model_name_or_path.replace('/', '-')}-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" training_args.output_dir = output_dir logger.info(f"Training parameters {training_args}" ) logger.info(f"Data parameters {data_args}" ) logger.info(f"Model parameters {model_args}" ) # 设置随机种子 set_seed(training_args.seed) # 加载预训练模型 model = SentenceBert( model_args.model_name_or_path, trust_remote_code=True, max_length=data_args.max_length, ) tokenizer = model.tokenizer # 构建训练和测试集 train_dataset = PairDataset(data_args.train_data_path) eval_dataset = PairDataset(data_args.eval_data_path) # 传入参数 trainer = BiTrainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, data_collator=PairCollator(), tokenizer=tokenizer, ) Path(training_args.output_dir).mkdir(parents=True, exist_ok=True) # 开始训练 trainer.train() trainer.save_model()if __name__ == "__main__" : main()
训练
基于
train.py
定义了
train.sh
传入相关参数:
timestamp=$(date +%Y%m%d%H%M) logfile="train_${timestamp} .log" # change CUDA_VISIBLE_DEVICES CUDA_VISIBLE_DEVICES=3 nohup python train.py \ --model_name_or_path=hfl/chinese-macbert-large \ --output_dir=output \ --train_data_path=data/train.txt \ --eval_data_path=data/dev.txt \ --num_train_epochs=3 \ --save_total_limit=5 \ --learning_rate=2e-5 \ --weight_decay=0.01 \ --warmup_ratio=0.01 \ --bf16=True \ --eval_strategy=epoch \ --save_strategy=epoch \ --per_device_train_batch_size=64 \ --report_to="none" \ --remove_unused_columns=False \ --max_length=128 \ > "$logfile " 2>&1 &
以上参数根据个人环境修改,这里使用的是哈工大的
chinese-macbert-large
预训练模型。
注意:
--remove_unused_columns
是必须的。
通过
bf16=True
可以加速训练同时不影响效果。
100%|██████████| 18655/18655 [1:17:23<00:00, 4.44it/s] 100%|██████████| 18655/18655 [1:17:23<00:00, 4.02it/s] 09/02/2024 21:02:41 - INFO - trainer - Saving model checkpoint to output/hfl-chinese-macbert-large-2024-09-02_19-45-12 {'eval_loss' : 0.09294428676366806, 'eval_runtime' : 56.1261, 'eval_samples_per_second' : 156.825, 'eval_steps_per_second' : 19.617, 'epoch' : 5.0} {'train_runtime' : 4643.261, 'train_samples_per_second' : 257.11, 'train_steps_per_second' : 4.018, 'train_loss' : 0.049199433276877584, 'epoch' : 5.0}
这里训练了5轮,我们拿最后保存的模型
output/hfl-chinese-macbert-large-2024-09-02_19-45-12
进行测试。
参数忘改了,为了便于比较,实际上下面的结果是以3轮的训练结果验证的。
测试
test.py:
测试脚本见后文的完整代码。