事先准备
- 抱抱脸 bert 文档 huggingface.co/transformer…
- 数据集 www.kaggle.com/c/jigsaw-to…
数据集分为六个类,多标签问题
import numpy as np
import pandas as pd
from sklearn import metrics
import transformers
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from transformers import BertTokenizer, BertModel, BertConfig
复制代码
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
复制代码
开始处理数据...
df = pd.read_csv("train.csv")
df.head()
复制代码
df['list'] = df[df.columns[2:]].values.tolist()
new_df = df[['comment_text', 'list']].copy()
new_df.head()
复制代码
定义超参数
MAX_LEN = 200
TRAIN_BATCH_SIZE = 8
VALID_BATCH_SIZE = 4
EPOCHS = 1
LEARNING_RATE = 1e-05
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
复制代码
测试一下分词器
test_one_sent = new_df.comment_text[0]
print(tokenizer.encode_plus(test_one_sent,
None,
add_special_tokens=True,
max_length=MAX_LEN,
pad_to_max_length=True,
return_token_type_ids=True))
复制代码
使用 torch 的 Dataset 类定义数据集
class CustomDataset(Dataset):
def __init__(self, dataframe, tokenizer, max_len):
self.tokenizer = tokenizer
self.data = dataframe
self.comment_text = dataframe.comment_text
self.targets = self.data.list
self.max_len = max_len
def __len__(self):
return len(self.comment_text)
def __getitem__(self, index):
comment_text = str(self.comment_text[index])
comment_text = " ".join(comment_text.split())
inputs = self.tokenizer.encode_plus(
comment_text,
None,
add_special_tokens=True,
max_length=self.max_len,
pad_to_max_length=True,
return_token_type_ids=True
)
ids = inputs['input_ids']
mask = inputs['attention_mask']
token_type_ids = inputs["token_type_ids"]
return {
'ids': torch.tensor(ids, dtype=torch.long),
'mask': torch.tensor(mask, dtype=torch.long),
'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
'targets': torch.tensor(self.targets[index], dtype=torch.float)
}
复制代码
使用 torch 的 DataLoader 定义迭代数据集
train_size = 0.8
train_dataset=new_df.sample(frac=train_size,random_state=200)
test_dataset=new_df.drop(train_dataset.index).reset_index(drop=True)
train_dataset = train_dataset.reset_index(drop=True)
print("FULL Dataset: {}".format(new_df.shape))
print("TRAIN Dataset: {}".format(train_dataset.shape))
print("TEST Dataset: {}".format(test_dataset.shape))
# 为了迅速训练一下,我这里只截断数据集2000条走一遍流程
train_dataset = train_dataset[:2000]
test_dataset = test_dataset[:100]
复制代码
training_set = CustomDataset(train_dataset, tokenizer, MAX_LEN)
testing_set = CustomDataset(test_dataset, tokenizer, MAX_LEN)
复制代码
train_params = {'batch_size': TRAIN_BATCH_SIZE,
'shuffle': True,
'num_workers': 0
}
test_params = {'batch_size': VALID_BATCH_SIZE,
'shuffle': True,
'num_workers': 0
}
training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)
复制代码
建立网络 BERT + DROPOUT + LINEAR;损失函数为 BCELogits Loss
class BERTClass(torch.nn.Module):
def __init__(self):
super(BERTClass, self).__init__()
self.l1 = transformers.BertModel.from_pretrained('bert-base-uncased')
self.l2 = torch.nn.Dropout(0.3)
self.l3 = torch.nn.Linear(768, 6)
def forward(self, ids, mask, token_type_ids):
_, output_1= self.l1(ids, attention_mask = mask, token_type_ids = token_type_ids)
output_2 = self.l2(output_1)
output = self.l3(output_2)
return output
model = BERTClass()
model.to(device)
复制代码
BCEWithLogitsLoss 的说明
这个类结合了 sigmoid 和 BCE loss,具体如下:
N 是 batch size,其实就是将多标签转换为多个二分类问题
def loss_fn(outputs, targets):
return torch.nn.BCEWithLogitsLoss()(outputs, targets)
复制代码
测试一下 loss,可以看出它们是相等的
y = torch.tensor([1., 1., 0.])
x = torch.tensor([1., 0.9, 0.1])
xy_loss = loss_fn(x, y)
print(xy_loss)
sigmoid_x = torch.sigmoid(x)
print(sigmoid_x)
print(-(np.log(0.7311)+np.log(0.7109)+np.log(1-0.5250)) / 3)
复制代码
定义优化器
optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)
复制代码
定义训练过程
from tqdm import tqdm
def train(epoch):
model.train()
for _,data in tqdm(enumerate(training_loader, 0)):
ids = data['ids'].to(device, dtype = torch.long)
mask = data['mask'].to(device, dtype = torch.long)
token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
targets = data['targets'].to(device, dtype = torch.float)
outputs = model(ids, mask, token_type_ids)
optimizer.zero_grad()
loss = loss_fn(outputs, targets)
if _%5000==0:
print(f'Epoch: {epoch}, Loss: {loss.item()}')
optimizer.zero_grad()
loss.backward()
optimizer.step()
复制代码
for epoch in range(EPOCHS):
train(epoch)
复制代码
定义验证过程
def validation(epoch):
model.eval()
fin_targets=[]
fin_outputs=[]
with torch.no_grad():
for _, data in enumerate(testing_loader, 0):
ids = data['ids'].to(device, dtype = torch.long)
mask = data['mask'].to(device, dtype = torch.long)
token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
targets = data['targets'].to(device, dtype = torch.float)
outputs = model(ids, mask, token_type_ids)
fin_targets.extend(targets.cpu().detach().numpy().tolist())
fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())
return fin_outputs, fin_targets
复制代码
for epoch in range(EPOCHS):
outputs, targets = validation(epoch)
outputs = (np.array(outputs) >= 0.3).astype(int) # 应该为 0.5
accuracy = metrics.accuracy_score(targets, outputs)
f1_score_micro = metrics.f1_score(targets, outputs, average='micro')
f1_score_macro = metrics.f1_score(targets, outputs, average='macro')
print(f"Accuracy Score = {accuracy}")
print(f"F1 Score (Micro) = {f1_score_micro}")
print(f"F1 Score (Macro) = {f1_score_macro}")
复制代码
最终:
- Accuracy Score = 0.89
- F1 Score (Micro) = 0.3870967741935484
- F1 Score (Macro) = 0.18253968253968253
效果很差,因为只取了一丁点数据