|
- import cv2
- import numpy as np
- import torch
- from torch import nn
- import utils.distributed_utils as utils
- import torch.nn.functional as F
- from torch.autograd import Variable
-
-
- class FocalLoss(nn.Module):
- def __init__(self, alpha, gamma=0., size_average=True):
- super(FocalLoss, self).__init__()
- self.gamma = gamma
- self.alpha = alpha
- self.size_average = size_average
-
- def forward(self, y_pred, y_true):
-
- logpt = F.log_softmax(y_pred)
-
- y_true = torch.argmax(y_true, dim=-1)
- y_true = torch.unsqueeze(y_true, dim=-1)
- logpt = logpt.gather(1, y_true)
- pt = Variable(logpt.data.exp())
- # y_true的第1维是通道维度,也即预测是哪一类的标签。gather取axis=1的索引,相当于y_true的值替换了列索引
- # 为了取到具体的值,还需要行索引,根据gather的算法,y_true中的每个值本身的位置,提供行索引。如果dim=0
- # 则本身的值替换行索引,本身的值所在的位置,提供列索引
- # loss = - self.alpha * (1 - pt) ** self.gamma * target * torch.log(pt) - (1 - alpha) * pt ** self.gamma * (1 - target) * torch.log(1 - pt)
-
- loss = -1 * (1 - pt) ** self.gamma * logpt
-
- if self.size_average:
- return loss.mean()
- else:
- return loss.sum()
-
-
- def dice_loss(y_pred, y_true, eps=1e-7):
- """Computes the Sørensen–Dice loss.
- Note that PyTorch optimizers minimize a loss. In this
- case, we would like to maximize the dice loss so we
- return the negated dice loss.
- Args:
- y_true: a tensor of shape [B*H*W, C].
- y_pred: a tensor of shape [B*H*W, C]. Corresponds to
- the raw output or logits of the model.
- eps: added to the denominator for numerical stability.
- Returns:
- dice_loss: the Sørensen–Dice loss.
- """
- prob = F.softmax(y_pred, dim=1)
- y_true = y_true.type(y_pred.type())
- # 分开前景,背景求和是为了减轻类别不平衡的情况
- intersection = torch.sum(prob * y_true, dim=0)
- cardinality = torch.sum(prob + y_true, dim=0)
- loss = (2. * intersection / (cardinality + eps)).mean()
-
- return 1 - loss
-
-
- def hybrid_loss(y_pred, y_true):
- """Calculating the loss"""
-
- num_classes = y_pred.shape[1]
-
- # gamma=0, alpha=None --> CE
- # focal = FocalLoss(alpha=[1] * num_classes, gamma=2.0)
- focal = FocalLoss(gamma=2.0, alpha=0.25)
-
- y_true = y_true.permute(0, 2, 3, 1)
- y_pred = y_pred.permute(0, 2, 3, 1)
-
- y_pred = y_pred.reshape(-1, num_classes)
- y_true = y_true.reshape(-1, )
-
- valid_index = y_true < 255
-
- y_pred = y_pred[valid_index]
- y_true = y_true[valid_index]
-
- # one hot
- y_true = (torch.arange(num_classes).cuda() == torch.unsqueeze(y_true, dim=1)).long()
-
- bce = focal(y_pred, y_true)
- dice = dice_loss(y_pred, y_true)
- loss = bce + dice
-
- return loss
-
-
- def hybrid_loss2(y_pred, y_true):
- """Calculating the loss"""
-
- num_classes = y_pred.shape[1]
-
- # gamma=0, alpha=None --> CE
- # focal = FocalLoss(alpha=[1] * num_classes, gamma=2.0)
- focal = FocalLoss(gamma=2.0, alpha=0.1)
-
- y_true = y_true.permute(0, 2, 3, 1)
- y_pred = y_pred.permute(0, 2, 3, 1)
-
- y_pred = y_pred.reshape(-1, num_classes)
- y_true = y_true.reshape(-1, )
-
- valid_index = y_true < 255
-
- y_pred = y_pred[valid_index]
- y_true = y_true[valid_index]
-
- # one hot
- y_true = (torch.arange(num_classes).cuda() == torch.unsqueeze(y_true, dim=1)).long()
-
- bce = focal(y_pred, y_true)
- dice = dice_loss(y_pred, y_true)
- loss = bce + dice
-
- return loss
-
-
- def criterion2(distance, label):
- label[label == 1] = -1 # change
- label[label == 0] = 1 # no change
- margin = 2.0
- pos_num = torch.sum((label == 1).float()) + 0.0001
- neg_num = torch.sum((label == -1).float()) + 0.0001
-
- loss_1 = torch.sum((1 + label) / 2 * torch.pow(distance, 2)) / pos_num
- loss_2 = torch.sum((1 - label) / 2 *
- torch.pow(torch.clamp(margin - distance, min=0.0), 2)
- ) / neg_num
- loss = loss_1 + loss_2
- return loss
-
-
- def evaluate(model, data_loader, pre_mask1, pre_mask2, device, num_classes):
- return_mask1 = []
- return_mask2 = []
- model.eval()
- confmat = utils.ConfusionMatrix(num_classes)
- metric_logger = utils.MetricLogger(delimiter=" ")
- header = 'Test:'
- i = 0
- with torch.no_grad():
- for image, target, b_images1_mask, b_images2_mask in metric_logger.log_every(data_loader,
- 100, header):
- image, target, b_images1_mask, b_images2_mask = image.to(device), target.to(
- device), b_images1_mask.to(
- device), b_images2_mask.to(device)
- b, h, w = b_images1_mask.shape
- b1 = []
- b2 = []
- for data in pre_mask1[i * b: i * b + b]:
- data = (np.expand_dims(data, axis=0))
- b1.append(data)
- b1 = np.array(b1, dtype=np.int32)
- # b1 = np.transpose(b1, (0, 1, 3, 2))
- b1 = torch.from_numpy(b1)
- b1 = b1.to(device, dtype=torch.float32)
-
- for data2 in pre_mask2[i * b: i * b + b]:
- data2 = (np.expand_dims(data2, axis=0))
- b2.append(data2)
- b2 = np.array(b2, dtype=np.int32)
- # b2 = np.transpose(b2, (0, 1, 3, 2))
- b2 = torch.from_numpy(b2)
- b2 = b2.to(device, dtype=torch.float32)
- i = i + 1
-
- pre_mask = torch.cat([b1, b2], 1)
- all_image = torch.cat([image, pre_mask], 1)
- output1, output2, output3, output4, b_images1_loss, b_images2_loss = model(all_image)
-
- # b_images1_loss = torch.squeeze(b_images1_loss)
- b_images1_loss = torch.argmax(b_images1_loss, dim=1)
-
- # b_images2_loss = torch.squeeze(b_images2_loss)
- b_images2_loss = torch.argmax(b_images2_loss, dim=1)
-
- for b1_data in b_images1_loss:
- return_mask1.append(b1_data.cpu())
- for b2_data in b_images2_loss:
- return_mask2.append(b2_data.cpu())
- # return_mask1.append(b_images1_loss.cpu())
- # return_mask2.append(b_images2_loss.cpu())
- output4 = torch.squeeze(output4)
- output4 = torch.argmax(output4, dim=0)
- confmat.update(target.flatten(), output4.flatten())
- confmat.reduce_from_all_processes()
-
- return confmat, return_mask1, return_mask2
-
-
- def train_one_epoch(model, optimizer, data_loader, device, epoch, pre_mask1, pre_mask2, num_classes,
- lr_scheduler, print_freq=10, scaler=None):
- return_mask1 = []
- return_mask2 = []
- model.train()
- metric_logger = utils.MetricLogger(delimiter=" ")
- metric_logger.add_meter('lr', utils.SmoothedValue(window_size=1, fmt='{value:.6f}'))
- header = 'Epoch: [{}]'.format(epoch)
- i = 0
- for image, target, b_images1_mask, b_images2_mask in metric_logger.log_every(data_loader, print_freq, header):
- image, target, b_images1_mask, b_images2_mask = image.to(device), target.to(device), b_images1_mask.to(
- device), b_images2_mask.to(device)
- b, h, w = b_images1_mask.shape
- with torch.cuda.amp.autocast(enabled=scaler is not None):
- b1 = []
- b2 = []
- for data in pre_mask1[i * b: i * b + b]:
- data = (np.expand_dims(data, axis=0))
- b1.append(data)
- b1 = np.array(b1, dtype=np.int32)
- # b1 = np.transpose(b1, (0, 1, 3, 2))
- b1 = torch.from_numpy(b1)
- b1 = b1.to(device, dtype=torch.float32)
-
- for data2 in pre_mask2[i * b: i * b + b]:
- data2 = (np.expand_dims(data2, axis=0))
- b2.append(data2)
-
- b2 = np.array(b2, dtype=np.int32)
- # b2 = np.transpose(b2, (0, 1, 3, 2))
- b2 = torch.from_numpy(b2)
- b2 = b2.to(device, dtype=torch.float32)
- i = i + 1
- pre_mask = torch.cat([b1, b2], 1)
-
- all_image = torch.cat([image, pre_mask], 1)
- target = torch.unsqueeze(target, 1)
- b_images1_mask = torch.unsqueeze(b_images1_mask, 1)
- b_images2_mask = torch.unsqueeze(b_images2_mask, 1)
-
- # output1, output2, output3, output4, b_images1_loss, b_images2_loss = model(image, pre_mask.float())
- output1, output2, output3, output4, b_images1_loss, b_images2_loss = model(all_image)
-
- diff_loss1 = hybrid_loss(output1, target)
- diff_loss2 = hybrid_loss(output2, target)
- diff_loss3 = hybrid_loss(output3, target)
- diff_loss4 = hybrid_loss(output4, target)
- loss2 = hybrid_loss2(b_images1_loss, b_images1_mask)
- loss3 = hybrid_loss2(b_images2_loss, b_images2_mask)
- loss = diff_loss1 + diff_loss2 + diff_loss3 + diff_loss4 + loss2 + loss3
- # loss = diff_loss2 + diff_loss3 + diff_loss4 + loss2 + loss3
-
- # b_images1_loss = torch.squeeze(b_images1_loss)
- b_images1_loss = torch.argmax(b_images1_loss, dim=1)
-
- # b_images2_loss = torch.squeeze(b_images2_loss)
- b_images2_loss = torch.argmax(b_images2_loss, dim=1)
-
- for b1_data in b_images1_loss:
- return_mask1.append(b1_data.cpu())
- for b2_data in b_images2_loss:
- return_mask2.append(b2_data.cpu())
- # return_mask1.append(b_images1_loss.cpu())
- # return_mask2.append(b_images2_loss.cpu())
- optimizer.zero_grad()
- if scaler is not None:
- scaler.scale(loss).backward()
- scaler.step(optimizer)
- scaler.update()
- else:
- loss.backward()
- optimizer.step()
-
- lr_scheduler.step()
-
- lr = optimizer.param_groups[0]["lr"]
- metric_logger.update(loss=loss.item(), lr=lr)
-
- return metric_logger.meters["loss"].global_avg, lr, return_mask1, return_mask2
-
-
- def create_lr_scheduler(optimizer,
- num_step: int,
- epochs: int,
- warmup=True,
- warmup_epochs=1,
- warmup_factor=1e-3):
- assert num_step > 0 and epochs > 0
- if warmup is False:
- warmup_epochs = 0
-
- def f(x):
- """
- 根据step数返回一个学习率倍率因子,
- 注意在训练开始之前,pytorch会提前调用一次lr_scheduler.step()方法
- """
- if warmup is True and x <= (warmup_epochs * num_step):
- alpha = float(x) / (warmup_epochs * num_step)
- # warmup过程中lr倍率因子从warmup_factor -> 1
- return warmup_factor * (1 - alpha) + alpha
- else:
- # warmup后lr倍率因子从1 -> 0
- # 参考deeplab_v2: Learning rate policy
- return (1 - (x - warmup_epochs * num_step) / ((epochs - warmup_epochs) * num_step)) ** 0.9
-
- return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=f)
|