MixNet实战:使用MixNet实现图像分类(二)


在上一篇文章中完成了前期的准备工作,见链接:
MixNet实战:使用MixNet实现图像分类(一)
这篇主要是讲解如何训练和测试

训练

完成上面的步骤后,就开始train脚本的编写,新建train.py.

导入项目使用的库

import json
import os
import shutil
import matplotlib.pyplot as plt
import torch
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
from apex import amp
from sklearn.metrics import classification_report
from timm.data.mixup import Mixup
from timm.loss import SoftTargetCrossEntropy
from torchvision import datasets
from models.mixnet import MixNet
from torchtoolbox.transform import Cutout
torch.backends.cudnn.benchmark = False
import warnings
warnings.filterwarnings("ignore")
os.environ['CUDA_VISIBLE_DEVICES']="0,1" #设置显卡序号

设置全局参数

设置学习率、BatchSize、epoch等参数,判断环境中是否存在GPU,如果没有则使用CPU。建议使用GPU,CPU太慢了。

if __name__ == '__main__':
    #创建保存模型的文件夹
    file_dir='checkpoints/mixnet'
    if os.path.exists(file_dir):
        print('true')
        # os.rmdir(file_dir)
        shutil.rmtree(file_dir)  # 删除再建立
        os.makedirs(file_dir)
    else:
        os.makedirs(file_dir)

    # 设置全局参数
    model_lr = 1e-4
    BATCH_SIZE = 4
    EPOCHS = 1000
    DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    use_amp = False  # 是否使用混合精度
    use_dp=True #是否开启dp方式的多卡训练
    classes = 12
    resume = False
    CLIP_GRAD = 5.0
    model_path = 'best.pth'
    Best_ACC = 0 #记录最高得分

设置存放权重文件的文件夹,如果文件夹存在删除再建立。

接下来,查看全局参数:

model_lr:学习率,根据实际情况做调整。

BATCH_SIZE:batchsize,根据显卡的大小设置。

EPOCHS:epoch的个数,一般300够用。

use_amp:是否使用混合精度。

classes:类别个数。

resume:是否接着上次模型继续训练。

model_path:模型的路径。如果resume设置为True时,就采用model_path定义的模型继续训练。

CLIP_GRAD:梯度的最大范数,在梯度裁剪里设置。

Best_ACC:记录最高ACC得分。

图像预处理与增强

数据处理比较简单,加入了Cutout、做了Resize和归一化,定义Mixup函数。

这里注意下Resize的大小,由于MobileViT的输入是224×224的大小,所以要Resize为224×224。

# 数据预处理7
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    Cutout(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.51819474, 0.5250407, 0.4945761], std=[0.24228974, 0.24347611, 0.2530049])
])
transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.51819474, 0.5250407, 0.4945761], std=[0.24228974, 0.24347611, 0.2530049])
])
mixup_fn = Mixup(
    mixup_alpha=0.8, cutmix_alpha=1.0, cutmix_minmax=None,
    prob=0.1, switch_prob=0.5, mode='batch',
    label_smoothing=0.1, num_classes=classes)

读取数据

使用pytorch默认读取数据的方式,然后将dataset_train.class_to_idx打印出来,预测的时候要用到。

将dataset_train.class_to_idx保存到txt文件或者json文件中。

# 读取数据
dataset_train = datasets.ImageFolder('data/train', transform=transform)
dataset_test = datasets.ImageFolder("data/val", transform=transform_test)
# 导入数据
train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset_test, batch_size=BATCH_SIZE, shuffle=False)
print(dataset_train.class_to_idx)
with open('class.txt','w') as file:
    file.write(str(dataset_train.class_to_idx))
with open('class.json','w',encoding='utf-8') as file:
    file.write(json.dumps(dataset_train.class_to_idx))

class_to_idx的结果:

{‘Black-grass’: 0, ‘Charlock’: 1, ‘Cleavers’: 2, ‘Common Chickweed’: 3, ‘Common wheat’: 4, ‘Fat Hen’: 5, ‘Loose Silky-bent’: 6, ‘Maize’: 7, ‘Scentless Mayweed’: 8, ‘Shepherds Purse’: 9, ‘Small-flowered Cranesbill’: 10, ‘Sugar beet’: 11}

设置模型

  • 设置loss函数,train的loss为:SoftTargetCrossEntropy,val的loss:nn.CrossEntropyLoss()。
  • 设置模型为mixnet_m,预训练设置为true,num_classes设置为12。如果resume为True,则加载模型接着上次训练。
  • 优化器设置为adamW。
  • 学习率调整策略选择为余弦退火。
  • 检测可用显卡的数量,如果大于1,则要用torch.nn.DataParallel加载模型,开启多卡训练。
  • 开启混合精度训练。
  • 如果存在多上显卡,则使用DP的方式开启多卡并行训练。
	# 实例化模型并且移动到GPU
	criterion_train = SoftTargetCrossEntropy()# 训练用的loss
	criterion_val = torch.nn.CrossEntropyLoss()# 验证用的loss
 	#设置模型
    model_ft = MixNet(net_type='mixnet_m', num_classes=classes)
    if resume:
        model_ft = torch.load(model_path)
    model_ft.to(DEVICE)
    print(model_ft)
    # 选择简单暴力的Adamw优化器,学习率调低
    optimizer = optim.AdamW(model_ft.parameters(), lr=model_lr)
    cosine_schedule = optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=20, eta_min=1e-6)
    if use_amp:
        model_ft, optimizer = amp.initialize(model_ft, optimizer, opt_level="O1")  # 这里是“欧一”,不是“零一”
    if torch.cuda.device_count() > 1 and use_dp:
        print("Let's use", torch.cuda.device_count(), "GPUs!")
        model_ft = torch.nn.DataParallel(model_ft)

定义训练和验证函数

训练函数

训练的主要步骤:

1、判断迭代的数据是否是奇数,由于mixup_fn只能接受偶数,所以如果不是偶数则要减去一位,让其变成偶数。但是有可能最后一次迭代只有一条数据,减去后就变成了0,所以还要判断不能小于2,如果小于2则直接中断本次循环。

2、将数据输入mixup_fn生成mixup数据,然后输入model计算loss。

3、 optimizer.zero_grad() 梯度清零,把loss关于weight的导数变成0。

4、如果使用混合精度,则使用amp.scale_loss反向传播求解梯度,否则,直接反向传播求梯度。torch.nn.utils.clip_grad_norm_函数执行梯度裁剪,防止梯度爆炸。

5、 optimizer.step()更新参数。

6、接下来,获取学习率,获取loss、计算本次Batch的ACC

等待一个epoch训练完成后,计算平均loss和平均acc

# 定义训练过程
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    sum_loss = 0
    correct = 0
    total_num = len(train_loader.dataset)
    print(total_num, len(train_loader))
    for batch_idx, (data, target) in enumerate(train_loader):
        if len(data) % 2 != 0:
            if len(data) < 2:
                continue
            data = data[0:len(data) - 1]
            target = target[0:len(target) - 1]
            print(len(data))
        data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
        samples, targets = mixup_fn(data, target)
        output = model(samples)
        loss = criterion_train(output, targets)
        optimizer.zero_grad()
        if use_amp:
            with amp.scale_loss(loss, optimizer) as scaled_loss:
                scaled_loss.backward()
            torch.nn.utils.clip_grad_norm_(amp.master_params(optimizer), CLIP_GRAD)
        else:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP_GRAD)
        optimizer.step()
        lr = optimizer.state_dict()['param_groups'][0]['lr']
        print_loss = loss.data.item()
        sum_loss += print_loss
        _, pred = torch.max(output.data, 1)
        correct += torch.sum(pred == target)
        if (batch_idx + 1) % 10 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tLR:{:.9f}'.format(
                epoch, (batch_idx + 1) * len(data), len(train_loader.dataset),
                       100. * (batch_idx + 1) / len(train_loader), loss.item(), lr))
    ave_loss = sum_loss / len(train_loader)
    correct = correct.data.item()
    acc = correct / total_num
    print('epoch:{}\tloss:{}\tacc:{}'.format(epoch, ave_loss, acc))
    return ave_loss, acc

验证函数

验证集和训练集大致相似,主要步骤:

1、定义参数,test_loss测试的loss,total_num总的验证集的数量,val_list验证集的label,pred_list预测的label。

2、在 with torch.no_grad()下面循环验证集,在该模块下,所有计算得出的tensor的requires_grad都自动设置为False。即使一个tensor(命名为x)的requires_grad = True,在with torch.no_grad计算,由x得到的新tensor(命名为w-标量)requires_grad也为False,且grad_fn也为None,即不会对w求导。

3、使用验证集的loss函数求出验证集的loss。

4、求出本次batch的acc

本次epoch循环完成后,求得本次epoch的acc、loss。

如果acc比Best_ACC大,则保存模型。

# 验证过程
def val(model, device, test_loader):
    global Best_ACC
    model.eval()
    test_loss = 0
    correct = 0
    total_num = len(test_loader.dataset)
    print(total_num, len(test_loader))
    val_list = []
    pred_list = []
    with torch.no_grad():
        for data, target in test_loader:
            for t in target:
                val_list.append(t.data.item())
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion_val(output, target)
            _, pred = torch.max(output.data, 1)
            for p in pred:
                pred_list.append(p.data.item())
            correct += torch.sum(pred == target)
            print_loss = loss.data.item()
            test_loss += print_loss
    correct = correct.data.item()
    acc = correct / total_num
    avgloss = test_loss / len(test_loader)
    print('\nVal set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        avgloss, correct, len(test_loader.dataset), 100 * acc))
    if acc > Best_ACC:
        if isinstance(model, torch.nn.DataParallel):
            torch.save(model.module, file_dir + "/" + 'model_' + str(epoch) + '_' + str(round(acc, 3)) + '.pth')
            torch.save(model.module, file_dir + '/' + 'best.pth')
        else:
            torch.save(model, file_dir + "/" + 'model_' + str(epoch) + '_' + str(round(acc, 3)) + '.pth')
            torch.save(model, file_dir + '/' + 'best.pth')
        Best_ACC = acc
    return val_list, pred_list, avgloss, acc

调用训练和验证方法

调用训练函数和验证函数的主要步骤:

1、定义参数:

  • is_set_lr,是否已经设置了学习率,当epoch大于一定的次数后,会将学习率设置到一定的值,并将其置为True。
  • log_dir:记录log用的,将有用的信息保存到字典中,然后转为json保存起来。
  • train_loss_list:保存每个epoch的训练loss。
  • val_loss_list:保存每个epoch的验证loss。
  • train_acc_list:保存每个epoch的训练acc。
  • val_acc_list:保存么每个epoch的验证acc。
  • epoch_list:存放每个epoch的值。

循环epoch

1、调用train函数,得到 train_loss, train_acc,并将分别放入train_loss_list,train_acc_list,然后存入到logdir字典中。

2、调用验证函数,得到val_list, pred_list, val_loss, val_acc。将val_loss, val_acc分别放入val_loss_list和val_acc_list中,然后存入到logdir字典中。

3、保存log。

4、打印本次的测试报告。

5、如果epoch大于600,将学习率设置为固定的1e-6。

6、绘制loss曲线和acc曲线。

 	# 训练与验证
    is_set_lr = False
    log_dir = {}
    train_loss_list, val_loss_list, train_acc_list, val_acc_list, epoch_list = [], [], [], [], []
    for epoch in range(1, EPOCHS + 1):
        epoch_list.append(epoch)
        train_loss, train_acc = train(model_ft, DEVICE, train_loader, optimizer, epoch)
        train_loss_list.append(train_loss)
        train_acc_list.append(train_acc)
        log_dir['train_acc'] = train_acc_list
        log_dir['train_loss'] = train_loss_list
        val_list, pred_list, val_loss, val_acc = val(model_ft, DEVICE, test_loader)
        val_loss_list.append(val_loss)
        val_acc_list.append(val_acc)
        log_dir['val_acc'] = val_acc_list
        log_dir['val_loss'] = val_loss_list
        log_dir['best_acc'] = Best_ACC
        with open(file_dir + '/result.json', 'w', encoding='utf-8') as file:
            file.write(json.dumps(log_dir))
        print(classification_report(val_list, pred_list, target_names=dataset_train.class_to_idx))
        if epoch < 600:
            cosine_schedule.step()
        else:
            if not is_set_lr:
                for param_group in optimizer.param_groups:
                    param_group["lr"] = 1e-6
                    is_set_lr = True
        fig = plt.figure(1)
        plt.plot(epoch_list, train_loss_list, 'r-', label=u'Train Loss')
        # 显示图例
        plt.plot(epoch_list, val_loss_list, 'b-', label=u'Val Loss')
        plt.legend(["Train Loss", "Val Loss"], loc="upper right")
        plt.xlabel(u'epoch')
        plt.ylabel(u'loss')
        plt.title('Model Loss ')
        plt.savefig(file_dir + "/loss.png")
        plt.close(1)
        fig2 = plt.figure(2)
        plt.plot(epoch_list, train_acc_list, 'r-', label=u'Train Acc')
        plt.plot(epoch_list, val_acc_list, 'b-', label=u'Val Acc')
        plt.legend(["Train Acc", "Val Acc"], loc="lower right")
        plt.title("Model Acc")
        plt.ylabel("acc")
        plt.xlabel("epoch")
        plt.savefig(file_dir + "/acc.png")
        plt.close(2)

运行以及结果查看

完成上面的所有代码就可以开始运行了。点击右键,然后选择“run train.py”即可,运行结果如下:

在每个epoch测试完成之后,打印验证集的acc、recall等指标。


绘制acc曲线

绘制loss曲线

测试

测试,我们采用一种通用的方式。

测试集存放的目录如下图:

import torch.utils.data.distributed
import torchvision.transforms as transforms
from PIL import Image
from torch.autograd import Variable
import os

classes = ('Black-grass', 'Charlock', 'Cleavers', 'Common Chickweed',
           'Common wheat', 'Fat Hen', 'Loose Silky-bent',
           'Maize', 'Scentless Mayweed', 'Shepherds Purse', 'Small-flowered Cranesbill', 'Sugar beet')
transform_test = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.51819474, 0.5250407, 0.4945761], std=[0.24228974, 0.24347611, 0.2530049])
])

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.load("model_52_0.954.pth")
model.eval()
model.to(DEVICE)

path = 'test/'
testList = os.listdir(path)
for file in testList:
    img = Image.open(path + file)
    img = transform_test(img)
    img.unsqueeze_(0)
    img = Variable(img).to(DEVICE)
    out = model(img)
    # Predict
    _, pred = torch.max(out.data, 1)
    print('Image Name:{},predict:{}'.format(file, classes[pred.data.item()]))

测试的主要逻辑:

1、定义类别,这个类别的顺序和训练时的类别顺序对应,一定不要改变顺序!!!!

2、定义transforms,transforms和验证集的transforms一样即可,别做数据增强。

3、 加载model,并将模型放在DEVICE里,

4、循环 读取图片并预测图片的类别,在这里注意,读取图片用PIL库的Image。不要用cv2,transforms不支持。循环里面的主要逻辑:

  • 使用Image.open读取图片
  • 使用transform_test对图片做归一化和标椎化。
  • img.unsqueeze_(0) 增加一个维度,由(3,224,224)变为(1,3,224,224)
  • Variable(img).to(DEVICE):将数据放入DEVICE中。
  • model(img):执行预测。
  • _, pred = torch.max(out.data, 1):获取预测值的最大下角标。

运行结果:

image-20220427112634211

完整的代码

https://download.csdn.net/download/hhhhhhhhhhwwwwwwwwww/85407640

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
社会演员多的头像社会演员多普通用户
上一篇 2022年5月22日
下一篇 2022年5月22日

相关推荐