|
- from __future__ import print_function
- import os
- import argparse
- import torch
- import time
- import numpy as np
- import torch.nn as nn
- import torch.nn.functional as F
- import torchvision
- import torch.optim as optim
- from torchvision import datasets, transforms
- from torch.autograd import Variable
-
- from models.wideresnet import *
- # from models.RiFT_wideresnet import *
- from models.resnet import *
- from loss.trades import trades_loss
- import logging
-
-
- os.environ["CUDA_VISIBLE_DEVICES"] = '0'
-
- parser = argparse.ArgumentParser(description='PyTorch CIFAR TRADES Adversarial Training')
- parser.add_argument('--batch-size', type=int, default=128, metavar='N',
- help='input batch size for training (default: 128)')
- parser.add_argument('--test-batch-size', type=int, default=500, metavar='N',
- help='input batch size for testing (default: 128)')
- parser.add_argument('--epochs', type=int, default=120, metavar='N',# 76
- help='number of epochs to train')
- parser.add_argument('--weight-decay', '--wd', default=2e-4,
- type=float, metavar='W')
- parser.add_argument('--lr', type=float, default=0.1, metavar='LR',
- help='learning rate')
- parser.add_argument('--momentum', type=float, default=0.9, metavar='M',
- help='SGD momentum')
- parser.add_argument('--no-cuda', action='store_true', default=False,
- help='disables CUDA training')
- parser.add_argument('--epsilon', default=8. / 255.,
- help='perturbation')
- parser.add_argument('--num-steps', default=10,
- help='perturb number of steps')
- parser.add_argument('--step-size', default=0.007,
- help='perturb step size')
- parser.add_argument('--beta', default=6.0,
- help='regularization, i.e., 1/lambda in TRADES')
- parser.add_argument('--seed', type=int, default=0, metavar='S',
- help='random seed (default: 1)')
- parser.add_argument('--log-interval', type=int, default=100, metavar='N',
- help='how many batches to wait before logging training status')
- parser.add_argument('--model-dir', default='/tmp/output/ResNet18-TRADES-CIFAR100-seed-0',# WideResNet28-TRADES-CIFAR10-seed-0 WideResNet34-TRADES-Tiny-ImageNet-seed-0
- help='directory of model for saving checkpoint')
- parser.add_argument('--save-freq', '-s', default=5, type=int, metavar='N',
- help='save frequency')
-
- args = parser.parse_args()
-
- # settings
- model_dir = args.model_dir
- if not os.path.exists(model_dir):
- os.makedirs(model_dir)
-
- logger = logging.getLogger(__name__)
- logging.basicConfig(
- format='[%(asctime)s] - %(message)s',
- datefmt='%Y/%m/%d %H:%M:%S',
- level=logging.INFO,
- filename=os.path.join(model_dir, 'trades_train.log'))
- logger.info(args)
-
-
- use_cuda = not args.no_cuda and torch.cuda.is_available()
- torch.manual_seed(args.seed)
- kwargs = {'num_workers': 4, 'pin_memory': True} if use_cuda else {}
-
- # setup data loader
- transform_train = transforms.Compose([
- transforms.RandomCrop(32, padding=4),
- transforms.RandomHorizontalFlip(),
- transforms.ToTensor(),
- ])
- transform_test = transforms.Compose([
- transforms.ToTensor(),
- ])
- trainset = torchvision.datasets.CIFAR100(root='/tmp/dataset', train=True, download=True, transform=transform_train)
- train_loader = torch.utils.data.DataLoader(trainset, batch_size=args.batch_size, shuffle=True, **kwargs)
- testset = torchvision.datasets.CIFAR100(root='/tmp/dataset', train=False, download=True, transform=transform_test)
- test_loader = torch.utils.data.DataLoader(testset, batch_size=args.test_batch_size, shuffle=False, **kwargs)
-
-
-
- '''from tiny_imagenet import TinyImageNet
- transform_train = transforms.Compose([
- transforms.RandomCrop(64, padding=8, padding_mode="reflect"),
- transforms.RandomHorizontalFlip(),
- transforms.ToTensor(),
- # transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
- ])
- transform_test = transforms.Compose([
- transforms.ToTensor(),
- # transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
- ])
- trainset = TinyImageNet('/tmp/dataset/tiny-imagenet-200', train=True, transform=transform_train)
- testset = TinyImageNet('/tmp/dataset/tiny-imagenet-200', train=False, transform=transform_test)
- '''
- class_number = 100
-
- train_loader = torch.utils.data.DataLoader(trainset, batch_size=args.batch_size, shuffle=True, **kwargs)
- test_loader = torch.utils.data.DataLoader(testset, batch_size=args.test_batch_size, shuffle=False, **kwargs)
-
-
-
-
-
- def t_train(args, model, train_loader, optimizer, epoch):
- model.train()
- for batch_idx, (data, target) in enumerate(train_loader):
- data, target = data.cuda(), target.cuda()
-
- optimizer.zero_grad()
-
- # calculate robust loss
- loss = trades_loss(model=model,
- x_natural=data,
- y=target,
- optimizer=optimizer,
- step_size=args.step_size,
- epsilon=args.epsilon,
- perturb_steps=args.num_steps,
- beta=args.beta)
- loss.backward()
- optimizer.step()
-
- # print progress
- if batch_idx % args.log_interval == 0:
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
- logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
-
-
- def eval_train(model, train_loader):
- model.eval()
- train_loss = 0
- correct = 0
- with torch.no_grad():
- for data, target in train_loader:
- data, target = data.cuda(), target.cuda()
- output = model(data)
- train_loss += F.cross_entropy(output, target, reduction='sum').item()
- pred = output.max(1, keepdim=True)[1]
- correct += pred.eq(target.view_as(pred)).sum().item()
- train_loss /= len(train_loader.dataset)
- print('Training: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
- train_loss, correct, len(train_loader.dataset),
- 100. * correct / len(train_loader.dataset)))
- logger.info('Training: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
- train_loss, correct, len(train_loader.dataset),
- 100. * correct / len(train_loader.dataset)))
- training_accuracy = correct / len(train_loader.dataset)
- return train_loss, training_accuracy
-
-
- def eval_test(model, test_loader):
- model.eval()
- test_loss = 0
- correct = 0
- with torch.no_grad():
- for data, target in test_loader:
- data, target = data.cuda(), target.cuda()
- output = model(data)
- test_loss += F.cross_entropy(output, target, reduction='sum').item()
- pred = output.max(1, keepdim=True)[1]
- correct += pred.eq(target.view_as(pred)).sum().item()
- test_loss /= len(test_loader.dataset)
- print('Test: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
- test_loss, correct, len(test_loader.dataset),
- 100. * correct / len(test_loader.dataset)))
- test_accuracy = correct / len(test_loader.dataset)
- return test_loss, test_accuracy
-
-
- def _pgd_whitebox(model,
- X,
- y,
- epsilon=args.epsilon,
- num_steps=20,
- step_size=0.003):
- out = model(X)
- err = (out.data.max(1)[1] != y.data).sum().item()
- X_pgd = Variable(X.data, requires_grad=True)
-
- random_noise = torch.FloatTensor(*X_pgd.shape).uniform_(-epsilon, epsilon).cuda()
- X_pgd = Variable(X_pgd.data + random_noise, requires_grad=True)
-
- for _ in range(num_steps):
- opt = optim.SGD([X_pgd], lr=1e-3)
- opt.zero_grad()
-
- with torch.enable_grad():
- loss = nn.CrossEntropyLoss()(model(X_pgd), y)
- loss.backward()
- eta = step_size * X_pgd.grad.data.sign()
- X_pgd = Variable(X_pgd.data + eta, requires_grad=True)
- eta = torch.clamp(X_pgd.data - X.data, -epsilon, epsilon)
- X_pgd = Variable(X.data + eta, requires_grad=True)
- X_pgd = Variable(torch.clamp(X_pgd, 0, 1.0), requires_grad=True)
- err_pgd = (model(X_pgd).data.max(1)[1] != y.data).sum().item()
- return err, err_pgd
-
- def eval_adv_test_whitebox(model, test_loader):
-
- model.eval()
- robust_err_total = 0
- natural_err_total = 0
-
- for data, target in test_loader:
- data, target = data.cuda(), target.cuda()
- # pgd attack
- X, y = Variable(data, requires_grad=True), Variable(target)
- err_natural, err_robust = _pgd_whitebox(model, X, y)
- robust_err_total += err_robust
- natural_err_total += err_natural
- print('natural_acc: ', 1 - natural_err_total / len(test_loader.dataset))
- print('robust_acc: ', 1- robust_err_total / len(test_loader.dataset))
- logger.info('natural_acc: {}'.format(1 - natural_err_total / len(test_loader.dataset)))
- logger.info('robust_acc: {} '.format(1 - robust_err_total / len(test_loader.dataset)))
- return 1 - natural_err_total / len(test_loader.dataset), 1- robust_err_total / len(test_loader.dataset)
-
-
-
- from autoattack import AutoAttack
- def eval_apgd(model, test_loader):
- model.eval()
- robust_err_total = 0
- # adversary = AutoAttack(model, norm="Linf", eps=args.epsilon,
- # log_path=os.path.join(model_dir, 'test-apgd.log'))
- # adversary.attacks_to_run = ['apgd-ce']
- adversary = AutoAttack(model, norm="Linf", eps=args.epsilon, version='standard',
- log_path=os.path.join(model_dir, 'autoattack.log'))
-
- for data, target in test_loader:
- data, target = data.cuda(), target.cuda()
- # APGD
- data_adv = adversary.run_standard_evaluation(data, target, bs=data.shape[0])
- with torch.no_grad():
- logits = model(data_adv)
- err_robust = (logits.data.max(1)[1] != target.data).sum().item()
-
- robust_err_total += err_robust
-
-
- print('APGD-CE_acc: ', 1- robust_err_total / len(test_loader.dataset))
- logger.info('APGD-CE_acc: {} '.format(1 - robust_err_total / len(test_loader.dataset)))
-
-
- def adjust_learning_rate(optimizer, epoch):
- """decrease the learning rate"""
- lr = args.lr
- if epoch >= 75:
- lr = args.lr * 0.1
- if epoch >= 90:
- lr = args.lr * 0.01
- if epoch >= 100:
- lr = args.lr * 0.001
- for param_group in optimizer.param_groups:
- param_group['lr'] = lr
-
-
- def main():
- # init model, ResNet18() can be also used here for training
- # model = WideResNet(image_size=64, num_classes=class_number, depth=34).cuda()
- model = ResNet18(class_number).cuda()
- # model = WideResNet(num_classes=class_number, depth=34).cuda()
- model = torch.nn.DataParallel(model).cuda()
- optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
-
- natural_acc = []
- robust_acc = []
- time_use = []
-
- for epoch in range(1, args.epochs + 1):
- # adjust learning rate for SGD
- adjust_learning_rate(optimizer, epoch)
-
- start_time = time.time()
-
- # adversarial training
- t_train(args, model, train_loader, optimizer, epoch)
-
- print('using time:', time.time() - start_time)
- logger.info('using time: {}'.format(time.time() - start_time))
-
- # # evaluation on natural examples
- # print('================================================================')
- # eval_train(model, train_loader)
- # # eval_test(model, test_loader)
- # natural_err_total, robust_err_total = eval_adv_test_whitebox(model, test_loader)
- #
- # natural_acc.append(natural_err_total)
- # robust_acc.append(robust_err_total)
- # time_use.append(time.time() - start_time)
- #
- # file_name = os.path.join(model_dir, 'train_stats.npy')
- # np.save(file_name, np.stack((np.array(natural_acc), np.array(robust_acc))))
- # file_name = os.path.join(model_dir, 'use_time.npy')
- # np.save(file_name, np.array(time_use))
-
- print('================================================================')
- torch.save(model.module.state_dict(),
- os.path.join(model_dir, 'model-last.pth'))
- torch.save(optimizer.state_dict(),
- os.path.join(model_dir, 'opt-last.tar'))
-
- # save checkpoint
- if (epoch >= 75) and (epoch <= 78) or (epoch % args.save_freq == 0):
- torch.save(model.module.state_dict(),
- os.path.join(model_dir, 'model-{}.pth'.format(epoch)))
- """torch.save(optimizer.state_dict(),
- os.path.join(model_dir, 'opt-{}.tar'.format(epoch)))"""
- # if (epoch == 76) or (epoch == 120):
- # eval_train(model, train_loader)
- # # eval_test(model, test_loader)
- # natural_err_total, robust_err_total = eval_adv_test_whitebox(model, test_loader)
- # eval_apgd(model, test_loader)
- eval_train(model, train_loader)
- # eval_test(model, test_loader)
- natural_err_total, robust_err_total = eval_adv_test_whitebox(model, test_loader)
- eval_apgd(model, test_loader)
-
-
- if __name__ == '__main__':
- main()
-
- """
- loss = trades_loss(model=model,
- x_natural=data,
- y=target,
- optimizer=optimizer,
- step_size=args.step_size,
- epsilon=args.epsilon,
- perturb_steps=args.num_steps,
- beta=args.beta)
-
- loss = mart_loss(model=model,
- x_natural=data,
- y=target,
- optimizer=optimizer,
- step_size=args.step_size,
- epsilon=args.epsilon,
- perturb_steps=args.num_steps,
- beta=args.beta)
- """
|