|
- import argparse
- import pickle
- import random
-
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torch.utils.data
- from PIL import Image
- from torchvision import datasets
- import os
- import torchvision.models as models
- from resnet_mg import *
- import pytorch_lightning as pl
- from pytorch_lightning import loggers as pl_loggers
-
- import torchvision.transforms as T
- from torch.utils.data import Dataset, DataLoader
- from transforms import get_transform
- from utils import MixUpWrapper, LR_Scheduler
- from loss import LabelSmoothing, NLLMultiLabelSmooth
- from torch.optim.lr_scheduler import CosineAnnealingLR
- # 导入包
- from c2net.context import prepare, upload_output
- # 初始化导入数据集和预训练模型到容器内
-
-
-
- class CustomDataset(Dataset):
- def __init__(self, data_list, transform=None):
- self.data_list = data_list
- self.transform = transform
-
- def __getitem__(self, index):
- image_path, label = self.data_list[index]
- image = Image.open(image_path).convert('RGB')
- image = self.transform(image)
- return image, label
-
- def __len__(self):
- return len(self.data_list)
-
-
- def accuracy(output, target, topk=(1,)):
- """Computes the accuracy over the k top predictions for the specified values of k"""
- with torch.no_grad():
- maxk = max(topk)
- batch_size = target.size(0)
-
- _, pred = output.topk(maxk, 1, True, True)
- pred = pred.t()
- correct = pred.eq(target.view(1, -1).expand_as(pred))
-
- res = []
- for k in topk:
- correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
- res.append(correct_k.mul_(100.0 / batch_size))
- return res
-
-
- class LitModel(pl.LightningModule):
- def __init__(self, lr, weight_decay, epochs_num, data_dir, batch_size=64, layers=18, num_workers=0, dataMixUp=0.0,
- dataLableSmoothing=0.0, warmup=20, pretrained=None):
- super(LitModel, self).__init__()
-
- self.batch_size = batch_size
- self.lr = lr
- self.weight_decay = weight_decay
- self.epochs_num = epochs_num
- self.layers = layers
- self.data_dir = data_dir
- self.batch_size = batch_size
- self.num_workers = num_workers
- self.warmup = warmup
- self.pretrained = pretrained
- # if self.layers == 18:
- # self.model = models.resnet18()
- # elif self.layers == 34:
- # self.model = models.resnet34()
- # elif self.layers == 50:
- # self.model = models.resnet50()
- # elif self.layers == 101:
- # self.model = models.resnet101()
- # elif self.layers == 152:
- # self.model = models.resnet152()
- # else:
- # raise ValueError('Invalid layer number!')
- if self.layers == 18:
- self.model = resnet18()
- elif self.layers == 34:
- self.model = resnet34()
- elif self.layers == 50:
- self.model = resnet50()
- elif self.layers == 101:
- self.model = resnet101()
- elif self.layers == 152:
- self.model = resnet152()
- else:
- raise ValueError('Invalid layer number!')
- # self.model.fc = torch.nn.Linear(self.model.fc.in_features, 100)
- if self.pretrained is not None:
- # 从磁盘中读取预训练权重文件
- pretrained_weights_path = self.pretrained # 预训练权重文件的路径
- pretrained_state_dict = torch.load(pretrained_weights_path)["state_dict"]
- # 加载预训练权重到你的模型
- # for k, v in pretrained_state_dict.items():
- # if k in self.model.state_dict():
- # self.model.state_dict()[k].copy_(v)
- # else:
- # print(f"Key '{k}' not found in model's state_dict.")
- for k, v in pretrained_state_dict.items():
- if k.replace('model.', '') in self.model.state_dict():
- self.model.state_dict()[k.replace('model.', '')].copy_(v)
- # print(k)
- else:
- print(f"Key '{k}' not found in model's state_dict.")
- self.model.to(device)
- # self.model = ResNet18().to(self.device)
- self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.weight_decay)
- # self.criterion = nn.CrossEntropyLoss()
-
- # 定义数据预处理
- # self.transform_train = T.Compose([
- # T.Resize((224, 224)),
- # T.ToTensor(),
- # T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
- # ])
- # self.transform_val = T.Compose([
- # T.Resize((224, 224)),
- # T.ToTensor(),
- # T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
- # ])
- self.transform_train, self.transform_val = get_transform('imagenet')(
- None, 224, True)
- self.train_dataset = datasets.ImageFolder(root=os.path.join(self.data_dir, "train"),
- transform=self.transform_train)
- self.train_loader = DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True,
- num_workers=self.num_workers)
- self.test_dataset = datasets.ImageFolder(root=os.path.join(self.data_dir, "val"), transform=self.transform_val)
- self.test_loader = DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False,
- num_workers=self.num_workers)
- if dataMixUp > 0:
- self.train_loader = MixUpWrapper(dataMixUp, 100, self.train_loader, 0)
- self.criterion = NLLMultiLabelSmooth(dataMixUp)
- elif dataLableSmoothing > 0.0:
- self.criterion = LabelSmoothing(dataLableSmoothing)
- else:
- self.criterion = torch.nn.CrossEntropyLoss()
-
- def forward(self, x):
- return self.model(x)
-
- # def configure_optimizers(self):
- # return self.optimizer
- def configure_optimizers(self):
- optimizer = optim.SGD(self.model.parameters(), lr=self.lr, momentum=0.9, weight_decay=self.weight_decay)
- scheduler = CosineAnnealingLR(optimizer, T_max=self.epochs_num, eta_min=0)
- # scheduler = LR_Scheduler('cos',
- # base_lr=self.lr,
- # num_epochs=self.epochs_num,
- # iters_per_epoch=len(self.train_loader),
- # warmup_epochs=self.warmup)
- return {'optimizer': optimizer, 'lr_scheduler': scheduler}
-
- def training_step(self, batch, batch_idx):
- inputs, targets = batch
- outputs = self(inputs)
- loss = self.criterion(outputs, targets.long().to(device))
- # 计算训练准确率
- # _, predicted = torch.max(outputs, 1)
- # correct = (predicted == targets).sum().item()
- # train_accuracy = correct / self.batch_size
- top1_acc, top5_acc = accuracy(outputs, targets, topk=(1, 5))
- # self.log('train_loss', loss)
- # self.log('train_accuracy', train_accuracy)
- # # self.training_loss.append(loss)
- # # self.training_accuracy.append(train_accuracy)
- # return {"loss":loss,"train_accuracy":train_accuracy}
- self.log('train_loss', loss)
- self.log('train_top1_accuracy', top1_acc)
- self.log('train_top5_accuracy', top5_acc)
- return {"loss": loss, "train_top1_accuracy": top1_acc, "train_top5_accuracy": top5_acc}
-
- def validation_step(self, batch, batch_idx):
- inputs, targets = batch
- outputs = self(inputs)
- loss = self.criterion(outputs, targets.long().to(device))
- # 计算验证准确率
- # _, predicted = torch.max(outputs, 1)
- # correct = (predicted == targets).sum().item()
- # val_accuracy = correct / self.batch_size
- # self.log('val_loss', loss)
- # self.log('val_accuracy', val_accuracy)
- # # self.validation_loss.append(loss)
- # # self.validation_accuracy.append(val_accuracy)
- # return {"loss":loss,"val_accuracy":val_accuracy}
- top1_acc, top5_acc = accuracy(outputs, targets, topk=(1, 5))
- self.log('val_loss', loss)
- self.log('val_top1_accuracy', top1_acc)
- self.log('val_top5_accuracy', top5_acc)
- return {"loss": loss, "val_top1_accuracy": top1_acc, "val_top5_accuracy": top5_acc}
-
- def train_dataloader(self):
- return self.train_loader
-
- def val_dataloader(self):
- return self.test_loader
- # def train_dataloader(self):
- # train_dataset = datasets.ImageFolder(root=os.path.join(self.data_dir , "train"), transform=self.transform_train)
- # if dataMixUp > 0:
- # # return DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True,num_workers=self.num_workers)
- # return MixUpWrapper(dataMixUp, 100, DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True,num_workers=self.num_workers), 0)
- # else:
- # return DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True,num_workers=self.num_workers)
- # def val_dataloader(self):
- # test_dataset = datasets.ImageFolder(root=os.path.join(self.data_dir , "val"), transform=self.transform_val)
- # return DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False,num_workers=self.num_workers)
-
-
-
-
- if __name__ == '__main__':
- c2net_context = prepare()
- # 获取代码路径,数据集路径,预训练模型路径,输出路径
- code_path = c2net_context.code_path
- dataset_path = c2net_context.dataset_path
- imagenet100_path = os.path.join(c2net_context.dataset_path , "imgnet", "imgnet")
- pretrain_model_path = c2net_context.pretrain_model_path
- you_should_save_here = c2net_context.output_path
- parser = argparse.ArgumentParser()
- parser.add_argument("--layers", type=int, default=18)
- parser.add_argument("--epochs", type=int, default=20)
- parser.add_argument("--lr", type=float, default=0.0001)
- parser.add_argument("--weight_decay", type=float, default=0.0001)
- parser.add_argument("--batch_size", type=int, default=64)
- parser.add_argument("--num_workers", type=int, default=0)
- parser.add_argument("--data_dir", type=str, default=imagenet100_path)
- parser.add_argument("--log_dir", type=str, default=you_should_save_here)
- parser.add_argument("--model_dir", type=str, default=you_should_save_here)
- parser.add_argument("--pretrained", type=str, default=None)
- # parser.add_argument("--image_dir", type=str, default="/model/image")
-
- args = parser.parse_args()
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
-
- torch.set_float32_matmul_precision('high')
-
- model = LitModel(lr=args.lr, weight_decay=args.weight_decay, epochs_num=args.epochs, data_dir=args.data_dir,
- batch_size=args.batch_size, layers=args.layers, num_workers=args.num_workers,
- pretrained=args.pretrained)
- os.makedirs(args.log_dir, exist_ok=True)
- tb_logger = pl_loggers.TensorBoardLogger(save_dir=args.log_dir, name='my_model')
- trainer = pl.Trainer(accelerator="gpu", max_epochs=model.epochs_num, default_root_dir=args.log_dir,precision='16')
- trainer.fit(model)
- # os.makedirs(args.model_dir, exist_ok=True)
- # torch.save(model.state_dict(), f'{args.model_dir}/model_{args.layers}.pt')
- print("模型保存成功")
- # 回传结果,只有训练任务才能回传
- upload_output()
-
-
|