来源:投稿 作者:175
编辑:学姐
引言
前面几篇文章都是基于表示型的方法训练BERT进行文本匹配,而本文是以交互型的方法。具体来说,将待匹配的两个句子拼接成一个输入喂给BERT模型,最后让其输出一个相似性得分。
本文完整代码
文末领取
。
文本匹配系列文章先更新到此,目前为止都是基于监督学习Sentence Pair的方式,后续有时间继续更新对比学习三元组(anchor, positive, negative)的方式和无监督学习的方式。
架构
Cross-Encoder会利用自注意力机制不断计算这两个句子之间的交互(注意力),最后接一个分类器输出一个分数(logits)代表相似度(可以经过sigmoid变成一个概率)。
实现
实现采用类似Huggingface的形式,每个文件夹下面有一种模型。分为
modeling
、
arguments
、
trainer
等不同的文件。不同的架构放置在不同的文件夹内。
modeling.py
:
import torch from torch import nn import numpy as np from tqdm import tqdm from transformers import ( AutoTokenizer, AutoConfig, AutoModelForSequenceClassification, ) from torch.utils.data import DataLoader from transformers.modeling_outputs import SequenceClassifierOutput from transformers.tokenization_utils_base import BatchEncoding import logging logger = logging.getLogger(__name__) class SentenceBert(nn.Module): def __init__( self, model_name: str, max_length: int = None, trust_remote_code: bool = True, ) -> None: super().__init__() self.config = AutoConfig.from_pretrained( model_name, trust_remote_code=trust_remote_code ) self.config.num_labels = 1 # reranker self.model = AutoModelForSequenceClassification.from_pretrained( model_name, config=self.config, trust_remote_code=trust_remote_code ) self.tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=trust_remote_code ) self.max_length = max_length self.loss_fct = nn.BCEWithLogitsLoss() def batching_collate(self, batch: list[tuple[str, str]]) -> BatchEncoding: texts = [[] for _ in range(len(batch[0]))] for example in batch: for idx, text in enumerate(example): texts[idx].append(text.strip()) tokenized = self.tokenizer( *texts, padding=True, truncation="longest_first" , return_tensors="pt" , max_length=self.max_length ).to(self.model.device) return tokenized def predict( self, sentences: list[tuple[str, str]], batch_size: int = 64, convert_to_tensor: bool = True, show_progress_bar: bool = False, ): dataloader = DataLoader( sentences, batch_size=batch_size, collate_fn=self.batching_collate, shuffle=False, ) preds = [] for batch in tqdm( dataloader, disable =not show_progress_bar, desc="Running Inference" ): with torch.no_grad(): logits = self.model(**batch).logits logits = torch.sigmoid(logits) preds.extend(logits) if convert_to_tensor: preds = torch.stack(preds) else : preds = np.asarray([pred.cpu().detach().float ().numpy() for pred in preds]) return preds def forward(self, inputs, labels=None): outputs = self.model(**inputs, return_dict=True) if labels is not None: labels = labels.float() logits = outputs.logits logits = logits.view(-1) loss = self.loss_fct(logits, labels) return SequenceClassifierOutput(loss, **outputs) return outputs def save_pretrained(self, output_dir: str) -> None: state_dict = self.model.state_dict() state_dict = type (state_dict)( {k: v.clone().cpu().contiguous() for k, v in state_dict.items()} ) self.model.save_pretrained(output_dir, state_dict=state_dict)
整个模型的实现放到
modeling.py
文件中。
这里首先设置类别数为
1
:
num_labels = 1
;然后通过
AutoModelForSequenceClassification
增加一个序列分类器头,该分类器头核心代码为:
class BertForSequenceClassification(BertPreTrainedModel): def __init__(self, config): super().__init__(config) self.num_labels = config.num_labels self.config = config # 实例化BERT模型
self.bert = BertModel(config) # 增加一个线性层,从hidden_size映射为num_labels维度,这里是1 self.classifier = nn.Linear(config.hidden_size, config.num_labels) def forward( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, token_type_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, head_mask: Optional[torch.Tensor] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]: # 先得到bert模型的输出 outputs = self.bert( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # 实际上是cls 标记对应的表示 pooled_output = outputs[1] # 得到一个一维的logits logits = self.classifier(pooled_output)
BERT模型中所谓的
pooled_output
实际上是:
class BertPooler(nn.Module): def __init__(self, config): super().__init__() # 从hidden_size空间映射到另一个hidden_size空间 self.dense = nn.Linear(config.hidden_size, config.hidden_size) # 经过tanh激活函数 self.activation = nn.Tanh() def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: # 取最后一层隐藏状态第一个token: [cls] first_token_tensor = hidden_states[:, 0] pooled_output = self.dense(first_token_tensor) pooled_output = self.activation(pooled_output) return pooled_output
回到我们的
modeling.py
,训练时利用
forward
方法;推理时利用
predict
方法,支持批处理。输入是表示语句对的元组。
arguments.py
:
from dataclasses import dataclass, field from typing import Optional import os @dataclass class ModelArguments: model_name_or_path: str = field( metadata={ "help" : "Path to pretrained model or model identifier from huggingface" } ) config_name: Optional[str] = field( default=None, metadata={ "help" : "Pretrained config name or path if not the same as model_name" }, ) tokenizer_name: Optional[str] = field( default=None, metadata={ "help" : "Pretrained tokenizer name or path if not the same as model_name" }, ) @dataclass class DataArguments: train_data_path: str = field( default=None, metadata={"help" : "Path to train corpus" } ) eval_data_path: str = field(default=None, metadata={"help" : "Path to eval corpus" }) max_length: int = field( default=512, metadata={ "help" : "The maximum total input sequence length after tokenization for input text." }, ) def __post_init__(self): if not os.path.exists(self.train_data_path): raise FileNotFoundError( f"cannot find file: {self.train_data_path}, please set a true path" ) if not os.path.exists(self.eval_data_path): raise FileNotFoundError( f"cannot find file: {self.eval_data_path}, please set a true path" )
定义了模型和数据相关参数。
dataset.py
:
from torch.utils.data import Dataset from transformers import PreTrainedTokenizer, DataCollatorWithPadding from datasets import Dataset as dt from typing import Any from utils import build_dataframe_from_csv class PairDataset(Dataset): def __init__( self, data_path: str, tokenizer: PreTrainedTokenizer, max_len: int ) -> None: df = build_dataframe_from_csv(data_path) self.dataset = dt.from_pandas(df, split="train" ) self.total_len = len(self.dataset) self.tokenizer = tokenizer self.max_len = max_len def __len__(self): return self.total_len def __getitem__(self, index) -> dict[str, Any]: query1 = self.dataset[index]["query1" ] query2 = self.dataset[index]["query2" ] label = self.dataset[index]["label" ] encoding = self.tokenizer.encode_plus( query1, query2, truncation="only_second" , max_length=self.max_len, padding=False, ) encoding["label" ] = label return encoding
数据集类考虑了LCQMC数据集的格式,即成对的语句和一个数值标签。类似:
Hello. Hi. 1 Nice to see you. Nice 0
这里数据集的处理和之前的有所不同,主要是调用
encode_plus
将文本对拼接在一起,并且仅阶段第二个文本。
这里没有进行padding,交给
DataCollatorWithPadding
来做。
trainer.py
:
import torch from transformers.trainer import Trainer from typing import Optional import os import logging TRAINING_ARGS_NAME = "training_args.bin" from modeling import SentenceBert logger = logging.getLogger(__name__) class CrossTrainer(Trainer): def compute_loss(self, model: SentenceBert, inputs, return_outputs=False): labels = inputs.pop("labels" ) return model(inputs, labels)["loss" ] def _save(self, output_dir: Optional[str] = None, state_dict=None): # If we are executing this function, we are the process zero, so we don't check for that. output_dir = output_dir if output_dir is not None else self.args.output_dir os.makedirs(output_dir, exist_ok=True) logger.info(f"Saving model checkpoint to {output_dir}" ) self.model.save_pretrained(output_dir) if self.tokenizer is not None: self.tokenizer.save_pretrained(output_dir) # Good practice: save your training arguments together with the trained model torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))
继承🤗 Transformers的
Trainer
类,重写了
compute_loss
和
_save
方法。
这样我们就可以利用🤗 Transformers来训练我们的模型了。
utils.py
:
import torch import pandas as pd from scipy.stats import pearsonr, spearmanr from typing import Tuple def build_dataframe_from_csv(dataset_csv: str) -> pd.DataFrame: df = pd.read_csv( dataset_csv, sep="\t" , header=None, names=["query1" , "query2" , "label" ], ) return df def compute_spearmanr(x, y): return spearmanr(x, y).correlation def compute_pearsonr(x, y): return pearsonr(x, y)[0] def find_best_acc_and_threshold(scores, labels, high_score_more_similar: bool): "" "Copied from https://github.com/UKPLab/sentence-transformers/tree/master" "" assert len(scores) == len(labels) rows = list(zip(scores, labels)) rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar) max_acc = 0 best_threshold = -1 # positive examples number so far positive_so_far = 0 # remain negative examples remaining_negatives = sum(labels == 0) for i in range(len(rows) - 1): score, label = rows[i] if label == 1: positive_so_far += 1 else : remaining_negatives -= 1 acc = (positive_so_far + remaining_negatives) / len(labels) if acc > max_acc: max_acc = acc best_threshold = (rows[i][0] + rows[i + 1][0]) / 2 return max_acc, best_threshold def metrics(y: torch.Tensor, y_pred: torch.Tensor) -> Tuple[float , float , float , float ]: TP = ((y_pred == 1) & (y == 1)).sum().float () # True Positive TN = ((y_pred == 0) & (y == 0)).sum().float () # True Negative FN = ((y_pred == 0) & (y == 1)).sum().float () # False Negatvie FP = ((y_pred == 1) & (y == 0)).sum().float () # False Positive p = TP / (TP + FP).clamp(min=1e-8) # Precision r = TP / (TP + FN).clamp(min=1e-8) # Recall F1 = 2 * r * p / (r + p).clamp(min=1e-8) # F1 score acc = (TP + TN) / (TP + TN + FP + FN).clamp(min=1e-8) # Accurary return acc, p, r, F1 def compute_metrics(predicts, labels): return metrics(labels, predicts)
定义了一些帮助函数,从sentence-transformers库中拷贝了寻找最佳准确率阈值的实现
find_best_acc_and_threshold
。
除了准确率,还计算了句嵌入的余弦相似度与真实标签之间的斯皮尔曼等级相关系数指标。
最后定义训练和测试脚本。
train.py
:
from transformers import ( set_seed, HfArgumentParser, TrainingArguments, DataCollatorWithPadding, ) import logging import os from pathlib import Path from datetime import datetime from modeling import SentenceBert from trainer import CrossTrainer from arguments import DataArguments, ModelArguments from dataset import PairDataset logger = logging.getLogger(__name__) logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s" , datefmt="%m/%d/%Y %H:%M:%S" , level=logging.INFO, ) def main(): parser = HfArgumentParser((TrainingArguments, DataArguments, ModelArguments)) training_args, data_args, model_args = parser.parse_args_into_dataclasses() output_dir = f"{training_args.output_dir}/{model_args.model_name_or_path.replace('/', '-')}-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" training_args.output_dir = output_dir logger.info(f"Training parameters {training_args}" ) logger.info(f"Data parameters {data_args}" ) logger.info(f"Model parameters {model_args}" ) set_seed(training_args.seed) model = SentenceBert( model_args.model_name_or_path, max_length=data_args.max_length, trust_remote_code=True, ) tokenizer = model.tokenizer train_dataset = PairDataset( data_args.train_data_path, tokenizer, data_args.max_length, ) eval_dataset = PairDataset( data_args.eval_data_path, tokenizer, data_args.max_length, ) trainer = CrossTrainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, data_collator=DataCollatorWithPadding(tokenizer), tokenizer=tokenizer, ) Path(training_args.output_dir).mkdir(parents=True, exist_ok=True) trainer.train() trainer.save_model()if __name__ == "__main__" : main()
训练
基于
train.py
定义了
train.sh
传入相关参数:
timestamp=$(date +%Y%m%d%H%M) logfile="train_${timestamp} .log" # change CUDA_VISIBLE_DEVICES CUDA_VISIBLE_DEVICES=1 nohup python train.py \ --model_name_or_path=hfl/chinese-macbert-large \ --output_dir=output \ --train_data_path=data/train.txt \ --eval_data_path=data/dev.txt \ --num_train_epochs=3 \ --save_total_limit=5 \ --learning_rate=2e-5 \ --weight_decay=0.01 \ --warmup_ratio=0.01 \ --bf16=True \ --save_strategy=epoch \ --per_device_train_batch_size=64 \ --report_to="none" \ --remove_unused_columns=False \ --max_length=128 \ > "$logfile " 2>&1 &
以上参数根据个人环境修改,这里使用的是哈工大的
chinese-macbert-large
预训练模型。
注意:
通过bf16=True可以加速训练同时不影响效果,不支持可以尝试fp16;
100%|██████████| 18655/18655 [1:15:47<00:00, 5.06it/s] 100%|██████████| 18655/18655 [1:15:47<00:00, 4.10it/s] {'loss' : 0.0464, 'grad_norm' : 4.171152591705322, 'learning_rate' : 1.6785791639592811e-07, 'epoch' : 4.96} {'train_runtime' : 4547.2543, 'train_samples_per_second' : 262.539, 'train_steps_per_second' : 4.102, 'train_loss' : 0.11396670312096753, 'epoch' : 5.0}
这里训练了5轮,为了测试效果,但发现实际上3轮的结果还好一些,因此最终拿它来测试。