Autoformer算法与代码分析

Autoformer算法与代码分析

Autoformer算法与代码分析
原文链接:https://arxiv.org/pdf/2106.13008.pdf
代码链接:https://github.com/thuml/Autoformer

简介

Autoformer本质上是基于Transformer结构的时序预测模型,因此如果有Transformer基础,对于理解Autoformer以及所有基于Transformer的时序预测模型都比较有帮助。

所有基于Transfomer结构的时序预测模型,本质上都是对于其中最重要的模块:Self-attention进行创新。例如LogsTransformer,reformer,informer,fedformer无一例外都是对于Self-attention模块进行创新。其中LogsTransformer,reformer,informer都是对于Self-attention的计算复杂度方面进行创新,并且提出各自的modified-self-attention。而到了Autoformer首先对于Self-attention的计算方式进行创新。

模型结构

Autoformer相对于普通的Transformer结构无非就是加了时序拆解模块,修改了Self-attention模块。其他的例如Feed Forward模块并没有改动,那么接下来就从这两个新的模块进行详细拆解:

Series Decmop

时序拆解器本质上来自于传统的时序预测算法,例如Arima,Fbprophet。Arima等传统算法本质上思考的就是从统计的角度对于时间序列进行拆解,并赋予拆解的子项以不同的物理意义例如:趋势项,季节项,残差项等。

因此传统的时序拆解的普遍形式是:

Autoformer算法与代码分析
其中Autoformer算法与代码分析是待拆解的时间序列,Autoformer算法与代码分析是对应拆解后的趋势项,季节项,残差项。每一种算法对于其中每一拆解项的计算方式都千差万别,因为这是它们的主要创新点。以及每个子项的组合方式也各不相同,上述介绍的是最普遍的加法模型,实际上还有乘法模型以及混合模型。

从这个角度看,Autoformer的时序拆解模块其实是最简单的一种:
Autoformer算法与代码分析

第一行的式子解释了趋势项Autoformer算法与代码分析的具体算法,第二行的式子解释了Autoformer采用的是最简单的加法模型,并且仅仅拆解两个子项:趋势项和季节项。趋势项的具体算法是:使用一个均值滤波器处理时序,也可以看做使用了一个等长的卷积(same convolution)只不过卷积核的参数是确定的。

时序拆解部分的算法比较简单,没有什么创新点。作者的思路可能来源于传统算法对于时序拆解的使用,试图移植到Transformer结构中。但实际上该部分还有很多可以深挖的部分,例如使用更复杂的混合模型,以及拆解出多个子项。实际上后来的Fedformer也在这个部分作出了相应的优化和改进。

Auto-correlation

和大部分的基于Transformer的时序预测模型一样,Autoformer也是将Self-attention的改进作为主要的创新点。和reformer,informer等在它之前的模型不同的是,Autoformer的注意力不主要在于对于Self-attention计算量的减小上,它更加关注将Self-attention移植到时序计算上,设计一个更贴合时序分析的attention结构。

而时序数据的分析无外乎为:找周期特性,频域分析等,而Auto-correlation关注的就是寻找时序数据的周期特性,它希望模型能够更好的记住时序的周期特性,从而预测时序的未来值。

Autoformer算法与代码分析

Attention注意力的计算核心在于:相似性的度量。即通过向量内积来衡量向量之间的相似性。而Autoformer度量时序周期性的方式是,对于时间序列进行平移,度量平移前后的时间序列的相似性,度量的具体方式类似于向量的内积。基于这种方法,Autoformer认为,相似性高的平移序列对应的平移量就是潜在的周期。

当我们每次的平移量为Autoformer算法与代码分析时,上述的相似性计算可以表示为:

Autoformer算法与代码分析

其中Autoformer算法与代码分析为需要寻找周期性的时间序列,Autoformer算法与代码分析表示原时间序列平移Autoformer算法与代码分析后的序列。也就是说如果Autoformer算法与代码分析Autoformer算法与代码分析的相似性比较高,那么我们可以将Autoformer算法与代码分析看做Autoformer算法与代码分析的潜在周期。

如果我们迭代计算Autoformer算法与代码分析和所有平移变量之间的相似性,且每次的平移增量为1,那么迭代计算可以用频域计算表示为:
Autoformer算法与代码分析

因此在实际的Auto-correlation中,输入时间序列Autoformer算法与代码分析经过Autoformer算法与代码分析的映射后,先变换到频域,进而在频域中计算平移相似性。

代码讲解

Autoformer的主要创新点以及独特的模块已经在原理层面介绍完毕,接下来笔者将会在实际的代码层面进行细致的讲解。具体的代码细节将会在注释中注明,而需要注意的点将会着重说明

模块初始化

class Autoformer(nn.Module):
    """
    Autoformer is the first method to achieve the series-wise connection,
    with inherent O(LlogL) complexity
    """
    def __init__(self, configs):
        super(Autoformer, self).__init__()
        #利用历史时间序列的时间戳长度,编码器输入的时间维度
        self.seq_len = configs.seq_len
        #解码器输入的历史时间序列的时间戳长度。
        self.label_len = configs.label_len
        self.pred_len = configs.pred_len
        self.output_attention = False

        # Decomp,传入参数均值滤波器的核大小
        kernel_size = configs.moving_avg
        self.decomp = series_decomp(kernel_size)

        # Embedding
        # embedding操作,由于时间序列天然在时序上具有先后关系,因此这里embedding的作用更多的是为了调整维度
        self.enc_embedding = DataEmbedding_wo_pos(configs.d_feature, configs.d_model, configs.embed, configs.freq,
                                                  configs.dropout)
        self.dec_embedding = DataEmbedding_wo_pos(configs.d_feature, configs.d_model, configs.embed, configs.freq,
                                                  configs.dropout)



        # Encoder,采用的是多编码层堆叠
        self.encoder = Encoder(
            [
                EncoderLayer(
                    AutoCorrelationLayer(
                        #这里的第一个False表明是否使用mask机制。
                        AutoCorrelation(False, configs.factor, attention_dropout=configs.dropout,
                                        output_attention=False),
                        configs.d_model, configs.n_heads),
                    #编码过程中的特征维度设置
                    configs.d_model,
                    configs.d_ff,
                    moving_avg=configs.moving_avg,
                    dropout=configs.dropout,
                    #激活函数
                    activation=configs.activation
                ) for l in range(configs.e_layers)
            ],
            #时间序列通常采用Layernorm而不适用BN层
            norm_layer=my_Layernorm(configs.d_model)
        )
        # Decoder也是才是用多解码器堆叠
        self.decoder = Decoder(
            [
                DecoderLayer(
                    #如同传统的Transformer结构,decoder的第一个attention需要mask,保证当前的位置的预测不能看到之前的内容
                    #这个做法是来源于NLP中的作法,但是换成时序预测,实际上应该是不需要使用mask机制的。
                    #而在后续的代码中可以看出,这里的attention模块实际上都没有使用mask机制。

                    #self-attention,输入全部来自于decoder自身
                    AutoCorrelationLayer(
                        AutoCorrelation(True, configs.factor, attention_dropout=configs.dropout,
                                        output_attention=False),
                        configs.d_model, configs.n_heads),
                    #cross-attention,输入一部分来自于decoder,另一部分来自于encoder的输出
                    AutoCorrelationLayer(
                        AutoCorrelation(False, configs.factor, attention_dropout=configs.dropout,
                                        output_attention=False),
                        configs.d_model, configs.n_heads),
                    configs.d_model,
                    #任务要求的输出特征维度
                    configs.c_out,
                    configs.d_ff,
                    moving_avg=configs.moving_avg,
                    dropout=configs.dropout,
                    activation=configs.activation,
                )
                for l in range(configs.d_layers)
            ],
            norm_layer=my_Layernorm(configs.d_model),
            projection=nn.Linear(configs.d_model, configs.c_out, bias=True)
        )

各个模块的初始化没有什么特别的,参照Autoformer给出的结构图即可一一对应的找到各个模块的具体含义。值得一提的是,这里的初始化考虑了attention模块是否需要使用mask机制。mask-attention来源于NLP任务中的特殊要求,而在时序预测中,attention其实没有mask的必要。因此在后续的代码也可以看出,实际上所有的attention结构都没用使用mask机制。

Autoformer的前向传播

首先介绍Autoformer的原始输入:

    def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
                enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
        #x_enc表示编码器的输入
        #x_mark_enc表示x_enc中各个时间戳的先后关系
        #x_dec表示解码器的输入
        #x_mark_dec表示x_dec中各个时间戳的先后关系

这里x_mark_enc,x_mark_dec都是为了后续输入的Embedding操作服务的。作者的想法可能来自于原始Transformer的输入Embedding操作。因为在原始Transformer使用的NLP领域,词向量的输入是需要通过Embedding操作来表征词之间的先后关系。但是在时间序列预测领域中,时间序列的时间戳之间天然的具有先后关系,因此笔者认为这里的Embedding操作仅仅起到调整输入的特征维度的作用,对于表明先后关系没有太多作用。

接下来介绍编码器和解码器的输入:

        # decomp init
        # 因为需要使用生成式预测,所以需要用均值和0来占位,占住预测部分的位置。
        mean = torch.mean(x_enc, dim=1).unsqueeze(1).repeat(1, self.pred_len, 1)
        zeros = torch.zeros([x_dec.shape[0], self.pred_len, x_dec.shape[2]], device=x_enc.device)
        seasonal_init, trend_init = self.decomp(x_enc)
        # decoder input
        trend_init = torch.cat([trend_init[:, -self.label_len:, :], mean], dim=1)
        seasonal_init = torch.cat([seasonal_init[:, -self.label_len:, :], zeros], dim=1)

因为Autoformer沿用了Informer的生成式预测的方式,所以直接将空有预测部分的输入送到模型中。所以需要初始化预测部分的输入,编码器的输入正常,解码器的输入由Trend和Season构成:

接下来的代码都没有什么特别的,根据paper中的流程图就可以轻松理解:

def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec,
            enc_self_mask=None, dec_self_mask=None, dec_enc_mask=None):
    # decomp init
    # 因为需要使用生成式预测,所以需要用均值和0来占位,占住预测部分的位置。
    mean = torch.mean(x_enc, dim=1).unsqueeze(1).repeat(1, self.pred_len, 1)
    zeros = torch.zeros([x_dec.shape[0], self.pred_len, x_dec.shape[2]], device=x_enc.device)
    seasonal_init, trend_init = self.decomp(x_enc)
    # decoder input
    trend_init = torch.cat([trend_init[:, -self.label_len:, :], mean], dim=1)
    seasonal_init = torch.cat([seasonal_init[:, -self.label_len:, :], zeros], dim=1)
    # enc
    enc_out = self.enc_embedding(x_enc, x_mark_enc)
    enc_out, attns = self.encoder(enc_out, attn_mask=enc_self_mask)
    # dec
    dec_out = self.dec_embedding(seasonal_init, x_mark_dec)
    seasonal_part, trend_part = self.decoder(dec_out, enc_out, x_mask=dec_self_mask, cross_mask=dec_enc_mask,
                                             trend=trend_init)
    # final
    dec_out = trend_part + seasonal_part

    if self.output_attention:
        return dec_out[:, -self.pred_len:, :], attns
    else:
        return dec_out[:, -self.pred_len:, :]

时序拆解器

时序拆解器的算法原理可以参照上述的讲解以及paper,它的代码如下所示:

class moving_avg(nn.Module):
    """
    Moving average block to highlight the trend of time series
    """
    def __init__(self, kernel_size, stride):
        super(moving_avg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
        # padding on the both ends of time series
        # 这里的判断语句主要是为了和Fedformer的部分进行区分,如果只考虑autoformer可以默认这里的判断全是True
        if type(self.kernel_size) == list:
            if len(self.kernel_size) == 1:
                self.kernel_size = self.kernel_size[0]
        front = x[:, 0:1, :].repeat(1, self.kernel_size - 1-math.floor((self.kernel_size - 1) // 2), 1)
        end = x[:, -1:, :].repeat(1, math.floor((self.kernel_size - 1) // 2), 1)
        x = torch.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1))
        x = x.permute(0, 2, 1)
        return x


class series_decomp(nn.Module):
    """
    Series decomposition block
    """
    def __init__(self, kernel_size):
        super(series_decomp, self).__init__()
        self.moving_avg = moving_avg(kernel_size, stride=1)

    def forward(self, x):
        moving_mean = self.moving_avg(x)
        res = x - moving_mean
        return res, moving_mean

时序拆解器就是一个简单的均值滤波器,值得注意的是这里的滤波全部沿用的是same conv 即不改变输入的时间维度大小。其中一个目的是为了后续的维度统一,笔者认为在传统的LSTM或GRU模块中,一维卷积还是valid conv会好一些因为如果传入LSTM的时间戳个数过多,会加剧梯度爆炸和梯度弥散等问题。

Autocorrelation

Autocorrelation是autoformer的最大创新点,算法细节如前所述,这里主要讲解实际的代码细节,一边参照流程图,一边看代码会更好理解:
Autoformer算法与代码分析

实际的代码也基本是照着流程图走的,只是有些实现的小细节第一遍看来不太好懂,首先是qkv的处理,需要注意的是encoder和decoder中qkv的来源不太相同,encoder中的qkv全部来源一个输入,而decoder的qkv不知来源于decoder本身还来源于encoder的输出:

    #首先对于q,k进行FFT变换
    q_fft = torch.fft.rfft(queries.permute(0, 2, 3, 1).contiguous(), dim=-1)  # size=[B, H, E, L]
    k_fft = torch.fft.rfft(keys.permute(0, 2, 3, 1).contiguous(), dim=-1)
    #对k进行求共轭,并与q点乘
    res = q_fft * torch.conj(k_fft)
    #傅里叶反变换计算得出的值就是每一个时移因子的相似性大小
    corr = torch.fft.irfft(res, dim=-1) # size=[B, H, E, L]
    V = self.time_delay_agg_training(values.permute(0, 2, 3, 1).contiguous(), corr).permute(0, 3, 1, 2)

计算出了每个时移因子的相似性大小,接下来就要基于此对于V进行处理:

def time_delay_agg_full(self, values, corr):
    """
    Standard version of Autocorrelation
    """
    batch = values.shape[0]
    head = values.shape[1]
    channel = values.shape[2]
    length = values.shape[3]
    # index init,索引值初始化,类似于初始化自变量x,由于是多维处理,因此需要在各个维度上进行repeat
    init_index = torch.arange(length).unsqueeze(0).unsqueeze(0).unsqueeze(0).repeat(batch, head, channel, 1).cuda()
    # find top k,选取最大的几个时移因子
    top_k = int(self.factor * math.log(length))
    #[0]返回的是权重值,[1]返回的是对应的索引d
    weights = torch.topk(corr, top_k, dim=-1)[0]
    delay = torch.topk(corr, top_k, dim=-1)[1]
    # update corr
    # 使用softmax对权重值进行归一化处理
    tmp_corr = torch.softmax(weights, dim=-1)
    # aggregation
    # 这里用的操作比较巧,笔者一开始看的也不明所以,这里之所以最后一个维度repeat两次是因为流程图中Roll(V)的操作,也就是将前一部分的序列位置平移拼接到原序列的末尾,因此这里repeat两次是为了后续通过截取直接实现Roll(V)操作
    tmp_values = values.repeat(1, 1, 1, 2)
    delays_agg = torch.zeros_like(values).float()
    for i in range(top_k):
        #init_index+delay就相当于自变量给一个平移增量即 x = x+d
        tmp_delay = init_index + delay[..., i].unsqueeze(-1)
        #这里就是Roll(V)操作,相当于根据索引x直接对于V进行截取
        pattern = torch.gather(tmp_values, dim=-1, index=tmp_delay)
        delays_agg = delays_agg + pattern * (tmp_corr[..., i].unsqueeze(-1))
    return delays_agg

Encoder

编码器和原始的Transformer结构没差,除了将self-attention改为auto-correlation外,采用的多个编码层堆叠的结构。我们直接上代码:

class EncoderLayer(nn.Module):
    """
    Autoformer encoder layer with the progressive decomposition architecture
    """
    def __init__(self, attention, d_model, d_ff=None, moving_avg=25, dropout=0.1, activation="relu"):
        super(EncoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model

        self.attention = attention
        #这里的卷积是为了替代feed forward模块
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1, bias=False)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1, bias=False)

        self.decomp1 = series_decomp(moving_avg)
        self.decomp2 = series_decomp(moving_avg)

        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu
    def forward(self, x, attn_mask=None):

		#Autocorrelation
        new_x, attn = self.attention(
            x, x, x,
            attn_mask=attn_mask
        )
        x = x + self.dropout(new_x)
        x, _ = self.decomp1(x)
        y = x
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))
        res, _ = self.decomp2(x + y)
        return res, attn

只要理解了Autocorrelation,autoformer的代码就没有什么复杂的地方,完全是按照正常的Transformer模块进行的设计。对着autoformer的流程图就可以很好的理解。

Decoder

class Decoder(nn.Module):
    """
    Autoformer Decoder
    """
    def __init__(self, layers, norm_layer=None, projection=None):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList(layers)
        self.norm = norm_layer
        self.projection = projection

    def forward(self, x, cross, x_mask=None, cross_mask=None, trend=None):
        for layer in self.layers:
            #每一次都将趋势项累加,季节项传递给更深的网络进行处理
            x, residual_trend = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)
            trend = trend + residual_trend

        if self.norm is not None:
            x = self.norm(x)

        if self.projection is not None:
            #从高维映射回输出维度
            x = self.projection(x)
        return x, trend

Decoder也是多个decoder层进行堆叠,而每一个decoder层都共享encoder的输出

class DecoderLayer(nn.Module):
    """
    Autoformer decoder layer 
    """
    def __init__(self, self_attention, cross_attention, d_model, c_out, d_ff=None,
                 moving_avg=25, dropout=0.1, activation="relu"):
        super(DecoderLayer, self).__init__()
        d_ff = d_ff or 4 * d_model
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1, bias=False)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1, bias=False)

        self.decomp1 = series_decomp(moving_avg)
        self.decomp2 = series_decomp(moving_avg)
        self.decomp3 = series_decomp(moving_avg)

        self.dropout = nn.Dropout(dropout)
        self.projection = nn.Conv1d(in_channels=d_model, out_channels=c_out, kernel_size=3, stride=1, padding=1,
                                    padding_mode='circular', bias=False)
        self.activation = F.relu if activation == "relu" else F.gelu

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        #self-attention输入全部来自于decoder自身
        x = x + self.dropout(self.self_attention(
            x, x, x,
            attn_mask=x_mask
        )[0])

        x, trend1 = self.decomp1(x)
        #cross-attention:q来自于decoder,k,v则来自于encoder的输出
        x = x + self.dropout(self.cross_attention(
            x, cross, cross,
            attn_mask=cross_mask
        )[0])

        x, trend2 = self.decomp2(x)
        y = x
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))
        x, trend3 = self.decomp3(x + y)
		
        #趋势项累加,季节项通过模块进行处理
        residual_trend = trend1 + trend2 + trend3
        residual_trend = self.projection(residual_trend.permute(0, 2, 1)).transpose(1, 2)
        return x, residual_trend

Decoder部分的代码也是比较清晰明了的,至此autoformer部分的代码已经介绍完毕,下一篇会介绍基于autoformer进行改进的Fedformer。

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(2)
扎眼的阳光的头像扎眼的阳光普通用户
上一篇 2023年3月26日
下一篇 2023年3月26日

相关推荐