基于aiohttp后台架构开发(将深度学习模型安全的加载到一个进程中)(接上文)

系列文章目录

前言

上一篇吗博客完成了数据库的基本操作,实现了基本的登录,注册,注销功能:

这篇文章主要进度为安全的将每个模型载入到进程中

这篇文章的第二部分旨在利用基本库完成一个我们的自己的框架,并安全的将模型载入到进程中。之后将保持更新直到整个系统完善为一个智能大系统,并且保持开源(仅提供与学习,请勿用于商业利益等行为!)
github地址:传送门

一、安全

这里为什么说的是安全,我们都知道做架构,框架一定要对OS,C底层有一定的了解,这里所谓的安全指的是防止例如内存泄露,异步错误,资源未释放,僵尸进程等系统性的问题。

这里举个例子,假设我们要载入一个模型,大概分为如下几步

  1. 申请足够的内存
  2. 将模型一点一点的载入
  3. 关闭io并将模型保持在内存中

注意!这里的第二步和第三步是可能发生系统错误的,比如第二步的载入过程,模型的载入一般会需要几秒到十几秒根据机器性能不等,在正在写入模型的过程中,利用键盘信号停止,就会导致部分的内存泄露。第三部将模型保存在内存中,一般而言是保存在一个进程中,如果处理不当,会导致该进程变为僵尸进程,僵尸进程堆积过多直接影响系统性能。

二、方法

一般而言我们要采用信号控制的方法,python提供的siganl这样一个lib,它可以帮助我们解决这种类似的问题。

包含,如下信号等等

文档:传送门

实现

模型:

本人使用的是yolov5系列训练的红绿灯识别,模型,由于项目原因,模型不方便公开,大家这里根据方法调用自己的模型即可。

代码紧接上文

utils下的文件

这里主要是网络的文件,但我们的重心应该放在加载网络的方法上。
修改之前的system.yaml

postgres:
  database: intersystem
  user: postgres
  password: root
  host: localhost
  port: 5432
  minsize: 1
  maxsize: 5
workers:
		# 这里是设置模型的路径
    max_workers: 1
    model_path:
      data: "model/data/traffic_light.names"
      weights: "model/data/best_model_12.pt"
      cfg: "model/data/yolov3-spp-6cls.cfg"
# 核心部分
# predefine the model
_model = None

# 这个函数的作用就是加载模型,它的事务是单纯的加载模型并返回,所以是不需要利用async去修饰的。
def load_my_model(model_path):
    """
    load the torch model
    :param model_path:
    :return:
    """
    # set the img_size	该参数用于设置传入网络的张量数据shape,好像是这样,我忘了。
    img_size = 512
    # load the cuda	该参数用于pytorch中的GPU使用设置
    device_set = device('cuda:0')
    # load the nets	这部分为加载网络结构,利用.cfg文件加载
    model = Darknet(str(BASE_DIR / model_path['cfg']), img_size)
    model = model_path
    return model

# 该方法调用加载模型的方法并设置信号
def load_model_with_signal(model_path: str) -> None:
    """

    :param model_path:
    :return:
    """
    # protect the ram
    # 该过程为设置信号,SIG_IGN表示忽略该方法在执行期间收到的键盘信号,例如ctrl+c等,这样的话,就防止了模型在加载的过程中导致的内存泄露的问题,内存泄露在上面有说道。
    signal(SIGINT, SIG_IGN)
    global _model
    if _model is None:
        _model = load_my_model(model_path=model_path)

# 该方法是清理模型,在app关闭时,清理资源
def clean_model_with_signal() -> None:
    """
    clean the model
    :return:
    """
    # SIG_DEL表示在接受到信号时,进行默认操作,例如ctrl+c则会进行一个进程的资源回收以及消亡。
    signal(SIGINT, SIG_DFL)
    global _model
    _model = None

来看看更改后的utils.py,这里我直接写入了网络结构,主要是项目当前的分层还不是很好

from signal import signal, SIGINT, SIG_IGN, SIG_DFL
from system.settings import BASE_DIR

import math

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

from utils import torch_utils
from utils.parse_config import parse_model_cfg
from utils.layers import MixConv2d, Swish, Mish, FeatureConcat, WeightedFeatureFusion

ONNX_EXPORT = False


from torch import device

# predefine the model
_model = None


def load_my_model(model_path):
    """
    load the torch model
    :param model_path:
    :return:
    """
    # set the img_size
    img_size = 512
    # load the cuda
    device_set = device('cuda:0')
    # load the nets
    model = Darknet(str(BASE_DIR / model_path['cfg']), img_size)
    model = model_path
    return model


def load_model_with_signal(model_path: str) -> None:
    """

    :param model_path:
    :return:
    """
    # protect the ram
    signal(SIGINT, SIG_IGN)
    global _model
    if _model is None:
        _model = load_my_model(model_path=model_path)


def clean_model_with_signal() -> None:
    """
    clean the model
    :return:
    """
    signal(SIGINT, SIG_DFL)
    global _model
    _model = None





def create_modules(module_defs, img_size, cfg):
    """

    :param module_defs:
    :param img_size:
    :param cfg:
    :return:
    """
    # Constructs module list of layer blocks from module configuration in module_defs

    img_size = [img_size] * 2 if isinstance(img_size, int) else img_size  # expand if necessary
    _ = module_defs.pop(0)  # cfg training hyperparams (unused)
    # 这里输入的通道数为3,初始化为一个列表对象,所以我全局排查了该列表对象的作用,发现这里根本没必要调用列表对象
    # 因为,该参数仅在初始化中使用,列表对象内嵌套一个整形对象,是对缓存的浪费,而且调用也会费时间。一个cdg不会浪费多少,十几个cfg初始化下来
    # 会影响整个类实例的速度
    output_filters = [3]  # input channels
    module_list = nn.ModuleList()
    routs = []  # list of layers which rout to deeper layers
    yolo_index = -1

    for i, mdef in enumerate(module_defs):
        modules = nn.Sequential()

        if mdef['type'] == 'convolutional':
            bn = mdef['batch_normalize']
            filters = mdef['filters']
            k = mdef['size']  # kernel size
            stride = mdef['stride'] if 'stride' in mdef else (mdef['stride_y'], mdef['stride_x'])
            if isinstance(k, int):  # single-size conv
                modules.add_module('Conv2d', nn.Conv2d(in_channels=output_filters[-1],
                                                       out_channels=filters,
                                                       kernel_size=k,
                                                       stride=stride,
                                                       padding=k // 2 if mdef['pad'] else 0,
                                                       groups=mdef['groups'] if 'groups' in mdef else 1,
                                                       bias=not bn))
            else:  # multiple-size conv
                modules.add_module('MixConv2d', MixConv2d(in_ch=output_filters[-1],
                                                          out_ch=filters,
                                                          k=k,
                                                          stride=stride,
                                                          bias=not bn))

            if bn:
                modules.add_module('BatchNorm2d', nn.BatchNorm2d(filters, momentum=0.03, eps=1E-4))
            else:
                routs.append(i)  # detection output (goes into yolo layer)

            if mdef['activation'] == 'leaky':  # activation study https://github.com/ultralytics/yolov3/issues/441
                modules.add_module('activation', nn.LeakyReLU(0.1, inplace=True))
            elif mdef['activation'] == 'swish':
                modules.add_module('activation', Swish())
            elif mdef['activation'] == 'mish':
                modules.add_module('activation', Mish())

        elif mdef['type'] == 'BatchNorm2d':
            filters = output_filters[-1]
            modules = nn.BatchNorm2d(filters, momentum=0.03, eps=1E-4)
            if i == 0 and filters == 3:  # normalize RGB image
                # imagenet mean and var https://pytorch.org/docs/stable/torchvision/models.html#classification
                modules.running_mean = torch.tensor([0.485, 0.456, 0.406])
                modules.running_var = torch.tensor([0.0524, 0.0502, 0.0506])

        elif mdef['type'] == 'maxpool':
            k = mdef['size']  # kernel size
            stride = mdef['stride']
            maxpool = nn.MaxPool2d(kernel_size=k, stride=stride, padding=(k - 1) // 2)
            if k == 2 and stride == 1:  # yolov3-tiny
                modules.add_module('ZeroPad2d', nn.ZeroPad2d((0, 1, 0, 1)))
                modules.add_module('MaxPool2d', maxpool)
            else:
                modules = maxpool

        elif mdef['type'] == 'upsample':
            if ONNX_EXPORT:  # explicitly state size, avoid scale_factor
                g = (yolo_index + 1) * 2 / 32  # gain
                modules = nn.Upsample(size=tuple(int(x * g) for x in img_size))  # img_size = (320, 192)
            else:
                modules = nn.Upsample(scale_factor=mdef['stride'])

        elif mdef['type'] == 'route':  # nn.Sequential() placeholder for 'route' layer
            layers = mdef['layers']
            filters = sum([output_filters[l + 1 if l > 0 else l] for l in layers])
            routs.extend([i + l if l < 0 else l for l in layers])
            modules = FeatureConcat(layers=layers)

        elif mdef['type'] == 'shortcut':  # nn.Sequential() placeholder for 'shortcut' layer
            layers = mdef['from']
            filters = output_filters[-1]
            routs.extend([i + l if l < 0 else l for l in layers])
            modules = WeightedFeatureFusion(layers=layers, weight='weights_type' in mdef)

        elif mdef['type'] == 'reorg3d':  # yolov3-spp-pan-scale
            pass

        elif mdef['type'] == 'yolo':
            yolo_index += 1
            stride = [32, 16, 8]  # P5, P4, P3 strides
            if any(x in cfg for x in ['panet', 'yolov4', 'cd53']):  # stride order reversed
                stride = list(reversed(stride))
            layers = mdef['from'] if 'from' in mdef else []
            modules = YOLOLayer(anchors=mdef['anchors'][mdef['mask']],  # anchor list
                                nc=mdef['classes'],  # number of classes
                                img_size=img_size,  # (416, 416)
                                yolo_index=yolo_index,  # 0, 1, 2...
                                layers=layers,  # output layers
                                stride=stride[yolo_index])

            # Initialize preceding Conv2d() bias (https://arxiv.org/pdf/1708.02002.pdf section 3.3)
            try:
                j = layers[yolo_index] if 'from' in mdef else -1
                # If previous layer is a dropout layer, get the one before
                if module_list[j].__class__.__name__ == 'Dropout':
                    j -= 1
                bias_ = module_list[j][0].bias  # shape(255,)
                bias = bias_[:modules.no * modules.na].view(modules.na, -1)  # shape(3,85)
                bias[:, 4] += -4.5  # obj
                bias[:, 5:] += math.log(0.6 / (modules.nc - 0.99))  # cls (sigmoid(p) = 1/nc)
                module_list[j][0].bias = torch.nn.Parameter(bias_, requires_grad=bias_.requires_grad)
            except:
                print('WARNING: smart bias initialization failure.')

        elif mdef['type'] == 'dropout':
            perc = float(mdef['probability'])
            modules = nn.Dropout(p=perc)
        else:
            print('Warning: Unrecognized Layer Type: ' + mdef['type'])

        # Register module list and number of output filters
        module_list.append(modules)
        output_filters.append(filters)

    routs_binary = [False] * (i + 1)
    for i in routs:
        routs_binary[i] = True
    return module_list, routs_binary


def get_yolo_layers(model):
    """

    :param model:
    :return:
    """
    return [i for i, m in enumerate(model.module_list) if m.__class__.__name__ == 'YOLOLayer']  # [89, 101, 113]


class YOLOLayer(nn.Module):
    def __init__(self, anchors, nc, img_size, yolo_index, layers, stride):
        super(YOLOLayer, self).__init__()
        self.anchors = torch.Tensor(anchors)
        self.index = yolo_index  # index of this layer in layers
        self.layers = layers  # model output layer indices
        self.stride = stride  # layer stride
        self.nl = len(layers)  # number of output layers (3)
        self.na = len(anchors)  # number of anchors (3)
        self.nc = nc  # number of classes (80)
        self.no = nc + 5  # number of outputs (85)
        self.nx, self.ny, self.ng = 0, 0, 0  # initialize number of x, y gridpoints
        self.anchor_vec = self.anchors / self.stride
        self.anchor_wh = self.anchor_vec.view(1, self.na, 1, 1, 2)

        if ONNX_EXPORT:
            self.training = False
            self.create_grids((img_size[1] // stride, img_size[0] // stride))  # number x, y grid points

    def create_grids(self, ng=(13, 13), device='cpu'):
        self.nx, self.ny = ng  # x and y grid size
        self.ng = torch.tensor(ng, dtype=torch.float)

        # build xy offsets
        if not self.training:
            yv, xv = torch.meshgrid([torch.arange(self.ny, device=device), torch.arange(self.nx, device=device)])
            self.grid = torch.stack((xv, yv), 2).view((1, 1, self.ny, self.nx, 2)).float()

        if self.anchor_vec.device != device:
            self.anchor_vec = self.anchor_vec.to(device)
            self.anchor_wh = self.anchor_wh.to(device)

    def forward(self, p, out):
        ASFF = False  # https://arxiv.org/abs/1911.09516
        if ASFF:
            i, n = self.index, self.nl  # index in layers, number of layers
            p = out[self.layers[i]]
            bs, _, ny, nx = p.shape  # bs, 255, 13, 13
            if (self.nx, self.ny) != (nx, ny):
                self.create_grids((nx, ny), p.device)

            # outputs and weights
            # w = F.softmax(p[:, -n:], 1)  # normalized weights
            w = torch.sigmoid(p[:, -n:]) * (2 / n)  # sigmoid weights (faster)
            # w = w / w.sum(1).unsqueeze(1)  # normalize across layer dimension

            # weighted ASFF sum
            p = out[self.layers[i]][:, :-n] * w[:, i:i + 1]
            for j in range(n):
                if j != i:
                    p += w[:, j:j + 1] * \
                         F.interpolate(out[self.layers[j]][:, :-n], size=[ny, nx], mode='bilinear', align_corners=False)

        elif ONNX_EXPORT:
            bs = 1  # batch size
        else:
            bs, _, ny, nx = p.shape  # bs, 255, 13, 13
            if (self.nx, self.ny) != (nx, ny):
                self.create_grids((nx, ny), p.device)

        # p.view(bs, 255, 13, 13) -- > (bs, 3, 13, 13, 85)  # (bs, anchors, grid, grid, classes + xywh)
        p = p.view(bs, self.na, self.no, self.ny, self.nx).permute(0, 1, 3, 4, 2).contiguous()  # prediction

        if self.training:
            return p

        elif ONNX_EXPORT:
            # Avoid broadcasting for ANE operations
            m = self.na * self.nx * self.ny
            ng = 1. / self.ng.repeat(m, 1)
            grid = self.grid.repeat(1, self.na, 1, 1, 1).view(m, 2)
            anchor_wh = self.anchor_wh.repeat(1, 1, self.nx, self.ny, 1).view(m, 2) * ng

            p = p.view(m, self.no)
            xy = torch.sigmoid(p[:, 0:2]) + grid  # x, y
            wh = torch.exp(p[:, 2:4]) * anchor_wh  # width, height
            p_cls = torch.sigmoid(p[:, 4:5]) if self.nc == 1 else \
                torch.sigmoid(p[:, 5:self.no]) * torch.sigmoid(p[:, 4:5])  # conf
            return p_cls, xy * ng, wh

        else:  # inference
            io = p.clone()  # inference output
            io[..., :2] = torch.sigmoid(io[..., :2]) + self.grid  # xy
            io[..., 2:4] = torch.exp(io[..., 2:4]) * self.anchor_wh  # wh yolo method
            io[..., :4] *= self.stride
            torch.sigmoid_(io[..., 4:])
            return io.view(bs, -1, self.no), p  # view [1, 3, 13, 13, 85] as [1, 507, 85]


class Darknet(nn.Module):
    # YOLOv3 object detection model

    def __init__(self, cfg, img_size=(416, 416), verbose=False):
        super(Darknet, self).__init__()

        self.module_defs = parse_model_cfg(cfg)
        self.module_list, self.routs = create_modules(self.module_defs, img_size, cfg)
        self.yolo_layers = get_yolo_layers(self)
        # torch_utils.initialize_weights(self)

        # Darknet Header https://github.com/AlexeyAB/darknet/issues/2914#issuecomment-496675346
        self.version = np.array([0, 2, 5], dtype=np.int32)  # (int32) version info: major, minor, revision
        self.seen = np.array([0], dtype=np.int64)  # (int64) number of images seen during training
        self.info(verbose) if not ONNX_EXPORT else None  # print model description

    def forward(self, x, augment=False, verbose=False):

        if not augment:
            return self.forward_once(x)
        else:  # Augment images (inference and test only) https://github.com/ultralytics/yolov3/issues/931
            img_size = x.shape[-2:]  # height, width
            s = [0.83, 0.67]  # scales
            y = []
            for i, xi in enumerate((x,
                                    torch_utils.scale_img(x.flip(3), s[0], same_shape=False),  # flip-lr and scale
                                    torch_utils.scale_img(x, s[1], same_shape=False),  # scale
                                    )):
                # cv2.imwrite('img%g.jpg' % i, 255 * xi[0].numpy().transpose((1, 2, 0))[:, :, ::-1])
                y.append(self.forward_once(xi)[0])

            y[1][..., :4] /= s[0]  # scale
            y[1][..., 0] = img_size[1] - y[1][..., 0]  # flip lr
            y[2][..., :4] /= s[1]  # scale

            # for i, yi in enumerate(y):  # coco small, medium, large = < 32**2 < 96**2 <
            #     area = yi[..., 2:4].prod(2)[:, :, None]
            #     if i == 1:
            #         yi *= (area < 96. ** 2).float()
            #     elif i == 2:
            #         yi *= (area > 32. ** 2).float()
            #     y[i] = yi

            y = torch.cat(y, 1)
            return y, None

    def forward_once(self, x, augment=False, verbose=False):
        img_size = x.shape[-2:]  # height, width
        yolo_out, out = [], []
        if verbose:
            print('0', x.shape)
            str = ''

        # Augment images (inference and test only)
        if augment:  # https://github.com/ultralytics/yolov3/issues/931
            nb = x.shape[0]  # batch size
            s = [0.83, 0.67]  # scales
            x = torch.cat((x,
                           torch_utils.scale_img(x.flip(3), s[0]),  # flip-lr and scale
                           torch_utils.scale_img(x, s[1]),  # scale
                           ), 0)

        for i, module in enumerate(self.module_list):
            name = module.__class__.__name__
            if name in ['WeightedFeatureFusion', 'FeatureConcat']:  # sum, concat
                if verbose:
                    l = [i - 1] + module.layers  # layers
                    sh = [list(x.shape)] + [list(out[i].shape) for i in module.layers]  # shapes
                    str = ' >> ' + ' + '.join(['layer %g %s' % x for x in zip(l, sh)])
                x = module(x, out)  # WeightedFeatureFusion(), FeatureConcat()
            elif name == 'YOLOLayer':
                yolo_out.append(module(x, out))
            else:  # run module directly, i.e. mtype = 'convolutional', 'upsample', 'maxpool', 'batchnorm2d' etc.
                x = module(x)

            out.append(x if self.routs[i] else [])
            if verbose:
                print('%g/%g %s -' % (i, len(self.module_list), name), list(x.shape), str)
                str = ''

        if self.training:  # train
            return yolo_out
        elif ONNX_EXPORT:  # export
            x = [torch.cat(x, 0) for x in zip(*yolo_out)]
            return x[0], torch.cat(x[1:3], 1)  # scores, boxes: 3780x80, 3780x4
        else:  # inference or test
            x, p = zip(*yolo_out)  # inference output, training output
            x = torch.cat(x, 1)  # cat yolo outputs
            if augment:  # de-augment results
                x = torch.split(x, nb, dim=0)
                x[1][..., :4] /= s[0]  # scale
                x[1][..., 0] = img_size[1] - x[1][..., 0]  # flip lr
                x[2][..., :4] /= s[1]  # scale
                x = torch.cat(x, 1)
            return x, p

    def fuse(self):
        # Fuse Conv2d + BatchNorm2d layers throughout model
        print('Fusing layers...')
        fused_list = nn.ModuleList()
        for a in list(self.children())[0]:
            if isinstance(a, nn.Sequential):
                for i, b in enumerate(a):
                    if isinstance(b, nn.modules.batchnorm.BatchNorm2d):
                        # fuse this bn layer with the previous conv2d layer
                        conv = a[i - 1]
                        fused = torch_utils.fuse_conv_and_bn(conv, b)
                        a = nn.Sequential(fused, *list(a.children())[i + 1:])
                        break
            fused_list.append(a)
        self.module_list = fused_list
        self.info() if not ONNX_EXPORT else None  # yolov3-spp reduced from 225 to 152 layers

    def info(self, verbose=False):
        torch_utils.model_info(self, verbose)

将模型读入到app中

# model.py
from aiohttp import web
from asyncio import get_event_loop, gather, shield
from concurrent.futures import ProcessPoolExecutor
from utils.utils import clean_model_with_signal, load_model_with_signal



async def init_models(app: web.Application, ) -> ProcessPoolExecutor:
    """

    :param app:
    :return:
    """
    conf = app['conf']['workers']
    # get the model path
    model_path = conf['model_path']
    # get the num_workers
    max_workers = conf['max_workers']
    # set the max executors num 设置进程池最大同时工作数
    executor = ProcessPoolExecutor(max_workers=max_workers)
    # create the loop 创建事务循环
    loop = get_event_loop()
    run = loop.run_in_executor
    # set the executor in runing 将方法在执行器中运行,完成了每个进程的模型读入
    fs = [run(executor, load_model_with_signal, model_path) for i in range(max_workers)]
    # gather可以并发运行序列中的可等待对象。
    await gather(*fs)
    # 该方法为关闭app后要对应的清理这些进程内的模型
    async def close_executor(app: web.Application, ) -> None:
        # set the cleaning model in executor
        fs = [run(executor, clean_model_with_signal) for i in range(max_workers)]
        # protect the fs from being cancelled
        await shield(gather(*fs))
        # until all events done wait=True表示直到所有的事务全部结束后,才会回收资源。
        executor.shutdown(wait=True)
		# on_cleanup中添加的是app关闭时应当执行的方法。
    app.on_cleanup.append(close_executor)
    app['executor'] = executor
    # 返回执行器
    return executor

修改main.py

# main.py
from aiohttp import web
from routes import setup_routes
from settings import config, BASE_DIR
from db import pg_context
from asyncio import SelectorEventLoop
from utils.model import init_models

import aiohttp_jinja2
import jinja2


async def init_app():
    """
    init the app
    :return:
    """
    # create a app
    app = web.Application()
    # setup the route
    setup_routes(app=app)
    app['conf'] = config
    aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(BASE_DIR / 'system' / 'templates')))
    # link the database
    app.cleanup_ctx.append(pg_context)
    # init the model, make the executor to use the model
    executor = await init_models(app=app)
    print(app['executor'])
    return app



async def get_app():
    """
    return the app
    :return:
    """
    app = await init_app()
    return app


if __name__ == '__main__':
    # create loop
    loop = SelectorEventLoop()
    # get the app and add it to the loop
    app = loop.run_until_complete(init_app(), )
    # start the app service
    web.run_app(app=app, loop=loop)

如上整个过程就完了,其他几个部分都是神经网路的需求文件,在这里不是重点,请移步本人的github

群聊

本人联合朋友创建了一个小白的群聊,里面包含了各种python,java开发书籍资源,需要学习的小伙伴可以通过群聊加入,需要什么书籍或者讨论问题,代码bug也可以直接找我们哈。群聊二维码在下面。欢迎读者加入。

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
xiaoxingxing的头像xiaoxingxing管理团队
上一篇 2022年5月26日
下一篇 2022年5月26日

相关推荐