|
- from __future__ import print_function
- import os
- import argparse
- import torch
- import time
- import numpy as np
- import torch.nn as nn
- import torch.nn.functional as F
- import torchvision
- import torch.optim as optim
- from torchvision import datasets, transforms
- from torch.autograd import Variable
- from datetime import datetime
- # from models.wideresnet import *
- from autoattack import AutoAttack
- from models.resnet import *
- import logging
- import copy
- import math
- from loss import pgd_loss, trades_loss, mart_loss
-
-
- os.environ["CUDA_VISIBLE_DEVICES"] = '0'
-
- parser = argparse.ArgumentParser(description='PyTorch CIFAR TRADES Adversarial Training')
- parser.add_argument('--batch-size', type=int, default=128, metavar='N',
- help='input batch size for training (default: 128)')
- parser.add_argument('--test-batch-size', type=int, default=100, metavar='N',
- help='input batch size for testing (default: 128)')
- parser.add_argument('--momentum', type=float, default=0.9, metavar='M',
- help='SGD momentum')
- parser.add_argument('--no-cuda', action='store_true', default=False,
- help='disables CUDA training')
- parser.add_argument('--epsilon', default=8. / 255., type=float,
- help='perturbation')
- parser.add_argument('--num-steps', default=10, type=int,
- help='perturb number of steps')
- parser.add_argument('--step-size', default=0.007, type=float,
- help='perturb step size')
-
-
- parser.add_argument('--gamma', default=0.005, type=float,
- help='the weight for wrong feature channels')
- parser.add_argument('--epochs', type=int, default=20, metavar='N',
- help='number of epochs to train')
- parser.add_argument('--weight-decay', '--wd', default=5e-4,# 3.5e-3
- type=float, metavar='W')
- parser.add_argument('--lr', type=float, default=0.025, metavar='LR',# 0.01, 0.05
- help='learning rate')
- parser.add_argument('--ckpt_url', default="/", help='pretrain model path')
- parser.add_argument('--pretrained', default='ResNet18-CIFAR10/newest',# ResNet18-CIFAR10 'ResNet18-PGD-AT-CIFAR10'
- help='directory of model')
- # parser.add_argument('--loss_name', default='L1', type=str,
- # choices=['L1', 'L2', 'Linf', 'Cosine'], help='loss type')
- # parser.add_argument('--loss', default='pgd_loss', type=str,
- # choices=['pgd_loss', 'trades_loss', 'mart_loss'], help='loss type')
-
-
- parser.add_argument('--seed', type=int, default=1, metavar='S',
- help='random seed (default: 1)')
- parser.add_argument('--log-interval', type=int, default=100, metavar='N',
- help='how many batches to wait before logging training status')
- parser.add_argument('--model-dir', default='/tmp/output/', #'/model/',
- help='directory of model for saving checkpoint')
- parser.add_argument('--save-freq', '-s', default=1, type=int, metavar='N',
- help='save frequency')
-
-
- args = parser.parse_args()
-
- # settings+
-
- model_dir = args.model_dir + str(datetime.now()) + '-gamma-' + str(args.gamma)\
- + '-lr-' + str(args.lr) + '-wd-' + str(args.weight_decay) + '-seed-' + str(args.seed) + 'pgd,sl-kl'
-
-
- if not os.path.exists(model_dir):
- os.makedirs(model_dir)
-
- logger = logging.getLogger(__name__)
- logging.basicConfig(
- format='[%(asctime)s] - %(message)s',
- datefmt='%Y/%m/%d %H:%M:%S',
- level=logging.INFO,
- filename=os.path.join(model_dir, 'train.log'))
- logger.info(args)
-
-
- use_cuda = not args.no_cuda and torch.cuda.is_available()
- torch.manual_seed(args.seed)
- kwargs = {'num_workers': 4, 'pin_memory': False} if use_cuda else {}
-
- # setup data loader
- transform_train = transforms.Compose([
- transforms.RandomCrop(32, padding=4),
- transforms.RandomHorizontalFlip(),
- transforms.ToTensor(),
- ])
- transform_test = transforms.Compose([
- transforms.ToTensor(),
- ])
- trainset = torchvision.datasets.CIFAR10(root='/tmp/dataset', train=True, download=True, transform=transform_train)
- train_loader = torch.utils.data.DataLoader(trainset, batch_size=args.batch_size, shuffle=True, **kwargs)
- testset = torchvision.datasets.CIFAR10(root='/tmp/dataset', train=False, download=True, transform=transform_test)
- test_loader = torch.utils.data.DataLoader(testset, batch_size=args.test_batch_size, shuffle=False, **kwargs)
-
-
-
-
- def adjust_learning_rate(args, optimizer, epoch):
- for param_group in optimizer.param_groups:
- if epoch <= 10:
- pass
- elif epoch == 11:
- param_group['lr'] = 0.02
- elif (epoch + 1) % 2 == 0:
- param_group['lr'] /= 2
-
-
-
-
-
-
- def L1(output_nat, output_rob):
- size = output_nat.shape
- if len(size) >= 3:# 4
- # torch.linalg.norm(output_nat - output_rob, ord=1, dim=list(range(2, len(size)))).mean() #矩阵范数
- # return torch.linalg.norm((output_nat - output_rob).view(size[0], size[1], -1), ord=1, dim=-1).mean()
- return torch.linalg.norm((output_nat - output_rob).view(size[0], size[1], -1).mean(dim=-1), ord=1, dim=-1).mean()
- else:
- return torch.linalg.norm(output_nat - output_rob, ord=1, dim=-1).mean()
-
- def L2(output_nat, output_rob):
- size = output_nat.shape
- if len(size) >= 3:
- return torch.linalg.norm((output_nat - output_rob).view(size[0], size[1], -1).mean(dim=-1), ord=2, dim=-1).mean()
- else:
- return torch.linalg.norm(output_nat - output_rob, ord=2, dim=-1).mean()
-
- def Linf(output_nat, output_rob):
- size = output_nat.shape
- if len(size) >= 3:
- return torch.linalg.norm((output_nat - output_rob).view(size[0], size[1], -1).mean(dim=-1), ord=torch.inf, dim=-1).mean()
- else:
- return torch.linalg.norm(output_nat - output_rob, ord=torch.inf, dim=-1).mean()
-
- def Cosine(output_nat, output_rob):
- size = output_nat.shape
- if len(size) >= 3:
- return 1 - torch.cosine_similarity(output_nat.view(size[0], size[1], -1).mean(dim=-1), output_rob.view(size[0], size[1], -1).mean(dim=-1), dim=-1).mean()
- else:
- return 1 - torch.cosine_similarity(output_nat, output_rob, dim=-1).mean()
-
-
-
-
-
-
-
- def criterion(model,
- teacher_model,
- model_ema,
- x_natural,
- y,
- optimizer,
- step_size=0.007,
- epsilon=0.031,
- perturb_steps=10):
-
- model.eval()
- batch_size = len(x_natural)
-
- last_model = model_ema.store_model(model)
-
- x_adv = x_natural.detach() + torch.randn(x_natural.shape).uniform_(-epsilon, epsilon).cuda().detach()
-
- for _ in range(perturb_steps):
- x_adv.requires_grad_()
- with torch.enable_grad():
- loss_kl = F.cross_entropy(model(x_adv), y)
- # F.cross_entropy(model(x_adv), y) + beta * F.kl_div(F.log_softmax(model(x_adv), dim=1),
- # F.softmax(model(x_natural), dim=1) + 1e-12, reduction='batchmean')
- grad = torch.autograd.grad(loss_kl, [x_adv])[0]
- x_adv = x_adv.detach() + step_size * torch.sign(grad.detach())
- x_adv = torch.min(torch.max(x_adv, x_natural - epsilon), x_natural + epsilon)
- x_adv = torch.clamp(x_adv, 0.0, 1.0)
-
- model.train()
-
- x_adv = Variable(torch.clamp(x_adv, 0.0, 1.0), requires_grad=False)
-
- optimizer.zero_grad()
-
- #logits = model(x_natural)
- logits_rob, feat_rob = model(x_adv, prejection=True)
- with torch.no_grad():
- logits_adv, feat_adv = last_model(x_adv, prejection=True)
- #_, feat_nat = teacher_model(x_natural, prejection=True)
-
- # loss_natural = F.cross_entropy(logits, y)
- loss_adv = F.cross_entropy(logits_rob, y)
- #loss_robust = (1.0 / batch_size) * F.kl_div(F.log_softmax(logits_rob, dim=1),
- # F.softmax(logits, dim=1) + 1e-12, reduction='sum')
-
- # one_hot = torch.nn.functional.one_hot(y, num_classes=10).to(logits.dtype)
- # embedding_gt = logits @ cost_embedding
- # embedding_pred = logits_rob @ cost_embedding
- # loss_emo = (1 - torch.sum(embedding_gt * embedding_pred, dim=-1)).mean()
-
- # loss_sl = - ((F.softmax(logits_rob, -1) - F.softmax(logits_nat, -1)) * \
- # (F.softmax(logits_adv, -1) - F.softmax(logits_nat, -1))).sum()
-
- loss_sl = F.kl_div(F.log_softmax(logits_rob, dim=1), F.softmax(logits_adv, dim=1), reduction='batchmean')
-
- loss = loss_adv + args.gamma * loss_sl
-
- # kl = nn.KLDivLoss(reduction='none')
- #
- # logits = model(x_natural)
- #
- # adv_probs = F.softmax(logits_rob, dim=1)
- #
- # tmp1 = torch.argsort(adv_probs, dim=1)[:, -2:]
- #
- # new_y = torch.where(tmp1[:, -1] == y, tmp1[:, -2], tmp1[:, -1])
- #
- # loss_adv = F.cross_entropy(logits_rob, y) + F.nll_loss(torch.log(1.0001 - adv_probs + 1e-12), new_y)
- #
- # nat_probs = F.softmax(logits, dim=1)
- #
- # true_probs = torch.gather(nat_probs, 1, (y.unsqueeze(1)).long()).squeeze()
- #
- # loss_robust = (1.0 / batch_size) * torch.sum(
- # torch.sum(kl(torch.log(adv_probs + 1e-12), nat_probs), dim=1) * (1.0000001 - true_probs))
-
- # loss = loss_adv + float(beta) * loss_robust + args.gamma * loss_sl
- # beta * loss_robust + loss_natural + loss_adv
- return loss
-
-
-
-
-
-
-
- def train_align_loss(args, model_nat, model_rob, model_ema, train_loader, optimizer, epoch):
- model_rob.train()
- for batch_idx, (data, target) in enumerate(train_loader):
- # epsilon = nn.init.trunc_normal_(torch.Tensor(1), args.epsilon * epoch / args.epochs, args.epsilon / 2, 0, 3 * args.epsilon / 2)
- # step_size = torch.normal(args.step_size, torch.Tensor([args.step_size / 2])).clamp(0, args.epsilon / 2)
-
- data, target = data.cuda(), target.cuda()
- optimizer.zero_grad()
-
- loss = criterion(model=model_rob,
- teacher_model=model_nat,
- model_ema=model_ema,
- x_natural=data,
- y=target,
- optimizer=optimizer,
- step_size=args.step_size,
- epsilon=args.epsilon,
- perturb_steps=args.num_steps)
-
-
- loss.backward()
- optimizer.step()
-
- model_ema.update_params(model_rob)
- model_ema.apply_shadow()
-
-
- # print progress
- if batch_idx % args.log_interval == 0:
- print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
- logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
- epoch, batch_idx * len(data), len(train_loader.dataset),
- 100. * batch_idx / len(train_loader), loss.item()))
-
-
-
-
-
-
- def learn_prior_knowledge(model, train_loader):
- model.eval()
- cost_embedding = torch.zeros((10, 512)).cuda()
- count_list = [0 for i in range(10)]
- for batch_idx, (data, target) in enumerate(train_loader):
- data, target = data.cuda(), target.cuda()
- with torch.no_grad():
- logit, feature = model(data, prejection=True)
- pred = (logit.data.max(1)[1] == target.data)
-
- # split by class
- # for cls in range(10):
- # index = (target == cls) * pred
- # count_list[cls] += index.sum().int()
- # cost_embedding[cls] += feature[index].sum(dim=0)
-
- # split by logit
- for cls in range(10):
- count_list[cls] += pred.sum().int()
- logit = F.softmax(logit)
- cost_embedding += torch.einsum("ij,ik->jk", [logit[pred], feature[pred]])
-
-
- for cls in range(10):
- cost_embedding[cls] = feature[cls] / count_list[cls]
- cost_embedding = cost_embedding / torch.linalg.vector_norm(cost_embedding, ord=2, dim=-1, keepdim=True)
- return cost_embedding
-
-
-
-
-
-
-
-
-
-
-
- class EMA(object):
- def __init__(self, model, alpha=0.999, buffer_ema=True):
- self.step = 0
- self.model = copy.deepcopy(model)
- self.alpha = alpha
- self.buffer_ema = buffer_ema
- self.shadow = self.get_model_state()
- self.backup = {}
- self.param_keys = [k for k, _ in self.model.named_parameters()]
- self.buffer_keys = [k for k, _ in self.model.named_buffers()]
- self.first_model = copy.deepcopy(model)
- self.second_model = copy.deepcopy(model)
-
-
- def update_params(self, model):
- decay = min(self.alpha, (self.step + 1) / (self.step + 10))
- state = model.state_dict()
- for name in self.param_keys:
- self.shadow[name].copy_(decay * self.shadow[name] + (1 - decay) * state[name])
- for name in self.buffer_keys:
- if self.buffer_ema:
- self.shadow[name].copy_(decay * self.shadow[name] + (1 - decay) * state[name])
- else:
- self.shadow[name].copy_(state[name])
- self.step += 1
-
- def apply_shadow(self):
- self.backup = self.get_model_state()
- self.model.load_state_dict(self.shadow)
-
- def restore(self):
- self.model.load_state_dict(self.backup)
-
- def get_model_state(self):
- return {
- k: v.clone().detach()
- for k, v in self.model.state_dict().items()
- }
-
- def store_model(self, model):
- last_model = copy.deepcopy(self.second_model)
- self.second_model_model = copy.deepcopy(self.first_model)
- self.first_model = model
- return last_model
-
-
-
- # def adjust_learning_rate_cosine(optimizer, epoch, args):
- # lr = args.lr * 0.5 * (1 + np.cos((epoch - 1) / args.epochs * np.pi))
- # for param_group in optimizer.param_groups:
- # param_group['lr'] = lr
- # return lr
- #
- #
- # for start_ep, tau, new_state_dict in zip(start_wa, tau_list, exp_avgs):
- # if epoch == start_ep:
- # for key, value in model.state_dict().items():
- # new_state_dict[key] = value
- # elif epoch > start_ep:
- # for key, value in model.state_dict().items():
- # new_state_dict[key] = (1 - tau) * value + tau * new_state_dict[key]
- # else:
- # pass
-
-
-
- def eval_train(model, train_loader):
- model.eval()
- train_loss = 0
- correct = 0
- with torch.no_grad():
- for data, target in train_loader:
- data, target = data.cuda(), target.cuda()
- output = model(data)
- train_loss += F.cross_entropy(output, target, reduction='sum').item()
- pred = output.max(1, keepdim=True)[1]
- correct += pred.eq(target.view_as(pred)).sum().item()
- train_loss /= len(train_loader.dataset)
- print('Training: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
- train_loss, correct, len(train_loader.dataset),
- 100. * correct / len(train_loader.dataset)))
- logger.info('Training: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
- train_loss, correct, len(train_loader.dataset),
- 100. * correct / len(train_loader.dataset)))
- training_accuracy = correct / len(train_loader.dataset)
- return train_loss, training_accuracy
-
-
-
-
-
-
- def _pgd_whitebox(model,
- X,
- y,
- epsilon=args.epsilon,
- num_steps=20,
- step_size=0.003):
- with torch.no_grad():
- out = model(X)
- err = (out.data.max(1)[1] != y.data).sum().item()
- X_pgd = Variable(X.data, requires_grad=True)
-
- random_noise = torch.FloatTensor(*X_pgd.shape).uniform_(-epsilon, epsilon).cuda()
- X_pgd = Variable(X_pgd.data + random_noise, requires_grad=True)
-
- for _ in range(num_steps):
- opt = optim.SGD([X_pgd], lr=1e-3)
- opt.zero_grad()
-
- with torch.enable_grad():
- loss = nn.CrossEntropyLoss()(model(X_pgd), y)
- loss.backward()
- eta = step_size * X_pgd.grad.data.sign()
- X_pgd = Variable(X_pgd.data + eta, requires_grad=True)
- eta = torch.clamp(X_pgd.data - X.data, -epsilon, epsilon)
- X_pgd = Variable(X.data + eta, requires_grad=True)
- X_pgd = Variable(torch.clamp(X_pgd, 0, 1.0), requires_grad=True)
- with torch.no_grad():
- err_pgd = (model(X_pgd).data.max(1)[1] != y.data).sum().item()
- return err, err_pgd
-
-
-
- def eval_adv_test_whitebox(model, test_loader):
-
- model.eval()
- robust_err_total = 0
- natural_err_total = 0
-
- for data, target in test_loader:
- data, target = data.cuda(), target.cuda()
- # PGD
- X, y = Variable(data, requires_grad=True), Variable(target)
- err_natural, err_robust = _pgd_whitebox(model, X, y)
-
- robust_err_total += err_robust
- natural_err_total += err_natural
-
- print('natural_acc: ', 1 - natural_err_total / len(test_loader.dataset))
- print('robust_acc: ', 1- robust_err_total / len(test_loader.dataset))
- logger.info('natural_acc: {}'.format(1 - natural_err_total / len(test_loader.dataset)))
- logger.info('robust_acc: {} '.format(1 - robust_err_total / len(test_loader.dataset)))
-
-
-
-
- def eval_apgd(model, test_loader):
- model.eval()
- robust_err_total = 0
- # adversary = AutoAttack(model, norm="Linf", eps=args.epsilon,
- # log_path=os.path.join(model_dir, 'test-apgd.log'))
- # adversary.attacks_to_run = ['apgd-ce']
- adversary = AutoAttack(model, norm="Linf", eps=args.epsilon, version='standard',
- log_path=os.path.join(model_dir, 'autoattack.log'))
-
- for data, target in test_loader:
- data, target = data.cuda(), target.cuda()
- # APGD
- data_adv = adversary.run_standard_evaluation(data, target, bs=args.test_batch_size)
- with torch.no_grad():
- logits = model(data_adv)
- err_robust = (logits.data.max(1)[1] != target.data).sum().item()
-
- robust_err_total += err_robust
-
-
- print('APGD-CE_acc: ', 1- robust_err_total / len(test_loader.dataset))
- logger.info('APGD-CE_acc: {} '.format(1 - robust_err_total / len(test_loader.dataset)))
-
-
-
-
-
-
-
-
-
-
- def main():
- # init model, ResNet18() can be also used here for training
- # model = WideResNet(10).cuda()
- model_nat = ResNet18(10).cuda()
- model_nat.load_state_dict(torch.load(args.ckpt_url))
-
- model_rob = copy.deepcopy(model_nat)
- model_ema = EMA(copy.deepcopy(model_rob))
- optimizer = optim.SGD(model_rob.parameters(), lr=args.lr, momentum=args.momentum, weight_decay=args.weight_decay)
-
- #cost_embedding = learn_prior_knowledge(model_nat, train_loader)
- for epoch in range(1, args.epochs + 1):
- start_time = time.time()
- # adversarial training
- adjust_learning_rate(args, optimizer, epoch)
- train_align_loss(args, model_nat, model_rob, model_ema, train_loader, optimizer, epoch)
- print('using time:', time.time() - start_time)
- logger.info('using time: {}'.format(time.time() - start_time))
-
- # evaluation on natural examples
- eval_train(model_rob, train_loader)
- eval_train(model_ema.model, train_loader)
-
- # eval_test(model, test_loader)
- eval_adv_test_whitebox(model_rob, test_loader)
- eval_adv_test_whitebox(model_ema.model, test_loader)
-
- print('================================================================')
- logger.info('================================================================')
-
-
- # save checkpoint
- torch.save(model_rob.state_dict(), os.path.join(model_dir, 'model-20.pth'))
- torch.save(model_ema.model.state_dict(), os.path.join(model_dir, 'model-ema-20.pth'))
-
- # eval_apgd(model_rob, test_loader)
- eval_apgd(model_ema.model, test_loader)
-
-
-
-
- if __name__ == '__main__':
- main()
|