|
- from mindtorch.tools import mstorch_enable
- import torch
- import torch.nn as nn
- import math
- import time
- from torchvision import datasets, transforms
-
- import mindspore as ms
-
- import argparse
-
-
- ### data prepare
- normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
- std=[0.229, 0.224, 0.225])
-
-
- ### model construct
- def conv_bn(inp, oup, stride):
- return nn.Sequential(
- nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
- nn.BatchNorm2d(oup),
- nn.ReLU6(inplace=True)
- # nn.ReLU6(inplace=False) # for graph mode accelerating
- )
-
-
- def conv_1x1_bn(inp, oup):
- return nn.Sequential(
- nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
- nn.BatchNorm2d(oup),
- nn.ReLU6(inplace=True)
- # nn.ReLU6(inplace=False) # for graph mode accelerating
- )
-
-
- def make_divisible(x, divisible_by=8):
- import numpy as np
- return int(np.ceil(x * 1. / divisible_by) * divisible_by)
-
-
- class InvertedResidual(nn.Module):
- def __init__(self, inp, oup, stride, expand_ratio):
- super(InvertedResidual, self).__init__()
- self.stride = stride
- assert stride in [1, 2]
-
- hidden_dim = int(inp * expand_ratio)
- self.use_res_connect = self.stride == 1 and inp == oup
-
- if expand_ratio == 1:
- self.conv = nn.Sequential(
- # dw
- nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
- nn.BatchNorm2d(hidden_dim),
- nn.ReLU6(inplace=True),
- # nn.ReLU6(inplace=False), # for graph mode accelerating
- # pw-linear
- nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
- nn.BatchNorm2d(oup),
- )
- else:
- self.conv = nn.Sequential(
- # pw
- nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
- nn.BatchNorm2d(hidden_dim),
- nn.ReLU6(inplace=True),
- # nn.ReLU6(inplace=False), # for graph mode accelerating
- # dw
- nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
- nn.BatchNorm2d(hidden_dim),
- nn.ReLU6(inplace=True),
- # nn.ReLU6(inplace=False), # for graph mode accelerating
- # pw-linear
- nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
- nn.BatchNorm2d(oup),
- )
-
- def forward(self, x):
- if self.use_res_connect:
- return x + self.conv(x)
- else:
- return self.conv(x)
-
-
- class MobileNetV2(nn.Module):
- def __init__(self, n_class=1000, input_size=224, width_mult=1.):
- super(MobileNetV2, self).__init__()
- block = InvertedResidual
- input_channel = 32
- last_channel = 1280
- interverted_residual_setting = [
- # t, c, n, s
- [1, 16, 1, 1],
- [6, 24, 2, 2],
- [6, 32, 3, 2],
- [6, 64, 4, 2],
- [6, 96, 3, 1],
- [6, 160, 3, 2],
- [6, 320, 1, 1],
- ]
-
- # building first layer
- assert input_size % 32 == 0
- # input_channel = make_divisible(input_channel * width_mult) # first channel is always 32!
- self.last_channel = make_divisible(last_channel * width_mult) if width_mult > 1.0 else last_channel
- self.features = [conv_bn(3, input_channel, 2)]
- # building inverted residual blocks
- for t, c, n, s in interverted_residual_setting:
- output_channel = make_divisible(c * width_mult) if t > 1 else c
- for i in range(n):
- if i == 0:
- self.features.append(block(input_channel, output_channel, s, expand_ratio=t))
- else:
- self.features.append(block(input_channel, output_channel, 1, expand_ratio=t))
- input_channel = output_channel
- # building last several layers
- self.features.append(conv_1x1_bn(input_channel, self.last_channel))
- # make it nn.Sequential
- self.features = nn.Sequential(*self.features)
-
- # building classifier
- self.classifier = nn.Linear(self.last_channel, n_class)
-
- self._initialize_weights()
-
- def forward(self, x):
- x = self.features(x)
- x = x.mean(3).mean(2)
- x = self.classifier(x)
- return x
-
- def _initialize_weights(self):
- for m in self.modules():
- if isinstance(m, nn.Conv2d):
- n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
- m.weight.data.normal_(0, math.sqrt(2. / n))
- if m.bias is not None:
- m.bias.data.zero_()
- elif isinstance(m, nn.BatchNorm2d):
- m.weight.data.fill_(1)
- m.bias.data.zero_()
- elif isinstance(m, nn.Linear):
- n = m.weight.size(1)
- m.weight.data.normal_(0, 0.01)
- m.bias.data.zero_()
-
- # for graph mode accelerating
- # def _initialize_weights(self):
- # for m in self.modules():
- # if isinstance(m, nn.Conv2d):
- # n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
- # m.weight.data = m.weight.data.normal_adapter(0, math.sqrt(2. / n))
- # if m.bias is not None:
- # m.bias.data = m.bias.data.zero_adapter()
- # elif isinstance(m, nn.BatchNorm2d):
- # m.weight.data = m.weight.data.fill_adapter(1)
- # m.bias.data = m.bias.data.zero_adapter()
- # elif isinstance(m, nn.Linear):
- # n = m.weight.size(1)
- # m.weight.data = m.weight.data.normal_adapter(0, 0.01)
- # m.bias.data = m.bias.data.zero_adapter()
-
-
- criterion = nn.CrossEntropyLoss()
-
-
- def mobilenet_v2():
- model = MobileNetV2(width_mult=1)
- return model
-
-
- ################# model train ###############
- def train(config_args):
- train_dataset = datasets.ImageFolder(
- config_args.traindir,
- transforms.Compose([
- transforms.RandomResizedCrop(224, scale=(0.2, 1.0)),
- transforms.RandomHorizontalFlip(),
- transforms.ToTensor(),
- normalize,
- ]))
-
- train_data = torch.utils.data.DataLoader(
- train_dataset, batch_size=128, shuffle=True,
- num_workers=8)
-
- epochs = config_args.epoch
- net = mobilenet_v2().to(config_args.device)
-
- optimizer = torch.optim.SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0005)
-
- def forward_fn(data, label):
- logits = net(data)
- loss = criterion(logits, label)
- return loss, logits
-
- grad_fn = ms.ops.value_and_grad(forward_fn, None, optimizer.parameters, has_aux=True)
-
- # @ms.jit # for graph mode accelerating
- def train_step(data, label):
- (loss, _), grads = grad_fn(data, label)
- optimizer(grads)
- return loss
-
- net.train()
- print("begin training ......")
- count = 0
- start_time = time.time()
- for i in range(epochs):
- for X, y in train_data:
- res = train_step(X, y)
- print("---------------------->epoch:{}, loss:{:.6f}".format(i, res.asnumpy()))
- count += 1
- if count % 100 == 0:
- end_time = time.time()
- avg_time = (end_time - start_time) / count
- print("---------------------->average step time based on {} steps: {:.6f}s.".format(count, avg_time))
-
- torch.save(net.state_dict(), config_args.save_path)
-
-
- ################# model eval ###############
- def test(config_args):
- test_data = torch.utils.data.DataLoader(
- datasets.ImageFolder(config_args.valdir, transforms.Compose([
- transforms.Resize(int(224 / 0.875)),
- transforms.CenterCrop(224),
- transforms.ToTensor(),
- normalize,
- ])),
- batch_size=64, shuffle=False,
- num_workers=4)
-
- net = mobilenet_v2().to(config_args.device)
- net.load_state_dict(torch.load(config_args.load_path), strict=True)
- size = len(test_data.dataset)
- num_batches = len(test_data)
- net.eval()
- test_loss, correct = 0, 0
- print("begin testing ......")
- with torch.no_grad(): # comment out this line for graph mode accelerating
- for X, y in test_data:
- X, y = X.to(config_args.device), y.to(config_args.device)
- pred = net(X)
- test_loss += criterion(pred, y).item()
- correct += (pred.argmax(1) == y).to(torch.float).sum().item()
- test_loss /= num_batches
- correct /= size
- print(f"Test Error: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
-
-
- if __name__ == '__main__':
- seed = 1
- torch.manual_seed(seed)
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--mode', type=str, default='train', help='Execute training or testing.')
- parser.add_argument('--device', type=str, default='cpu', help='Select the hardware device for execution.')
- parser.add_argument('--epoch', type=int, default=20, help='Epoch size of training.')
- parser.add_argument('--save_path', type=str, default='./mobilenetv2.pth', help='Training output path for local.')
- parser.add_argument('--load_path', type=str, default='./mobilenetv2.pth',
- help='Pretrained checkpoint path for fine tune or evaluating.')
- parser.add_argument('--traindir', type=str, default='./', help='Training dataset path for local.')
- parser.add_argument('--valdir', type=str, default='./', help='Testing dataset path for local.')
- config_args = parser.parse_args()
-
- if config_args.device in ("gpu", "GPU", "cuda"):
- ms.context.set_context(device_target="GPU")
- elif config_args.device in ("cpu", "CPU"):
- ms.context.set_context(device_target="CPU")
- elif config_args.device == "Ascend":
- ms.context.set_context(device_target="Ascend")
- else:
- print("WARNING: '--device' configuration is abnormal, and the appropriate device will be adapted.")
-
- # for graph mode accelerating
- # ms.context.set_context(mode=ms.GRAPH_MODE)
- # ms.set_context(jit_syntax_level=ms.STRICT)
-
- if config_args.mode == 'train':
- train(config_args)
- elif config_args.mode == 'test':
- test(config_args)
|