站点图标 AI技术聚合

时间序列之拐点检测(changepoints detection)算法

一、前言

对于时间拐点问题,其实就是找changepoint的问题,业务场景比如机器的缩扩容,业务的升级回滚等,都会让一些指标发生这样的现象, 如下图。(这种场景比较理想,现实情况要复杂得多)

52110c62d69365919c9f36ceef6cd79f.jpeg

为了检测这个区域,需要使用changepoint detector:ruptures

二、ruptures

ruptures 是专门用于检测时间序列数据中的结构性变化,这种变化也常被称为断点(breakpoints)或变点(changepoints)。

库提供了多种算法来检测时间序列中的突变或者异常模式变化。这些算法能够识别出时间序列数据中的突变,这些突变可能表明了一些潜在的事件或状态变化。

项目地址:https://github.com/deepcharles/ruptures 官方文档在这里 http://ctruong.perso.math.cnrs.fr/ruptures

2.1 安装

pip install ruptures

2.2 使用

其中有很多算法供选择,这里选择Dynp(dynamic programming.)

其他算法使用可以参考:https://forecastegy.com/posts/change-point-detection-time-series-python/#detecting-change-points-with-binary-segmentation

import matplotlib.pyplot as plt
import ruptures as rpt
import numpy as np

mean = 0
std_dev = 1
length_of_series = 100
values = np.random.normal(mean, std_dev, length_of_series)

values[-35:] = values[-35:] + 10

# 找拐点
algo = rpt.Dynp(model="l2", min_size=3, jump=3).fit(values)
# 检测断点,指定最大断点数
result = algo.predict(n_bkps=1)
result = result[:-1]
# 显示结果
plt.plot(values)
for bkp in result:
    plt.axvline(x=bkp, color='r', linestyle='--')
plt.show()

参数:

  • model:用于计算cost的function,根据这个来找changepoint

  • custom_cost: 自定义cost function

  • min_size:最小的分段长度。

  • jump:在进行断点搜索时的采样步长。

2.3 源码简单解析

predict源码:

实则还是self.seg是核心函数,结果通过长度的index进行排序 输出的结果格式为长度的index,看上面给出的results就明白了。

@lru_cache(maxsize=None)
    def seg(self, start, end, n_bkps):
        """Recurrence to find the optimal partition of signal[start:end].

        This method is to be memoized and then used.

        Args:
            start (int): start of the segment (inclusive)
            end (int): end of the segment (exclusive)
            n_bkps (int): number of breakpoints

        Returns:
            dict: {(start, end): cost value, ...}
        """
        jump, min_size = self.jump, self.min_size

        if n_bkps == 0:
            cost = self.cost.error(start, end)
            return {(start, end): cost}
        elif n_bkps > 0:
            # Let's fill the list of admissible last breakpoints
            multiple_of_jump = (k for k in range(start, end) if k % jump == 0)
            admissible_bkps = list()
            for bkp in multiple_of_jump:
                n_samples = bkp - start
                # first check if left subproblem is possible
                if sanity_check(
                    n_samples=n_samples,
                    n_bkps=n_bkps - 1,
                    jump=jump,
                    min_size=min_size,
                ):
                    # second check if the right subproblem has enough points
                    if end - bkp >= min_size:
                        admissible_bkps.append(bkp)

            assert (
                len(admissible_bkps) > 0
            ), "No admissible last breakpoints found.\
             start, end: ({},{}), n_bkps: {}.".format(
                start, end, n_bkps
            )

            # Compute the subproblems
            sub_problems = list()
            for bkp in admissible_bkps:
                left_partition = self.seg(start, bkp, n_bkps - 1)
                right_partition = self.seg(bkp, end, 0)
                tmp_partition = dict(left_partition)
                tmp_partition[(bkp, end)] = right_partition[(bkp, end)]
                sub_problems.append(tmp_partition)

            # Find the optimal partition
            return min(sub_problems, key=lambda d: sum(d.values()))

稍微看下 这个源码,大致就懂这个算法的含义了。

  1. 通过jump大小得到可能的候选边界

  2. 通过min_size的大小比较,得到每段的前后边界组合

  3. 进行递归

  4. 如果没有分块,则计算此时的时间序列段的cost value

  5. 得到所有分块的cost值,找到此时cost的最小值

这个其实比较暴力了,相当于目标函数cost fuction最小化。不暴力的有贝叶斯的方法:Bayesian Online Changepoint Detection

2.4 一些问题

  1. 这个必须要预先设置要检测多少个拐点

  2. 比如要大于min_size

这两个问题对于实际应用,基本不会知道检测的长度预先有多少个拐点,其次大于min_size这个要求如果是流式场景肯定会有一定的延迟。

所以对于流式场景这个只能检测过去几个点的拐点状态,无法实时检测出来。

并且针对第一个问题,不知道有多少个,那我们也可以通过卡阈值的方式来自动化的选出排名前部的边界

# 找拐点
algo = rpt.Dynp(model="l2", min_size=3, jump=3).fit(values)
# 这块n可以很多,但是要满足分割的条件, 直接采用 seg
partition = algo.seg(0, algo.n_samples, n_bkps=10)
# 输出的肯定有最后一个边界要去掉
filter_partition = {}
for key, value in partition.items():
    if key[1] == algo.n_samples:
        continue
    filter_partition[key] = value
partition = filter_partition
sort_partition = sorted(partition.items(), key=lambda x: x[1], reverse=True)

后续怎么操作就看需求了,可以按照cost大小进行排序,也可以直接根据数量取前面的等等

推荐阅读:

我的2022届互联网校招分享

我的2021总结

浅谈算法岗和开发岗的区别

互联网校招研发薪资汇总

公众号:AI蜗牛车

保持谦逊、保持自律、保持进步

发送【蜗牛】获取一份《手把手AI项目》(AI蜗牛车著)

发送【1222】获取一份不错的leetcode刷题笔记

发送【AI四大名著】获取四本经典AI电子书

版权声明:本文为博主作者:AI蜗牛车原创文章,版权归属原作者,如果侵权,请联系我们删除!

原文链接:https://blog.csdn.net/qq_33431368/article/details/136003279

退出移动版