本文通过ChnSentiCorp数据集介绍了完型填空任务过程,主要使用预训练语言模型bert-base-chinese直接在测试集上进行测试,也简要介绍了模型训练流程,不过最后没有保存训练好的模型。
一.完形填空
完形填空应该大家都比较熟悉,就是把句子中的词挖掉,根据上下文推测挖掉的词是什么。
二.准备数据集
本文使用ChnSentiCorp数据集,不清楚的可以参考中文情感分类介绍。一些样例如下所示:
本文做法为将每句话截断为固定的30个词,同时将第15个词替换为[MASK],模型任务为根据上下文预测第15个词。
1.使用编码工具
def load_encode_tool(pretrained_model_name_or_path):
token = BertTokenizer.from_pretrained(Path(f'{pretrained_model_name_or_path}'))
return token
if __name__ == '__main__':
# 测试编码工具
pretrained_model_name_or_path = r'L:\20230713_HuggingFaceModel\bert-base-chinese'
token = load_encode_tool(pretrained_model_name_or_path)
print(token)
输出结果如下所示:
BertTokenizer(name_or_path='L:\20230713_HuggingFaceModel\bert-base-chinese', vocab_size=21128, model_max_length=1000000000000000019884624838656, is_fast=False, padding_side='right', truncation_side='right', special_tokens={'unk_token': '[UNK]', 'sep_token': '[SEP]', 'pad_token': '[PAD]', 'cls_token': '[CLS]', 'mask_token': '[MASK]'}, clean_up_tokenization_spaces=True)
测试编码句子如下所示:
if __name__ == '__main__':
# 测试编码工具
pretrained_model_name_or_path = r'L:\20230713_HuggingFaceModel\bert-base-chinese'
token = load_encode_tool(pretrained_model_name_or_path)
# 测试编码句子
out = token.batch_encode_plus(
batch_text_or_text_pairs=[('不是一切大树,', '都被风暴折断。'),('不是一切种子,', '都找不到生根的土壤。')],
truncation=True,
padding='max_length',
max_length=18,
return_tensors='pt',
return_length=True, # 返回长度
)
# 查看编码输出
for k, v in out.items():
print(k, v.shape)
print(token.decode(out['input_ids'][0]))
print(token.decode(out['input_ids'][1]))
结果输出如下所示:
input_ids torch.Size([2, 18])
token_type_ids torch.Size([2, 18])
length torch.Size([2])
attention_mask torch.Size([2, 18])
[CLS] 不 是 一 切 大 树 , [SEP] 都 被 风 暴 折 断 。 [SEP] [PAD]
[CLS] 不 是 一 切 种 子 , [SEP] 都 找 不 到 生 根 的 土 [SEP]
第1个句子长度为17,补了1个[PAD],第2个句子长度为18。return_length=True表示返回句子真实长度,即不包括[PAD]填充部分长度。如下所示: 编码结果如下所示:
2.定义数据集
def load_dataset_from_disk():
pretrained_model_name_or_path = r'L:\20230713_HuggingFaceModel\ChnSentiCorp'
dataset = load_from_disk(pretrained_model_name_or_path)
# batched=True表示批量处理
# batch_size=1000表示每次处理1000个样本
# num_proc=8表示使用8个线程操作
# remove_columns=['text']表示移除text列
dataset = dataset.map(f1, batched=True, batch_size=1000, num_proc=8, remove_columns=['text', 'label'])
return dataset
if __name__ == '__main__':
# 加载数据集
dataset = load_dataset_from_disk()
print(dataset)
结果输出如下所示:
DatasetDict({
train: Dataset({
features: ['input_ids', 'token_type_ids', 'attention_mask', 'length'],
num_rows: 9600
})
validation: Dataset({
features: ['input_ids', 'token_type_ids', 'attention_mask', 'length'],
num_rows: 1200
})
test: Dataset({
features: ['input_ids', 'token_type_ids', 'attention_mask', 'length'],
num_rows: 1200
})
})
3.定义计算设备
# 定义计算设备
device = 'cpu'
if torch.cuda.is_available():
device = 'cuda'
# print(device)
4.定义数据整理函数
本质是将每个句子第15个词替换为[MASK],同时将第15个词作为标签,即根据上下文要预测的词。如下所示:
# 数据整理函数
def collate_fn(data):
# 取出编码结果
input_ids = [i['input_ids'] for i in data]
attention_mask = [i['attention_mask'] for i in data]
token_type_ids = [i['token_type_ids'] for i in data]
# 转换为Tensor格式
input_ids = torch.LongTensor(input_ids)
attention_mask = torch.LongTensor(attention_mask)
token_type_ids = torch.LongTensor(token_type_ids)
# 把第15个词替换为MASK
labels = input_ids[:, 15].reshape(-1).clone()
input_ids[:, 15] = token.get_vocab()[token.mask_token]
# 移动到计算设备
input_ids = input_ids.to(device)
attention_mask = attention_mask.to(device)
token_type_ids = token_type_ids.to(device)
labels = labels.to(device)
return input_ids, attention_mask, token_type_ids, labels
5.定义数据集加载器
# 数据集加载器
loader = torch.utils.data.DataLoader(dataset=dataset['train'], batch_size=16, collate_fn=collate_fn, shuffle=True, drop_last=True)
print(len(loader)) #600=9600/16
查看样例数据如下所示:
# 查看数据样例
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
break
print(input_ids.shape, attention_mask.shape, token_type_ids.shape, labels)
输出结果如下所示:
torch.Size([16, 30])
torch.Size([16, 30])
torch.Size([16, 30])
tensor([4638, 8024, 3198, 6206, 6392, 4761, 3449, 2128, 3341, 119, 3315, 2697,
2523, 2769, 6814, 1086], device='cuda:0')
三.定义模型
1.加载预训练模型
加载模型并移动到device(CPU或GPU)中,如下所示:
pretrained_model_name_or_path = r'L:\20230713_HuggingFaceModel\bert-base-chinese'
# 加载预训练模型
pretrained = BertModel.from_pretrained(Path(f'{pretrained_model_name_or_path}'))
pretrained.to(device)
2.定义下游任务模型
下游任务模型将BERT提取第15个词的特征(16×768),输入到全连接神经网络(768×21128),得到16×21128,即把第15个词的特征投影到全体词表空间中,还原为词典中的某个词。
class Model(torch.nn.Module):
def __init__(self):
super().__init__()
self.decoder = torch.nn.Linear(in_features=768, out_features=token.vocab_size, bias=False)
# 重新将decode中的bias参数初始化为全o
self.bias = torch.nn.Parameter(data=torch.zeros(token.vocab_size))
self.decoder.bias = self.bias
# 定义 Dropout层,防止过拟合
self.Dropout = torch.nn.Dropout(p=0.5)
def forward(self, input_ids, attention_mask, token_type_ids):
# 使用预训练模型抽取数据特征
with torch.no_grad():
out = pretrained(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
# 把第15个词的特征投影到全字典范围内
out = self.Dropout(out.last_hidden_state[:, 15])
out = self.decoder(out)
return out
四.训练和测试
1.训练
定义了AdamW优化器、loss损失函数(交叉熵损失函数)和线性学习率调节器,如下所示:
def train():
# 定义优化器
optimizer = AdamW(model.parameters(), lr=5e-4, weight_decay=1.0)
# 定义1oss函数
criterion = torch.nn.CrossEntropyLoss()
# 定义学习率调节器
scheduler = get_scheduler(name='linear', num_warmup_steps=0, num_training_steps=len(loader) * 5, optimizer=optimizer)
# 将模型切换到训练模式
model.train()
# 共训练5个epoch
for epoch in range(5):
# 按批次遍历训练集中的数据
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
# 模型计算
out = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
# 计算loss并使用梯度下降法优化模型参数
loss = criterion(out, labels)
loss.backward()
optimizer.step()
scheduler.step()
optimizer.zero_grad()
# 输出各项数据的情况,便于观察
if i % 50 == 0:
out = out.argmax(dim=1)
accuracy = (out == labels).sum().item() / len(labels)
lr = optimizer.state_dict()['param_groups'][0]['lr']
print(epoch, 1, loss.item(), lr, accuracy)
输出部分结果如下所示:
0 1 10.123428344726562 0.0004998333333333334 0.0
0 1 8.659417152404785 0.0004915 0.0625
0 1 7.431852340698242 0.0004831666666666667 0.0625
0 1 7.261701583862305 0.00047483333333333335 0.0625
0 1 6.693362236022949 0.0004665 0.125
0 1 4.0811614990234375 0.00045816666666666667 0.375
0 1 7.034963607788086 0.00044983333333333334 0.1875
2.测试
使用测试数据集进行测试,如下所示:
def test():
# 定义测试数据集加载器
loader_test = torch.utils.data.DataLoader(dataset=dataset['test'], batch_size=32, collate_fn=collate_fn, shuffle=True, drop_last=True)
# 将下游任务模型切换到运行模式
model.eval()
correct = 0
total = 0
# 按批次遍历测试集中的数据
for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
# 计算15个批次即可,不需要全部遍历
if i == 15:
break
print(i)
# 计算
with torch.no_grad():
out = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
# 统计正确率
out = out.argmax(dim=1)
correct += (out == labels).sum().item()
total += len(labels)
print(correct / total)
参考文献:
[1]HuggingFace自然语言处理详解:基于BERT中文模型的任务实战
[2]https://github.com/ai408/nlp-engineering/blob/main/20230625_HuggingFace自然语言处理详解/第8章:完形填空.py