📜  pytorch 中的变压器 (1)

📅  最后修改于: 2023-12-03 15:34:32.992000             🧑  作者: Mango

PyTorch中的变压器

介绍

机器翻译是自然语言处理中的一个重要任务。在过去的十年中,序列到序列模型已经很成功地用于机器翻译任务中,其中一个流行的模型就是编码器-解码器模型。编码器将输入序列编码为一个固定长度的向量,解码器根据该向量来生成输出序列。

然而,这种方式有一个限制,即输入序列的长度必须是固定的。这就限制了模型的应用范围。在2017年,Google提出了一种新的模型,即变压器模型,能够有效地处理任意长度的序列。

变压器模型是一种基于自注意机制的序列到序列模型。它由一系列堆叠的编码器和解码器组成,其中每个编码器和解码器都是一个由多个自注意层和前馈神经网络组成的模块。

在本文中,我们将介绍如何使用PyTorch构建变压器模型。

代码实现

我们将使用PyTorch实现一个简单的变压器模型,用于将英语翻译成法语。

首先,我们需要导入一些必要的包:

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import math

然后,我们定义一些超参数:

NUM_WORDS = 10000  # 词表大小
MAX_LEN = 100  # 句子最大长度
EMBED_DIM = 512  # 词向量维度
FF_DIM = 2048  # 前馈网络维度
NUM_HEADS = 8  # 头的数量
NUM_LAYERS = 6  # 层数
DROPOUT = 0.1  # dropout率
LEARNING_RATE = 0.0005  # 学习率
BATCH_SIZE = 64  # 批量大小
EPOCHS = 10  # 迭代次数

接下来,我们定义一个辅助函数,用于批量数据的处理:

def tokenize(text):
    return [tok.text for tok in nlp.tokenizer(text)]

def build_vocab(sentences, num_words):
    word_freq = {}
    for sentence in sentences:
        for word in sentence:
            if word in word_freq:
                word_freq[word] += 1
            else:
                word_freq[word] = 1

    vocab = sorted(word_freq, key=word_freq.get, reverse=True)
    vocab = ['<pad>', '<sos>', '<eos>', '<unk>'] + vocab[:num_words]
    word2index = {word: i for i, word in enumerate(vocab)}
    index2word = {i: word for i, word in enumerate(vocab)}

    return word2index, index2word

def pad_sequence(sequence, max_len, pad_token):
    num_missing = max_len - len(sequence)
    sequence += [pad_token] * num_missing
    return sequence[:max_len]

def numericalize(sentence, word2index, max_len, pad_token, sos_token=None, eos_token=None):
    numericalized = []
    if sos_token is not None:
        numericalized.append(word2index[sos_token])
    for word in sentence:
        numericalized.append(word2index.get(word, word2index['<unk>']))
    if eos_token is not None:
        numericalized.append(word2index[eos_token])
    return pad_sequence(numericalized, max_len, word2index[pad_token])

接下来,我们加载我们的训练数据和测试数据:

from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator

# 加载多语言数据集Multi30k(torchtext自带)
source_field = Field(tokenize=tokenize, init_token='<sos>', eos_token='<eos>', lower=True)
target_field = Field(tokenize=tokenize, init_token='<sos>', eos_token='<eos>', lower=True)

train_data, valid_data, test_data = Multi30k.splits(exts=('.de', '.en'), fields=(source_field, target_field))

# 构建词汇表
source_field.build_vocab(train_data, max_size=NUM_WORDS)
target_field.build_vocab(train_data, max_size=NUM_WORDS)

# 创建数据迭代器
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size=BATCH_SIZE, device=device)

接下来,我们定义编码器和解码器:

class Encoder(nn.Module):
    def __init__(self, num_layers, num_heads, embed_dim, ff_dim, dropout):
        super(Encoder, self).__init__()

        self.self_attention = nn.ModuleList([nn.MultiheadAttention(embed_dim, num_heads, dropout) for _ in range(num_layers)])
        self.layer_norm1 = nn.ModuleList([nn.LayerNorm(embed_dim) for _ in range(num_layers)])

        self.positional_encoding = PositionalEncoding(embed_dim, dropout)

        self.feed_forward = nn.ModuleList([nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim),
            nn.Dropout(dropout)) for _ in range(num_layers)])
        
        self.layer_norm2 = nn.ModuleList([nn.LayerNorm(embed_dim) for _ in range(num_layers)])

    def forward(self, inputs):
        # inputs: (seq_len, batch_size, embed_dim)
        self_attention_outputs = [inputs]
        for i in range(len(self.self_attention)):
            self_attention_outputs[i], _ = self.self_attention[i](self_attention_outputs[i], self_attention_outputs[i], self_attention_outputs[i])
            self_attention_outputs[i] = self.layer_norm1[i](self_attention_outputs[i] + inputs)
            inputs = self_attention_outputs[i]

        inputs = self.positional_encoding(inputs)
        
        feed_forward_outputs = [inputs]
        for i in range(len(self.feed_forward)):
            feed_forward_outputs[i] = self.feed_forward[i](feed_forward_outputs[i])
            feed_forward_outputs[i] = self.layer_norm2[i](feed_forward_outputs[i] + inputs)
            inputs = feed_forward_outputs[i]

        return inputs


class Decoder(nn.Module):
    def __init__(self, num_layers, num_heads, embed_dim, ff_dim, dropout):
        super(Decoder, self).__init__()

        self.self_attention = nn.ModuleList([nn.MultiheadAttention(embed_dim, num_heads, dropout) for _ in range(num_layers)])
        self.layer_norm1 = nn.ModuleList([nn.LayerNorm(embed_dim) for _ in range(num_layers)])

        self.encoder_decoder_attention = nn.ModuleList([nn.MultiheadAttention(embed_dim, num_heads, dropout) for _ in range(num_layers)])
        self.layer_norm2 = nn.ModuleList([nn.LayerNorm(embed_dim) for _ in range(num_layers)])

        self.positional_encoding = PositionalEncoding(embed_dim, dropout)

        self.feed_forward = nn.ModuleList([nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim),
            nn.Dropout(dropout)) for _ in range(num_layers)])
        
        self.layer_norm3 = nn.ModuleList([nn.LayerNorm(embed_dim) for _ in range(num_layers)])

    def forward(self, inputs, encoder_outputs):
        # inputs: (seq_len, batch_size, embed_dim)
        # encoder_outputs: (seq_len, batch_size, embed_dim)
        self_attention_outputs = [inputs]
        for i in range(len(self.self_attention)):
            self_attention_outputs[i], _ = self.self_attention[i](self_attention_outputs[i], self_attention_outputs[i], self_attention_outputs[i])
            self_attention_outputs[i] = self.layer_norm1[i](self_attention_outputs[i] + inputs)
            inputs = self_attention_outputs[i]

        inputs = self.positional_encoding(inputs)

        encoder_decoder_attention_outputs = [inputs]
        for i in range(len(self.encoder_decoder_attention)):
            encoder_decoder_attention_outputs[i], _ = self.encoder_decoder_attention[i](encoder_decoder_attention_outputs[i], encoder_outputs, encoder_outputs)
            encoder_decoder_attention_outputs[i] = self.layer_norm2[i](encoder_decoder_attention_outputs[i] + inputs)
            inputs = encoder_decoder_attention_outputs[i]

        inputs = self.positional_encoding(inputs)
        
        feed_forward_outputs = [inputs]
        for i in range(len(self.feed_forward)):
            feed_forward_outputs[i] = self.feed_forward[i](feed_forward_outputs[i])
            feed_forward_outputs[i] = self.layer_norm3[i](feed_forward_outputs[i] + inputs)
            inputs = feed_forward_outputs[i]

        return inputs


class Transformer(nn.Module):
    def __init__(self, num_layers, num_heads, embed_dim, ff_dim, dropout, num_classes):
        super(Transformer, self).__init__()

        self.embed_dim = embed_dim

        self.encoder = Encoder(num_layers, num_heads, embed_dim, ff_dim, dropout)
        self.decoder = Decoder(num_layers, num_heads, embed_dim, ff_dim, dropout)

        self.output_layer = nn.Linear(embed_dim, num_classes)

    def forward(self, source, target):
        # source: (seq_len, batch_size)
        # target: (seq_len, batch_size)
        source_padding_mask = source.eq(0).unsqueeze(1).repeat(1, source.size(0), 1)  # (batch_size, seq_len)
        target_padding_mask = target.eq(0).unsqueeze(1).repeat(1, target.size(0), 1)  # (batch_size, seq_len)

        source_embed = self.source_embedding(source)
        target_embed = self.target_embedding(target)

        source_inputs = source_embed.transpose(0, 1)  # (batch_size, seq_len, embed_dim)
        target_inputs = target_embed[:-1].transpose(0, 1)  # (batch_size, seq_len - 1, embed_dim)
        target_outputs = target_embed[1:].transpose(0, 1)  # (batch_size, seq_len - 1, embed_dim)

        encoder_outputs = self.encoder(source_inputs)    # (batch_size, seq_len, embed_dim)
        decoder_outputs = self.decoder(target_inputs, encoder_outputs)  # (batch_size, seq_len - 1, embed_dim)

        outputs = self.output_layer(decoder_outputs)  # (batch_size, seq_len - 1, num_classes)

        return outputs

使用示例:

# 建立模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(NUM_LAYERS, NUM_HEADS, EMBED_DIM, FF_DIM, DROPOUT, len(target_field.vocab)).to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=target_field.vocab.stoi['<pad>'])
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# 训练模型
for epoch in range(EPOCHS):
    epoch_loss = 0.0
    for i, batch in enumerate(train_iterator):
        source = batch.src.to(device)
        target = batch.trg.to(device)

        optimizer.zero_grad()

        output = model(source, target[:-1])
        output = output.view(-1, output.shape[-1])
        target = target[1:].view(-1)
        loss = criterion(output, target)

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
        optimizer.step()

        epoch_loss += loss.item()

        if i % 100 == 0:
            print(f'Epoch: {epoch} | Batch: {i} | Loss: {loss.item():.3f}')

    print(f'Epoch: {epoch} | Loss: {epoch_loss / len(train_iterator):.3f}')

# 测试模型
test_loss = 0.0
for i, batch in enumerate(test_iterator):
    source = batch.src.to(device)
    target = batch.trg.to(device)

    output = model(source, target[:-1])
    output = output.view(-1, output.shape[-1])
    target = target[1:].view(-1)
    loss = criterion(output, target)

    test_loss += loss.item()

print(f'Test Loss: {test_loss / len(test_iterator):.3f}')
总结

变压器模型是一种流行的序列到序列模型,能够有效地处理任意长度的序列。PyTorch提供了丰富的API和工具,可以方便地构建和训练变压器模型。在本文中,我们使用PyTorch实现了一个简单的变压器模型,用于将英语翻译成法语。