Transformer

'''
# System --> Windows & Python3.10.0
# File ----> main.py
# Author --> Illusionna
# Create --> 2024/07/10 19:32:15
'''
# -*- Encoding: UTF-8 -*-


"""
Step 1: 查看当前 GPU 的所有基础信息
    >>> nvidia-smi
    CUDA Version: 12.1

Step 2: 配置 main.py 程序依赖环境
    >>> conda create -n NLP python==3.10.0
    >>> conda activate NLP
    >>> pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

Step 3: 激活并测试环境
    >>> conda activate NLP
    >>> python
    >>> import torch
    >>> print(torch.__version__)
    2.3.1+cu121
    >>> print(torch.cuda.is_available())
    True
    >>> exit(0)

Step 4: Transformer 简单测试示例推理.
    >>> python main.py
"""

"""
"Attention is All You Need", 代码 pytorch 推荐两篇:
1. 哈佛大学 NLP 研究组
    旧版: http://nlp.seas.harvard.edu/2018/04/03/attention.html
    新版: https://nlp.seas.harvard.edu/annotated-transformer
2. 台湾小哥通俗代码
    https://github.com/jadore801120/attention-is-all-you-need-pytorch
"""


import math
import copy
import torch


def debug(**kwargs) -> None:
    """
    调试打印, 仅此处局部 import 库.

    >>> debug(kwargs='2.718281828')
    """
    # <S>---------------------------------------------------------------
    import os
    import sys
    import random
    import inspect
    # ---------------------------------------------------------------<E>
    os.system('')
    line = inspect.getframeinfo(sys._getframe(1))
    file = os.path.relpath(line.filename, os.getcwd())
    print('<S>---------------------------------------------------------------')
    print(f'\033[3{random.randint(1, 6)}m[+debug] "{file}", line {line.lineno}')
    for key, value in kwargs.items():
        print('')
        print(key, '=')
        print(value, end='\n')
    print('\033[0m', end='')
    print('---------------------------------------------------------------<E>')


def clone(module: object, N: int) -> object:
    """
    克隆 N 个相同的块层.
    """
    return torch.nn.ModuleList([copy.deepcopy(module) for _ in range(0, N, 1)])


def subsequentMask(size: int) -> torch.Tensor:
    """
    掩码矩阵图示见 https://s21.ax1x.com/2024/07/10/pkf52UH.png

    创建掩码矩阵, 屏蔽后续位置, 防止解码器计算自注意力"看到"未来的词, 保持序列生成的因果关系, 自注意力是并行计算, 掩码设计可以让解码器训练阶段高效处理长序列数据, 同时保持正确序列生成顺序.
    >>> subsequentMask(4)
    >>> tensor(
        [
            [
                [ True, False, False, False],
                [ True, True, False, False],
                [ True, True, True, False],
                [ True, True, True, True]
            ]
        ]
    )
    """
    attention_shape = (1, size, size)
    # torch.triu 把一个矩阵强制转化成上三角矩阵, 从索引为 1 的对角线处处理.
    subsequent_mask = torch.triu(torch.ones(attention_shape), diagonal=1).type(torch.uint8)
    return subsequent_mask == 0


def scaledDotProductAttention(
    Q: torch.Tensor,
    K: torch.Tensor,
    V: torch.Tensor,
    attention_mask: torch.Tensor,
    dropout: object
) -> tuple[torch.Tensor, torch.Tensor]:
    """
    计算注意力, 返回得分和权重, 函数公式见 https://s21.ax1x.com/2024/07/10/pkfThG9.png
    >>> scores = [
        [0.1, 0.2, 0.3, 0.4, 0.5],
        [0.6, 0.7, 0.8, 0.9, 1.0],
        [1.1, 1.2, 1.3, 1.4, 1.5],
        [1.6, 1.7, 1.8, 1.9, 2.0],
        [2.1, 2.2, 2.3, 2.4, 2.5]
    ]
    # mask 矩阵也可以为布尔矩阵, 效果是等价于 0-1 矩阵.
    >>> mask = [
        [1, 1, 1, 0, 0],
        [1, 1, 1, 0, 0],
        [1, 1, 1, 0, 0],
        [1, 1, 1, 0, 0],
        [1, 1, 1, 0, 0]
    ]
    >>> scores.masked_fill --> [
        [0.1, 0.2, 0.3, -INFINITY, -INFINITY],
        [0.6, 0.7, 0.8, -INFINITY, -INFINITY],
        [1.1, 1.2, 1.3, -INFINITY, -INFINITY],
        [1.6, 1.7, 1.8, -INFINITY, -INFINITY],
        [2.1, 2.2, 2.3, -INFINITY, -INFINITY]
    ]
    """
    INFINITY = 1e9
    d_k = Q.size(dim=-1)
    # 除以根号 d_k 即 scale 缩放, 原因在于防止 softmax 推向梯度极小的区域不利于训练.
    scores = torch.matmul(Q, K.transpose(dim0=-1, dim1=-2)) / math.sqrt(d_k)
    if attention_mask is not None:
        # 遮掩处值全部置为负无穷, 以便下一行 softmax 函数计算其为数值零.
        scores = scores.masked_fill(attention_mask == False, -INFINITY)
    # 注意力权重概率表示, Q 矩阵元素对应 K 矩阵元素的相似性或重要性, 正相关.
    probability_attention = scores.softmax(dim=-1)
    # 丢弃一部分神经元.
    if dropout is not None:
        probability_attention = dropout(probability_attention)
    # 除了 Scaled Dot Product Attention 点积注意力以外, 还有一种 Additive Attention 加性注意力, 但斯坦福自然语言处理研究组发现, 在实践中, 点积注意力更快, 更节省空间, 因为它可以使用底层库的高度优化的矩阵乘法 torch.matmul() 实现.
    return (torch.matmul(probability_attention, V), probability_attention)


class MultiHeadAttention(torch.nn.Module):
    """
    多头(交叉)注意力, 架构见 https://s21.ax1x.com/2024/07/10/pkhVUXt.png
    """
    def __init__(self, *args, h: int, d_model: int, dropout: float=0.1, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        # 若等号成立, 则继续执行, 否则抛出 AssertionError 错误.
        assert d_model % h == 0
        self.d_k = d_model // h
        self.n_heads = h
        # 克隆 4 块.
        self.linears = clone(torch.nn.Linear(d_model, d_model), 4)
        self.dropout = torch.nn.Dropout(dropout)

    def forward(
        self, Q: torch.Tensor, K: torch.Tensor, V: torch.Tensor,
        mask: torch.Tensor | None = None
    ) -> torch.Tensor:
        if mask is not None:
            # 所有的 h 个头都应用掩码.
            mask = mask.unsqueeze(dim=1)
        # RESIDUAL = Q
        batch_size = Q.size(dim=0)
        # <S>---------------------------------------------------------------
        # 循环三次, linear 每次取自 self.linears 迭代器, x 张量每次取自列表 [Q, K, V] 中的元素.
        [Q, K, V] = [
            # view() 相对于重塑, batch_size 批量, -1 代表自动根据 h * d_k 进行调整.
            linear(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
            for linear, x in zip(self.linears, [Q, K, V])
        ]
        '''
        # 上面列表推导式等价于下面写法.
        Q = self.linears[0](Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        K = self.linears[1](K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        V = self.linears[2](V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        '''
        # ---------------------------------------------------------------<E>
        (x, _) = scaledDotProductAttention(
            Q = Q, K = K, V = V,
            attention_mask = mask, dropout = self.dropout
        )
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads * self.d_k)
        del Q; del K; del V
        # <S>---------------------------------------------------------------
        # 最后一个块进行推理, 即索引为 3 的块.
        '''
        # 台湾小哥在最后返回处进行了残存网络连接, 然后再正则化.
        norm = LayerNorm(x.size(dim=-1))
        return norm(self.linears[-1](x) + RESIDUAL)
        '''
        return self.linears[-1](x)
        # ---------------------------------------------------------------<E>


class PositionWiseFeddForward(torch.nn.Module):
    """
    位置逐元素全连接前馈神经网络, 函数公式见 https://s21.ax1x.com/2024/07/10/pkfvZr9.png
    """
    def __init__(
        self,
        *args,
        d_model: int,
        d_ff: int,
        dropout: float = 0.1,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.W1 = torch.nn.Linear(d_model, d_ff, bias=True)
        self.W2 = torch.nn.Linear(d_ff, d_model, bias=True)
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # torch.Tensor 含有属性 relu() 函数, 可以直接调用.
        # return self.W2(self.dropout(self.W1(x).relu()))
        return self.W2(self.dropout(torch.relu(self.W1(x))))


class PositionalEncoding(torch.nn.Module):
    """
    位置编码, 原理见该作者博客 https://wmathor.com/index.php/archives/1453
    """
    def __init__(
        self,
        *args,
        d_model: int,
        dropout: float = 0.1,
        max_tokens: int = 5000,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.dropout = torch.nn.Dropout(dropout)
        pe = torch.zeros(max_tokens, d_model)
        # 创建 max_tokens x 1 的列向量.
        position = torch.arange(0, max_tokens, 1).unsqueeze(dim=1)
        '''
        矩阵的所有行, 每行从索引 0 开始, 跳步 2 单位.
        >>> matrix = torch.Tensor(
            [
                [1, 2, 3, 4, 5, 6, 7, 8, 9],
                [2, 2, 3, 4, 5, 6, 7, 8, 9],
                [3, 2, 3, 4, 5, 6, 7, 8, 9]
            ]
        )
        >>> matrix[:, 0::2] = torch.Tensor(
            [1, 3, 5, 7, 9],
            [2, 3, 5, 7, 9],
            [3, 3, 5, 7, 9]
        )
        >>> matrix[:, 0:-1:2] = torch.Tensor(
            [1, 3, 5, 7],
            [2, 3, 5, 7],
            [3, 3, 5, 7]
        )
        '''
        divisor = torch.exp(torch.arange(0, d_model, 2) * -(math.log(1e4) / d_model))
        pe[:, 0::2] = torch.sin(position * divisor)
        pe[:, 1::2] = torch.cos(position * divisor)
        '''
        >>> pe = torch.Tensor(
            [
                [1, 2, 3],
                [4, 5, 6],
                [7, 8, 9]
            ]
        )
        >>> pe.unsqueeze(0) = torch.Tensor(
            [
                [
                    [1, 2, 3],
                    [4, 5, 6],
                    [7, 8, 9]
                ]
            ]
        )
        # 增加维度, 给索引为 0 维即 batch 维增加一单位, 相当于添加一个大小为 1 的 batch 维.
        '''
        pe = pe.unsqueeze(dim=0)
        # 将 pe 注册成为模型的缓冲区, 可以在 forward 函数中访问.
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        1. 缓冲区是模型的一部分;
        2. 位置编码是固定不变的, 训练过程不会更新的, 所以将 pe 注册到缓冲区是合适的;
        3. 梯度计算设置为 False, 意味反向传播计算梯度不会更新 pe 编码矩阵;
        4. pe 作为模型的状态被保存和加载.
        """
        # dropout 正则化来防止过拟合.
        return self.dropout(x + self.pe[:, :x.size(1)].requires_grad_(requires_grad=False))


class Embedding(torch.nn.Module):
    """
    嵌入层, d_model 输出向量的维度, vocabulary 词汇表的大小即输入的离散值的范围.
    """
    def __init__(self, *args, d_model: int, vocabulary: int, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.d_model = d_model
        self.embedding = torch.nn.Embedding(vocabulary, d_model)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 输出嵌入词向量时, 进行缩放, 让嵌入词向量数值范围更稳定, 并与位置编码数值范围相互匹配.
        return self.embedding(x) * math.sqrt(self.d_model)


class LayerNorm(torch.nn.Module):
    """
    归一化 LN 层: 函数公式见 https://s21.ax1x.com/2024/07/09/pkfrUN8.png
    """
    def __init__(self, *args, features: int, eps: float=1e-6, **kwargs) -> None:
        # features 代表归一化层特征维度大小.
        super().__init__(*args, **kwargs)
        # 创建可学习参数(训练中通过反向传播进行更新优化), γ 初始化均为 1, 不缩放.
        self.gamma = torch.nn.Parameter(torch.ones(features))
        # 创建可学习参数, β 初始化均为 0, 不偏移.
        self.beta = torch.nn.Parameter(torch.zeros(features))
        self.epsilon = eps

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        计算 y = LN(x) 结果.
        """
        # keepdim = True, 输出与输入维度相同, 保持输出的维度.
        average = x.mean(dim=-1, keepdim=True)
        standard = x.std(dim=-1, keepdim=True)
        return self.gamma * (x - average) / torch.sqrt(standard + self.epsilon) + self.beta


class SublayerConnection(torch.nn.Module):
    """
    子层块: 残差网络连接, 模型见 https://s21.ax1x.com/2024/07/09/pkfc0I0.png

    残差网络可以在反向传播过程中, 梯度连乘, 也不会造成梯度消失.
    """
    def __init__(self, *args, size: int, dropout: float, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.norm = LayerNorm(features=size)
        # dropout 丢弃神经网络结点图示见 https://s21.ax1x.com/2024/07/09/pkfc2L9.png
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, x: torch.Tensor, sublayer: torch.Tensor) -> torch.Tensor:
        """
        无信息损失的 x 加上一个推理损失的函数, 这是残差网络重要思想, 避免梯度消失(求导多了常数项).
        """
        return x + self.dropout(sublayer(self.norm(x)))


class EncoderLayer(torch.nn.Module):
    """
    编码器块层: 由自"注意力"和"位置逐元素全连接前馈神经网络"构成, 多个块层可构成集群.
    """
    def __init__(
        self,
        *args,
        size: int,
        self_attention: object,
        feed_forward_network: object,
        dropout: float,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.self_attention = self_attention
        self.feed_forward_network = feed_forward_network
        # 克隆 2 块, 每个编码器层包含两个子层块.
        self.sublayer = clone(SublayerConnection(size=size, dropout=dropout), 2)
        self.size = size

    def forward(self, x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
        # query, key, value 三个集合输入到 attention 函数是统一的.
        # mask 掩码指定元素 padding, 计算自注意力权重忽略遮掩的元素.
        x = self.sublayer[0](x, lambda x: self.self_attention(x, x, x, mask))
        # 把子层块的输出 x 推流至子层块的第二个, 进行 FFN 推理计算.
        return self.sublayer[1](x, self.feed_forward_network)


class DecoderLayer(torch.nn.Module):
    """
    解码器块层: 由"自注意力", "(普通)注意力", "前馈神经网络构成", 多个块层可构成集群.

    1. 解码器块比编码器块多一个(普通)注意力层, 它的输入来自解码器集群的输出 memory
    2. 自注意力和注意力相同, 只不过 query, key, value 输入不同
    3. 注意力的 query 由下层输入支持, 即自注意力的输出
    4. key, value 是编码器集群最后一层输出 memory
    5. 自注意力的 query, key, value 均由下层输入支持
    """
    def __init__(
        self,
        *args,
        size: int,
        self_attention: object,
        source_attention: object,
        feed_forward_network: object,
        dropout: float,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.size = size
        self.self_attention = self_attention
        self.source_attention = source_attention
        self.feed_forward_network = feed_forward_network
        # 克隆 3 块子层.
        self.sublayer = clone(SublayerConnection(size=size, dropout=dropout), 3)
    
    def forward(
        self,
        x: torch.Tensor,
        memory: torch.Tensor,
        source_mask: object,
        target_mask: object
    ) -> torch.Tensor:
        # 自注意力.
        x = self.sublayer[0](x, lambda x: self.self_attention(x, x, x, target_mask))
        # (普通)注意力, 此处的 memory 来自于编码器的输出.
        x = self.sublayer[1](x, lambda x: self.source_attention(x, memory, memory, source_mask))
        # 前馈神经网络.
        return self.sublayer[2](x, self.feed_forward_network)


class Encoder(torch.nn.Module):
    """
    编码器集群: 编码器块堆叠, "Attention Is All You Need" 采用六层.
    """
    def __init__(self, *args, layer: object, N: int, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        # 克隆拷贝 N 个编码器, 堆叠构成集群.
        self.layers = clone(layer, N)
        self.norm = LayerNorm(features=layer.size)

    def forward(self, x: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
        """
        依次通过集群的每个块层传递输入(和掩码).
        """
        # 集群中的每个块层反复推理.
        for layer in self.layers:
            x = layer(x, mask)
        # 最后输出前通过一次 LN 层.
        return self.norm(x)


class Decoder(torch.nn.Module):
    """
    解码器集群: 解码器块堆叠, 生成编码器集群将输入序列变成词向量产生的最终结果.
    """
    def __init__(self, *args, layer: object, N: int, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.layers = clone(layer, N)
        self.norm = LayerNorm(features=layer.size)
    
    def forward(
        self,
        x: torch.Tensor,
        memory: torch.Tensor,
        source_mask: object,
        target_mask: object
    ) -> torch.Tensor:
        """
        含掩码.
        """
        for layer in self.layers:
            x = layer(x, memory, source_mask, target_mask)
        return self.norm(x)


class Generator(torch.nn.Module):
    """
    生成器: 定义标准 Linear + Softmax 生成步骤.
    """
    def __init__(self, *args, d_model: int, vocabulary: int, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.projection = torch.nn.Linear(d_model, vocabulary)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return torch.log_softmax(self.projection(x), dim=-1)


class EncoderDecoder(torch.nn.Module):
    """
    标准编码器-解码器架构, 这是未来魔改版本的基础.
    """
    def __init__(
        self,
        *args,
        encoder: object,
        decoder: object,
        source_embedding: object,
        target_embedding: object,
        generator: object,
        **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = source_embedding
        self.tgt_embed = target_embedding
        self.generator = generator

    def encode(self, src: torch.Tensor, src_mask: torch.Tensor) -> torch.Tensor:
        return self.encoder(self.src_embed(src), src_mask)

    def decode(
        self,
        memory: torch.Tensor,
        src_mask: torch.Tensor,
        tgt: torch.Tensor,
        tgt_mask: torch.Tensor
    ) -> torch.Tensor:
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

    def forward(
        self,
        src: torch.Tensor,
        tgt: torch.Tensor,
        src_mask: torch.Tensor,
        tgt_mask: torch.Tensor
    ) -> torch.Tensor:
        return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)


class Transformer:
    """
    完整的 Transformer 模型, 架构见 https://s21.ax1x.com/2024/07/10/pkhVMm6.png
    """
    def __new__(
        cls,
        source_vocabulary: int,
        target_vocabulary: int,
        N: int = 6,
        d_model: int = 512,
        d_ff: int = 512,
        n_heads: int = 8,
        dropout: float = 0.1
    ) -> object:
        attn = MultiHeadAttention(
            h = n_heads,
            d_model = d_model,
            dropout = dropout
        )
        ffn = PositionWiseFeddForward(
            d_model = d_model,
            d_ff = d_ff,
            dropout = dropout
        )
        pos = PositionalEncoding(
            d_model = d_model,
            dropout = dropout,
            max_tokens = 5000
        )
        model = EncoderDecoder(
            encoder = Encoder(
                layer = EncoderLayer(
                    size = d_model,
                    self_attention = copy.deepcopy(attn),
                    feed_forward_network = copy.deepcopy(ffn),
                    dropout = dropout
                ),
                N = N
            ),
            decoder = Decoder(
                layer = DecoderLayer(
                    size = d_model,
                    self_attention = copy.deepcopy(attn),
                    source_attention = copy.deepcopy(attn),
                    feed_forward_network = copy.deepcopy(ffn),
                    dropout = dropout
                ),
                N = N
            ),
            source_embedding = torch.nn.Sequential(
                Embedding(d_model=d_model, vocabulary=source_vocabulary),
                copy.deepcopy(pos)
            ),
            target_embedding = torch.nn.Sequential(
                Embedding(d_model=d_model, vocabulary=target_vocabulary),
                copy.deepcopy(pos)
            ),
            generator = Generator(d_model=d_model, vocabulary=target_vocabulary)
        )
        # 用 Glorot / fan_avg 初始化参数.
        for param in model.parameters():
            if param.dim() > 1:
                torch.nn.init.xavier_uniform_(param)
        return model


def demoInference(epoch: int) -> None:
    """
    示例推理测试.
    """
    model = Transformer(source_vocabulary=11, target_vocabulary=11, N=1)
    model.eval()
    src = torch.tensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]], dtype=torch.long)
    src_mask = torch.ones(1, 1, 10)
    memory = model.encode(src, src_mask)
    prediction = torch.zeros(1, 1).type_as(src)
    for _ in range(0, 9, 1):
        output = model.decode(
            memory = memory,
            src_mask = src_mask,
            tgt = prediction,
            tgt_mask = subsequentMask(prediction.size(1)).type_as(src.data)
        )
        probability = model.generator.forward(output[:, -1])
        _, next_word = torch.max(probability, dim=1)
        next_word = next_word.data[0]
        prediction = torch.cat(
            tensors = [prediction, torch.empty(1, 1).type_as(src.data).fill_(next_word)],
            dim = 1
        )
    debug(epoch=epoch, ExampleUntrainedModel=prediction)



if __name__ == '__main__':
    for idx in range(0, 10, 1):
        demoInference(epoch = idx + 1)


GNU GPLv3 project by Illusionna: orzzz.net