MoCo代码分析 [自监督学习]


个人简介:南京邮电大学,计算机科学与技术,在读本科

前驱文章一《MoCo v1 文献研究 [自监督学习]》

前驱文章二《MoCo v2 文献研究 [自监督学习]》


文章目录


**源码地址**: https://github.com/facebookresearch/moco

一、代码中的 “ReadMe.md”

● 这里简单把 “ReadMe.md” 复制了下来,方便后面输入命令来上机跑。

MoCo: Momentum Contrast for Unsupervised Visual Representation Learning

MoCo代码分析 [自监督学习]

This is a PyTorch implementation of the MoCo paper:

@Article{he2019moco,
  author  = {Kaiming He and Haoqi Fan and Yuxin Wu and Saining Xie and Ross Girshick},
  title   = {Momentum Contrast for Unsupervised Visual Representation Learning},
  journal = {arXiv preprint arXiv:1911.05722},
  year    = {2019},
}

It also includes the implementation of the MoCo v2 paper:

@Article{chen2020mocov2,
  author  = {Xinlei Chen and Haoqi Fan and Ross Girshick and Kaiming He},
  title   = {Improved Baselines with Momentum Contrastive Learning},
  journal = {arXiv preprint arXiv:2003.04297},
  year    = {2020},
}

1.1 Preparation

Install PyTorch and ImageNet dataset following the official PyTorch ImageNet training code.

This repo aims to be minimal modifications on that code. Check the modifications by:

diff main_moco.py <(curl https://raw.githubusercontent.com/pytorch/examples/master/imagenet/main.py)
diff main_lincls.py <(curl https://raw.githubusercontent.com/pytorch/examples/master/imagenet/main.py)

1.2 Unsupervised Training

This implementation only supports multi-gpu, DistributedDataParallel training, which is faster and simpler; single-gpu or DataParallel training is not supported.

To do unsupervised pre-training of a ResNet-50 model on ImageNet in an 8-gpu machine, run:

python main_moco.py \
  -a resnet50 \
  --lr 0.03 \
  --batch-size 256 \
  --dist-url 'tcp://localhost:10001' --multiprocessing-distributed --world-size 1 --rank 0 \
  [your imagenet-folder with train and val folders]

This script uses all the default hyper-parameters as described in the MoCo v1 paper. To run MoCo v2, set --mlp --moco-t 0.2 --aug-plus --cos.

Note: for 4-gpu training, we recommend following the linear lr scaling recipe: --lr 0.015 --batch-size 128 with 4 gpus. We got similar results using this setting.

1.3 Linear Classification

With a pre-trained model, to train a supervised linear classifier on frozen features/weights in an 8-gpu machine, run:

python main_lincls.py \
  -a resnet50 \
  --lr 30.0 \
  --batch-size 256 \
  --pretrained [your checkpoint path]/checkpoint_0199.pth.tar \
  --dist-url 'tcp://localhost:10001' --multiprocessing-distributed --world-size 1 --rank 0 \
  [your imagenet-folder with train and val folders]

Linear classification results on ImageNet using this repo with 8 NVIDIA V100 GPUs :

pre-train
epochs
pre-train
time
MoCo v1
top-1 acc.
MoCo v2
top-1 acc.
ResNet-5020053 hours60.8±0.267.5±0.1

Here we run 5 trials (of pre-training and linear classification) and report mean±std: the 5 results of MoCo v1 are {60.6, 60.6, 60.7, 60.9, 61.1}, and of MoCo v2 are {67.7, 67.6, 67.4, 67.6, 67.3}.

1.4 Models

Our pre-trained ResNet-50 models can be downloaded as following:

epochsmlpaug+costop-1 acc.modelmd5
MoCo v120060.6downloadb251726a
MoCo v220067.7download59fd9945
MoCo v280071.1downloada04e12f8

1.5 Transferring to Object Detection

See ./detection.

1.6 License

This project is under the CC-BY-NC 4.0 license. See LICENSE for details.

1.7 See Also



二、main_moco.py —— 无监督训练主文件

整个无监督训练模型的代码框架如下

import ...

parser.add_argument(...)

def main():
	...

def main_worker(gpu, ngpus_per_node, args):
	...

def train(train_loader, model, criterion, optimizer, epoch, args):
	...

def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
	...

class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
	...
    def reset(self):
	...
    def update(self, val, n=1):
	...
    def __str__(self):
	...

class ProgressMeter(object):
    def __init__(self, num_batches, meters, prefix=""):
	...
    def display(self, batch):
	...
    def _get_batch_fmtstr(self, num_batches):
	...


def adjust_learning_rate(optimizer, epoch, args):
    """Decay the learning rate based on schedule"""
	...

def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
	...

if __name__ == '__main__':
    main()

● 接下来,一个一个地细剖。按着顺序来


2.1 关于“导入包”的代码

作者把包的引入分成了三类:基础库、Pytorch库、个人写的moco库。

● 作者的模型是用 PyTorch 实现的。python torch 又称 PyTorach,是一个以 Python 优先的深度学习框架,一个开源的 Python 机器学习库,用于自然语言处理等应用程序,不仅能够实现强大的 GPU 加速,同时还支持动态神经网络,这是现在很多主流框架比如 Tensorflow 等都不支持的。

moco.loadermoco.builder 是在 “moco” 文件夹下的两个子文件,分别是 “loader.py” 和 “builder.py”。

import argparse
import builtins
import math
import os
import random
import shutil
import time
import warnings

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.multiprocessing as mp
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

import moco.loader
import moco.builder

代码说明:“import ... as ...” 的意思是 “取别名”,比如对于 import torch.nn as nn 来说,后面你想调用 “torch.nn” 这个函数,只需要写 “nn” 即可。


2.2 关于“命令行参数”的代码

● 接下来所做的事情都是关于 “import argparse” 后做的事情。

argparse 是 python 自带的命令行参数解析包,可以用来方便地读取命令行参数。它的使用也比较简单。

model_names = sorted(name for name in models.__dict__
                     if name.islower() and not name.startswith("__")
                     and callable(models.__dict__[name]))

parser = argparse.ArgumentParser(description='PyTorch ImageNet Training')

代码说明
  ① models 是从 “import torchvision.models as models” 这里来的。torchvision.models 中为我们提供了已经训练好的模型,让我们可以加载之后,直接使用。
  ② 类的静态函数、类函数、普通函数、全局变量以及一些内置的属性都是放在 “类的__dict__” 里的。而 “对象的__dict__” 中存储了一些 self.xxx 等一些东西。
  ③ islower()的功能:检查文本中所有字符是否都小写。
  ④ startswith()的功能:判断字符串是否以指定字符或字符串开头。
  ⑤ callable()的功能:检查一个对象是否是可调用的。
  ⑥ sorted()的功能:返回重新排序的列表。(默认是升序)
  ⑦ model_names = sored(...) 所做的事情就是把 torchvision.models 中的 “models.__dict__” 中的每一个 “大成员” 中的 “所有小成员” 进行了重排(除开 “字符串开头为__的” 小成员名)
  ⑧ 导入 argparse 这个包后,作者调用了包中的 “ArgumentParser类” 生成了一个 parser 对象(好多博客中把这个叫做参数解析器),其中的 description 描述这个参数解析器是干什么的,当我们在命令行显示帮助信息的时候会看到 description 描述的信息。最后作者将其赋值给了 parser


● 要看懂下面三段代码,看一篇这个即可:官方的 argparse 用法

函数 add_argument(name or flags...[, action][, nargs][, const][, default][, type][, choices][, required][, help][, metavar][, dest]) 的功能:定义单个的命令行参数应当如何解析。每个形参都在下面有它自己更多的描述,代码中用到的有
  ① name or flags:一个命名或者一个选项字符串的列表,例如 foo-f, --foo
  ② action:当参数在命令行中出现时使用的动作基本类型。
  ③ nargs:命令行参数应当消耗的数目。
  ④ default:当参数未在命令行中出现并且也不存在于命名空间对象时所产生的值。
  ⑤ type:命令行参数应当被转换成的类型。
  ⑥ choices:可用的参数的容器。
  ⑦ help:一个此选项作用的简单描述。
  ⑧ metavar:在使用方法消息中使用的参数值示例。
  ⑨ dest:被添加到 parse_args() 所返回对象上的属性名。

# 训练数据的目录
parser.add_argument('data', metavar='DIR', help='path to dataset')

# 选择训练模型
parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet50', choices=model_names,
                    help='model architecture: ' + ' | '.join(model_names) + ' (default: resnet50)')

# 还不太清楚...后期补充
parser.add_argument('-j', '--workers', default=32, type=int, metavar='N', help='number of data loading workers (default: 32)')

# 迭代次数
parser.add_argument('--epochs', default=200, type=int, metavar='N', help='number of total epochs to run')

# 继承的迭代轮数(默认为0), 也就是说, 可以持续训练, 上一天关机保存的 模型, 可继续加载, 继续训练, 只不过迭代的轮数要写入(不然默认为0)
parser.add_argument('--start-epoch', default=0, type=int, metavar='N', help='manual epoch number (useful on restarts)')

# 批量大小
parser.add_argument('-b', '--batch-size', default=256, type=int, metavar='N',
                    help='mini-batch size (default: 256), this is the total '
                         'batch size of all GPUs on the current node when '
                         'using Data Parallel or Distributed Data Parallel')

# 学习速率            
parser.add_argument('--lr', '--learning-rate', default=0.03, type=float, metavar='LR', help='initial learning rate', dest='lr')

# 还不太清楚...后期补充          
parser.add_argument('--schedule', default=[120, 160], nargs='*', type=int, help='learning rate schedule (when to drop lr by 10x)')

# 动量更新值
parser.add_argument('--momentum', default=0.9, type=float, metavar='M', help='momentum of SGD solver')

# 权重衰减速度
parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float, metavar='W', help='weight decay (default: 1e-4)',
                    dest='weight_decay')
                    
# 打印训练信息的频率
parser.add_argument('-p', '--print-freq', default=10, type=int, metavar='N', help='print frequency (default: 10)')

代码说明
  ① metavar 有助于 “生成帮助信息”,即方便我们对照着输入 “命令行参数”,我们可以不管。
  ② 默认情况下,解析器会将 “命令行参数” 当作简单字符串读入。 然而,“命令行字符串” 经常应当被解读为其他类型,例如 floatintadd_argument()type 关键字允许执行任何必要的类型检查和类型转换。如果 type 关键字使用了 default 关键字,则类型转换器仅会在默认值为字符串时被应用。
  ③ 有默认值,即包含有 default 的即是论文中找到的 “最佳参数值”。


# 模型地址(默认一开始没有), 如果你上次存了训练后的模型, 只需呀再利用这个 命令 写入 相应的地址, 即可继续加载该模型, 继续在其基础上训练
parser.add_argument('--resume', default='', type=str, metavar='PATH', help='path to latest checkpoint (default: none)')

# 还不太清楚...后期补充
parser.add_argument('--world-size', default=-1, type=int, help='number of nodes for distributed training')

# 还不太清楚...后期补充
parser.add_argument('--rank', default=-1, type=int, help='node rank for distributed training')

# 还不太清楚...后期补充
parser.add_argument('--dist-url', default='tcp://224.66.41.62:23456', type=str, help='url used to set up distributed training')

# 分布式的后端
parser.add_argument('--dist-backend', default='nccl', type=str, help='distributed backend')

# 初始化训练的(随机)种子
parser.add_argument('--seed', default=None, type=int, help='seed for initializing training. ')

# 是否使用 GPU (默认有)
parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.')

# 使用多处理分布式训练启动, 每个节点有 N 个进程,即 N 个 gpu。无论是单节点还是多节点数据并行训练,这是使用 PyTorch 的最快方式。
# action='store_true' 的意思是, 系统默认是 “启动的”
parser.add_argument('--multiprocessing-distributed', action='store_true',
                    help='Use multi-processing distributed training to launch '
                         'N processes per node, which has N GPUs. This is the '
                         'fastest way to use PyTorch for either single node or '
                         'multi node data parallel training')
# MoCo v1 默认的配置:
parser.add_argument('--moco-dim', default=128, type=int, help='feature dimension (default: 128)')
parser.add_argument('--moco-k', default=65536, type=int, help='queue size; number of negative keys (default: 65536)')
parser.add_argument('--moco-m', default=0.999, type=float, help='moco momentum of updating key encoder (default: 0.999)')
parser.add_argument('--moco-t', default=0.07, type=float, help='softmax temperature (default: 0.07)')

# MoCo v2 新增的配置:
parser.add_argument('--mlp', action='store_true', help='use mlp head')
parser.add_argument('--aug-plus', action='store_true', help='use moco v2 data augmentation')
parser.add_argument('--cos', action='store_true', help='use cosine lr schedule')


2.3 关于“main()函数”的代码

def main():
    args = parser.parse_args()

	# 一般不会执行
    if args.seed is not None:	# 如果要开启“系统随机”,这样会导致训练速度会大大下降
        random.seed(args.seed)
        torch.manual_seed(args.seed)
        cudnn.deterministic = True
        warnings.warn('You have chosen to seed training. '
                      'This will turn on the CUDNN deterministic setting, '
                      'which can slow down your training considerably! '
                      'You may see unexpected behavior when restarting '
                      'from checkpoints.')
	
	# 一般不会执行
    if args.gpu is not None:	
        warnings.warn('You have chosen a specific GPU. This will completely '
                      'disable data parallelism.')
	# 获取系统环境变量中 分布式训练 的值
    if args.dist_url == "env://" and args.world_size == -1:
        args.world_size = int(os.environ["WORLD_SIZE"])

    args.distributed = args.world_size > 1 or args.multiprocessing_distributed

    ngpus_per_node = torch.cuda.device_count()	# 获取 gpu 数量
    
    if args.multiprocessing_distributed:
        # Since we have ngpus_per_node processes per node, the total world_size needs to be adjusted accordingly
        # 由于每个节点都有 ngpus_per_node 个进程,因此需要相应地调整 world_size 的总数
        args.world_size = ngpus_per_node * args.world_size
        
        # Use torch.multiprocessing.spawn to launch distributed processes: the main_worker process function
        # 使用 torch.multiprocessing.spawn 启动分布式进程: main_worker 进程函数
        mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
    else:
        # Simply call main_worker function
        # 如果不适用分布式训练, 那么就简单地调用 main_worker 进程函数
        main_worker(args.gpu, ngpus_per_node, args)

代码说明
  ① 函数 parse_args(args=None, namespace=None) 的功能:将“参数字符串”(就是在运行该程序时,我们从键盘输入到这里面的)转换为 “对象” 并将其设为命名空间的属性。 返回带有成员的命名空间。
  ② mp.spawn() 第一个参数是一个函数,这个函数将执行训练的所有步骤。从这一步开始,Python 将建立多个进程,每个进程都会执行 main_worker() 函数。 第二个参数是开启的进程数目。第三个参数是传给 main_worker() 的函数实参。


2.4 关于“main_worker()函数”的代码

def main_worker(gpu, ngpus_per_node, args):
    args.gpu = gpu

    # suppress printing if not master
    if args.multiprocessing_distributed and args.gpu != 0:
        def print_pass(*args):
            pass

        builtins.print = print_pass

    if args.gpu is not None:
        print("Use GPU: {} for training".format(args.gpu))

    if args.distributed:
        if args.dist_url == "env://" and args.rank == -1:
            args.rank = int(os.environ["RANK"])
        if args.multiprocessing_distributed:
            # For multiprocessing distributed training, rank needs to be the global rank among all the processes
            # 对于多处理分布式训练,RANK 必须是所有流程中的全局 RANK
            args.rank = args.rank * ngpus_per_node + gpu
        dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                                world_size=args.world_size, rank=args.rank)
    # create model
    print("=> creating model '{}'".format(args.arch))
	# 创建一个名为 “model” 的 MoCo 对象, 并传入一系列参数
    model = moco.builder.MoCo(models.__dict__[args.arch], args.moco_dim, args.moco_k, args.moco_m, args.moco_t, args.mlp)
    print(model) # 打印模型

	# 有 cuda 的话先查看是否打算用分布式计算(distributed computing)
    if args.distributed:
        # For multiprocessing distributed, DistributedDataParallel constructor
        # should always set the single device scope, otherwise,
        # DistributedDataParallel will use all available devices.
        # 对于分布式多处理,DistributedDataParallel 构造函数应始终设置单个设备范围, 否则 DistributedDataParallel 将使用所有可用设备。
        if args.gpu is not None:
            torch.cuda.set_device(args.gpu)
            model.cuda(args.gpu)
            # When using a single GPU per process and per
            # DistributedDataParallel, we need to divide the batch size
            # ourselves based on the total number of GPUs we have
            # 当每个进程和每个 DistributedDataParallel 使用单个 GPU 时, 我们需要根据拥有的 GPU 总数自行划分批大小(batch size)
            args.batch_size = int(args.batch_size / ngpus_per_node)
            args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
            model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
        else: 	# 否则 DistributedDataParallel 将划分 batch_size 并将其分配给所有可用的 GPU
            model.cuda()
            # DistributedDataParallel will divide and allocate batch_size to all
            # available GPUs if device_ids are not set
            model = torch.nn.parallel.DistributedDataParallel(model)
    
    # 如果不打算采用分布式计算,且存在人为指定的gpu
    elif args.gpu is not None:
        torch.cuda.set_device(args.gpu)
        model = model.cuda(args.gpu)
        # comment out the following line for debugging
        raise NotImplementedError("Only DistributedDataParallel is supported.")		# raise: 用于抛出异常
        
    else:
        # AllGather implementation (batch shuffle, queue update, etc.) in this code only supports DistributedDataParallel.
        raise NotImplementedError("Only DistributedDataParallel is supported.")		# raise: 用于抛出异常

    # define loss function (criterion) and optimizer
    # 定义损失函数(标准)和优化器
    criterion = nn.CrossEntropyLoss().cuda(args.gpu)
    optimizer = torch.optim.SGD(model.parameters(), args.lr, momentum=args.momentum, weight_decay=args.weight_decay)

    # optionally resume from a checkpoint
    # 如果可以从一个 “检查点” 恢复。“检查点”: 每一次迭代都会产生一个, 它即是训练模型的权重, 每隔一轮都会有所更新。
    if args.resume:
        if os.path.isfile(args.resume):	 	# os.path.isfile(): 判断某一对象(需提供绝对路径)是否为文件
            print("=> loading checkpoint '{}'".format(args.resume))
            if args.gpu is None:	# 如果没有指定 gpu
                checkpoint = torch.load(args.resume)
            else:	# 如果指定了 gpu
                # Map model to be loaded to specified single gpu.
                # 加载模型到指定的单个 GPU 上
                loc = 'cuda:{}'.format(args.gpu)
                checkpoint = torch.load(args.resume, map_location=loc)
            # 从 checkpoint 里导出 epoch
            args.start_epoch = checkpoint['epoch']
            # 把 model 的 parameter, 即存有这些 parameter 的 'state_dict' 加载到当前 model 里来
            model.load_state_dict(checkpoint['state_dict'])	   
            # 把 optimizer 的信息, 即存有这些信息的 'optimizer' 加载到当前 optimizer 里来
            optimizer.load_state_dict(checkpoint['optimizer'])
            print("=> loaded checkpoint '{}' (epoch {})".format(args.resume, checkpoint['epoch']))
        else:
            print("=> no checkpoint found at '{}'".format(args.resume))

    # 该标志允许您启用内置的 cudnn 自动调谐器,以找到用于硬件的最佳算法。
    # 原理: 在 cudnn 中启用基准测试模式。 只要网络输入大小不变,基准测试模式就很好。 
    # 这样, cudnn 将为该特定配置寻找最佳算法集(这需要一些时间)。 这会使得运行时间加快。
    # 但是, 如果你的输入大小在每次迭代中都发生变化,则 cudnn 将在每次出现新大小时进行基准测试, 这可能会导致运行时性能变差。
    cudnn.benchmark = True

    # Data loading code
    # 加载数据
    traindir = os.path.join(args.data, 'train')
    # normalize 的初始化
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
	# 如果使用 MoCo v2
    if args.aug_plus:
        # MoCo v2's aug: similar to SimCLR https://arxiv.org/abs/2002.05709
        augmentation = [
            transforms.RandomResizedCrop(224, scale=(0.2, 1.)),	# 随机剪裁
            transforms.RandomApply([	# 随机颜色抖动
                transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)  # not strengthened
            ], p=0.8),
            transforms.RandomGrayscale(p=0.2),	# 随机灰度
            transforms.RandomApply([moco.loader.GaussianBlur([.1, 2.])], p=0.5),	# 新加的数据增强手段:『高斯模糊』
            transforms.RandomHorizontalFlip(),	# 随机翻转
            transforms.ToTensor(),
            normalize
        ]
    else:
        # MoCo v1's aug: the same as InstDisc https://arxiv.org/abs/1805.01978
        augmentation = [
            transforms.RandomResizedCrop(224, scale=(0.2, 1.)),
            transforms.RandomGrayscale(p=0.2),
            transforms.ColorJitter(0.4, 0.4, 0.4, 0.4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize
        ]
	
	# moco.loader.TwoCropsTransform(): 将一个图像的两个随机剪裁作为“查询”和“键”。【该函数是个人写的, 在 loader.py 中】
	# transforms.Compose(): 将多个 transform 组合起来使用
    train_dataset = datasets.ImageFolder(traindir, moco.loader.TwoCropsTransform(transforms.Compose(augmentation)))

    # 有打算用分布式计算的话就定义 train_sampler 这个 object.
    # train_sampler 能在生成数据加载器(dataloader)时负责把 training data 划分到不同 gpu 上
    if args.distributed:
        train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)	# 训练时采用的 “下采样” 手段
    else:
        train_sampler = None

	# 为了训练设置的 data_loader 对象(也可以说是一个变量)
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
        num_workers=args.workers, pin_memory=True, sampler=train_sampler, drop_last=True)
        
    # 分布式训练开始
    for epoch in range(args.start_epoch, args.epochs):	
        # 如果采用分布式训练,告诉 train_sampler 要训练多少个 epoch, 它好按周期来划分数据给到每次更新参数时用的 data_loader
        if args.distributed:
            train_sampler.set_epoch(epoch)
         
        adjust_learning_rate(optimizer, epoch, args)

        # train for one epoch
        # 一次迭代训练
        train(train_loader, model, criterion, optimizer, epoch, args)	# 调用“2.4的train()函数” 

        if not args.multiprocessing_distributed or (args.multiprocessing_distributed
                                                    and args.rank % ngpus_per_node == 0):
            save_checkpoint({		# 每一次迭代都保存 “检查点”
                'epoch': epoch + 1,
                'arch': args.arch,
                'state_dict': model.state_dict(),
                'optimizer': optimizer.state_dict(),
            }, is_best=False, filename='checkpoint_{:04d}.pth.tar'.format(epoch))

补充代码说明ToTensor()的功能:将矩阵『重塑(reshape)』为 (H, W, C) 的 “nump.ndarray 对象” 或将 img 『重塑(reshape)』转为 (C, H, W)的 “tensor 对象”,其将每一个数值归一化到 [0,1],其归一化方法比较简单,直接除以 255 即可。


2.5 关于“train()函数”的代码

def train(train_loader, model, criterion, optimizer, epoch, args):
    batch_time = AverageMeter('Time', ':6.3f')	# 创建一个名为 “Time” 的“AverageMeter对象”, 用于计算和存储平均值和当前值
    data_time = AverageMeter('Data', ':6.3f')	# 6.3f 的意思是: 固定输出6个占位+显示到小数点后3位
    losses = AverageMeter('Loss', ':.4e')
    top1 = AverageMeter('Acc@1', ':6.2f')
    top5 = AverageMeter('Acc@5', ':6.2f')

	# 创建一个“ProgressMeter对象”, 用于实时输出训练的中间结果
    progress = ProgressMeter( len(train_loader), [batch_time, data_time, losses, top1, top5], prefix="Epoch: [{}]".format(epoch))

    # switch to train mode
    # 切换到模型的训练模式
    model.train()

    end = time.time()	# 返回当前时间的时间戳(即从1970年1月1日至今经过的浮点秒数)
    
	# 从用来训练的dataloder里提取数据
    for i, (images, _) in enumerate(train_loader): 
        # measure data loading time
        # 记录数据的加载时间
        data_time.update(time.time() - end)
		
		 # 如果有指定 gpu,就把图片数据放入到指定gpu中,并采用非阻塞通信
        if args.gpu is not None:
            images[0] = images[0].cuda(args.gpu, non_blocking=True)
            images[1] = images[1].cuda(args.gpu, non_blocking=True)

        # compute output
        # 计算 q 与 k 的输出结果
        output, target = model(im_q=images[0], im_k=images[1])
        loss = criterion(output, target)	# 计算对比损失

        # acc1/acc5 are (K+1)-way contrast classifier accuracy
        # measure accuracy and record loss
        # Top-1/Top-5是针对于 K 种分类的准确度, 并会记录“对比损失”
        acc1, acc5 = accuracy(output, target, topk=(1, 5))	# 计算指定 k 值最高的准确度
        losses.update(loss.item(), images[0].size(0))	# images.size(0)表示图片数量
        top1.update(acc1[0], images[0].size(0))	
        top5.update(acc5[0], images[0].size(0))

        # compute gradient and do SGD step
        # 在 PyTorch 中, 我们需要在开始进行反向传播之前将梯度设置为零,
        # 因为 PyTorch 会在随后的向后传递中累积梯度。
        optimizer.zero_grad()
        # loss.backward(): 为每个具有 require_grad = True 的参数 x 计算 d(loss) / dx。 d(...)/dx是对“...”求导的意思
        # 这些对于每个参数 x 都累积到 x.grad 中。 伪代码: x.grad + = d(loss) / dx
        loss.backward()
        # optimizer.step(): 使用 x.grad 来更新 x 的值。 例如: SGD 优化器执行以下操作: x += -lr * x.grad 
        optimizer.step()

        # measure elapsed time
        # 计算训练每一批次花费的时间
        batch_time.update(time.time() - end)
        end = time.time()	# 重置 “批次开始训练的” 时间点

		# 按照某一设置好的 “打印频率” 来打印进度信息
        if i % args.print_freq == 0:
            progress.display(i)

补充代码说明enumerate()的功能:对于一个可遍历的对象(如列表、字符串),enumerate将其组成一个索引序列, 利用它可以同时获得索引和值。这个比用 range() 方便一点。


2.6 关于“AverageMeter类”的代码

class AverageMeter(object):
    """Computes and stores the average and current value"""
	# 计算和存储平均值和当前值(关于准确率的), 这个是深度学习固定的模板
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)

补充代码说明fmt 是 “format” 的缩写,用于格式化输出。


2.7 关于“ProgressMeter类”的代码

class ProgressMeter(object):
	# 这个也是深度学习固定的模板, 用于辅助“中间结果输出”
    def __init__(self, num_batches, meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def display(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'

补充代码说明a // b 的意思是对除以 b 的结果向负无穷方向取整后的数。例如:7 // 2 = 3。“prefix” 是用来输出时添加的前缀。


2.8 关于“adjust_learning_rate()函数”的代码

def adjust_learning_rate(optimizer, epoch, args):
    """Decay the learning rate based on schedule"""
    # 根据训练进度递减学习速率
    # 这个也是深度学习固定的模板
    lr = args.lr
    if args.cos:  # cosine lr schedule
        lr *= 0.5 * (1. + math.cos(math.pi * epoch / args.epochs))
    else:  # stepwise lr schedule
        for milestone in args.schedule:
            lr *= 0.1 if epoch >= milestone else 1.
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


2.9 关于“accuracy()函数”的代码

def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    # 计算指定 k 值的最高准确度
    with torch.no_grad():	# 不进行梯度下降
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].contiguous().view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

补充代码说明:原来版本的代码是没有 .contiguous() 的,加上才能跑。


2.10 最后一点

if __name__ == '__main__':
    main()

代码说明:这样写的目的是使得“文件只能作为脚本才能(直接)执行,而 import 到其他脚本中是不会被执行的。



三、builder.py —— MoCo模型建立的文件

3.1 关于“MoCo类”的代码

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import torch
import torch.nn as nn

class MoCo(nn.Module):
    """
    Build a MoCo model with: a query encoder, a key encoder, and a queue
    使用查询编码器、键编码器和队列构建 MoCo 模型
    https://arxiv.org/abs/1911.05722
    """
    def __init__(self, base_encoder, dim=128, K=65536, m=0.999, T=0.07, mlp=False):
        """
        dim: feature dimension (default: 128)
        K: queue size; number of negative keys (default: 65536)
        m: moco momentum of updating key encoder (default: 0.999)
        T: softmax temperature (default: 0.07)
        dim: 特征维度(默认: 128)
        K: 查询字典的队列大小,也就是负样本的数量(默认: 65536)
        m: 键编码器更新的 Moco 动量值(默认: 0.999)
        T: softmax 的温度参数(默认: 0.07)
        """
        # nn.Module 子类的函数必须在构建函数中执行父类的构造函数
        # 下式等价于 nn.Module.__init__(self)
        super(MoCo, self).__init__()

        self.K = K
        self.m = m
        self.T = T

        # create the encoders
        # num_classes is the output fc dimension
        # Num_classes 是全连接层的输出维度
        self.encoder_q = base_encoder(num_classes=dim)
        self.encoder_k = base_encoder(num_classes=dim)

        if mlp:  # hack: brute-force replacement
            dim_mlp = self.encoder_q.fc.weight.shape[1]
            # nn.Linear(): 从输入输出的张量的 shape 角度来理解, 相当于一个输入为[batch_size, in_features]的张量变换
            # 成了[batch_size, out_features]的输出张量。
            # nn.Sequential(): 可以将一系列的操作打包, 这些操作可以包括 Conv2d()、ReLU()、Maxpool2d()等, 
            # 打包后方便调用吧,就相当于是一个黑箱,forward() 时调用这个黑箱就行了。
            self.encoder_q.fc = nn.Sequential(nn.Linear(dim_mlp, dim_mlp), nn.ReLU(), self.encoder_q.fc)
            self.encoder_k.fc = nn.Sequential(nn.Linear(dim_mlp, dim_mlp), nn.ReLU(), self.encoder_k.fc)

		# zip(): 该函数返回一个以元组为元素的列表,其中第 i 个元组包含每个参数序列的第 i 个元素。
        for param_q, param_k in zip(self.encoder_q.parameters(), self.encoder_k.parameters()):
            param_k.data.copy_(param_q.data)  # initialize. “键编码器”初始化时的参数直接拷贝“查询编码器”的
            param_k.requires_grad = False  # not update by gradient. “键编码器”不进行反向传播更新权重

        # create the queue
        # self.register_buffer(‘my_buffer’, self.tensor):my_buffer 是名字(str类型); self.tensor 是需要进行 register 登记的张量。
        # 这样我们就得到了一个新的张量, 这个张量会保存在 model.state_dict() 中,也就可以随着模型一起通过 .cuda() 复制到 gpu 上。
        self.register_buffer("queue", torch.randn(dim, K))
        # nn.functional.normalize(): 将某一个维度除以那个维度对应的范数
        self.queue = nn.functional.normalize(self.queue, dim=0)

        self.register_buffer("queue_ptr", torch.zeros(1, dtype=torch.long))

	# 使用装饰器 @torch.no_gard() 修饰的函数,在调用时不允许计算梯度
    @torch.no_grad()
    def _momentum_update_key_encoder(self):
        """
        Momentum update of the key encoder
        动量键编码器的更新
        """
        for param_q, param_k in zip(self.encoder_q.parameters(), self.encoder_k.parameters()):
            param_k.data = param_k.data * self.m + param_q.data * (1. - self.m)

    @torch.no_grad()
    def _dequeue_and_enqueue(self, keys):	# 出队和入队操作
        # gather keys before updating queue
        # 在更新队列之前获取键
        keys = concat_all_gather(keys)	# 个人写的函数

        batch_size = keys.shape[0]

        ptr = int(self.queue_ptr)	# ptr 是指针(pointer)的缩写
        # assert: 检查条件,不符合就终止程序
        assert self.K % batch_size == 0  # for simplicity

        # replace the keys at ptr (dequeue and enqueue)
        self.queue[:, ptr:ptr + batch_size] = keys.T
        ptr = (ptr + batch_size) % self.K  # move pointer

        self.queue_ptr[0] = ptr

    @torch.no_grad()
    def _batch_shuffle_ddp(self, x):
        """
        Batch shuffle, for making use of BatchNorm.
        *** Only support DistributedDataParallel (DDP) model. ***
        结合批量归一化进行“批量洗牌”, 仅支持分布式数据并行(DDP)计算模型。
        """
        # gather from all gpus
        # 获取所有 gpu 的信息
        batch_size_this = x.shape[0]
        x_gather = concat_all_gather(x)
        batch_size_all = x_gather.shape[0]

        num_gpus = batch_size_all // batch_size_this

        # random shuffle index
        # 随机洗牌torch.randperm(): 将0~n-1(包括0和n-1)随机打乱后获得的数字序列,函数名是 random permutation 的缩写
        idx_shuffle = torch.randperm(batch_size_all).cuda()	 

        # broadcast to all gpus
        # 把洗好的序列重新传回所有 gpu
        torch.distributed.broadcast(idx_shuffle, src=0)

        # index for restoring
        # torch.argsort(): 一般结合 torch.sort() 使用, 用于返回重排后的索引
        idx_unshuffle = torch.argsort(idx_shuffle)

        # shuffled index for this gpu
        gpu_idx = torch.distributed.get_rank()	# 返回当前进程组的排名
        idx_this = idx_shuffle.view(num_gpus, -1)[gpu_idx]

        return x_gather[idx_this], idx_unshuffle

    @torch.no_grad()
    def _batch_unshuffle_ddp(self, x, idx_unshuffle):
        """
        Undo batch shuffle.
        *** Only support DistributedDataParallel (DDP) model. ***
        不做“批量洗牌”操作
        """
        # gather from all gpus
        batch_size_this = x.shape[0]
        x_gather = concat_all_gather(x)
        batch_size_all = x_gather.shape[0]

        num_gpus = batch_size_all // batch_size_this

        # restored index for this gpu
        gpu_idx = torch.distributed.get_rank()
        idx_this = idx_unshuffle.view(num_gpus, -1)[gpu_idx]

        return x_gather[idx_this]

	# 模型的前向传播
    def forward(self, im_q, im_k):
        """
        Input:
            im_q: a batch of query images
            im_k: a batch of key images
        Output:
            logits, targets
        """

        # compute query features
        # 计算“查询”的特征
        q = self.encoder_q(im_q)  # queries: NxC, N 指有多少张图片, C 指有多少特征(维度)
        q = nn.functional.normalize(q, dim=1)

        # compute key features
        # 计算“键”的特征
        with torch.no_grad():  # no gradient to keys. 将不用计算梯度的变量放在 with torch.no_grad() 里
            self._momentum_update_key_encoder()  # update the key encoder, 更新“键”编码器

            # shuffle for making use of BN
            im_k, idx_unshuffle = self._batch_shuffle_ddp(im_k)

            k = self.encoder_k(im_k)  # keys: NxC
            k = nn.functional.normalize(k, dim=1)

            # undo shuffle
            k = self._batch_unshuffle_ddp(k, idx_unshuffle)

        # compute logits
        # Einstein sum is more intuitive
        # positive logits: Nx1
        l_pos = torch.einsum('nc,nc->n', [q, k]).unsqueeze(-1)	# 计算“查询(query)”与“键,即正样本(key)”之间的相似度
        # negative logits: NxK
        l_neg = torch.einsum('nc,ck->nk', [q, self.queue.clone().detach()])	#计算“查询(query)”与“负样本(来自队列Queue)”之间的相似度

        # logits: Nx(1+K)
        # torch.cat(): 是将两个张量(tensor)拼接在一起,cat 是 concatnate 的意思,即拼接,联系在一起。
        logits = torch.cat([l_pos, l_neg], dim=1)

        # apply temperature
        # 除以一个温度参数
        logits /= self.T

        # labels: positive key indicators
        # 建立一个 “标签矩阵”, 形状和 logits 一样, 矩阵中元素的数据类型为 torch.long
        labels = torch.zeros(logits.shape[0], dtype=torch.long).cuda()

        # dequeue and enqueue
        # 出队和入队操作
        self._dequeue_and_enqueue(k)

        return logits, labels


3.2 关于“concat_all_gather()函数”的代码

# utils
# 常用工具
@torch.no_grad()
def concat_all_gather(tensor):
    """
    Performs all_gather operation on the provided tensors.
    *** Warning ***: torch.distributed.all_gather has no gradient.
    """
    # torch.ones_like(): 根据给定张量, 生成与其形状相同的全 1 张量(即矩阵).
    tensors_gather = [torch.ones_like(tensor)
        for _ in range(torch.distributed.get_world_size())]
    torch.distributed.all_gather(tensors_gather, tensor, async_op=False)

    output = torch.cat(tensors_gather, dim=0)
    return output


四、loader.py —— 辅助图片载入的文件

4.1 关于“TwoCropsTransform类”的代码

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
from PIL import ImageFilter
import random

class TwoCropsTransform:
    """Take two random crops of one image as the query and key."""
	# 将一个图像的两个随机裁剪的图片作为“查询”和“键”
    def __init__(self, base_transform):
        self.base_transform = base_transform

    def __call__(self, x):
        q = self.base_transform(x)
        k = self.base_transform(x)
        return [q, k]


4.2 关于“GaussianBlur类”的代码

class GaussianBlur(object):
    """Gaussian blur augmentation in SimCLR https://arxiv.org/abs/2002.05709"""
	# 『高斯模糊增强(Gaussian blur augmentation)』的原理详见 SimCLR
    def __init__(self, sigma=[.1, 2.]):
        self.sigma = sigma

    def __call__(self, x):
        sigma = random.uniform(self.sigma[0], self.sigma[1])
        x = x.filter(ImageFilter.GaussianBlur(radius=sigma))
        return x

代码说明:MoCo v1 使用 RandomResizedCrop()RandomGrayscale()ColorJitter()RandomHorizontalFlip() 这几种数据增强方式,到了 MoCo v2 又增加了:transforms.RandomApply([moco.loader.GaussianBlur([.1, 2.])], p=0.5) 这种方式。



五、main_lincls.py —— 评估模型建立的文件

● “lincls” 是 “Linear Classification” 的缩写。

整个评估模型的代码框架如下

import ...

parser.add_argument(...)

def main():
	...
	
def main_worker(gpu, ngpus_per_node, args):
	...
	
def train(train_loader, model, criterion, optimizer, epoch, args):
	...
	
def validate(val_loader, model, criterion, args):
	...
	
def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
	...
	
def sanity_check(state_dict, pretrained_weights):
    """
    Linear classifier should not change any weights other than the linear layer.
    This sanity check asserts nothing wrong happens (e.g., BN stats updated).
    """
	...

class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
		...
    def reset(self):
		...
    def update(self, val, n=1):
		...
    def __str__(self):
		...

class ProgressMeter(object):
    def __init__(self, num_batches, meters, prefix=""):
		...
    def display(self, batch):
		...
    def _get_batch_fmtstr(self, num_batches):
		...

def adjust_learning_rate(optimizer, epoch, args):
    """Decay the learning rate based on schedule"""
	...

def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
	...

if __name__ == '__main__':
    main()

● Linear Evaluation 的代码整体结构和 Unsupervised Pre-training 是一致的,但是在做 Linear Evaluation 时要注意冻结 Encoder 的参数不更新,而只更新最后分类器的参数即可。代码表示如下

def main_worker(gpu, ngpus_per_node, args):
    global best_acc1
    args.gpu = gpu
	...
    # create model
    ...

    # freeze all layers but the last fc
    # 冻结 Encoder 的参数不更新,而只更新最后分类器的参数
    for name, param in model.named_parameters():
        if name not in ['fc.weight', 'fc.bias']:	
            param.requires_grad = False
    # init the fc layer
    # 初始化分类器的参数
    model.fc.weight.data.normal_(mean=0.0, std=0.01)
    model.fc.bias.data.zero_()
    ...
    
    # load from pre-trained, before DistributedDataParallel constructor
    # 下载预训练模型
    if args.pretrained:
        if os.path.isfile(args.pretrained):
            print("=> loading checkpoint '{}'".format(args.pretrained))
            checkpoint = torch.load(args.pretrained, map_location="cpu")

            # rename moco pre-trained keys
            state_dict = checkpoint['state_dict']
            for k in list(state_dict.keys()):
                # retain only encoder_q up to before the embedding layer
                # 装入所有层之前保留“查询编码器”
                if k.startswith('module.encoder_q') and not k.startswith('module.encoder_q.fc'):
                    # remove prefix
                    state_dict[k[len("module.encoder_q."):]] = state_dict[k]
                # delete renamed or unused k
                del state_dict[k]

            args.start_epoch = 0
            msg = model.load_state_dict(state_dict, strict=False)
            assert set(msg.missing_keys) == {"fc.weight", "fc.bias"}

            print("=> loaded pre-trained model '{}'".format(args.pretrained))
        else:
            print("=> no checkpoint found at '{}'".format(args.pretrained))

	...

    # optimize only the linear classifier
    # 只优化线性分类器
    parameters = list(filter(lambda p: p.requires_grad, model.parameters()))
    assert len(parameters) == 2  # fc.weight, fc.bias
    # 优化器 optimizer 作用的参数为 parameters, 它只包含分类器 fc 的 weight 和 bias 这两部分。
    optimizer = torch.optim.SGD(parameters, args.lr,
                                momentum=args.momentum,
                                weight_decay=args.weight_decay)

	...

    # Data loading code
    # 把训练集和测试集放进来
    traindir = os.path.join(args.data, 'train')
    valdir = os.path.join(args.data, 'val')
	...

    if args.evaluate:
        validate(val_loader, model, criterion, args)
        return

    for epoch in range(args.start_epoch, args.epochs):
        if args.distributed:
            train_sampler.set_epoch(epoch)
        adjust_learning_rate(optimizer, epoch, args)

        # train for one epoch
        # 训练一轮
        train(train_loader, model, criterion, optimizer, epoch, args)

        # evaluate on validation set
        # 在测试集上进行评估
        acc1 = validate(val_loader, model, criterion, args)

        # remember best acc@1 and save checkpoint
        # 保存 top-1 的准确度, 随后保存“寄放点”(代码在...中)
        is_best = acc1 > best_acc1
        best_acc1 = max(acc1, best_acc1)

		...

补充说明一:另外为了确保除了分类器 fc 的 weight 和 bias 的部分,其余所有参数不发生改变,作者使用了这个 sanity_check() 函数。这个函数有 2 个输入:state_dict( Linear Evaluation 进行了一个 Epoch 之后的模型 ) 和 pretrained_weights (预训练权重的存放文件夹)。先从 pretrained_weights 的位置导入权重,命名为 state_dict_pre
  接下来通过 assert ((state_dict[k].cpu() == state_dict_pre[k_pre]).all()) 比较 state_dict 的权重和 pretrained_weights 是不是一致的,即 Encoder 的权重有没有发生变化。如果有变化就会打印 “k is changed in linear classifier training”。

补充说明二:“main_lincls.py” 比 “main_moco.py” 多了一个计算测试集的准确率的 validate() 函数,剩下的部分代码差不多一致的。



▶本文参考附录

MoCo v1原论文地址:https://arxiv.org/pdf/2003.04297.pdf.

MoCo v2原论文地址:https://arxiv.org/pdf/2003.04297.pdf.

[1] 《Self-Supervised Learning 超详细解读 (四):MoCo系列解读 (1)》.

[2] 《ImageNet distributed training in PyTorch》.

[3] 《argparse — 命令行选项、参数和子命令解析器¶》.


⭐️ ⭐️

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
扎眼的阳光的头像扎眼的阳光普通用户
上一篇 2023年2月23日 下午12:30
下一篇 2023年2月23日 下午12:31

相关推荐

此站出售,如需请站内私信或者邮箱!