从零实现Transformer、ChatGLM-6B、LangChain+LLM的本地知识库问答

目录

前言 

最近一直在做类ChatGPT项目的部署 微调,关注比较多的是两个:一个LLaMA,一个ChatGLM,会发现有不少模型是基于这两个模型去做微调的,说到微调,那具体怎么微调呢,因此又详细了解了一下微调代码,发现微调LLM时一般都会用到Hugging face实现的Transformers库的Trainer类

从而发现,如果大家想从零复现ChatGPT,便得从实现Transformer开始,因此便开启了本文:如何从零起步实现Transformer、ChatGLM(至于LLaMA已在之前的博客里解读过),主要分为两个大部分

  • 按照Transformer每一步的原理逐步逐行从零实现(主要参考harvard对transformer的实现),先编码器后解码器,特别是注意力机制(缩放点积、多头注意力)
  • 从头到尾解读ChatGLM-6B的整体代码架构,及逐行解读每一行代码
    且本文的代码解读与其他代码解读最大的不同是:会对出现在本文的每一行代码都加以注释、解释、说明,甚至对每行代码中的变量都会做解释/说明

总之,一如既往的保持对初学者的足够友好,让即便没有太多背景知识的也能顺畅理解本文

第一部分 从零实现Transformer编码器模块

transformer强大到什么程度呢,基本是17年之后绝大部分有影响力模型的基础架构都基于的transformer(比如,这里有200来个,包括且不限于基于decode的GPT、基于encode的BERT、基于encode-decode的T5等等)

通过博客内的这篇文章《Transformer通俗笔记:从Word2Vec、Seq2Seq逐步理解到GPT、BERT》,我们已经详细了解了transformer的原理(如果忘了,建议必复习下再看本文,当然,如果你实在不想跳转,就只想呆在本文,也行,我努力..)

如果把上图中的各种细节也显示出来,则如下大图所示(此大图来源于七月在线NLP11里倪老师讲的Transformer模型源码解读,positional encoding、多头等没画)

从零实现Transformer、ChatGLM-6B、LangChain+LLM的本地知识库问答

具体说来,是一个典型的编码器-解码器架构

# 定义一个基于 nn.Module 的编码器-解码器类
class EncoderDecoder(nn.Module):

    # 初始化方法,接收编码器、解码器、源嵌入、目标嵌入和生成器作为参数
    def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
        # 调用 nn.Module 的初始化方法
        super(EncoderDecoder, self).__init__()  

        self.encoder = encoder  # 将传入的编码器实例保存为类属性
        self.decoder = decoder  # 将传入的解码器实例保存为类属性
        self.src_embed = src_embed  # 将传入的源嵌入实例保存为类属性
        self.tgt_embed = tgt_embed  # 将传入的目标嵌入实例保存为类属性
        self.generator = generator  # 将传入的生成器实例保存为类属性
        
    # 前向传播方法,接收源序列、目标序列和它们的掩码作为参数
    def forward(self, src, tgt, src_mask, tgt_mask):
        # 对源序列进行编码,并将编码结果与掩码传递给解码器进行解码
        return self.decode(self.encode(src, src_mask), src_mask,
                            tgt, tgt_mask)
    
    # 编码方法,接收源序列和掩码作为参数
    def encode(self, src, src_mask):
        # 将源序列进行嵌入,然后将嵌入后的序列和源序列掩码传给编码器
        return self.encoder(self.src_embed(src), src_mask)
    
    # 解码方法,接收编码器输出(memory)、源序列掩码、目标序列和目标序列掩码作为参数
    def decode(self, memory, src_mask, tgt, tgt_mask):
        # 将目标序列进行嵌入,然后将嵌入后的序列、编码器输出、源序列掩码和目标序列掩码传给解码器
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

# 定义一个基于 nn.Module 的生成器类
class Generator(nn.Module):

    # 初始化方法,接收模型维度(d_model)和词汇表大小(vocab)作为参数
    def __init__(self, d_model, vocab):
        # 调用 nn.Module 的初始化方法
        super(Generator, self).__init__()  
        # 定义一个线性层,将模型的输出维度映射到词汇表大小
        self.proj = nn.Linear(d_model, vocab)  

    # 前向传播方法,接收输入 x
    def forward(self, x):
        # 将输入 x 传入线性层,然后对输出应用 log-softmax 激活函数(在最后一个维度上)
        return F.log_softmax(self.proj(x), dim=-1)

考虑到Hugging face实现的Transformers库虽然功能强大,但3000多行,对于初次实现的初学者来说,理解难度比较大,因此,咱们一步步结合对应的原理来逐行编码实现一个简易版的transformer

1.1 关于输入的处理:针对输入做embedding,然后加上位置编码

 为了方便后面代码的编写,先引入一些库

import numpy as np          # 导入NumPy库,用于进行矩阵运算和数据处理
import torch                # 导入PyTorch库,用于构建神经网络及相关操作
import torch.nn as nn       # 导入PyTorch神经网络模块,用于构建神经网络层
import torch.nn.functional as F  # 导入PyTorch神经网络函数库,用于激活函数、损失函数等
import math, copy, time          # 导入数学库、复制库和时间库,用于各种数学计算、复制操作和计时
from torch.autograd import Variable  # 从PyTorch自动微分库中导入Variable类,用于构建自动微分计算图
import matplotlib.pyplot as plt      # 导入Matplotlib的pyplot模块,用于绘制图表和可视化
import seaborn                       # 导入Seaborn库,用于绘制统计图形和美化图表
seaborn.set_context(context="talk")  # 设置Seaborn的上下文环境,设置图表的尺寸和标签字体大小等
%matplotlib inline                   # IPython魔术命令,使Matplotlib绘制的图形直接显示在Notebook内

1.1.1 针对输入做embedding

对于模型来说,每一句话比如“七月的服务真好,答疑的速度很快”,在模型中都是一个词向量,但如果每句话都临时抱佛脚去生成对应的词向量,则处理起来无疑会费时费力,所以在实际应用中,我们会事先预训练好各种embedding矩阵,这些embedding矩阵包含常用领域常用单词的向量化表示,且提前做好分词

维度1维度2维度3维度4维度512
教育
机构
在线
课程
..
服务
答疑
老师

从而当模型接收到“七月的服务真好,答疑的速度很快”这句输入时,便可以从对应的embedding矩阵里查找对应的词向量,最终把整句输入转换成对应的向量表示

这部分的代码 可以如下表示

# 定义一个名为Embeddings的类,继承自PyTorch的nn.Module类
class Embeddings(nn.Module):
    # 初始化Embeddings类
    def __init__(self, d_model, vocab):
        # 调用父类nn.Module的初始化方法
        super(Embeddings, self).__init__()
        # 创建一个词嵌入层,参数为词汇表大小和词嵌入维度
        self.lut = nn.Embedding(vocab, d_model)
        # 将词嵌入维度保存为类属性
        self.d_model = d_model

    # 定义前向传播方法
    def forward(self, x):
        # 通过词嵌入层将输入的单词编码为向量,并乘以词嵌入维度的平方根进行缩放
        return self.lut(x) * math.sqrt(self.d_model)

1.1.2 位置编码的深意:如何编码更好

然,如此篇文章所述,RNN的结构包含了序列的时序信息,而Transformer却完全把时序信息给丢掉了,比如“他欠我100万”,和“我欠他100万”,两者的意思千差万别,故为了解决时序的问题,Transformer的作者用了一个绝妙的办法:位置编码(Positional Encoding)。

即将每个位置编号,从而每个编号对应一个向量,最终通过结合位置向量和词向量,作为输入embedding,就给每个词都引入了一定的位置信息,这样Attention就可以分辨出不同位置的词了,具体怎么做呢?

  1. 如果简单粗暴的话,直接给每个向量分配一个数字,比如1到1000之间
  2. 也可以用one-hot编码表示位置

  3. transformer论文中作者通过sin函数和cos函数交替来创建 positional encoding,其计算positional encoding的公式如下

    PE_{(pos,2i+1)} = cos\left ( \frac{pos}{10000^{\frac{2i}{d_{model}}}} \right )

    PE_{(pos,2i)} = sin\left ( \frac{pos}{10000^{\frac{2i}{d_{model}}}} \right )

    其中,pos相当于是每个token在整个序列中的位置,相当于是0, 1, 2, 3…(看序列长度是多大,比如10,比如100),d_{model}代表位置向量的维度(也是词embedding的维度,transformer论文中设置的512维) 

    至于i是embedding向量的位置下标对2求商并取整(可用双斜杠//表示整数除法,即求商并取整),它的取值范围是[0,...,\frac{d_{model}}{2}],比如
    i = 0 // 2 = 02i = 0
    i = 1 //2 =02i = 0,2i+1 = 1
    i = 2 // 2 = 12i = 2
    i = 3 // 2 = 12i = 2,2i+1 = 3
    i = 4 // 2 = 22i = 4
    i = 5//2 = 22i = 4, 2i + 1 =5

    i = 510 // 2 = 2552i = 510
    i = 511 // 2 = 2552i = 510,2i + 1 = 511

    相当于
    2i是指向量维度中的偶数维,即第0维、第2维、第4维…,第510维,用sin函数计算
    2i+1 是向量维度中的奇数维,即第1维、第3维、第5维..,第511维,用cos函数计算

不要小看transformer的这个位置编码,不少做NLP多年的人也不一定对其中的细节有多深入,而网上大部分文章谈到这个位置编码时基本都是千篇一律、泛泛而谈,很少有深入,故本文还是细致探讨下

考虑到一图胜千言 一例胜万语,举个例子,当我们要编码「我 爱 你」的位置向量,假定每个token都具备512维,如果位置下标从0开始时,则根据位置编码的计算公式可得且为让每个读者阅读本文时一目了然,我计算了每个单词对应的位置编码示例(在此之前,这些示例在其他地方基本没有)

  • 当对pos = 0上的单词「我」进行位置编码时,它本身的维度有512维
    PE_0 = [sin(\frac{0}{10000^{\frac{0}{512}}}),cos(\frac{0}{10000^{\frac{0}{512}}}), sin(\frac{0}{10000^{\frac{2}{512}}}),cos(\frac{0}{10000^{\frac{2}{512}}}), sin(\frac{0}{10000^{\frac{4}{512}}}), cos(\frac{0}{10000^{\frac{4}{512}}}),..., sin(\frac{0}{10000^{\frac{510}{512}}}),cos(\frac{0}{10000^{\frac{510}{512}}})]
  • 当对pos = 1上的单词「爱」进行位置编码时,它本身的维度有512维

    PE_1 = [sin(\frac{1}{10000^{\frac{0}{512}}}),cos(\frac{1}{10000^{\frac{0}{512}}}), sin(\frac{1}{10000^{\frac{2}{512}}}),cos(\frac{1}{10000^{\frac{2}{512}}}), sin(\frac{1}{10000^{\frac{4}{512}}}), cos(\frac{1}{10000^{\frac{4}{512}}}),..., sin(\frac{1}{10000^{\frac{510}{512}}}),cos(\frac{1}{10000^{\frac{510}{512}}})]

     然后再叠加上embedding向量,可得

  • 当对pos = 2上的单词「你」进行位置编码时,它本身的维度有512维
    PE_2 = [sin(\frac{2}{10000^{\frac{0}{512}}}),cos(\frac{2}{10000^{\frac{0}{512}}}), sin(\frac{2}{10000^{\frac{2}{512}}}),cos(\frac{2}{10000^{\frac{2}{512}}}), sin(\frac{2}{10000^{\frac{4}{512}}}), cos(\frac{2}{10000^{\frac{4}{512}}}),..., sin(\frac{2}{10000^{\frac{510}{512}}}),cos(\frac{2}{10000^{\frac{510}{512}}})]
  • ….

最终得到的可视化效果如下图所示

代码实现如下

“”“位置编码的实现,调用父类nn.Module的构造函数”“”
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()  
        self.dropout = nn.Dropout(p=dropout)  # 初始化dropout层
        
        # 计算位置编码并将其存储在pe张量中
        pe = torch.zeros(max_len, d_model)                # 创建一个max_len x d_model的全零张量
        position = torch.arange(0, max_len).unsqueeze(1)  # 生成0到max_len-1的整数序列,并添加一个维度
        # 计算div_term,用于缩放不同位置的正弦和余弦函数
        div_term = torch.exp(torch.arange(0, d_model, 2) *
                             -(math.log(10000.0) / d_model))

        # 使用正弦和余弦函数生成位置编码,对于d_model的偶数索引,使用正弦函数;对于奇数索引,使用余弦函数。
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)                  # 在第一个维度添加一个维度,以便进行批处理
        self.register_buffer('pe', pe)        # 将位置编码张量注册为缓冲区,以便在不同设备之间传输模型时保持其状态
        
    # 定义前向传播函数
    def forward(self, x):
        # 将输入x与对应的位置编码相加
        x = x + Variable(self.pe[:, :x.size(1)], 
                         requires_grad=False)
        # 应用dropout层并返回结果
        return self.dropout(x)

本文发布之后,有同学留言问,上面中的第11行、12行代码

div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))

为什么先转换为了等价的指数+对数运算,而不是直接幂运算?是效率、精度方面有差异吗?

这里使用指数和对数运算的原因是为了确保数值稳定性和计算效率。

  • 一方面,直接使用幂运算可能会导致数值上溢或下溢。当d_model较大时,10000.0 ** (-i / d_model)中的幂可能会变得非常小,以至于在数值计算中产生下溢。通过将其转换为指数和对数运算,可以避免这种情况,因为这样可以在计算过程中保持更好的数值范围
  • 二方面,在许多计算设备和库中,指数和对数运算的实现通常比幂运算更快。这主要是因为指数和对数运算在底层硬件和软件中有特定的优化实现,而幂运算通常需要计算更多的中间值

所以,使用指数和对数运算可以在保持数值稳定性的同时提高计算效率。

既然提到了这行代码,我们干脆就再讲更细致些,上面那行代码对应的公式为

其中的中括号对应的是一个从 0 到 d_{\text{model}} - 1 的等差数列(步长为 2),设为i

且上述公式与这个公式是等价的

为何,原因在于a^x=e^{(x\cdot ln(a))},从而有10000^{-\frac{i}{d_{model}}}=e^{(-\frac{i}{d_{model}}\cdot log(10000))}

最终,再通过下面这两行代码完美实现位置编码

        # 使用正弦和余弦函数生成位置编码,对于d_model的偶数索引,使用正弦函数;对于奇数索引,使用余弦函数。
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

1.2 经过「embedding + 位置编码」后乘以三个权重矩阵得到三个向量Q K V

从下图可知,经过「embedding + 位置编码」得到的输入X,会乘以「三个权重矩阵:W^Q W^K W^V」得到查询向量Q、键向量K、值向量V(你可以简单粗暴的理解为弄出来了三个分身)

举个例子,针对「我想吃酸菜鱼」这句话,经过embedding + 位置编码后,可得(注:可以512维,也可以是768维,但由于transformer论文中作者设置的512维,所以除了这个酸菜鱼的例子暂为768维外,其他地方均统一为512维)

然后乘以三个权重矩阵得

 为此,我们可以先创建4个相同的线性层,每个线性层都具有 d_model 的输入维度和 d_model 的输出维度

        self.linears = clones(nn.Linear(d_model, d_model), 4) 

前三个线性层分别用于对 Q向量、K向量、V向量进行线性变换(至于这第4个线性层在随后的第3点)

1.3 对输入和Multi-Head Attention做Add&Norm,再对上步输出和Feed Forward做Add&Norm

我们聚焦下transformer论文中原图的这部分,可知,输入通过embedding+位置编码后,先后做以下两个步骤

  1. 针对query向量做multi-head attention,得到的结果与原query向量,做相加并归一化
            attention = self.attention(query, key, value, mask)
            output = self.dropout(self.norm1(attention + query))
    这个相加具体是怎么个相加法呢?事实上,Add代表的Residual Connection(残差连接),是为了解决多层神经网络训练困难的问题,通过将前一层的信息无差的传递到下一层,可以有效的仅关注差异部分,这一方法之前在图像处理结构如ResNet等中常常用到

    具体编码时通过 SublayerConnection 函数实现此功能
    """一个残差连接(residual connection),后面跟着一个层归一化(layer normalization)操作"""
    class SublayerConnection(nn.Module):
        # 初始化函数,接收size(层的维度大小)和dropout(dropout率)作为输入参数
        def __init__(self, size, dropout):
            super(SublayerConnection, self).__init__()  # 调用父类nn.Module的构造函数
            self.norm = LayerNorm(size)                 # 定义一个层归一化(Layer Normalization)操作,使用size作为输入维度
            self.dropout = nn.Dropout(dropout)          # 定义一个dropout层
    
        # 定义前向传播函数,输入参数x是输入张量,sublayer是待执行的子层操作
        def forward(self, x, sublayer):  
            # 将残差连接应用于任何具有相同大小的子层
            # 首先对输入x进行层归一化,然后执行子层操作(如self-attention或前馈神经网络)
            # 接着应用dropout,最后将结果与原始输入x相加。
            return x + self.dropout(sublayer(self.norm(x)))
    而Norm则代表了Layer Normalization,通过对层的激活值的归一化,可以加速模型的训练过程,使其更快的收敛,编码时用 LayerNorm 函数实现
    """构建一个层归一化(layernorm)模块"""
    class LayerNorm(nn.Module):
        # 初始化函数,接收features(特征维度大小)和eps(防止除以零的微小值)作为输入参数
        def __init__(self, features, eps=1e-6):
            super(LayerNorm, self).__init__()  # 调用父类nn.Module的构造函数
            self.a_2 = nn.Parameter(torch.ones(features))   # 定义一个大小为features的一维张量,初始化为全1,并将其设置为可训练参数
            self.b_2 = nn.Parameter(torch.zeros(features))  # 定义一个大小为features的一维张量,初始化为全0,并将其设置为可训练参数
            self.eps = eps                   # 将防止除以零的微小值eps保存为类实例的属性
    
        # 定义前向传播函数,输入参数x是输入张量
        def forward(self, x):
            mean = x.mean(-1, keepdim=True)  # 计算输入x在最后一个维度上的均值,保持输出结果的维度
            std = x.std(-1, keepdim=True)    # 计算输入x在最后一个维度上的标准差,保持输出结果的维度
            # 对输入x进行层归一化,使用可训练参数a_2和b_2进行缩放和偏移,最后返回归一化后的结果
            return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
  2. 上面步骤得到的『输出结果output做feed forward』之后,再与『上面步骤的原输出结果output』也做相加并归一化
            forward = self.feed_forward(output)
            block_output = self.dropout(self.norm2(forward + output))
            return block_output

最终这个编码器层代码可以完整的写为

"""编码器(Encoder)由自注意力(self-attention)层和前馈神经网络(feed forward)层组成"""
class EncoderLayer(nn.Module):
    # 初始化函数,接收size(层的维度大小)、self_attn(自注意力层实例)
    # feed_forward(前馈神经网络实例)和dropout(dropout率)作为输入参数
    def __init__(self, size, self_attn, feed_forward, dropout):
        super(EncoderLayer, self).__init__()      # 调用父类nn.Module的构造函数
        self.self_attn = self_attn                # 将自注意力层实例保存为类实例的属性
        self.feed_forward = feed_forward          # 将前馈神经网络实例保存为类实例的属性

        # 创建两个具有相同参数的SublayerConnection实例(用于残差连接和层归一化)
        self.sublayer = clones(SublayerConnection(size, dropout), 2)  
        self.size = size                          # 将层的维度大小保存为类实例的属性

    def forward(self, x, mask):
        # 先对输入x进行自注意力操作
        # 然后将结果传递给第一个SublayerConnection实例(包括残差连接和层归一化)
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))

        # 将上一步的输出传递给前馈神经网络
        # 然后将结果传递给第二个SublayerConnection实例(包括残差连接和层归一化),最后返回结果
        return self.sublayer[1](x, self.feed_forward)

1.3.1 缩放点积注意力(Scaled Dot-Product Attention)

接下来,先看下缩放点积注意力(Scaled Dot-Product Attention)的整体实现步骤

  1. 为了计算每个单词与其他单词之间的相似度,会拿「每个单词/token的q向量包括自身在内所有单词/token的k向量一一做点积(两个向量之间的点积结果可以代表两个向量的相似度)

    对应到矩阵的形式上,则是矩阵Q与K矩阵的转置做相乘
    还是拿上面那个例子:「我想吃酸菜鱼」,则Q乘以K的转置K^T如下图所示

    最终得到的QK^T矩阵有6行6列,从上往下逐行来看的话,每一个格子里都会有一个数值,每一个数值依次代表:
    \rightarrow  单词我与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度,比如可能是0.3 0.2 0.2 0.1 0.1 0.1,代表编码1时放在「我 想 吃 酸 菜 鱼」上面的注意力大小
    同时,可以看到模型在对当前位置的信息进行编码时,会过度的将注意力集中于自身的位置(当然 这无可厚非,毕竟自己与自己最相似嘛),而可能忽略了其它位置。很快你会看到,作者采取的一种解决方案就是采用多头注意力机制(Multi-Head Attention)
    \rightarrow  想与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
    \rightarrow  吃与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
    \rightarrow  酸与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
    \rightarrow  菜与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
    \rightarrow  鱼与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度​
  2. 由于Q \times K^T会随着dimension的增大而增大,为避免过大,所以除以\sqrt{d_k} ,相当于对点积的结果做下缩放

    其中,d_k是向量k的维度,且d_k = d_q = d_v,如果只设置了一个头,那d_k就是模型的维度d_{model},如果设置了8个头,则d_k = d_{model}/8,且如果模型的维度是512维,则\sqrt{d_k}即等于8

    上面两步的代码可以如下编写
        # torch.matmul是PyTorch库提供的矩阵乘法函数
        # 具体操作即是将第一个矩阵的每一行与第二个矩阵的每一列进行点积(对应元素相乘并求和),得到新矩阵的每个元素
        scores = torch.matmul(query, key.transpose(-2, -1)) \
                 / math.sqrt(d_k)
  3. 接着使用 Softmax 计算每一个单词对包括自身在内所有单词的 Attention值,这些值加起来的和为1(相当于起到了归一化的效果)

    这步对应的代码为
        # 对 scores 进行 softmax 操作,得到注意力权重 p_attn
        p_attn = F.softmax(scores, dim = -1)
  4. 最后再乘以V矩阵,即对所有values(v1 v2 v3 v4),根据不同的attention值(\hat{a}_{1,1} \hat{a}_{1,2} \hat{a}_{1,3} \hat{a}_{1,4}),做加权平均

    对应到我想吃酸菜鱼这个例子上,则是

  5. 最终得到单词的输出,如下图所示(图中V矩阵的4行分别代表v1 v2 v3 v4):

    上述两步对应的代码为
        # 用注意力权重 p_attn 对 value 向量进行加权求和,得到最终的输出
        return torch.matmul(p_attn, value), p_attn

同样的方法,也可以计算出b2,b3,b4,如下图8所示, b2就是拿q2去对其他的key做attention,最后再与其他的value值相乘取weighted sum得到,最终每个单词都包含了上下文相关单词的语义信息,不再只是attention计算之前,每个单词只有它自己的信息,和上下文没有关联

另外,这里面还有一点值得注意的是,可能有同学疑问:当我们计算x1与x2、x3、x4的相似度之后,x2会再与x1、x3、x4再依次计算一遍相似度,这两个过程中,前者算过了x1和x2的相似度,后者则再算一遍x2与x1的相似度,这不是重复计算么?其实不然,这是两码事,原因很简单,正如你喜欢一个人 你会觉得她对你很重要,但那个人不一定喜欢你 她不会觉得你对她有多重要..

最终,Scaled Dot-Product Attention这部分对应的完整代码可以写为

'''计算“缩放点积注意力'''
# query, key, value 是输入的向量组
# mask 用于遮掩某些位置,防止计算注意力
# dropout 用于添加随机性,有助于防止过拟合
def attention(query, key, value, mask=None, dropout=None):

    d_k = query.size(-1)  # 获取 query 向量的最后一个维度的大小,即词嵌入的维度

    # 计算 query 和 key 的点积,并对结果进行缩放,以减少梯度消失或爆炸的可能性
    scores = torch.matmul(query, key.transpose(-2, -1)) \
             / math.sqrt(d_k)

    # 如果提供了 mask,根据 mask 对 scores 进行遮掩
    # 遮掩的具体方法就是设为一个很大的负数比如-1e9,从而softmax后 对应概率基本为0
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    # 对 scores 进行 softmax 操作,得到注意力权重 p_attn
    p_attn = F.softmax(scores, dim = -1)

    # 如果提供了 dropout,对注意力权重 p_attn 进行 dropout 操作
    if dropout is not None:
        p_attn = dropout(p_attn)

    # 用注意力权重 p_attn 对 value 向量进行加权求和,得到最终的输出
    return torch.matmul(p_attn, value), p_attn

1.3.2 多头注意力(Multi-Head Attention)

先看2个头的例子,依然还是通过a^i生成对应的三个矩阵q^ik^iv^i,然后这三个矩阵再各自乘以两个转移矩阵得到对应的分矩阵,如

  • q^i矩阵对应的两个分矩阵q^{i,1}q^{i,2} 
  • k^i矩阵对应的两个分矩阵为k^{i,1}k^{i,2}
  • v^i矩阵对应的两个分矩阵为v^{i,1}v^{i,2}

至于a^j同理,也生成对应的6个分矩阵q^{j,1}q^{j,2}k^{j,1}k^{j,2}v^{j,1}v^{j,2}

接下来编码a^i时,分两步

  1. q^{i,1}先与k^{i,1}做点积然后乘以v^{i,1}然后再与k^{j,1}做点积再乘以v^{j,1},再把这两个计算的结果相加得到b^{i,1}

  2. q^{i,2}再分别与k^{i,2}做点积然后乘以v^{i,2}、然后再与k^{j,2}做点积再乘以v^{j,2},再把这两个计算的结果相加得到b^{i,2}

如果是8个头呢,计算步骤上也是一样的,只是从2个头变化到8个头而已,最终把每个头得到的结果直接concat,最后经过一个linear变换,得到最终的输出,整体如下所示

这部分Multi-Head Attention的代码可以写为

'''代码来自nlp.seas.harvard.edu,我针对每一行代码、甚至每行代码中的部分变量都做了详细的注释/解读'''
class MultiHeadedAttention(nn.Module):
    # 输入模型的大小(d_model)和注意力头的数量(h)
    def __init__(self, h, d_model, dropout=0.1):
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0  # 确保 d_model 可以被 h 整除

        # 我们假设 d_v(值向量的维度)总是等于 d_k(键向量的维度)
        self.d_k = d_model // h      # 计算每个注意力头的维度
        self.h = h                   # 保存注意力头的数量
        self.linears = clones(nn.Linear(d_model, d_model), 4)  # 上文解释过的四个线性层
        self.attn = None                      # 初始化注意力权重为 None
        self.dropout = nn.Dropout(p=dropout)  # 定义 dropout 层

    # 实现多头注意力的前向传播
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            # 对所有 h 个头应用相同的 mask
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)  # 获取 batch 的大小

        # 1) 批量执行从 d_model 到 h x d_k 的线性投影
        query, key, value = \
            [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
             for l, x in zip(self.linears, (query, key, value))]

        # 2) 在批量投影的向量上应用注意力
        # 具体方法是调用上面实现Scaled Dot-Product Attention的attention函数
        x, self.attn = attention(query, key, value, mask=mask,
                                 dropout=self.dropout)

        # 3) 使用 view 函数进行“拼接concat”,然后做下Linear变换
        x = x.transpose(1, 2).contiguous() \
             .view(nbatches, -1, self.h * self.d_k)
        return self.linears[-1](x)  # 返回多头注意力的输出

1.3.3 Position-wise前馈网络的实现

在上文,咱们逐一编码实现了embedding、位置编码、缩放点积/多头注意力,以及Add和Norm,整个编码器部分还剩最后一个模块,即下图框里的Feed Forward Network(简称FFN)

其中包括两个线性变换:维度上先扩大后缩小,最终输入和输出的维数为d_{model} = 512,内层的维度为d_{ff} = 2048,过程中使用ReLU作为激活函数

FFN(x)=max(0,xW_1+b_1)W_2+b_2

虽然线性变换在不同位置上是相同的,但它们在层与层之间使用不同的参数,相当于使用了两个内核大小为1的卷积

这部分的代码可以如下编写

‘’‘定义一个名为PositionwiseFeedForward的类,继承自nn.Module’‘’
class PositionwiseFeedForward(nn.Module):
    # 文档字符串:实现FFN方程
    # 初始化方法,接受三个参数:d_model,d_ff和dropout(默认值为0.1)
    def __init__(self, d_model, d_ff, dropout=0.1):
        # 调用父类nn.Module的初始化方法
        super(PositionwiseFeedForward, self).__init__()  
        self.w_1 = nn.Linear(d_model, d_ff)  # 定义一个全连接层,输入维度为d_model,输出维度为d_ff
        self.w_2 = nn.Linear(d_ff, d_model)  # 定义一个全连接层,输入维度为d_ff,输出维度为d_model
        self.dropout = nn.Dropout(dropout)  # 定义一个dropout层,dropout概率为传入的dropout参数

    # 定义前向传播方法,接受一个输入参数x
    def forward(self, x):
        # 将输入x通过第一个全连接层w_1后,经过ReLU激活函数,再通过dropout层,最后通过第二个全连接层w_2,返回最终结果
        return self.w_2(self.dropout(F.relu(self.w_1(x))))

1.4 对整个transformer  block复制N份最终成整个encode模块

N可以等于6或其他数值

class Encoder(nn.Module):  # 定义一个名为Encoder的类,它继承了nn.Module类
    # 一个具有N层堆叠的核心编码器
    # 初始化方法,接受两个参数:layer(编码器层的类型)和N(编码器层的数量)
    def __init__(self, layer, N):  
        super(Encoder, self).__init__()      # 调用父类nn.Module的初始化方法
        self.layers = clones(layer, N)       # 创建N个编码器层的副本,并将其赋值给实例变量self.layers
        self.norm = LayerNorm(layer.size)    # 创建一个LayerNorm层,并将其赋值给实例变量self.norm
        
    # 定义前向传播方法,接受两个参数:x(输入数据)和mask(掩码)
    def forward(self, x, mask):  
        # 文档字符串:解释本方法的功能是将输入(及其掩码)依次传递给每一层
        for layer in self.layers:        # 遍历self.layers中的每一个编码器层
            x = layer(x, mask)           # 将输入x和mask传递给当前编码器层,并将输出结果赋值给x
        return self.norm(x)              # 对最终的输出x应用LayerNorm层,并将结果返回

其中的clone函数的代码为

def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

第二部分 从零实现Transformer解码器模块

咱们再回顾下transformer的整个模型架构,特别是解码器的部分,毕竟BERT外,GPT等很有影响力的模型都用的transformer decode结构

从底至上,

  • 输入包括2部分,下方是前一个time step的输出的embedding
    再加上一个表示位置的Positional Encoding
  • 接着是Masked Multi-Head Self-attention,masked字面意思是屏蔽

    然后做一下Add&Norm
  • 再往上是一个不带mask的Multi-Head Attention层,它的Key、Value矩阵使用 Encoder 的编码信息矩阵,而Query使用上一个 Decoder block 的输出计算
    然后再做一下Add&Norm
  • 继续往上,经过一个FFN层,也做一下Add&Norm
  • 最后做下linear变换后,通过Softmax 层计算下一个翻译单词的概率

由于在第一部分介绍过了embedding、positional encoding、FFN、Add&Norm、linear、softmax、multi-head attention,故本部分只重点介绍下Masked Multi-Head Self-attention

2.1 Masked Multi-Head Self-attention

本过程和第一部分介绍的Multi-Head self-attention基本一致,区别在于加了个mask机制

  1. 输入经过embedding + 位置编码之后,还是乘以三个不同的权重矩阵:W^QW^KW^V,依次得到三个不同的矩阵输入:Q、K、V
  2. Q矩阵乘以K矩阵的转置K^T,得到Q\cdot K^T,注意,紧接着Q\cdot K^T会再乘以一个Mask矩阵,得到Masked Attention矩阵
  3.  Masked Attention矩阵经过softmax后,乘以V矩阵得到Z_1矩阵
  4. 最终把Z_1Z_2拼接之后,再做一个linear变换得到最终的Z矩阵

2.2 transformer解码器架构与整体编码-解码架构的实现

整个解码器架构的代码可以如下编写『有一点值得注意的是,如下文代码中所述

  • 在对输入x执行自注意力计算并进行第一个子层的处理(带mask),最后一个参数是tgt_mask,即x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
  • 但对输入x执行源注意力计算并进行第二个子层的处理时(不带mask),最后一个参数是src_mask,即x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask)) 
# 定义DecoderLayer类,继承自PyTorch的nn.Module类
class DecoderLayer(nn.Module):
    # 初始化方法,接收五个参数:size, self_attn, src_attn, feed_forward, dropout
    # 调用父类nn.Module的初始化方法
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
        super(DecoderLayer, self).__init__()

        # 将size赋值给实例变量self.size
        self.size = size

        # 将self_attn赋值给实例变量self.self_attn
        self.self_attn = self_attn

        # 将src_attn赋值给实例变量self.src_attn
        self.src_attn = src_attn

        # 将feed_forward赋值给实例变量self.feed_forward
        self.feed_forward = feed_forward

        # 使用SublayerConnection类创建三个子层,并存储到实例变量self.sublayer中
        self.sublayer = clones(SublayerConnection(size, dropout), 3)

    # 定义前向传播方法,接收四个参数:x, memory, src_mask, tgt_mask 
    def forward(self, x, memory, src_mask, tgt_mask):

        # 将memory赋值给局部变量m
        m = memory

        # 对输入x执行自注意力计算并进行第一个子层的处理
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))

        # 对输入x执行源注意力计算并进行第二个子层的处理
        x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))

        # 对输入x执行前馈神经网络计算并进行第三个子层的处理,然后返回结果
        return self.sublayer[2](x, self.feed_forward)

且Decoder也是由N=6个相同层组成

class Decoder(nn.Module):
    "Generic N layer decoder with masking."
    def __init__(self, layer, N):
        super(Decoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)
        
    def forward(self, x, memory, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)

最终,整个transformer完整模型的整体封装代码为

def make_model(src_vocab, tgt_vocab, N=6, 
               d_model=512, d_ff=2048, h=8, dropout=0.1):
    "Helper: Construct a model from hyperparameters."
    c = copy.deepcopy
    attn = MultiHeadedAttention(h, d_model)
    ff = PositionwiseFeedForward(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    model = EncoderDecoder(
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
        Decoder(DecoderLayer(d_model, c(attn), c(attn), 
                             c(ff), dropout), N),
        nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
        nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
        Generator(d_model, tgt_vocab))
    
    # This was important from their code. 
    # Initialize parameters with Glorot / fan_avg.
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform(p)
    return model

# Small example model.
tmp_model = make_model(10, 10, 2)
None

2.3 编码器与解码器的协同

当我们把编码器和解码器组合到一起后,看下它两是如何一块协作的

在这里插入图片描述

需要注意的是

  1. Encoder中的Q、K、V全部来自于上一层单元的输出
    而Decoder只有Q来自于上一个Decoder单元的输出,K与V都来自于Encoder最后一层的输出。也就是说,Decoder是要通过当前状态与Encoder的输出算出权重后(计算query与各个key的相似度),最后将Encoder的编码加权得到下一层的状态

    比如当我们要把“Hello Word”翻译为“你好,世界”时
    Decoder会计算“你好”这个query分别与“Hello”、“Word”这两个key的相似度
    很明显,“你好”与“Hello”更相似,从而给“Hello”更大的权重,从而把“你好”对应到“Hello”,达到的效果就是“Hello”翻译为“你好”
  2. 且在解码器中因为加了masked机制,自注意力层只允许关注已输出位置的信息,实现方法是在自注意力层的softmax之前进行mask,将未输出位置的权重设置为一个非常大的负数(进一步softmax之后基本变为0,相当于直接屏蔽了未输出位置的信息)

第三部分 Transformer的整个训练过程:预处理与迭代

3.1 预处理阶段:创建词汇表

具体实现时,先创建批次和掩码

class Batch:
def __init__(self, src, trg=None, pad=0):
    self.src = src  # 输入数据源(通常为源语言)
    self.src_mask = (src != pad).unsqueeze(-2)  # 创建源语言的掩码,用于忽略填充部分
    if trg is not None:  # 如果目标语言数据存在
        self.trg = trg[:, :-1]   # 目标语言数据,去掉最后一个词
        self.trg_y = trg[:, 1:]  # 目标语言数据,去掉第一个词
        self.trg_mask = \
            self.make_std_mask(self.trg, pad)          # 创建目标语言的掩码,用于忽略填充部分和未来词汇
        self.ntokens = (self.trg_y != pad).data.sum()  # 计算目标语言中非填充词的数量

@staticmethod
def make_std_mask(tgt, pad):
    "Create a mask to hide padding and future words."
    tgt_mask = (tgt != pad).unsqueeze(-2)                      # 创建目标语言的掩码,用于忽略填充部分
    tgt_mask = tgt_mask & Variable(
        subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data))  # 使用子掩码屏蔽未来词汇
    return tgt_mask                                            # 返回完整的目标语言掩码

3.2 训练三部曲:随机初始化、损失函数、反向传播

接下来,我们创建一个通用的训练和得分函数来跟踪损失。我们传入一个通用的损失计算函数,它也处理参数更新

def run_epoch(data_iter, model, loss_compute):
start = time.time()        # 记录当前时间
total_tokens = 0           # 初始化总tokens计数
total_loss = 0             # 初始化总损失
tokens = 0                 # 初始化tokens计数

# 遍历数据集中的每个批次
for i, batch in enumerate(data_iter):     
    # 对每个批次进行前向传播                
    out = model.forward(batch.src, batch.trg, 
                        batch.src_mask, batch.trg_mask)   

    # 计算每个批次的损失
    loss = loss_compute(out, batch.trg_y, batch.ntokens)  
    
    # 累加损失
    total_loss += loss  
    total_tokens += batch.ntokens      # 累加tokens
    tokens += batch.ntokens            # 累加tokens

    # 每50个批次进行一次日志记录
    if i % 50 == 1:  
        elapsed = time.time() - start  # 计算已用时间

        # 输出当前批次,损失和每秒处理的tokens
        print("Epoch Step: %d Loss: %f Tokens per Sec: %f" %
                (i, loss / batch.ntokens, tokens / elapsed))  
        start = time.time()            # 重置开始时间
        tokens = 0                     # 重置tokens计数

return total_loss / total_tokens       # 返回平均损失

下面这段代码定义了一个名为 SimpleLossCompute 的类,实现了简单的损失计算和训练函数

  • 在调用该类的实例时,输入预测输出、目标输出和规范化因子,计算损失值并进行梯度更新
  • 如果提供了优化器,还会更新模型参数和清空梯度缓存
# 定义 SimpleLossCompute 类,实现简单的损失计算和训练函数
class SimpleLossCompute:
    # 初始化 SimpleLossCompute 类的实例
    def __init__(self, generator, criterion, opt=None):
        self.generator = generator    # 生成器,用于预测输出
        self.criterion = criterion    # 损失函数,如交叉熵损失
        self.opt = opt                # 优化器,如 Adam

    # 定义调用 SimpleLossCompute 类实例时的操作
    def __call__(self, x, y, norm):
        x = self.generator(x)         # 生成预测输出
        # 计算损失,这里需要将预测输出和目标输出转换为合适的形状
        loss = self.criterion(x.contiguous().view(-1, x.size(-1)), 
                              y.contiguous().view(-1)) / norm
        loss.backward()               # 计算梯度
        if self.opt is not None:      # 如果提供了优化器
            self.opt.step()           # 更新模型参数
            self.opt.optimizer.zero_grad()  # 清空梯度缓存
        return loss.data[0] * norm    # 返回损失值乘以规范化因子(实际损失值)

3.2.1 Adam优化器:自动调整学习率并具有动量效应

优化器(optimizer)经常用于在训练过程中更新模型参数以最小化损失函数,而Adam(Adaptive Moment Estimation)是一种常用的优化器,它结合了两种传统优化算法的优点:Momentum和RMSprop

为了通俗易懂地理解Adam,可以将其比作一个赛车手。训练模型就像是找到一辆赛车在赛道上的最佳行驶速度和路径,以达到最快的速度并取得优异的成绩。在这个过程中,速度的调整(即学习率)非常重要

  1. 首先,Adam像Momentum一样,具有动量效应。这意味着赛车手(模型)会积累动量,使其在下坡时更快,而在上坡时减速。这有助于模型更快地穿越平坦区域,并避免在最低点附近摆动

  2. 其次,Adam像RMSprop一样,会自适应地调整每个参数的学习率。在我们的赛车比喻中,这就像赛车手会针对每个轮胎的摩擦系数(赛道状况)做出相应的速度调整。这有助于模型更快地收敛到最优解

总之,Adam可以自动调整学习率,并具有动量效应。总的来说,它能帮助我们的“赛车手”在不同的赛道状况下更快地找到最佳行驶速度和路径,从而更快地训练出高效的模型

transformer原始论文便选择的Adam作为优化器,其参数为\beta _1 = 0.9\beta _2 = 0.98\epsilon = 10^{-9},根据以下公式,我们在训练过程中改变了学习率:

在预热中随步数线性地增加学习速率,并且此后与步数的反平方根成比例地减小它,设置预热步数为4000

我们来看下具体的编码实现。下面这段代码定义了一个名为 NoamOpt 的类,实现了一种自适应学习率调整策略,该策略在训练 Transformer 模型时常用。在训练的前几个步骤(预热期)中,学习率会线性增长,之后学习率会随着步数的增加而逐渐降低。这种策略有助于模型在训练初期更快地收敛,同时在训练后期保持较低的学习率,有利于模型的稳定训练。

# 定义 NoamOpt 类,实现自适应学习率调整策略
class NoamOpt:
    # 初始化 NoamOpt 类的实例
    def __init__(self, model_size, factor, warmup, optimizer):
        self.optimizer = optimizer      # 优化器对象(如 Adam)
        self._step = 0                  # 记录优化步数
        self.warmup = warmup            # 预热步数
        self.factor = factor            # 缩放因子
        self.model_size = model_size    # 模型维度大小
        self._rate = 0                  # 初始学习率

    # 更新模型参数和学习率
    def step(self):
        self._step += 1                            # 优化步数加 1
        rate = self.rate()                         # 计算当前学习率
        for p in self.optimizer.param_groups:      # 更新优化器中的学习率
            p['lr'] = rate
        self._rate = rate                          # 存储当前学习率
        self.optimizer.step()                      # 更新模型参数

    # 计算当前步数的学习率
    def rate(self, step=None):
        if step is None:                           # 如果未提供步数,使用当前步数
            step = self._step
        return self.factor * \
            (self.model_size ** (-0.5) *           # 计算学习率公式中的模型维度项
            min(step ** (-0.5), step * self.warmup ** (-1.5)))  # 计算学习率公式中的最小值项

# 定义用于获取 NoamOpt 类实例的函数
def get_std_opt(model):
    return NoamOpt(model.src_embed[0].d_model, 2, 4000,
                   torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))

最后总结一下Transformer的影响力

  • OpenAI基于它发展出了GPT,并不断迭代出GPT2、GPT3、GPT3.5及火爆全球的 ChatGPT
  • Google则基于它发展出了在ChatGPT出现之前统治NLP各大任务的BERT,多好的青春年华!

第四部分 ChatGLM-6B的代码架构与逐行实现

ChatGLM-6B(介绍页面、代码地址),是智谱 AI 开源、支持中英双语的对话语言模型。

话不多说,直接干,虽然6B的版本相比GPT3 175B 不算大,但毕竟不是一个小工程,本文就不一一贴所有代码了,更多针对某个文件夹下或某个链接下的代码进行整体分析/说明,以帮助大家更好、更快的理解ChatGLM-6B,从而加速大家的类ChatGPT复现之路

其中,pytorch_model-00001-of-00008.bin 到 pytorch_model-00008-of-00008.bin: 这些文件是PyTorch模型的权重文件,相当于一个大模型被分割成多个部分以方便下载和使用

4.1 模型的核心实现: chatglm-6b/modeling_chatglm.py

4.1.1 导入相关库、编码器、GELU、旋转位置编码(第1-239行)

  • 首先,代码导入了许多需要的库,如torch、torch.nn.functional等,它们为模型实现提供了基本的功能。
    脚本中设置了一些标志,以便在运行时启用JIT(Just-In-Time)编译功能
  • 定义了InvalidScoreLogitsProcessor类,它继承自LogitsProcessor。该类用于处理可能出现的NaN和inf值,通过将它们替换为零来确保计算的稳定性
  • load_tf_weights_in_chatglm_6b函数,用于从TensorFlow检查点加载权重到PyTorch模型中。这对于迁移学习和在PyTorch中使用预训练模型非常有用
  • PrefixEncoder类是一个编码器,用于对输入的前缀进行编码。它根据配置使用一个两层的MLP(多层感知器)或者直接进行嵌入,输出维度为(batch_size, prefix_length, 2 * layers * hidden)
  • gelu_impl函数是一个GELU(高斯误差线性单元)激活函数的实现,这是一个常用的激活函数,尤其在Transformer模型中
    # 使用PyTorch的JIT编译器,将Python函数转换为Torch脚本,以便优化和加速执行
    @torch.jit.script
    # 定义名为gelu_impl的函数,接受一个参数x
    def gelu_impl(x):
        # 返回GELU激活函数的计算结果,这里使用了一种近似计算方法
        return 0.5 * x * (1.0 + torch.tanh(0.7978845608028654 * x *
                                           (1.0 + 0.044715 * x * x)))
    
    # 定义名为gelu的函数,接受一个参数x
    def gelu(x):
        # 调用gelu_impl函数并返回结果
        return gelu_impl(x)
  • RotaryEmbedding类实现了旋转位置编码(第177-239行)。旋转位置编码是一种新型的位置编码方法,相比于传统的位置编码,它在大序列长度和多头注意力上具有更好的性能
    \rightarrow  _load_from_state_dict方法:这是一个空方法,用于从给定的状态字典加载模型参数。在这个代码段中,它没有实现任何功能
    \rightarrow  forward方法:这是一个核心方法,实现了正向传播,输入为x和可选参数seq_dim和seq_len。这个方法首先计算序列长度,然后根据条件更新max_seq_len_cached。接着,它计算嵌入向量,并将其缓存。最后,它返回余弦和正弦缓存
        # 类的前向传播方法,接收三个参数
        def forward(self, x, seq_dim=1, seq_len=None):  
            # 如果没有提供序列长度,则从输入张量的形状中获取序列长度
            if seq_len is None:  
                seq_len = x.shape[seq_dim]
    
            # 如果缓存的最大序列长度不存在,或者提供的序列长度大于缓存的最大序列长度
            if self.max_seq_len_cached is None or (seq_len > self.max_seq_len_cached):
                # 更新缓存的最大序列长度
                self.max_seq_len_cached = None if self.learnable else seq_len  
                # 创建等差序列
                t = torch.arange(seq_len, device=x.device, dtype=self.inv_freq.dtype) 
     
                # 计算频率张量
                freqs = torch.einsum('i,j->ij', t, self.inv_freq)
    
                # 将频率张量沿最后一个维度进行拼接,形成旋转嵌入
                emb = torch.cat((freqs, freqs), dim=-1).to(x.device)
                # 如果精度为bfloat16,将旋转嵌入转换为float类型
                if self.precision == torch.bfloat16:
                    emb = emb.float()  
    
                # 计算旋转嵌入的余弦值和正弦值,形状为 [sx, 1 (b * np), hn]
                cos_cached = emb.cos()[:, None, :]
                sin_cached = emb.sin()[:, None, :]
                if self.precision == torch.bfloat16:
                    # 如果精度为bfloat16,将余弦值转换为bfloat16类型
                    cos_cached = cos_cached.bfloat16()  
                    # 如果精度为bfloat16,将正弦值转换为bfloat16类型
                    sin_cached = sin_cached.bfloat16()  
    
                # 如果旋转嵌入是可学习的
                if self.learnable:  
                    # 返回余弦值和正弦值
                    return cos_cached, sin_cached  
                # 更新缓存的余弦值和正弦值
                self.cos_cached, self.sin_cached = cos_cached, sin_cached  
            # 返回截取后的余弦值和正弦值,以匹配输入序列的长度
            return self.cos_cached[:seq_len, ...], self.sin_cached[:seq_len, ...]
    
    \rightarrow  _apply方法:这个方法应用给定的函数(fn)到缓存的余弦和正弦值上,并调用父类的_apply方法
    \rightarrow  rotate_half函数:这个函数将输入张量x在最后一个维度上分为两半,并将它们交换位置
    \rightarrow  apply_rotary_pos_emb_index函数:这个函数应用了旋转位置嵌入索引,主要通过cos和sin将位置信息添加到输入张量q和k上
    # 使用PyTorch的JIT编译器,将Python函数转换为Torch脚本,以便优化和加速执行
    @torch.jit.script  
    # 定义一个名为apply_rotary_pos_emb_index的函数,接收五个参数
    def apply_rotary_pos_emb_index(q, k, cos, sin, position_id):  
        # 通过position_id获取cos和sin的嵌入表示
        # cos.squeeze(1)和sin.squeeze(1)用于去除多余的维度
        # 而unsqueeze(2)则用于重新添加所需的维度
        # 从而将cos和sin的形状从[sq, 1, hn]变为[sq, b, np, hn],以便后续q和k进行运算
        cos, sin = F.embedding(position_id, cos.squeeze(1)).unsqueeze(2), \
            F.embedding(position_id, sin.squeeze(1)).unsqueeze(2)
    
        # 计算旋转位置编码后的q和k,将q和k与cos和sin进行点积运算
        q, k = (q * cos) + (rotate_half(q) * sin), (k * cos) + (rotate_half(k) * sin)
        # 返回旋转位置编码后的q和k
        return q, k

4.1.2 SelfAttention的PyTorch模块:实现自注意力机制(第242-551行)

定义了一个名为SelfAttention的PyTorch模块,它实现了自注意力机制。这个模块在许多自然语言处理任务中都被用作基本构建块。以下是代码中的关键部分:

  1. attention_fn方法:这个方法实现了自注意力的核心计算过程,包括计算注意力分数、注意力概率和上下文层。这些计算对于实现许多自然语言处理任务,如语言建模、命名实体识别等,都是非常重要的
    为方便大家更好、更快、更一目了然的理解,我花了个把钟头,一如上面的 依然把下面每一行代码都逐行加上了注释,且关键的部分加了额外的解释说明
    # 定义attention函数
    def attention_fn(
            self,
            query_layer,                     # 查询层张量
            key_layer,                       # 键层张量
            value_layer,                     # 值层张量
            attention_mask,                  # 注意力掩码张量
            hidden_size_per_partition,       # 每个分区的隐藏层大小,每个分区可能包含2或4或8个头
            layer_id,                        # 当前层的ID
            layer_past=None,               # 保存过去的键和值的张量,用于解码器的自回归任务
            scaling_attention_score=True,  # 是否缩放注意力分数,默认为True
            use_cache=False,               # 是否使用缓存,默认为False
    ):
    
        # 如果layer_past不为空,则获取然后拼接过去的key和value
        if layer_past is not None:
            past_key, past_value = layer_past[0], layer_past[1]
            key_layer = torch.cat((past_key, key_layer), dim=0)
            value_layer = torch.cat((past_value, value_layer), dim=0)
    
        # 获取key_layer的形状信息
        # 包括序列长度sq、批大小b、注意力头数(np,原代码为nh,应该是笔误)、每个注意力头的隐藏层大小hn
        seq_len, b, nh, hidden_size = key_layer.shape
    
        # 如果使用缓存,则设置present为key和value的元组,否则为None
        if use_cache:
            present = (key_layer, value_layer)
        else:
            present = None
    
        # 计算查询-键层缩放系数
        query_key_layer_scaling_coeff = float(layer_id + 1)
    
        # 如果需要缩放注意力分数,对查询层进行缩放
        if scaling_attention_score:
            query_layer = query_layer / (math.sqrt(hidden_size) * query_key_layer_scaling_coeff)
    
        # 设置输出张量的大小,计算原始注意力分数的形状:[b, np, sq, sk]
        output_size = (query_layer.size(1), query_layer.size(2), query_layer.size(0), key_layer.size(0))
        """
        解释下:query_layer 的原始形状为
        [seqlen, batch,num_attention_heads,hidden_size_per_attention_head],简写为[sq,b,np,hn]
        故query_layer.size(1)对应b, query_layer.size(2)对应np, query_layer.size(0)对应sq
    
        key_layer   的原始形状为
        [seklen,batch,num_attention_heads,hidden_size_per_attention_head],简写为[sk,b,np,hn]
        所以key_layer.size(0)对应sk
        """
    
        # 通过之前第39行的output_size[b, np, sq, sk],重塑查询层和键层张量 好进行矩阵相乘
        # [sq, b, np, hn] -> [sq, b * np, hn]
        query_layer = query_layer.view(output_size[2], output_size[0] * output_size[1], -1)
        # [sk, b, np, hn] -> [sk, b * np, hn]
        key_layer = key_layer.view(output_size[3], output_size[0] * output_size[1], -1)
       
        """
        上面那两行再解释下,因为需要计算每个批次中每个注意力头的注意力分数,为此
        将批次大小(batch)和注意力头数量(num_attention_heads)合并到一个维度中以便于执行矩阵乘法
    
        因此,我们将 query_layer 的形状从[sq,b,np,hn]调整为 [sq, b * np, hn]
        同理,对于 key_layer,将 key_layer 的形状从[sk,b,np,hn]调整为 [sk, b * np, hn]
        """
    
        # 初始化乘法结果张量
        matmul_result = torch.zeros(
            1, 1, 1,
            dtype=query_layer.dtype,
            device=query_layer.device,
        )
    
        # 计算查询层和键层的乘积
        matmul_result = torch.baddbmm(
            matmul_result,
            # 将 query_layer 的形状从 [sq, b * np, hn] 转换为 [b * np, sq, hn]
            query_layer.transpose(0, 1), 
    
            # 将 key_layer 的形状从 [sk, b * np, hn] 转换为 [b * np, hn, sk]
            # 相当于对key_layer 进行了两次转置操作,得到形状为 [b * np, hn, sk] 的张量
            key_layer.transpose(0, 1).transpose(1, 2), 
    
            beta=0.0,
            alpha=1.0,
        )
    
        # 上面最终query_layer为[b * np, sq, hn]
        # 上面最终key_layer  为[b * np, hn, sk]
        # 现在,沿用之前第39行的output_size的注意力分数张量[b, np, sq, sk]
        attention_scores = matmul_result.view(*output_size)
    
        # 使用缩放掩码Softmax计算注意力概率
        if self.scale_mask_softmax:
            self.scale_mask_softmax.scale = query_key_layer_scaling_coeff
            attention_probs = self.scale_mask_softmax(attention_scores, attention_mask.contiguous())
        else:
            # 如果掩码不全为0,应用注意力掩码
            if not (attention_mask == 0).all():
                attention_scores.masked_fill_(attention_mask, -10000.0)
    
            # 转换注意力分数张量的数据类型为浮点数
            dtype = attention_scores.dtype
            attention_scores = attention_scores.float()
    
            # 缩放注意力分数
            attention_scores = attention_scores * query_key_layer_scaling_coeff
    
            # 对注意力分数执行Softmax操作以获取注意力概率
            attention_probs = F.softmax(attention_scores, dim=-1)
    
            # 将注意力概率张量的数据类型恢复为原始数据类型
            attention_probs = attention_probs.type(dtype)
    
        """
        计算上下文层[sq, b, hp]
        """
        # 对原始value_layer做下转换得到新的output_size:[sk, b, np, hn] --> [b, np, sq, hn]
        output_size = (value_layer.size(1), value_layer.size(2), query_layer.size(0), value_layer.size(3))
    
        # 对原始value_layer的中间两个维度做下合并 [sk, b, np, hn] -> [sk, b * np, hn]
        value_layer = value_layer.view(value_layer.size(0), output_size[0] * output_size[1], -1)
    
        # 调整注意力概率:对之前得到的前两个维度做下合并:[b, np, sq, sk] =》[b * np, sq, sk]
        attention_probs = attention_probs.view(output_size[0] * output_size[1], output_size[2], -1)
    
        # 对上一行得到的attention_probs[b * np, sq, sk]
        # 乘以『value_layer即[sk, b * np, hn]的转置』,即[b * np, hn, sk]
        # 相当于[b * np, sq, sk] x [b * np, hn, sk],最终得到[b * np, sq, hn]
        context_layer = torch.bmm(attention_probs, value_layer.transpose(0, 1))
    
        # 上行得到context_layer的[b * np, sq, hn]通过上面第116行的新output_size调整为4个维度的
        # [b, np, sq, hn]
        # 使其更直观地表示批量大小b、注意力头数np、查询序列长度sq以及每个注意力头的隐藏层大小hn
        context_layer = context_layer.view(*output_size)
    
        # [b, np, sq, hn] --> [sq, b, np, hn],使其与查询层(query_layer)的形状一致
        context_layer = context_layer.permute(2, 0, 1, 3).contiguous()
    
        # [sq, b, np, hn] --> [sq, b, hp],此举的作用在于前两个维度(sq 和 b)不变
        # 同时将后两个维度(np 和 hn)合并成单个维度,即每个分区的隐藏层大小(hp)
        new_context_layer_shape = context_layer.size()[:-2] + (hidden_size_per_partition,)
        context_layer = context_layer.view(*new_context_layer_shape)
    
        # 将上下文层、当前的键值对(present)以及注意力概率(attention_probs)打包成一个元组
        outputs = (context_layer, present, attention_probs)
    
        return outputs
  2. default_init函数:这个函数是一个初始化辅助函数,用于创建类的实例。

SelfAttention类定义:这个类实现了自注意力机制,包括定义类的初始化方法和成员变量。类的初始化方法包括设置各种属性,如hidden_size,num_attention_heads,layer_id等。类还包含一个名为rotary_emb的RotaryEmbedding实例,用于处理位置编码。此外,query_key_value和dense是用于计算查询、键和值的线性层。

  1. attention_mask_func方法,将注意力掩码应用于Transformer模型中的注意力得分(到了第407行)
        @staticmethod
        def attention_mask_func(attention_scores, attention_mask):
            # 使用掩码 (attention_mask) 更新注意力得分 (attention_scores)
            # 对于掩码值为0的位置,将注意力得分设置为-10000.0
            attention_scores.masked_fill_(attention_mask, -10000.0)
            
            # 返回更新后的注意力得分张量
            return attention_scores
  2. split_tensor_along_last_dim 方法
    该方法沿着张量的最后一个维度将其分割成多个部分。参数包括输入张量 tensor、要将张量分割成的分区数 num_partitions,以及布尔值 contiguous_split_chunks,用于确定分割后的张量是否需要在内存中连续。函数首先计算最后一个维度的大小,然后使用torch.split将输入张量分割成多个子张量。如果需要连续的分割块,将每个子张量转换为连续张量
  3. SelfAttention 类的 forward 方法:
    该方法负责计算自注意力。它接收以下参数:hidden_states(输入序列的隐藏状态)、position_ids(位置编码)、attention_mask(注意力掩码)、layer_id(层ID)、layer_past(上一层的隐藏状态),以及use_cache(布尔值,表示是否使用缓存)和output_attentions(布尔值,表示是否输出注意力概率)。方法首先将隐藏状态传递给查询键值 (query, key, value) 层,然后将这些层分割成独立的张量。接下来,应用旋转位置编码,计算注意力概率,并得到上下文表示。最后,返回输出张量、隐藏状态以及注意力概率(如果需要的话)。

4.1.3 GLMBlock类、ChatGLMPreTrainedModel类(第554-784行)

GLMBlock 类:这是一个包含多个子模块的Transformer层,如层归一化 (LayerNorm)、自注意力 (SelfAttention) 和门控线性单元 (GLU)

// 第554到第569行
class GLMBlock(torch.nn.Module):
    def __init__(
            self,
            hidden_size,
            num_attention_heads,
            layernorm_epsilon,
            layer_id,
            inner_hidden_size=None,
            hidden_size_per_attention_head=None,
            layernorm=LayerNorm,
            use_bias=True,
            params_dtype=torch.float,

            //相当于有28层或28个GLMBlock
            num_layers=28,
            position_encoding_2d=True,
            empty_init=True
    ):

GLMBlock 类的 forward 方法接收与SelfAttention的forward方法类似的参数,如输入序列的隐藏状态、位置编码、注意力掩码等。在这个方法中

  • 首先应用层归一化
  • 然后计算自注意力,接着应用第二个层归一化,最后通过门控线性单元 (GLU) 计算输出。在每个步骤之间,都有残差连接来保留之前的信息
  • 最后,返回输出张量、隐藏状态以及注意力概率(如果需要的话)

接下来第661-729行,定义了一个名为 ChatGLMPreTrainedModel 的类,它继承自 PreTrainedModel。这个类是用于处理权重初始化以及简化下载和加载预训练模型的接口。

  • 类变量包括:
    is_parallelizable:表示该模型是否可并行化,默认为 False
    supports_gradient_checkpointing:表示该模型是否支持梯度检查点,默认为 True
    config_class:模型配置类,这里使用了 ChatGLMConfig
    base_model_prefix:设置为 “transformer”
    _no_split_modules:一个包含 “GLMBlock” 的列表
  • 类方法包括:
    __init__:构造函数,调用父类的构造函数
    _init_weights:初始化权重的方法,这里没有具体实现
    get_masks:根据输入生成注意力掩码
    get_position_ids:根据输入和掩码位置生成位置编码,支持二维和非二维位置编码
    _set_gradient_checkpointing:根据给定的值(默认为False)设置梯度检查点。

此外,还定义了一个名为 CHATGLM_6B_START_DOCSTRING 的变量,包含有关 ChatGLM6BConfig 的文档字符串,描述了如何使用这个 PyTorch 模型

4.1.4 ChatGLMModel类(第785-1029行)

定义了一个名为ChatGLMModel的类,它继承自ChatGLMPreTrainedModel。这是一个基于transformer的模型,能够作为编码器(仅使用自注意力机制)或解码器。解码器的情况下,会在自注意力层之间添加一个跨注意力层。模型的结构遵循论文Attention is all you need中描述的结构。

ChatGLMModel类的forward方法负责执行模型的前向传播。这个方法接收一系列输入参数,如input_ids、attention_mask、past_key_values等。根据这些输入,方法将执行以下操作:

  1. 如果没有提供inputs_embeds,使用word_embeddings将input_ids转换为嵌入向量
  2. 如果没有提供past_key_values,使用get_prompt方法获取提示
  3. 如果没有提供attention_mask,生成一个全零的张量
  4. 如果没有提供position_ids,使用get_position_ids方法获取位置ID
  5. 使用注意力掩码更新输入
  6. 对于模型中的每个层,执行以下操作:
    更新隐藏状态
    如果需要,保存当前层的隐藏状态
    更新注意力权重
  7. 对最后一层应用层归一化。
  8. 如果需要,保存所有隐藏状态。
  9. 如果需要,返回一个包含所有输出的元组,否则返回一个BaseModelOutputWithPast对象。

这个模型的设计可以在序列到序列(Seq2Seq)任务中使用,这时需要将is_decoder和add_cross_attention参数设置为True,并在前向传播时提供encoder_hidden_states。 

4.1.5 ChatGLMForConditionalGeneration的类(第1031-1436行)

定义了一个名为ChatGLMForConditionalGeneration的类,,它用于条件生成任务,如文本生成。这个类继承自ChatGLMPreTrainedModel,主要包括初始化方法、模型的前向传播逻辑以及生成过程中需要的输入预处理方法。

主要部分的解释如下:

  • __init__方法是类的构造函数,用于初始化该类的实例。它接受两个参数:config(一个ChatGLMConfig实例,包含模型的配置信息)和empty_init(一个布尔值,表示是否跳过模型参数的初始化)。构造函数首先调用父类的构造函数,然后根据empty_init的值选择初始化方法。接着,它初始化一些实例变量,例如max_sequence_length和position_encoding_2d。最后,它初始化transformer和lm_head两个关键组件
  • get_output_embeddings和set_output_embeddings方法分别用于获取和设置lm_head的权重。
  • _update_model_kwargs_for_generation方法用于在生成过程中更新模型的关键字参数,包括更新past_key_values、attention_mask和position_ids。
  • prepare_inputs_for_generation方法在生成过程中准备模型的输入,包括input_ids、past_key_values、attention_mask和position_ids等。此外,该方法还处理了遮罩位置和gmask的使用。
  • forward方法实现了模型的前向传播逻辑。它接受一系列可选参数,例如input_ids、position_ids、attention_mask、past_key_values等,并根据这些输入调用transformer模块。接着,它将hidden_states传递给lm_head,并计算lm_logits。如果提供了标签(labels),则计算损失函数。最后,根据return_dict的值,返回一个包含损失、logits、隐藏状态等信息的元组或字典。此时到了1231行
  • _reorder_cache 方法:在执行束搜索 (beam search) 或者束采样 (beam sample) 时用于重新排序 past_key_values 缓存,以便将 past_key_values 与正确的 beam_idx 匹配。
  • process_response 方法:处理模型生成的回应,将其中的训练时间替换为 “2023年”,同时将英文标点符号替换为中文标点符号。
  • chat 方法:根据给定的查询和聊天历史生成回应。通过 tokenizer 对查询和聊天历史进行编码,并将其输入到模型中。然后,对模型生成的回应进行解码和处理,最后将新的回应添加到聊天历史中并返回。
  • stream_chat 方法:与 chat 方法类似,但使用生成器函数 (generator function) 以流式方式生成回应。
  • stream_generate 方法:一个生成器函数,用于生成回应。它首先将输入的 query 和聊天历史进行编码,然后根据生成配置 (generation_config) 进行一系列的准备工作。接着,在满足停止条件之前,通过模型的多次迭代来生成回应。
  • quantize 方法:量化模型的权重,以减少模型的内存占用和计算资源。这对于在资源有限的设备上部署模型非常有用。

该类中还包括一些辅助方法,例如 _get_logits_processor, _get_stopping_criteria, _get_logits_warper, prepare_inputs_for_generation, 和 _update_model_kwargs_for_generation,这些方法用于处理生成过程中的各种设置和参数。

4.2 分词代码的实现:tokenization_chatglm.py

4.2.1 TextTokenizer:处理文本和词条之间的转换

TextTokenizer:这个类主要处理文本和词条之间的转换,包括将文本转化为词条列表的分词(tokenize),将词条列表转化为文本的解码(decode),以及获取词条的ID和从ID获取词条(convert_tokens_to_ids, convert_ids_to_tokens)等操作。此外,它还包含了处理特殊词条和填充的功能

这些处理是许多自然语言处理(NLP)任务,如文本分类、命名实体识别、问答系统、机器翻译等的基础步骤

当然,TextTokenizer还依赖下面的SPTokenizer进行文本的分词和解码操作,而将复杂的操作封装在了自己的接口之下,同时添加了对特殊词条和填充的处理。

# 导入相关库和模块
from typing import List, Optional, Union
import os

from transformers.tokenization_utils import PreTrainedTokenizer  # 从 transformers 包导入预训练的词条化工具类
from transformers.utils import logging, PaddingStrategy  # 导入 transformers 的日志和填充策略工具类
from transformers.tokenization_utils_base import EncodedInput, BatchEncoding  # 导入词条化相关工具类
from typing import Dict  # 导入字典类型
import sentencepiece as spm  # 导入 sentencepiece,一个开源的词条化工具
import numpy as np  # 导入 numpy,用于科学计算

logger = logging.get_logger(__name__)  # 创建一个日志记录器

# 定义预训练位置嵌入大小的常量
PRETRAINED_POSITIONAL_EMBEDDINGS_SIZES = {
    "THUDM/chatglm-6b": 2048,
}


class TextTokenizer:
    def __init__(self, model_path):
        self.sp = spm.SentencePieceProcessor()  # 创建一个 SentencePieceProcessor 实例
        self.sp.Load(model_path)  # 加载模型
        self.num_tokens = self.sp.vocab_size()  # 获取模型的词汇表大小

    def encode(self, text):
        return self.sp.EncodeAsIds(text)  # 将文本编码为ID序列

    def decode(self, ids: List[int]):
        return self.sp.DecodeIds(ids)  # 将ID序列解码为文本

    def tokenize(self, text):
        return self.sp.EncodeAsPieces(text)  # 将文本分割为词条序列

    def convert_tokens_to_string(self, tokens):
        return self.sp.DecodePieces(tokens)  # 将词条序列解码为文本

    def convert_tokens_to_ids(self, tokens):
        return [self.sp.PieceToId(token) for token in tokens]  # 将词条序列转换为ID序列

    def convert_token_to_id(self, token):
        return self.sp.PieceToId(token)  # 将单个词条转换为ID

    def convert_id_to_token(self, idx):
        return self.sp.IdToPiece(idx)  # 将ID转换为词条

    def __len__(self):
        return self.num_tokens  # 返回词汇表大小

4.2.2 SPTokenizer:包装了SentencePiece库的分词器

SPTokenizer:这个类是一个包装了SentencePiece库的分词器。SentencePiece是一个开源的自然语言处理库,用于神经网络模型的不规则文本分词,这个类主要提供了一些接口来利用SentencePiece库进行分词、解码等操作

class SPTokenizer:
    def __init__(
            self,
            vocab_file,
            num_image_tokens=20000,
            max_blank_length=80,
            byte_fallback=True,
    ):
        assert vocab_file is not None  # 检查词汇表文件是否存在
        self.vocab_file = vocab_file  # 保存词汇表文件路径
        self.num_image_tokens = num_image_tokens  # 保存图像词条数量
        self.special_tokens = ["[MASK]", "[gMASK]", "[sMASK]", "<unused_0>", "<sop>", "<eop>", "<ENC>", "<dBLOCK>"]  # 定义特殊词条
        self.max_blank_length = max_blank_length  # 定义最大空白长度
        self.byte_fallback = byte_fallback  # 设置字节回退标记
        self.text_tokenizer = TextTokenizer(vocab_file)  # 创建文本词条化工具

    def _get_text_tokenizer(self):
        return self.text_tokenizer  # 获取文本词条化工具

    @staticmethod
    def get_blank_token(length: int):
        assert length >= 2
        return f"<|blank_{length}|>"  # 获取空白词条

    @staticmethod
    def get_tab_token():
        return f""  # 获取制表符词条

    @property
    def num_text_tokens(self):
        return self.text_tokenizer.num_tokens  # 获取文本词条数量

    @property
    def num_tokens(self):
        return self.num_image_tokens + self.num_text_tokens  # 获取总词条数量

    @staticmethod
    def _encode_whitespaces(text: str, max_len: int = 80):
        text = text.replace("\t", SPTokenizer.get_tab_token())  # 替换制表符
        for i in range(max_len, 1, -1):
            text = text.replace(" " * i, SPTokenizer.get_blank_token(i))  # 替换多个连续空格
        return text

    def _preprocess(self, text: str, linebreak=True, whitespaces=True):
        if linebreak:
            text = text.replace("\n", "<n>")  # 替换换行符
        if whitespaces:
            text = self._encode_whitespaces(text, max_len=self.max_blank_length)  # 编码空白字符
        return text

    def encode(
            self, text: str, linebreak=True, whitespaces=True, add_dummy_prefix=True
    ) -> List[int]:
        """
        文本编码方法
        """
        text = self._preprocess(text, linebreak, whitespaces)  # 预处理文本
        if not add_dummy_prefix:
            text = "<n>" + text
        tmp = self._get_text_tokenizer().encode(text)  # 编码文本
        tokens = [x + self.num_image_tokens for x in tmp]  # 将文本词条ID转换为包含图像词条ID的序列
        return tokens if add_dummy_prefix else tokens[2:]

    def postprocess(self, text):
        text = text.replace("<n>", "\n")  # 替换换行词条
        text = text.replace(SPTokenizer.get_tab_token(), "\t")  # 替换制表符词条
        for i in range(2, self.max_blank_length + 1):
            text = text.replace(self.get_blank_token(i), " " * i)  # 替换空白词条
        return text

    def decode(self, text_ids: List[int]) -> str:
        ids = [int(_id) - self.num_image_tokens for _id in text_ids]  # 将包含图像词条的ID序列转换为文本词条ID序列
        ids = [_id for _id in ids if _id >= 0]  # 删除非文本词条ID
        text = self._get_text_tokenizer().decode(ids)  # 解码ID序列为文本
        text = self.postprocess(text)  # 对文本进行后处理
        return text

    def decode_tokens(self, tokens: List[str]) -> str:
        text = self._get_text_tokenizer().convert_tokens_to_string(tokens)  # 将词条序列解码为文本
        text = self.postprocess(text)  # 对文本进行后处理
        return text

    def tokenize(
            self, text: str, linebreak=True, whitespaces=True, add_dummy_prefix=True
    ) -> List[str]:
        """
        文本分词方法
        """
        text = self._preprocess(text, linebreak, whitespaces)  # 预处理文本
        if not add_dummy_prefix:
            text = "<n>" + text
        tokens = self._get_text_tokenizer().tokenize(text)  # 分词
        return tokens if add_dummy_prefix else tokens[2:]

    def __getitem__(self, x: Union[int, str]):
        if isinstance(x, int):
            if x < self.num_image_tokens:
                return "<image_{}>".format(x)  # 如果是图像词条,返回词条形式
            else:
                return self.text_tokenizer.convert_id_to_token(x - self.num_image_tokens)  # 如果是文本词条,返回文本词条
        elif isinstance(x, str):
            if x.startswith("<image_") and x.endswith(">") and x[7:-1].isdigit():
                return int(x[7:-1])  # 如果是图像词条形式,返回词条ID
            else:
                return self.text_tokenizer.convert_token_to_id(x) + self.num_image_tokens  # 如果是文本词条,返回包含图像词条的ID
        else:
            raise ValueError("The key should be str or int.")  # 如果不是整数或字符串,抛出异常

4.2.3 字节级字节对编码(Byte-Pair Encoding,BPE)分词器类

下面这段代码定义了一个ChatGLM的字节级字节对编码(Byte-Pair Encoding,BPE)分词器类,包含了一些分词器的基础操作,例如文本预处理、分词、词条解码、填充等

具体而言,以下的代码包括了对输入文本的预处理,将文本转化为词条序列的分词,以及将词条序列转化为文本的解码,等一系列分词器常用的操作。同时,这个分词器还支持添加特殊词条,以及在分词器的左边或右边进行填充,以满足模型输入的需要

class ChatGLMTokenizer(PreTrainedTokenizer):  # 基于PreTrainedTokenizer定义一个新的分词器类
    """
    Construct a ChatGLM tokenizer. Based on byte-level Byte-Pair-Encoding.
    Args:
        vocab_file (`str`):
            Path to the vocabulary file.
    """

    vocab_files_names = {"vocab_file": "ice_text.model"}  # 设定词汇表文件名称
    max_model_input_sizes = PRETRAINED_POSITIONAL_EMBEDDINGS_SIZES  # 预设模型输入的最大尺寸
    model_input_names = ["input_ids", "attention_mask", "position_ids"]  # 预设模型输入的名称列表

    def __init__(  # 定义初始化函数
            self,
            vocab_file,  # 词汇表文件路径
            do_lower_case=False,  # 是否对文本做小写转换
            remove_space=False,  # 是否移除文本中的空格
            bos_token='<sop>',  # 文本开头的特殊词条
            eos_token='<eop>',  # 文本结尾的特殊词条
            end_token='</s>',  # 文本结束的特殊词条
            mask_token='[MASK]',  # 遮蔽词条
            gmask_token='[gMASK]',  # gMASK词条
            padding_side="left",  # 填充侧(左侧填充或右侧填充)
            pad_token="<pad>",  # 填充词条
            unk_token="<unk>",  # 未知词条
            num_image_tokens=20000,  # 图像词条的数量
            **kwargs  # 其他参数
    ) -> None:
        super().__init__(  # 调用父类的初始化函数
            do_lower_case=do_lower_case,
            remove_space=remove_space,
            padding_side=padding_side,
            bos_token=bos_token,
            eos_token=eos_token,
            end_token=end_token,
            mask_token=mask_token,
            gmask_token=gmask_token,
            pad_token=pad_token,
            unk_token=unk_token,
            num_image_tokens=num_image_tokens,
            **kwargs
        )

        self.do_lower_case = do_lower_case  # 是否进行小写转换
        self.remove_space = remove_space  # 是否移除空格
        self.vocab_file = vocab_file  # 词汇表文件

        self.bos_token = bos_token  # 文本开头的特殊词条
        self.eos_token = eos_token  # 文本结尾的特殊词条
        self.end_token = end_token  # 文本结束的特殊词条
        self.mask_token = mask_token  # 遮蔽词条
        self.gmask_token = gmask_token  # gMASK词条

        self.sp_tokenizer = SPTokenizer(vocab_file, num_image_tokens=num_image_tokens)  # 初始化SPTokenizer

    # 以下部分是定义了一些属性和方法
    @property
    def gmask_token_id(self) -> Optional[int]:  # 获取gmask词条的id
        if self.gmask_token is None:  # 若不存在,则返回None
            return None
        return self.convert_tokens_to_ids(self.gmask_token)  # 返回gmask词条对应的id

    @property
    def end_token_id(self) -> Optional[int]:  # 获取end词条的id
        if self.end_token is None:  # 若不存在,则返回None
            return None
        return self.convert_tokens_to_ids(self.end_token)  # 返回end词条对应的id

    @property
    def vocab_size(self):  # 获取词汇表的大小
        return self.sp_tokenizer.num_tokens  # 返回词汇表的大小

    def get_vocab(self):  # 获取词汇表
        vocab = {self._convert_id_to_token(i): i for i in range(self.vocab_size)}  # 将词汇表转化为字典形式
        vocab.update(self.added_tokens_encoder)  # 更新添加的词条编码器
        return vocab  # 返回词汇表

    def preprocess_text(self, inputs):  # 文本预处理函数
        if self.remove_space:  # 若需要移除空格
            outputs = " ".join(inputs.strip().split())  # 则移除多余的空格
        else:
            outputs = inputs  # 否则保持不变

        if self.do_lower_case:  # 若需要进行小写转换
            outputs = outputs.lower()  # 则转换为小写

        return outputs  # 返回预处理后的文本

    def _tokenize(self, text, **kwargs):  # 分词函数
        text = self.preprocess_text(text)  # 对文本进行预处理

        seq = self.sp_tokenizer.tokenize(text)  # 对文本进行分词

        return seq  # 返回分词结果

    def convert_tokens_to_string(self, tokens: List[str]) -> str:  # 将词条转化为字符串
        return self.sp_tokenizer.decode_tokens(tokens)  # 解码词条

    def _decode(
            self,
            token_ids: Union[int, List[int]],
            **kwargs
    ) -> str:
        # 对id进行解码
        if isinstance(token_ids, int):  # 如果输入是单个id
            token_ids = [token_ids]  # 则将其转化为列表
        if len(token_ids) == 0:  # 如果输入为空
            return ""  # 则返回空字符串
        if self.pad_token_id in token_ids:  # 如果填充id在输入中
            token_ids = list(filter((self.pad_token_id).__ne__, token_ids))  # 则移除填充id
        return super()._decode(token_ids, **kwargs)  # 返回父类的解码函数

    def _convert_token_to_id(self, token):  # 将词条转化为id
        return self.sp_tokenizer[token]  # 使用sp_tokenizer进行转换

    def _convert_id_to_token(self, index):  # 将id转化为词条
        return self.sp_tokenizer[index]  # 使用sp_tokenizer进行转换

    def save_vocabulary(self, save_directory, filename_prefix=None):  # 保存词汇表到指定目录
        # 将词汇表及特殊词条文件保存到目录
        if os.path.isdir(save_directory):  # 如果保存目录存在
            vocab_file = os.path.join(
                save_directory, self.vocab_files_names["vocab_file"]
            )  # 则构建vocab文件路径
        else:
            vocab_file = save_directory  # 否则vocab文件就是保存目录

        with open(self.vocab_file, 'rb') as fin:  # 打开vocab文件
            proto_str = fin.read()  # 读取文件内容

        with open(vocab_file, "wb") as writer:  # 打开待写入的文件
            writer.write(proto_str)  # 写入内容

        return (vocab_file,)  # 返回保存的文件路径

    # 以下是与特殊词条有关的方法
    def build_inputs_with_special_tokens(
            self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        # 构建带有特殊词条的输入
        gmask_id = self.sp_tokenizer[self.gmask_token]  # 获取gmask的id
        eos_id = self.sp_tokenizer[self.eos_token]  # 获取eos的id
        token_ids_0 = token_ids_0 + [gmask_id, self.sp_tokenizer[self.bos_token]]  # 添加gmask和bos到第一部分的尾部
        if token_ids_1 is not None:  # 如果存在第二部分
            token_ids_0 = token_ids_0 + token_ids_1 + [eos_id]  # 则将第二部分及eos添加到token_ids_0的尾部
        return token_ids_0  # 返回结果

    # 以下是与填充有关的方法
    def _pad(
            self,
            encoded_inputs: Union[Dict[str, EncodedInput], BatchEncoding],
            max_length: Optional[int] = None,
            padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
            pad_to_multiple_of: Optional[int] = None,
            return_attention_mask: Optional[bool] = None,
    ) -> dict:
        # 对编码后的输入进行填充
        bos_token_id = self.sp_tokenizer[self.bos_token]  # 获取bos的id
        mask_token_id = self.sp_tokenizer[self.mask_token]  # 获取mask的id
        gmask_token_id = self.sp_tokenizer[self.gmask_token]  # 获取gmask的id
        assert self.padding_side == "left"  # 断言填充在左边

        required_input = encoded_inputs[self.model_input_names[0]]  # 获取所需的输入
        seq_length = len(required_input)  # 获取序列长度

        if padding_strategy == PaddingStrategy.LONGEST:  # 如果填充策略是最长的
            max_length = len(required_input)  # 则最大长度为输入的长度

        if max_length is not None and pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0):
            max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of  # 如果最大长度不是pad_to_multiple_of的倍数,则进行相应的调整

        if max_length is not None and seq_length < max_length:  # 如果最大长度存在且序列长度小于最大长度
            difference = max_length - seq_length  # 计算差值
            if self.padding_side == "right":  # 如果填充在右边
                if return_attention_mask:  # 如果需要返回注意力掩码
                    encoded_inputs["attention_mask"] = [1] * seq_length + [0] * difference  # 则构建注意力掩码
                encoded_inputs[self.model_input_names[0]] = (
                    [bos_token_id] + [mask_token_id] * difference + required_input + [gmask_token_id]
                )  # 构建输入
            else:
                if return_attention_mask:  # 如果需要返回注意力掩码
                    encoded_inputs["attention_mask"] = [0] * difference + [1] * seq_length  # 则构建注意力掩码
                encoded_inputs[self.model_input_names[0]] = (
                    [gmask_token_id] + required_input + [mask_token_id] * difference + [bos_token_id]
                )  # 构建输入

        return encoded_inputs  # 返回编码后的输入

跟分词相关的还有一个tokenizer_config.json:这个文件通常包含分词器的配置信息,例如预训练模型使用的特殊令牌(如[CLS],[SEP]等)

{
  "name_or_path": "THUDM/chatglm-6b",
  "bos_token": "<sop>",
  "eos_token": "<eop>",
  "end_token": "</s>",
  "gmask_token": "[gMASK]",
  "mask_token": "[MASK]",
  "pad_token": "<pad>",
  "unk_token": "<unk>",
  "remove_space": false,
  "do_lower_case": false,
  "tokenizer_class": "ChatGLMTokenizer",
  "num_image_tokens": 0,
  "auto_map": {
    "AutoTokenizer": [
      "tokenization_chatglm.ChatGLMTokenizer",
      null
      ]
  }
}

4.3 quantization:模型量化——减小模型大小和推理时间

quantization.py: 这是一个Python脚本,可能包含了对模型进行量化的代码,量化是一种减小模型大小和推理时间的技术​

4.3.1 compress_int4_weight等类

from torch.nn import Linear   # 从torch.nn模块导入Linear线性模块
from torch.nn.parameter import Parameter  # 从torch.nn.parameter模块导入Parameter参数模块

import bz2  # 导入bz2模块,该模块支持bzip2压缩和解压缩
import torch  # 导入torch模块,这是一个深度学习框架
import base64  # 导入base64模块,该模块提供了将二进制数据转换为ASCII字符的方法
import ctypes  # 导入ctypes模块,该模块提供了一种强大的工具来创建、访问和操纵C数据类型
from transformers.utils import logging  # 从transformers.utils模块导入logging日志模块

from typing import List  # 从typing模块导入List,可以用于注解变量的类型
from functools import partial  # 从functools模块导入partial,可以用来固定函数的部分参数,返回新的partial对象

logger = logging.get_logger(__name__)  # 创建一个logger,名字为当前模块的名称

try:
    # 从cpm_kernels.kernels.base模块导入LazyKernelCModule,KernelFunction和round_up
    from cpm_kernels.kernels.base import LazyKernelCModule, KernelFunction, round_up  

    class Kernel:  # 定义一个名为Kernel的类
        def __init__(self, code: bytes, function_names: List[str]):  # 定义类的初始化函数,接收一个字节类型的code和一个字符串列表类型的function_names作为参数
            self.code = code  # 将传入的code参数赋值给self.code
            self._function_names = function_names  # 将传入的function_names参数赋值给self._function_names
            self._cmodule = LazyKernelCModule(self.code)  # 使用传入的code创建一个LazyKernelCModule对象,并赋值给self._cmodule

            for name in self._function_names:  # 遍历_function_names列表
                setattr(self, name, KernelFunction(self._cmodule, name))  # 为self设置一个属性,属性名为name,值为KernelFunction对象

    quantization_code = "$QlpoOTFBWSZTWU9yuJUAQHN......"

    # 尝试加载一组用于权重压缩和解压的 CUDA kernels
    # 其中,kernels 中包括四种不同的操作:
    # "int4WeightCompression","int4WeightExtractionFloat",
    # "int4WeightExtractionHalf","int8WeightExtractionFloat",
    # "int8WeightExtractionHalf"
    kernels = Kernel(
        bz2.decompress(base64.b64decode(quantization_code)),
        [
            "int4WeightCompression",
            "int4WeightExtractionFloat",
            "int4WeightExtractionHalf",
            "int8WeightExtractionFloat",
            "int8WeightExtractionHalf",
        ],
    )
    # 如果在加载过程中出现任何异常,kernels 设为 None,并记录警告信息
    except Exception as exception:
        kernels = None
        logger.warning("Failed to load cpm_kernels:" + str(exception))

# 定义一个自定义的 PyTorch autograd 函数,表示一种线性操作
# 这种操作在前向传播过程中使用的是量化后的权重,而在反向传播过程中则使用的是半精度浮点数的权重
class W8A16Linear(torch.autograd.Function):
    @staticmethod
    def forward(ctx, inp: torch.Tensor, quant_w: torch.Tensor, scale_w: torch.Tensor, weight_bit_width):
        # 保存输入的形状、权重的位宽以及权重的量化值和量化尺度等信息,供后向传播时使用
        ctx.inp_shape = inp.size()
        ctx.weight_bit_width = weight_bit_width
        out_features = quant_w.size(0)
        inp = inp.contiguous().view(-1, inp.size(-1))
        # 提取权重的半精度浮点数表示
        weight = extract_weight_to_half(quant_w, scale_w, weight_bit_width)
        ctx.weight_shape = weight.size()
        # 计算输出
        output = inp.mm(weight.t())
        # 保存必要的信息,供后向传播时使用
        ctx.save_for_backward(inp, quant_w, scale_w)
        return output.view(*(ctx.inp_shape[:-1] + (out_features,)))

    @staticmethod
    def backward(ctx, grad_output: torch.Tensor):
        # 提取前向传播时保存的信息
        inp, quant_w, scale_w = ctx.saved_tensors
        # 提取权重的半精度浮点数表示
        weight = extract_weight_to_half(quant_w, scale_w, ctx.weight_bit_width)
        grad_output = grad_output.contiguous().view(-1, weight.size(0))
        # 计算输入和权重的梯度
        grad_input = grad_output.mm(weight)
        grad_weight = grad_output.t().mm(inp)
        return grad_input.view(ctx.inp_shape), grad_weight.view(ctx.weight_shape), None, None

# 定义一个函数,用于将权重压缩为 int4 格式
def compress_int4_weight(weight: torch.Tensor):  # (n, m)
    with torch.cuda.device(weight.device):
        n, m = weight.size(0), weight.size(1)
        assert m % 2 == 0
        m = m // 2
        out = torch.empty(n, m, dtype=torch.int8, device="cuda")
        stream = torch.cuda.current_stream()

        gridDim = (n, 1, 1)
        blockDim = (min(round_up(m, 32), 1024), 1, 1)

        # 调用 CUDA kernels 进行权重压缩
        kernels.int4WeightCompression(
            gridDim,
            blockDim,
            0,
            stream,
            [ctypes.c_void_p(weight.data_ptr()), ctypes.c_void_p(out.data_ptr()), ctypes.c_int32(n), ctypes.c_int32(m)],
        )
        return out

# 定义一个函数,用于将量化的权重转换为半精度浮点数格式
def extract_weight_to_half(weight: torch.Tensor, scale_list: torch.Tensor, source_bit_width: int):
    if source_bit_width == 8:
        func = kernels.int8WeightExtractionHalf
    elif source_bit_width == 4:
        func = kernels.int4WeightExtractionHalf
    else:
        assert False, "Unsupported bit-width"

    with torch.cuda.device(weight.device):
        n, m = weight.size(0), weight.size(1)
        out = torch.empty(n, m * (8 // source_bit_width), dtype=torch.half, device="cuda")
        stream = torch.cuda.current_stream()

        gridDim = (n, 1, 1)
        blockDim = (min(round_up(m, 32), 1024), 1, 1)

        # 调用 CUDA kernels 提取权重
        func(
            gridDim,
            blockDim,
            0,
            stream,
            [
                ctypes.c_void_p(weight.data_ptr()),
                ctypes.c_void_p(scale_list.data_ptr()),
                ctypes.c_void_p(out.data_ptr()),
                ctypes.c_int32(n),
                ctypes.c_int32(m),
            ],
        )
        return out

4.3.2 QuantizedLinear

# 定义一个名为 QuantizedLinear 的类,该类继承自 PyTorch 中的 Linear 类
class QuantizedLinear(Linear):
    # 初始化函数,接受一些参数,包括权重的位宽、权重张量、偏置张量等
    def __init__(self, weight_bit_width: int, weight_tensor=None, bias_tensor=None, empty_init=False, *args, **kwargs):
        # 调用父类的初始化函数
        super(QuantizedLinear, self).__init__(*args, **kwargs)
        # 保存权重的位宽
        self.weight_bit_width = weight_bit_width

        # 获取权重的形状,并删除父类中的权重
        shape = self.weight.shape
        del self.weight

        # 如果未指定权重张量,或者指定了空初始化,则初始化权重和权重的量化尺度
        if weight_tensor is None or empty_init:
            self.weight = torch.empty(
                shape[0], shape[1] * weight_bit_width // 8, dtype=torch.int8, device=kwargs["device"]
            )
            self.weight_scale = torch.empty(shape[0], dtype=kwargs["dtype"], device=kwargs["device"])
        else:  # 否则,计算权重的量化值和量化尺度
            self.weight_scale = (weight_tensor.abs().max(dim=-1).values / ((2 ** (weight_bit_width - 1)) - 1)).half()
            self.weight = torch.round(weight_tensor / self.weight_scale[:, None]).to(torch.int8)
            # 如果权重的位宽为 4,压缩权重
            if weight_bit_width == 4:
                self.weight = compress_int4_weight(self.weight)

        # 将权重和权重的量化尺度设置为参数,并指定它们不需要梯度
        self.weight = Parameter(self.weight.to(kwargs["device"]), requires_grad=False)
        self.weight_scale = Parameter(self.weight_scale.to(kwargs["device"]), requires_grad=False)
        # 如果指定了偏置张量,将偏置设置为参数,并指定它不需要梯度
        if bias_tensor is not None:
            self.bias = Parameter(bias_tensor.to(kwargs["device"]), requires_grad=False)
        else:  # 否则,偏置设为 None
            self.bias = None

    # 定义前向传播函数
    def forward(self, input):
        # 应用 W8A16Linear 函数计算输出
        output = W8A16Linear.apply(input, self.weight, self.weight_scale, self.weight_bit_width)
        # 如果存在偏置,将偏置加到输出上
        if self.bias is not None:
            output = output + self.bias
        return output


# 定义一个函数,用于将模型中的线性层替换为量化的线性层
def quantize(model, weight_bit_width, empty_init=False, **kwargs):
    """Replace fp16 linear with quantized linear"""
    # 遍历模型中的每一层
    for layer in model.layers:
        # 将每一层中的 query_key_value 替换为量化的线性层
        layer.attention.query_key_value = QuantizedLinear(
            weight_bit_width=weight_bit_width,
            weight_tensor=layer.attention.query_key_value.weight.to(torch.cuda.current_device()),
            bias_tensor=layer.attention.query_key_value.bias,
            in_features=layer.attention.query_key_value.in_features,
            out_features=layer.attention.query_key_value.out_features,
            bias=True,
            dtype=torch.half,
            device=layer.attention.query_key_value.weight.device,
            empty_init=empty_init
        )
        # 将每一层中的 dense 替换为量化的线性层
        layer.attention.dense = QuantizedLinear(
            weight_bit_width=weight_bit_width,
            weight_tensor=layer.attention.dense.weight.to(torch.cuda.current_device()),
            bias_tensor=layer.attention.dense.bias,
            in_features=layer.attention.dense.in_features,
            out_features=layer.attention.dense.out_features,
            bias=True,
            dtype=torch.half,
            device=layer.attention.dense.weight.device,
            empty_init=empty_init
        )
        # 将每一层中的 dense_h_to_4h 替换为量化的线性层
        layer.mlp.dense_h_to_4h = QuantizedLinear(
            weight_bit_width=weight_bit_width,
            weight_tensor=layer.mlp.dense_h_to_4h.weight.to(torch.cuda.current_device()),
            bias_tensor=layer.mlp.dense_h_to_4h.bias,
            in_features=layer.mlp.dense_h_to_4h.in_features,
            out_features=layer.mlp.dense_h_to_4h.out_features,
            bias=True,
            dtype=torch.half,
            device=layer.mlp.dense_h_to_4h.weight.device,
            empty_init=empty_init
        )
        # 将每一层中的 dense_4h_to_h 替换为量化的线性层
        layer.mlp.dense_4h_to_h = QuantizedLinear(
            weight_bit_width=weight_bit_width,
            weight_tensor=layer.mlp.dense_4h_to_h.weight.to(torch.cuda.current_device()),
            bias_tensor=layer.mlp.dense_4h_to_h.bias,
            in_features=layer.mlp.dense_4h_to_h.in_features,
            out_features=layer.mlp.dense_4h_to_h.out_features,
            bias=True,
            dtype=torch.half,
            device=layer.mlp.dense_4h_to_h.weight.device,
            empty_init=empty_init
        )
    return model

第五部分 基于LangChain + ChatGLM-6B的本地知识库问答的应用实现

5.1 什么是LangChain:连接本地知识库与LLM的桥梁

作为一个 LLM 应用框架,LangChain 支持调用多种不同模型,提供相对统一、便捷的操作接口,让模型即插即用,这是其GitHub地址

而一个LangChain应用是通过很多个组件实现的,LangChain主要支持6种组件:

  1. Models:模型,各种类型的模型和模型集成,比如GPT-4
  2. Prompts:提示,包括提示管理、提示优化和提示序列化
  3. Memory:记忆,用来保存和模型交互时的上下文状态
  4. Indexes:索引,用来结构化文档,以便和模型交互
  5. Chains:链,一系列对各种组件的调用
  6. Agents:代理,决定模型采取哪些行动,执行并且观察流程,直到完成为止

具体如下图所示(图源)

5.2 通过LangChain+LLM实现本地知识库问答的核心步骤

GitHub上有一个利用 langchain 思想实现的基于本地知识库的问答应用,目标期望建立一套对中文场景与开源模型支持友好、可离线运行的知识库问答解决方案,其项目地址为:https://github.com/imClumsyPanda/langchain-ChatGLM

  • 💡 受 GanymedeNil 的项目 document.ai,和 AlexZhangji 创建的 ChatGLM-6B Pull Request 启发,建立了全流程可使用开源模型实现的本地知识库问答应用。现已支持使用 ChatGLM-6B、 ClueAI/ChatYuan-large-v2 等大语言模型的接入
  • ✅ 本项目中 Embedding 默认选用的是 GanymedeNil/text2vec-large-chinese,LLM 默认选用的是 ChatGLM-6B,依托上述模型,本项目可实现全部使用开源模型离线私有部署

⛓️ 本项目实现原理如下图所示

  1. 第一阶段:加载文件-读取文本-文本分割(Text splitter)
    加载文件
    :这是读取存储在本地的知识库文件的步骤
    读取文本:读取加载的文件内容,通常是将其转化为文本格式
    文本分割(Text splitter):按照一定的规则(例如段落、句子、词语等)将文本分割

        def _load_file(self, filename):
            # 加载文件
            if filename.lower().endswith(".pdf"):
                loader = UnstructuredFileLoader(filename) 
                text_splitor = CharacterTextSplitter()
                docs = loader.load_and_split(text_splitor)
            else:
                loader = UnstructuredFileLoader(filename, mode="elements")
                text_splitor = CharacterTextSplitter()
                docs = loader.load_and_split(text_splitor)
            return docs
  2. 第二阶段:文本向量化(embedding)-存储到向量数据库
    文本向量化(embedding)
    :这通常涉及到NLP的特征抽取,可以通过诸如TF-IDF、word2vec、BERT等方法将分割好的文本转化为数值向量

     def __init__(self, model_name=None) -> None:
            if not model_name:
                # use default embedding model
                self.embeddings = HuggingFaceEmbeddings(model_name=self.model_name)

    存储到向量数据库:文本向量化之后存储到数据库vectorstore(FAISS)

    def init_vector_store(self):
            persist_dir = os.path.join(VECTORE_PATH, ".vectordb")
            print("向量数据库持久化地址: ", persist_dir)
            if os.path.exists(persist_dir):
                # 从本地持久化文件中Load
                print("从本地向量加载数据...")
                vector_store = Chroma(persist_directory=persist_dir, embedding_function=self.embeddings)
                # vector_store.add_documents(documents=documents)
            else:
                documents = self.load_knownlege()
                # 重新初始化
                vector_store = Chroma.from_documents(documents=documents, 
                                                     embedding=self.embeddings,
                                                     persist_directory=persist_dir)
                vector_store.persist()
            return vector_store

    其中load_knownlege的实现为

        def load_knownlege(self):
            docments = []
            for root, _, files in os.walk(DATASETS_DIR, topdown=False):
                for file in files:
                    filename = os.path.join(root, file)
                    docs = self._load_file(filename)
                    # 更新metadata数据
                    new_docs = [] 
                    for doc in docs:
                        doc.metadata = {"source": doc.metadata["source"].replace(DATASETS_DIR, "")} 
                        print("文档2向量初始化中, 请稍等...", doc.metadata)
                        new_docs.append(doc)
                    docments += new_docs
            return docments
  3. 第三阶段:问句向量化
    这是将用户的查询或问题转化为向量,应使用与文本向量化相同的方法,以便在相同的空间中进行比较

  4. 第四阶段:在文本向量中匹配出与问句向量最相似的top k个
    这一步是信息检索的核心,通过计算余弦相似度、欧氏距离等方式,找出与问句向量最接近的文本向量

        def query(self, q):
            """Query similar doc from Vector """
            vector_store = self.init_vector_store()
            docs = vector_store.similarity_search_with_score(q, k=self.top_k)
            for doc in docs:
                dc, s = doc
                yield s, dc
  5. 第五阶段:匹配出的文本作为上下文和问题一起添加到prompt中
    这是利用匹配出的文本来形成与问题相关的上下文,用于输入给语言模型。

  6. 第六阶段:提交给LLM生成回答
    最后,将这个问题和上下文一起提交给语言模型(例如GPT系列),让它生成回答
    比如知识查询(代码来源)

    class KnownLedgeBaseQA:
    
        def __init__(self) -> None:
            k2v = KnownLedge2Vector()
            self.vector_store = k2v.init_vector_store()
            self.llm = VicunaLLM()
        
        def get_similar_answer(self, query):
            
            prompt = PromptTemplate(
                template=conv_qa_prompt_template,
                input_variables=["context", "question"]
            )
    
            retriever = self.vector_store.as_retriever(search_kwargs={"k": VECTOR_SEARCH_TOP_K})
            docs = retriever.get_relevant_documents(query=query)
    
            context = [d.page_content for d in docs] 
            result = prompt.format(context="\n".join(context), question=query)
            return result

如你所见,这种通过组合langchain+LLM的方式,特别适合一些垂直领域或大型集团企业搭建通过LLM的智能对话能力搭建企业内部的私有问答系统,也适合个人专门针对一些英文paper进行问答,比如比较火的一个开源项目:ChatPDF,其从文档处理角度来看,实现流程如下(图源):

5.3 langchain-ChatGLM项目的深入解读

再回顾一遍langchain-ChatGLM这个项目的架构图(图源)

你会发现该项目主要由以下七大模块组成

  1. models: llm的接⼝类与实现类,针对开源模型提供流式输出⽀持
  2. loader: 文档加载器的实现类
  3. textsplitter: 文本切分的实现类
  4. chains: 工作链路实现,如 chains/local_doc_qa 实现了基于本地⽂档的问答实现
  5. content:用于存储上传的原始⽂件
  6. vector_store:用于存储向量库⽂件,即本地知识库本体
  7. configs:配置文件存储

5.3.1 langchain-ChatGLM之chains文件夹下的代码解析

具体而言,上图中的FAISS是Facebook AI推出的一种用于有效搜索大规模高维向量空间中相似度的库。在大规模数据集中快速找到与给定向量最相似的向量是很多AI应用的重要组成部分,例如在推荐系统、自然语言处理、图像检索等领域

这个文件(xlangchain-ChatGLM/vectorstores.py at master · imClumsyPanda/langchain-ChatGLM · GitHub)的代码主要是关于FAISS (Facebook AI Similarity Search)的使用,以及一个FAISS向量存储类(FAISSVS,FAISSVS类继承自FAISS类)的定义,包含以下主要方法:

  1. max_marginal_relevance_search_by_vector:通过给定的嵌入向量,使用最大边际相关性(Maximal Marginal Relevance, MMR)方法来返回相关的文档。MMR是一种解决查询结果多样性和相关性的算法。具体来说,它不仅要求返回的文档与查询尽可能相似,而且希望返回的文档集之间尽可能多样。

  2. max_marginal_relevance_search:给定查询文本,首先将文本转换为嵌入向量,然后调用max_marginal_relevance_search_by_vector函数进行MMR搜索。

  3. __from:这是一个类方法,用于从一组文本和对应的嵌入向量创建一个FAISSVS实例。该方法首先创建一个FAISS索引并添加嵌入向量,然后创建一个文档存储以存储与每个嵌入向量关联的文档

以上就是这段代码的主要内容,通过使用FAISS和MMR,它可以帮助我们在大量文档中找到与给定查询最相关的文档

在这个代码文件下(langchain-ChatGLM/local_doc_qa.py at master · imClumsyPanda/langchain-ChatGLM · GitHub)下

  1. 导入包和模块: 代码开始的部分是一系列的导入语句,导入了必要的 Python 包和模块,包括文件加载器,文本分割器,模型配置,以及一些 Python 内建模块和其他第三方库。

  2. 改写 HuggingFaceEmbeddings 类的哈希方法: 代码定义了一个名为 _embeddings_hash 的函数,并将其赋值给 HuggingFaceEmbeddings 类的 __hash__ 方法。这样做的目的是使 HuggingFaceEmbeddings 对象可以被哈希,即可以作为字典的键或者被加入到集合中。

  3. 载入向量存储器: 定义了一个名为 load_vector_store 的函数,这个函数用于从本地加载一个向量存储器,返回 FAISS 类的对象。其中使用了 lru_cache 装饰器,可以缓存最近使用的 CACHED_VS_NUM 个结果,提高代码效率

  4. 文件树遍历: tree 函数是一个递归函数,用于遍历指定目录下的所有文件,返回一个包含所有文件的完整路径和文件名的列表。它可以忽略指定的文件或目录。

  5. 加载文件: load_file 函数根据文件后缀名选择合适的加载器和文本分割器,加载并分割文件

  6. 生成提醒: generate_prompt 函数用于根据相关文档和查询生成一个提醒。提醒的模板由 prompt_template 参数提供

  7. 分割列表: seperate_list 函数接受一个整数列表,返回一个列表的列表,其中每个子列表都包含连续的整数

  8. 向量搜索: similarity_search_with_score_by_vector 函数用于通过向量进行相似度搜索,返回与给定嵌入向量最相似的文档和对应的分数

    def similarity_search_with_score_by_vector(
            self, embedding: List[float], k: int = 4
    ) -> List[Tuple[Document, float]]:
        # 通过输入向量在向量库中进行搜索,返回最相似的 k 个向量的索引和得分
        scores, indices = self.index.search(np.array([embedding], dtype=np.float32), k)
        docs = []  # 用于存储找到的文档
        id_set = set()  # 用于存储找到的文档的 id
        store_len = len(self.index_to_docstore_id)  # 记录向量库中的向量数量
    
        # 遍历搜索结果的索引和得分
        for j, i in enumerate(indices[0]):
            # 如果索引无效或者得分低于阈值,则忽略该结果
            if i == -1 or 0 < self.score_threshold < scores[0][j]:
                continue
            _id = self.index_to_docstore_id[i]  # 根据索引获取文档的 id
            doc = self.docstore.search(_id)  # 根据 id 在文档库中查找文档
    
            # 如果不需要对文档内容进行分块
            if not self.chunk_conent:
                # 检查查找到的文档是否有效
                if not isinstance(doc, Document):
                    raise ValueError(f"Could not find document for id {_id}, got {doc}")
                doc.metadata["score"] = int(scores[0][j])  # 将得分记录到文档的元数据中
                docs.append(doc)  # 将文档添加到结果列表中
                continue
    
            id_set.add(i)  # 记录文档的索引
            docs_len = len(doc.page_content)  # 记录文档的长度
    
            # 对找到的文档进行处理,寻找相邻的文档,尽可能将多个文档的内容组合在一起,直到达到设定的最大长度
            for k in range(1, max(i, store_len - i)):
                break_flag = False
                for l in [i + k, i - k]:
                    if 0 <= l < len(self.index_to_docstore_id):
                        _id0 = self.index_to_docstore_id[l]
                        doc0 = self.docstore.search(_id0)
                        if docs_len + len(doc0.page_content) > self.chunk_size:
                            break_flag = True
                            break
                        elif doc0.metadata["source"] == doc.metadata["source"]:
                            docs_len += len(doc0.page_content)
                            id_set.add(l)
                if break_flag:
                    break
    
        # 如果不需要对文档内容进行分块,直接返回找到的文档
        if not self.chunk_conent:
            return docs
    
        # 如果没有找到满足条件的文档,返回空列表
        if len(id_set) == 0 and self.score_threshold > 0:
            return []
    
        id_list = sorted(list(id_set))  # 将找到的文档的 id 排序
        id_lists = seperate_list(id_list)  # 将 id 列表分块
    
        # 遍历分块后的 id 列表,将同一块中的文档内容组合在一起
        for id_seq in id_lists:
            for id in id_seq:
                if id == id_seq[0]:
                    _id = self.index_to_docstore_id[id]
                    doc = self.docstore.search(_id)
                else:
                    _id0 = self.index_to_docstore_id[id]
                    doc0 = self.docstore.search(_id0)
                    doc.page_content += " " + doc0.page_content
    
            # 检查组合后的文档是否有效
            if not isinstance(doc, Document):
                raise ValueError(f"Could not find document for id {_id}, got {doc}")
    
            # 计算组合后的文档的得分
            doc_score = min([scores[0][id] for id in [indices[0].tolist().index(i) for i in id_seq if i in indices[0]]])
            doc.metadata["score"] = int(doc_score)  # 将得分记录到文档的元数据中
            docs.append(doc)  # 将组合后的文档添加到结果列表中
    
        torch_gc()  # 清理 PyTorch 的缓存
        return docs  # 返回找到的文档列表

之后,定义了一个名为 LocalDocQA 的类,主要用于基于文档的问答任务。基于文档的问答任务的主要功能是,根据一组给定的文档(这里被称为知识库)以及用户输入的问题,返回一个答案,LocalDocQA 类的主要方法包括:

  1. init_cfg():此方法初始化一些变量,包括将 llm_model(一个语言模型用于生成答案)分配给 self.llm,将一个基于HuggingFace的嵌入模型分配给 self.embeddings,将输入参数 top_k 分配给 self.top_k。

  2. init_knowledge_vector_store():此方法负责初始化知识向量库。它首先检查输入的文件路径,对于路径中的每个文件,将文件内容加载到 Document 对象中,然后将这些文档转换为嵌入向量,并将它们存储在向量库中

  3. one_knowledge_add():此方法用于向知识库中添加一个新的知识文档。它将输入的标题和内容创建为一个 Document 对象,然后将其转换为嵌入向量,并添加到向量库中

  4. get_knowledge_based_answer():此方法是基于给定的知识库和用户输入的问题,来生成一个答案。它首先根据用户输入的问题找到知识库中最相关的文档,然后生成一个包含相关文档和用户问题的提示,将提示传递给 llm_model 来生成答案
    且注意一点,这个函数调用了上面已经实现好的:similarity_search_with_score

    def get_knowledge_based_answer(self, query, vs_path, chat_history=[], streaming: bool = STREAMING):
        # 加载向量库,用于后续的相似度搜索
        vector_store = load_vector_store(vs_path, self.embeddings)
    
        # 将自定义的 similarity_search_with_score_by_vector 方法赋给 FAISS 的 similarity_search_with_score 方法
        FAISS.similarity_search_with_score_by_vector = similarity_search_with_score_by_vector
    
        # 设置向量库的 chunk_size 属性
        vector_store.chunk_size = self.chunk_size
    
        # 设置向量库的 chunk_conent 属性
        vector_store.chunk_conent = self.chunk_conent
    
        # 设置向量库的 score_threshold 属性
        vector_store.score_threshold = self.score_threshold
    
        # 通过向量库对查询进行相似度搜索,返回最相关的文档及其得分
        related_docs_with_score = vector_store.similarity_search_with_score(query, k=self.top_k)
    
        # 清理 PyTorch 的缓存
        torch_gc()
    
        # 生成用于提问的提示语
        prompt = generate_prompt(related_docs_with_score, query)
    
        # 通过 LLM(长语言模型)生成回答
        for answer_result in self.llm.generatorAnswer(prompt=prompt, history=chat_history,
                                                      streaming=streaming):
            # 获取回答的文本
            resp = answer_result.llm_output["answer"]
    
            # 获取聊天历史
            history = answer_result.history
    
            # 将聊天历史中的最后一项的提问替换为当前的查询
            history[-1][0] = query
    
            # 组装回答的结果
            response = {"query": query,
                        "result": resp,
                        "source_documents": related_docs_with_score}
    
            # 返回回答的结果和聊天历史
            yield response, history
  5. get_knowledge_based_conent_test():此方法是为了测试的,它将返回与输入查询最相关的文档和查询提示

  6. get_search_result_based_answer():此方法与 get_knowledge_based_answer() 类似,不过这里使用的是 bing_search 的结果作为知识库

    def get_search_result_based_answer(self, query, chat_history=[], streaming: bool = STREAMING):
        # 对查询进行 Bing 搜索,并获取搜索结果
        results = bing_search(query)
    
        # 将搜索结果转化为文档的形式
        result_docs = search_result2docs(results)
    
        # 生成用于提问的提示语
        prompt = generate_prompt(result_docs, query)
    
        # 通过 LLM(长语言模型)生成回答
        for answer_result in self.llm.generatorAnswer(prompt=prompt, history=chat_history,
                                                      streaming=streaming):
            # 获取回答的文本
            resp = answer_result.llm_output["answer"]
    
            # 获取聊天历史
            history = answer_result.history
    
            # 将聊天历史中的最后一项的提问替换为当前的查询
            history[-1][0] = query
    
            # 组装回答的结果
            response = {"query": query,
                        "result": resp,
                        "source_documents": result_docs}
    
            # 返回回答的结果和聊天历史
            yield response, history

    如你所见,这个函数和上面那个函数的主要区别在于,这个函数是直接利用搜索引擎的搜索结果来生成回答的,而上面那个函数是通过查询相似度搜索来找到最相关的文档,然后基于这些文档生成回答的
    而这个bing_search则是如下定义的

    #coding=utf8
    # 声明文件编码格式为 utf8
    
    from langchain.utilities import BingSearchAPIWrapper
    # 导入 BingSearchAPIWrapper 类,这个类用于与 Bing 搜索 API 进行交互
    
    from configs.model_config import BING_SEARCH_URL, BING_SUBSCRIPTION_KEY
    # 导入配置文件中的 Bing 搜索 URL 和 Bing 订阅密钥
    
    def bing_search(text, result_len=3):
        # 定义一个名为 bing_search 的函数,该函数接收一个文本和结果长度的参数,默认结果长度为3
    
        if not (BING_SEARCH_URL and BING_SUBSCRIPTION_KEY):
            # 如果 Bing 搜索 URL 或 Bing 订阅密钥未设置,则返回一个错误信息的文档
            return [{"snippet": "please set BING_SUBSCRIPTION_KEY and BING_SEARCH_URL in os ENV",
                     "title": "env inof not fould",
                     "link": "https://python.langchain.com/en/latest/modules/agents/tools/examples/bing_search.html"}]
    
        search = BingSearchAPIWrapper(bing_subscription_key=BING_SUBSCRIPTION_KEY,
                                      bing_search_url=BING_SEARCH_URL)
        # 创建 BingSearchAPIWrapper 类的实例,该实例用于与 Bing 搜索 API 进行交互
    
        return search.results(text, result_len)
        # 返回搜索结果,结果的数量由 result_len 参数决定
    
    if __name__ == "__main__":
        # 如果这个文件被直接运行,而不是被导入作为模块,那么就执行以下代码
    
        r = bing_search('python')
        # 使用 Bing 搜索 API 来搜索 "python" 这个词,并将结果保存在变量 r 中
    
        print(r)
        # 打印出搜索结果

__main__部分的代码是 LocalDocQA 类的实例化和使用示例。它首先初始化了一个 llm_model_ins 对象,然后创建了一个 LocalDocQA 的实例并调用其 init_cfg() 方法进行初始化。之后,它指定了一个查询和知识库的路径,然后调用 get_knowledge_based_answer() 或 get_search_result_based_answer() 方法获取基于该查询的答案,并打印出答案和来源文档的信息。

chain这个文件夹下 还有最后一个项目文件(langchain-ChatGLM/text_load.py at master · imClumsyPanda/langchain-ChatGLM · GitHub),如下所示

import os
import pinecone 
from tqdm import tqdm
from langchain.llms import OpenAI
from langchain.text_splitter import SpacyTextSplitter
from langchain.document_loaders import TextLoader
from langchain.document_loaders import DirectoryLoader
from langchain.indexes import VectorstoreIndexCreator
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Pinecone

#一些配置文件
openai_key="你的key" # 注册 openai.com 后获得
pinecone_key="你的key" # 注册 app.pinecone.io 后获得
pinecone_index="你的库" #app.pinecone.io 获得
pinecone_environment="你的Environment"  # 登录pinecone后,在indexes页面 查看Environment
pinecone_namespace="你的Namespace" #如果不存在自动创建

#科学上网你懂得
os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890'
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'

#初始化pinecone
pinecone.init(
    api_key=pinecone_key,
    environment=pinecone_environment
)
index = pinecone.Index(pinecone_index)

#初始化OpenAI的embeddings
embeddings = OpenAIEmbeddings(openai_api_key=openai_key)

#初始化text_splitter
text_splitter = SpacyTextSplitter(pipeline='zh_core_web_sm',chunk_size=1000,chunk_overlap=200)

# 读取目录下所有后缀是txt的文件
loader = DirectoryLoader('../docs', glob="**/*.txt", loader_cls=TextLoader)

#读取文本文件
documents = loader.load()

# 使用text_splitter对文档进行分割
split_text = text_splitter.split_documents(documents)
try:
	for document in tqdm(split_text):
		# 获取向量并储存到pinecone
		Pinecone.from_documents([document], embeddings, index_name=pinecone_index)
except Exception as e:
    print(f"Error: {e}")
    quit()

5.3.2 langchain-ChatGLM之____文件夹下的代码解析

待更..

至于该项目的部署教程请见我司同事杜老师写的博客:Langchain-ChatGLM:基于本地知识库的问答

// 待更..

参考文献与推荐阅读

  1. ​​​​​​Transformer通俗笔记:从Word2Vec、Seq2Seq逐步理解到GPT、BERT
  2. Transformer原始论文(值得反复读几遍):Attention Is All You Need
  3. Vision Transformer 超详细解读 (原理分析+代码解读) (一)
  4. Transformer模型详解(图解最完整版)
  5. The Annotated Transformer(翻译之一),harvard对transformer的简单编码实现
  6. transformer的细节到底是怎么样的?
  7. 如何从浅入深理解transformer?
  8. Transformer 结构详解:位置编码 | Transformer Architecture: The Positional Encoding
  9. Transformer学习笔记一:Positional Encoding(位置编码)
  10. 保姆级讲解Transformer
  11. Jay Alammar写的图解transformer
  12. 如何理解attention中的Q,K,V?

附录:创作/修改记录

  1. 4.12-4.14,基本完成第一部分 transformer编码器部分的初稿
  2. 4.16,彻底完善关于transformer位置编码的阐述,可能是网上对这点最一目了然的阐述了
  3. 4.17,完成transformer的解码器部分
  4. 4.18,开始写「第四部分 ChatGLM-6B的代码架构与逐一实现」
  5. 5.26,新增内容
    分词代码的实现:tokenization_chatglm.py
    quantization:模型量化——减小模型大小和推理时间
  6. 5.27,新增“第五部分 基于LangChain + ChatGLM-6B的本地知识库的应用实现”
  7. 6.8日,完善第五部分

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(2)
心中带点小风骚的头像心中带点小风骚普通用户
上一篇 2023年6月17日
下一篇 2023年6月17日

相关推荐