Transformer Code (Pytorch)

Preface

This article is written to guide you to build your own transformer model with pytorch.

python=3.8

Import package

import torch
import torch.nn as nn
import math

This piece references the following articles:

Since I am also still learning, each subsection also explains the usage and role of some functions.

1. Multi-Head Attention Layer

Figure-1 Multi Head (source: from paper)

Firstly, let us focus on implementing “single-attention”.

The formula is:

Full Code

class MultiHeadAttention(nn.Module):
'''
scaled_dot_product_attention(self, Q, K, V, mask=None): Calculate self-attention value
split_heads(self, x): Reshape the input to have num_heads for multi-head attention
combine_heads(self, x): Combine the multiple heads back to original shape
forward(self, Q, K, V, mask=None)
'''

def __init__(self, d_model, num_heads):
'''
d_model: Dimensionality of the input
num_heads: Number of attention heads
d_k: Dimension of each head's key, query, and value
'''

super(MultiHeadAttention, self).__init__()
# Ensure that the model dimension (d_model) is divisible by the number of heads
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

# Initialize dimensions
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads

# Linear layers for transforming inputs
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)

def scaled_dot_product_attention(self, Q, K, V, mask=None):
# Calculate self-attention scores
attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

# Apply mask if provided (useful for preventing attention to certain parts like padding)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

# Softmax is applied to obtain attention probabilities
attn_probs = torch.softmax(attn_scores, dim=-1)

# Multiply by values to obtain the final output
output = torch.matmul(attn_probs, V)
return output

def split_heads(self, x):
# Reshape the input to have num_heads for multi-head attention
batch_size, seq_length, d_model = x.size()
return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

def combine_heads(self, x):
# Combine the multiple heads back to original shape
batch_size, _, seq_length, d_k = x.size()
return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

def forward(self, Q, K, V, mask=None):
# Apply linear transformations and split heads
# Q -> W_q Q + b
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))

# Perform scaled dot-product attention
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)

# Combine heads and apply output transformation
output = self.W_o(self.combine_heads(attn_output))
return output

Class Definition and Initialization

class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):

The class is defined as a subclass of PyTorch’s nn.Module.

  1. d_model: Dimensionality of the input
  2. num_heads: Number of attention heads
  3. d_k: Dimension of each head’s key, query, and value

The initialization checks if d_model is divisible by num_heads, and then defines the transformation weights for query, key, value, and output.

Scaled Dot-Product Attention

Simply, we can write the code for this equation. Note that here we need to pay attention to whether or not there is a mask.

def scaled_dot_product_attention(self, Q, K, V, mask=None):
  1. Calculating Attention Scores. The inputs are queries(Q), keys(K), value(V), and then scaling by the square root of the key dimension (d_k).
  2. Applying Mask: If a mask is provided, it is applied to the attention scores to mask out specific values.
  3. Calculating Attention Weights: The attention scores are passed through a softmax function.
  4. Calculating Output: The final output of the attention is calculated by multiplying the attention weights by the values (V).

Splitting & Combine Heads

def split_heads(self, x):
def combine_heads(self, x):

split_heads() function reshapes the input x into the shape (batch_size, num_heads, seq_length, d_k). It enables the model to process multiple attention heads concurrently, allowing for parallel computation.

combine_heads() function combines the results back into a single tensor of shape (batch_size, seq_length, d_model). This prepares the result for further processing.

def forward(self, Q, K, V, mask=None)

Forward Method

def forward(self, Q, K, V, mask=None):

In the Forward layer, the input Q, K, V is first split into multiple layers, then passed through the linear layer, then into the scaled_dot_product_attention layer, and finally combined with the output.

2. Position Wise Feed Forward Layer

Figure-2 Feed Forward Layer (source: from paper)

Easily we write the corresponding code with reference to the formula:

class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()

def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))

3. Encoder Block

Figure-3 Encoder Block (source: from paper)

Combining the Multi-Head Attention Layer and Position Wise Feed Forward Layer. The only we need to pay attention to is that residuals and normlize.

class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)

def forward(self, x, mask):
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x

4. Decoder Block

Figure-4 Decoder Block (source: from paper)

Simply, write the code following the structure of the Figure-4.

class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)

def forward(self, x, enc_output, src_mask, tgt_mask):
attn_output = self.self_attn(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(attn_output))
attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout(ff_output))
return x

nn.dropout() function help to prevent overfitting.

5. Positional Encoder

Just write the code corresponding to the formula.

See my earlier post for a more specific explanation.

class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length):
super(PositionalEncoding, self).__init__()

pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)

self.register_buffer('pe', pe.unsqueeze(0))

def forward(self, x):
return x + self.pe[:, :x.size(1)]

6. Transformer

Figure-5 Decoder Block (source: from paper)

Against this complete structure picture, we can give the code for the Transformer.

class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
super(Transformer, self).__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

self.fc = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)

def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_length = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
tgt_mask = tgt_mask & nopeak_mask
return src_mask, tgt_mask

def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)

dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

output = self.fc(dec_output)
return output