前言:主要学习了源码
并加入了自己在学习中对部分代码的理解,全部放在代码里面的注释了,方便记录,也欢迎大家一起讨论~
1 BackBone
"""
在ResNet50的基础上进行改进:加入膨胀卷积
原ResNet50:
Conv1:7*7-->Conv2:MaxPool-->ResBlock1-->Conv3:ResBlock2-->Conv4:ResBlock3-->Conv5:ResBlock4-->AverPool-->FC-->Softmax
改进后:
①前两个残差块没有改变
②后两个残差块的第一个残差结构第一层(Bottleneck1)并没有下采样(分支连接),故整个BockBone仅进行三次下采样(步长改为1):480*480-->60*60;原因是下采样过多的话,语义分割效果并不好
③后两个残差块的第一个残差结构第一层(Bottleneck1)中间3*3卷积的步长改为1(以防下采样),并加入膨胀系数:第三个残差块为1,第四个残差块为2
④后两个残差块的其他残差结构(Bottleneck2)均加入膨胀系数:第三个残差块为2,第四个残差块为4
"""
import torch
import torch.nn as nn
def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
"""3x3 convolution with padding"""
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
padding=dilation, groups=groups, bias=False, dilation=dilation)
def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class Bottleneck(nn.Module):
# Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
# while original implementation places the stride at the first 1x1 convolution(self.conv1)
# according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
# This variant is also known as ResNet V1.5 and improves accuracy according to
# https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.
expansion = 4
def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
base_width=64, dilation=1, norm_layer=None):
super(Bottleneck, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
width = int(planes * (base_width / 64.)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
self.conv1 = conv1x1(inplanes, width)
self.bn1 = norm_layer(width)
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.bn2 = norm_layer(width)
self.conv3 = conv1x1(width, planes * self.expansion)
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes=1000, zero_init_residual=False,
groups=1, width_per_group=64, replace_stride_with_dilation=None, # 与ResNet不同的是replace_stride_with_dilation
norm_layer=None):
super(ResNet, self).__init__()
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
self.inplanes = 64
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False] # 三个F只搭建ResNet,有True会使用膨胀卷积
if len(replace_stride_with_dilation) != 3:
raise ValueError("replace_stride_with_dilation should be None "
"or a 3-element tuple, got {}".format(replace_stride_with_dilation))
self.groups = groups
self.base_width = width_per_group
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3,
bias=False)
self.bn1 = norm_layer(self.inplanes)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(block, 128, layers[1], stride=2,
dilate=replace_stride_with_dilation[0])
self.layer3 = self._make_layer(block, 256, layers[2], stride=2,
dilate=replace_stride_with_dilation[1])
self.layer4 = self._make_layer(block, 512, layers[3], stride=2,
dilate=replace_stride_with_dilation[2])
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck):
nn.init.constant_(m.bn3.weight, 0)
def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation # dilate为True时,赋值dilation为previous_dilation
if dilate:
self.dilation *= stride # 膨胀系数与步长相乘
stride = 1
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride), # 分支连接并不下采样
norm_layer(planes * block.expansion),
)
layers = []
layers.append(block(self.inplanes, planes, stride, downsample, self.groups, # block相当于Bottleneck1
self.base_width, previous_dilation, norm_layer))
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(block(self.inplanes, planes, groups=self.groups,
base_width=self.base_width, dilation=self.dilation,
norm_layer=norm_layer))
return nn.Sequential(*layers)
def _forward_impl(self, x):
# See note [TorchScript super()]
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def forward(self, x):
return self._forward_impl(x)
def _resnet(block, layers, **kwargs):
model = ResNet(block, layers, **kwargs)
return model
def resnet50(**kwargs):
r"""ResNet-50 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
progress (bool): If True, displays a progress bar of the download to stderr
"""
return _resnet(Bottleneck, [3, 4, 6, 3], **kwargs)
def resnet101(**kwargs):
r"""ResNet-101 model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
progress (bool): If True, displays a progress bar of the download to stderr
"""
return _resnet(Bottleneck, [3, 4, 23, 3], **kwargs)
2 FCN_model
"""
搭建网络模型(Model):搭建网络(主干网络BackBone+FCNHead)-->正向传播-->梯度更新
"""
from collections import OrderedDict
from typing import Dict
import torch
from torch import nn, Tensor
from torch.nn import functional as F
from .backbone import resnet50, resnet101
"""1.1 目的是为了重构BackBone和返回中间层的输出:layer3的aux和layer4的out"""
class IntermediateLayerGetter(nn.ModuleDict):
"""
Module wrapper that returns intermediate layers from a model
It has a strong assumption that the modules have been registered
into the model in the same order as they are used.
This means that one should **not** reuse the same nn.Module
twice in the forward if you want this to work.
Additionally, it is only able to query submodules that are directly
assigned to the model. So if `model` is passed, `model.feature1` can
be returned, but not `model.feature1.layer2`.
Args:
model (nn.Module): model on which we will extract the features
return_layers (Dict[name, new_name]): a dict containing the names
of the modules for which the activations will be returned as
the key of the dict, and the value of the dict is the name
of the returned activation (which the user can specify).
"""
_version = 2
__annotations__ = {
"return_layers": Dict[str, str],
}
def __init__(self, model: nn.Module, return_layers: Dict[str, str]) -> None:
if not set(return_layers).issubset([name for name, _ in model.named_children()]): # 判断return_layers的key值是否传入model即BackBone中,也就是是否含有layer3/4
raise ValueError("return_layers are not present in model")
orig_return_layers = return_layers
return_layers = {str(k): str(v) for k, v in return_layers.items()} # 转化字典类型,方便修改
"""重新构建backbone,将没有使用到的模块全部删掉"""
layers = OrderedDict() # 创建有序字典,遍历BackBone
for name, module in model.named_children(): # 将ResNet每个子模块全部遍历并保存;遍历一个删除一个,直到全部遍历为止
layers[name] = module # 把子模块的名字和数据存入字典
if name in return_layers: # 将return_layers已存在的键值对信息删除
del return_layers[name]
if not return_layers: # 如果为空就结束;则BackBone构建完成
break
super(IntermediateLayerGetter, self).__init__(layers)
self.return_layers = orig_return_layers # 把layer3/4赋值于self.return_layers
"""正向传播的结果:layer3和layer4的输出"""
def forward(self, x: Tensor) -> Dict[str, Tensor]:
out = OrderedDict() # 构建新的字典;得到正向传播的结果:aux和out的输出
for name, module in self.items(): # 循环遍历每个子模块;x为输入的数据
x = module(x) # 每个子模块进行正向传播
if name in self.return_layers:
out_name = self.return_layers[name] # 把存在return_layers里key对应的value取出来,比如layer3的aux对应的输出和layer4的out对应的输出
out[out_name] = x # 举例:name=layer4-->out_name=out-->layer4的输出
return out
"""3 最终模型FCN"""
class FCN(nn.Module):
"""
Implements a Fully-Convolutional Network for semantic segmentation.
Args:
backbone (nn.Module): the network used to compute the features for the model.
The backbone should return an OrderedDict[Tensor], with the key being
"out" for the last feature map used, and "aux" if an auxiliary classifier
is used.
classifier (nn.Module): module that takes the "out" element returned from
the backbone and returns a dense prediction.
aux_classifier (nn.Module, optional): auxiliary classifier used during training
"""
__constants__ = ['aux_classifier']
def __init__(self, backbone, classifier, aux_classifier=None):
super(FCN, self).__init__()
self.backbone = backbone
self.classifier = classifier
self.aux_classifier = aux_classifier
def forward(self, x: Tensor) -> Dict[str, Tensor]: # x为输入的数据
input_shape = x.shape[-2:] # tensor的最后两个维度即为图片的宽高
# contract: features is a dict of tensors
features = self.backbone(x) # feature为字典
result = OrderedDict() # 创建字典,方便统计最终输出
x = features["out"]
x = self.classifier(x) # 将out输出提取并送入主分类器中
# 原论文中虽然使用的是ConvTranspose2d,但权重是冻结的,所以就是一个bilinear插值
x = F.interpolate(x, size=input_shape, mode='bilinear', align_corners=False) # 通过双线性插值还原到原少shape
result["out"] = x # 结果存到字典中
if self.aux_classifier is not None:
x = features["aux"]
x = self.aux_classifier(x)
# 原论文中虽然使用的是ConvTranspose2d,但权重是冻结的,所以就是一个bilinear插值
x = F.interpolate(x, size=input_shape, mode='bilinear', align_corners=False)
result["aux"] = x # 辅助分类器的结果
return result
"""2 定义FCNHead函数"""
class FCNHead(nn.Sequential):
def __init__(self, in_channels, channels):
inter_channels = in_channels // 4
layers = [
nn.Conv2d(in_channels, inter_channels, 3, padding=1, bias=False),
nn.BatchNorm2d(inter_channels),
nn.ReLU(),
nn.Dropout(0.1),
nn.Conv2d(inter_channels, channels, 1)
]
super(FCNHead, self).__init__(*layers)
"""1 定义BackBone函数"""
def fcn_resnet50(aux, num_classes=21, pretrain_backbone=False):
# 'resnet50_imagenet': 'https://download.pytorch.org/models/resnet50-0676ba61.pth'
# 'fcn_resnet50_coco': 'https://download.pytorch.org/models/fcn_resnet50_coco-1167a1af.pth'
backbone = resnet50(replace_stride_with_dilation=[False, True, True]) # 调用带有膨胀卷积的ResNet50
if pretrain_backbone: # 载入resnet50 backbone预训练权重
backbone.load_state_dict(torch.load("resnet50.pth", map_location='cpu'))
out_inplanes = 2048
aux_inplanes = 1024
return_layers = {'layer4': 'out'}
if aux:
return_layers['layer3'] = 'aux'
backbone = IntermediateLayerGetter(backbone, return_layers=return_layers) # 利用IntermediateLayerGetter重构BackBone,因为需要去掉全连接层和第三第四ResBlock的输出
aux_classifier = None
# why using aux: https://github.com/pytorch/vision/issues/4292
if aux:
aux_classifier = FCNHead(aux_inplanes, num_classes) # 辅助分类器输出
classifier = FCNHead(out_inplanes, num_classes) # 主干分类输出
model = FCN(backbone, classifier, aux_classifier)
return model
def fcn_resnet101(aux, num_classes=21, pretrain_backbone=False):
# 'resnet101_imagenet': 'https://download.pytorch.org/models/resnet101-63fe2227.pth'
# 'fcn_resnet101_coco': 'https://download.pytorch.org/models/fcn_resnet101_coco-7ecb50ca.pth'
backbone = resnet101(replace_stride_with_dilation=[False, True, True])
if pretrain_backbone:
# 载入resnet101 backbone预训练权重
backbone.load_state_dict(torch.load("resnet101.pth", map_location='cpu'))
out_inplanes = 2048
aux_inplanes = 1024
return_layers = {'layer4': 'out'}
if aux:
return_layers['layer3'] = 'aux'
backbone = IntermediateLayerGetter(backbone, return_layers=return_layers)
aux_classifier = None
# why using aux: https://github.com/pytorch/vision/issues/4292
if aux:
aux_classifier = FCNHead(aux_inplanes, num_classes)
classifier = FCNHead(out_inplanes, num_classes)
model = FCN(backbone, classifier, aux_classifier)
return model
搭建完模型就是训练
3 Train
import os
import time
import datetime
import torch
from src import fcn_resnet50
from train_utils import train_one_epoch, evaluate, create_lr_scheduler
from my_dataset import VOCSegmentation
import transforms as T
"""1 训练过程中,图像所采用的预处理方法"""
class SegmentationPresetTrain:
def __init__(self, base_size, crop_size, hflip_prob=0.5, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
min_size = int(0.5 * base_size)
max_size = int(2.0 * base_size)
trans = [T.RandomResize(min_size, max_size)] # 列表中每一个元素代表一个预处理方法
if hflip_prob > 0:
trans.append(T.RandomHorizontalFlip(hflip_prob)) # 随机翻转
trans.extend([
T.RandomCrop(crop_size),
T.ToTensor(), # 0-255的像素缩放到0-1之间
T.Normalize(mean=mean, std=std), # 标准化处理:减均值除标准差
])
self.transforms = T.Compose(trans) # Compose()打包赋值transforms
def __call__(self, img, target):
return self.transforms(img, target)
"""2 验证过程中,图像所采用的预处理方法"""
class SegmentationPresetEval:
def __init__(self, base_size, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
self.transforms = T.Compose([
T.RandomResize(base_size, base_size),
T.ToTensor(),
T.Normalize(mean=mean, std=std),
])
def __call__(self, img, target):
return self.transforms(img, target)
"""3 定义初始和剪裁尺寸"""
def get_transform(train):
base_size = 520
crop_size = 480 # 训练时是剪裁尺寸
return SegmentationPresetTrain(base_size, crop_size) if train else SegmentationPresetEval(base_size)
"""4 定义模型"""
def create_model(aux, num_classes, pretrain=True):
model = fcn_resnet50(aux=aux, num_classes=num_classes) # 创建模型
if pretrain:
weights_dict = torch.load("./fcn_resnet50_coco.pth", map_location='cpu') # 字典模式导入预训练权重,还未载入模型,方便先判断是否删除某些权重
""" # 官方提供的预训练权重是21类(包括背景)
# 如果训练自己的数据集,将和类别相关的权重删除,防止权重shape不一致报错"""
if num_classes != 21: # 官方提供的预训练权重是21类(包括背景)
for k in list(weights_dict.keys()): # 如果训练自己的数据集,将和类别相关的权重删除,防止权重shape不一致报错
if "classifier.4" in k: # classifier.4是关于类别先关的权重,删除之
del weights_dict[k]
missing_keys, unexpected_keys = model.load_state_dict(weights_dict, strict=False) # 载入权重
if len(missing_keys) != 0 or len(unexpected_keys) != 0:
print("missing_keys: ", missing_keys)
print("unexpected_keys: ", unexpected_keys)
return model
"""5 训练"""
def main(args):
device = torch.device(args.device if torch.cuda.is_available() else "cpu")
batch_size = args.batch_size
# segmentation nun_classes + background
num_classes = args.num_classes + 1
results_file = "results{}.txt".format(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")) # 用来保存训练以及验证过程中信息
# VOCdevkit -> VOC2012 -> ImageSets -> Segmentation -> train.txt
train_dataset = VOCSegmentation(args.data_path,
year="2012",
transforms=get_transform(train=True),
txt_name="train.txt")
# VOCdevkit -> VOC2012 -> ImageSets -> Segmentation -> val.txt
val_dataset = VOCSegmentation(args.data_path,
year="2012",
transforms=get_transform(train=False),
txt_name="val.txt")
num_workers = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size,
num_workers=num_workers,
shuffle=True,
pin_memory=True,
collate_fn=train_dataset.collate_fn)
val_loader = torch.utils.data.DataLoader(val_dataset,
batch_size=1,
num_workers=num_workers,
pin_memory=True,
collate_fn=val_dataset.collate_fn)
model = create_model(aux=args.aux, num_classes=num_classes)
model.to(device)
params_to_optimize = [ # 分别遍历backbone和classifier所有权重,提取并训练
{"params": [p for p in model.backbone.parameters() if p.requires_grad]},
{"params": [p for p in model.classifier.parameters() if p.requires_grad]}
]
if args.aux:
params = [p for p in model.aux_classifier.parameters() if p.requires_grad]
params_to_optimize.append({"params": params, "lr": args.lr * 10}) # 加入辅助分类器一起训练
optimizer = torch.optim.SGD(
params_to_optimize,
lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay
)
scaler = torch.cuda.amp.GradScaler() if args.amp else None
"""创建学习率更新策略,这里是每个step更新一次(不是每个epoch)"""
lr_scheduler = create_lr_scheduler(optimizer, len(train_loader), args.epochs, warmup=True)
if args.resume: # 是否为断点续连
checkpoint = torch.load(args.resume, map_location='cpu')
model.load_state_dict(checkpoint['model'])
optimizer.load_state_dict(checkpoint['optimizer'])
lr_scheduler.load_state_dict(checkpoint['lr_scheduler'])
args.start_epoch = checkpoint['epoch'] + 1
if args.amp:
scaler.load_state_dict(checkpoint["scaler"])
"""训练过程"""
start_time = time.time()
for epoch in range(args.start_epoch, args.epochs): # for循环遍历数据集
mean_loss, lr = train_one_epoch(model, optimizer, train_loader, device, epoch, # 训练一轮的过程
lr_scheduler=lr_scheduler, print_freq=args.print_freq, scaler=scaler)
"""验证过程"""
confmat = evaluate(model, val_loader, device=device, num_classes=num_classes)
val_info = str(confmat)
print(val_info)
# write into txt
with open(results_file, "a") as f:
# 记录每个epoch对应的train_loss、lr以及验证集各指标
train_info = f"[epoch: {epoch}]\n" \
f"train_loss: {mean_loss:.4f}\n" \
f"lr: {lr:.6f}\n"
f.write(train_info + val_info + "\n\n")
save_file = {"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"lr_scheduler": lr_scheduler.state_dict(),
"epoch": epoch,
"args": args}
if args.amp:
save_file["scaler"] = scaler.state_dict()
torch.save(save_file, "save_weights/model_{}.pth".format(epoch))
total_time = time.time() - start_time
total_time_str = str(datetime.timedelta(seconds=int(total_time)))
print("training time {}".format(total_time_str))
"""6 传入参数需要根据实际修改"""
def parse_args():
import argparse
parser = argparse.ArgumentParser(description="pytorch fcn training")
parser.add_argument("--data-path", default="/data/", help="VOCdevkit root")
parser.add_argument("--num-classes", default=20, type=int)
parser.add_argument("--aux", default=True, type=bool, help="auxilier loss")
parser.add_argument("--device", default="cuda", help="training device")
parser.add_argument("-b", "--batch-size", default=4, type=int)
parser.add_argument("--epochs", default=30, type=int, metavar="N",
help="number of total epochs to train")
parser.add_argument('--lr', default=0.0001, type=float, help='initial learning rate')
parser.add_argument('--momentum', default=0.9, type=float, metavar='M',
help='momentum')
parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float,
metavar='W', help='weight decay (default: 1e-4)',
dest='weight_decay')
parser.add_argument('--print-freq', default=10, type=int, help='print frequency')
parser.add_argument('--resume', default='', help='resume from checkpoint') # 中断后续点训练
parser.add_argument('--start-epoch', default=0, type=int, metavar='N',
help='start epoch')
# Mixed precision training parameters
parser.add_argument("--amp", default=False, type=bool,
help="Use torch.cuda.amp for mixed precision training")
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
if not os.path.exists("./save_weights"):
os.mkdir("./save_weights")
main(args)
文章出处登录后可见!
已经登录?立即刷新