|
- # -*- coding: utf-8 -*-
- """
- @author: huangxs
- @License: (C)Copyright 2021, huangxs
- @CreateTime: 2021/12/12 16:40:47
- @Filename: utils
- service api views
- """
- """coco eval for lodnet"""
- import os
- import mmcv
- import json
- import cv2
- import numpy as np
- from pycocotools.coco import COCO
- from pycocotools.cocoeval import COCOeval
- from pycocotools import mask as maskUtils
-
- from .model_utils.config import config
-
- _init_value = np.array(0.0)
- summary_init = {
- 'Precision/mAP': _init_value,
- 'Precision/mAP@.50IOU': _init_value,
- 'Precision/mAP@.75IOU': _init_value,
- 'Precision/mAP (small)': _init_value,
- 'Precision/mAP (medium)': _init_value,
- 'Precision/mAP (large)': _init_value,
- 'Recall/AR@1': _init_value,
- 'Recall/AR@10': _init_value,
- 'Recall/AR@100': _init_value,
- 'Recall/AR@100 (small)': _init_value,
- 'Recall/AR@100 (medium)': _init_value,
- 'Recall/AR@100 (large)': _init_value,
- }
-
-
- def coco_eval(result_files, result_types, coco, max_dets=(100, 300, 1000), single_result=False):
- """coco eval for maskrcnn"""
- anns = json.load(open(result_files['bbox']))
- if not anns:
- print('no result anns:', summary_init)
- return summary_init
- if isinstance(coco, str):
- coco = COCO(coco)
- assert isinstance(coco, COCO)
-
- all_summary_metrics = {}
- for res_type in result_types:
- result_file = result_files[res_type]
- assert result_file.endswith('.json')
-
- coco_dets = coco.loadRes(result_file)
- gt_img_ids = coco.getImgIds()
- det_img_ids = coco_dets.getImgIds()
- iou_type = 'bbox' if res_type == 'proposal' else res_type
- cocoEval = COCOeval(coco, coco_dets, iou_type)
- if res_type == 'proposal':
- cocoEval.params.useCats = 0
- cocoEval.params.maxDets = list(max_dets)
-
- tgt_ids = gt_img_ids if not single_result else det_img_ids
-
- if single_result:
- res_dict = dict()
- for id_i in tgt_ids:
- cocoEval = COCOeval(coco, coco_dets, iou_type)
- if res_type == 'proposal':
- cocoEval.params.useCats = 0
- cocoEval.params.maxDets = list(max_dets)
-
- cocoEval.params.imgIds = [id_i]
- cocoEval.evaluate()
- cocoEval.accumulate()
- cocoEval.summarize()
- res_dict.update({coco.imgs[id_i]['file_name']: cocoEval.stats[1]})
-
- cocoEval = COCOeval(coco, coco_dets, iou_type)
- if res_type == 'proposal':
- cocoEval.params.useCats = 0
- cocoEval.params.maxDets = list(max_dets)
-
- cocoEval.params.imgIds = tgt_ids
- cocoEval.evaluate()
- cocoEval.accumulate()
- cocoEval.summarize()
-
- summary_metrics = {
- 'Precision/mAP': cocoEval.stats[0],
- 'Precision/mAP@.50IOU': cocoEval.stats[1],
- 'Precision/mAP@.75IOU': cocoEval.stats[2],
- 'Precision/mAP (small)': cocoEval.stats[3],
- 'Precision/mAP (medium)': cocoEval.stats[4],
- 'Precision/mAP (large)': cocoEval.stats[5],
- 'Recall/AR@1': cocoEval.stats[6],
- 'Recall/AR@10': cocoEval.stats[7],
- 'Recall/AR@100': cocoEval.stats[8],
- 'Recall/AR@100 (small)': cocoEval.stats[9],
- 'Recall/AR@100 (medium)': cocoEval.stats[10],
- 'Recall/AR@100 (large)': cocoEval.stats[11],
- }
- print('result :', res_type)
- print(summary_metrics)
- all_summary_metrics[res_type] = summary_metrics
-
- return all_summary_metrics
-
-
- def xyxy2xywh(bbox):
- _bbox = bbox.tolist()
- return [
- _bbox[0],
- _bbox[1],
- _bbox[2] - _bbox[0] + 1,
- _bbox[3] - _bbox[1] + 1,
- ]
-
-
- def bbox2result_1image(bboxes, labels, num_classes):
- """Convert detection results to a list of numpy arrays.
-
- Args:
- bboxes (Tensor): shape (n, 5)
- labels (Tensor): shape (n, )
- num_classes (int): class number, including background class
-
- Returns:
- list(ndarray): bbox results of each class
- """
- if bboxes.shape[0] == 0:
- result = [np.zeros((0, 5), dtype=np.float32) for i in range(num_classes - 1)]
- else:
- result = [bboxes[labels == i, :] for i in range(num_classes - 1)]
-
- return result
-
-
- def proposal2json(dataset, results):
- """convert proposal to json mode"""
- img_ids = dataset.getImgIds()
- json_results = []
- dataset_len = dataset.get_dataset_size() * 2
- for idx in range(dataset_len):
- img_id = img_ids[idx]
- bboxes = results[idx]
- for i in range(bboxes.shape[0]):
- data = dict()
- data['image_id'] = img_id
- data['bbox'] = xyxy2xywh(bboxes[i])
- data['score'] = float(bboxes[i][4])
- data['category_id'] = 1
- json_results.append(data)
- return json_results
-
-
- def det2json(dataset, results):
- """convert det to json mode"""
- cat_ids = dataset.getCatIds()
- img_ids = dataset.getImgIds()
- json_results = []
- dataset_len = len(img_ids)
- for idx in range(dataset_len):
- img_id = img_ids[idx]
- if idx == len(results): break
- result = results[idx]
- for label, result_label in enumerate(result):
- bboxes = result_label
- for i in range(bboxes.shape[0]):
- data = dict()
- data['image_id'] = img_id
- data['bbox'] = xyxy2xywh(bboxes[i])
- data['score'] = float(bboxes[i][4])
- data['category_id'] = cat_ids[label]
- json_results.append(data)
- return json_results
-
-
- def segm2json(dataset, results):
- """convert segm to json mode"""
- cat_ids = dataset.getCatIds()
- img_ids = dataset.getImgIds()
- bbox_json_results = []
- segm_json_results = []
-
- dataset_len = len(img_ids)
- assert dataset_len == len(results)
- for idx in range(dataset_len):
- img_id = img_ids[idx]
- if idx == len(results): break
- det, seg = results[idx]
- for label, det_label in enumerate(det):
- bboxes = det_label
- for i in range(bboxes.shape[0]):
- try:
- data = dict()
- data['image_id'] = img_id
- data['bbox'] = xyxy2xywh(bboxes[i])
- data['score'] = float(bboxes[i][4])
- data['category_id'] = cat_ids[label]
- bbox_json_results.append(data)
- except Exception as e:
- print(e)
-
- if len(seg) == 2:
- segms = seg[0][label]
- mask_score = seg[1][label]
- else:
- segms = seg[label]
- mask_score = [bbox[4] for bbox in bboxes]
- for i in range(bboxes.shape[0]):
- data = dict()
- data['image_id'] = img_id
- data['score'] = float(mask_score[i])
- data['category_id'] = cat_ids[label]
- segms[i]['counts'] = segms[i]['counts'].decode()
- data['segmentation'] = segms[i]
- segm_json_results.append(data)
- return bbox_json_results, segm_json_results
-
-
- def results2json(dataset, results, out_file):
- """convert result convert to json mode"""
- result_files = dict()
- if isinstance(results[0], list):
- json_results = det2json(dataset, results)
- result_files['bbox'] = '{}.{}.json'.format(out_file, 'bbox')
- result_files['proposal'] = '{}.{}.json'.format(out_file, 'bbox')
- with open(result_files['bbox'], 'w') as fp:
- json.dump(json_results, fp)
- elif isinstance(results[0], tuple):
- json_results = segm2json(dataset, results)
- result_files['bbox'] = '{}.{}.json'.format(out_file, 'bbox')
- result_files['segm'] = '{}.{}.json'.format(out_file, 'segm')
- with open(result_files['bbox'], 'w') as fp:
- json.dump(json_results[0], fp)
- with open(result_files['segm'], 'w') as fp:
- json.dump(json_results[1], fp)
- elif isinstance(results[0], np.ndarray):
- json_results = proposal2json(dataset, results)
- result_files['proposal'] = '{}.{}.json'.format(out_file, 'proposal')
- with open(result_files['proposal'], 'w') as fp:
- json.dump(json_results, fp)
- else:
- raise TypeError('invalid type of results')
- return result_files
-
-
- def get_seg_masks(mask_pred, det_bboxes, det_labels, img_meta, rescale, num_classes):
- """Get segmentation masks from mask_pred and bboxes"""
- mask_pred = mask_pred.astype(np.float32)
-
- cls_segms = [[] for _ in range(num_classes - 1)]
- bboxes = det_bboxes[:, :4]
- labels = det_labels + 1
-
- ori_shape = img_meta[:2].astype(np.int32)
- scale_factor = img_meta[2:].astype(np.int32)
-
- if rescale:
- img_h, img_w = ori_shape[:2]
- else:
- img_h = np.round(ori_shape[0] * scale_factor[0]).astype(np.int32)
- img_w = np.round(ori_shape[1] * scale_factor[1]).astype(np.int32)
-
- for i in range(bboxes.shape[0]):
- bbox = (bboxes[i, :] / 1.0).astype(np.int32)
- label = labels[i]
- w = max(bbox[2] - bbox[0] + 1, 1)
- h = max(bbox[3] - bbox[1] + 1, 1)
- w = min(w, img_w - bbox[0])
- h = min(h, img_h - bbox[1])
- if w <= 0 or h <= 0:
- print("there is invalid proposal bbox, index={} bbox={} w={} h={}".format(i, bbox, w, h))
- w = max(w, 1)
- h = max(h, 1)
- mask_pred_ = mask_pred[i, :, :]
- im_mask = np.zeros((img_h, img_w), dtype=np.uint8)
- bbox_mask = cv2.resize(mask_pred_, (w, h), interpolation=cv2.INTER_LINEAR)
- bbox_mask = (bbox_mask > config.mask_thr_binary).astype(np.uint8)
- im_mask[bbox[1]:bbox[1] + h, bbox[0]:bbox[0] + w] = bbox_mask
-
- rle = maskUtils.encode(
- np.array(im_mask[:, :, np.newaxis], order='F'))[0]
- cls_segms[label - 1].append(rle)
-
- return cls_segms
-
-
- def intersect_and_union(pred_label,
- label,
- num_classes,
- ignore_index,
- label_map=dict(),
- reduce_zero_label=False):
- """Calculate intersection and Union.
- Args:
- pred_label (ndarray): Prediction segmentation map.
- label (ndarray): Ground truth segmentation map.
- num_classes (int): Number of categories.
- ignore_index (int): Index that will be ignored in evaluation.
- label_map (dict): Mapping old labels to new labels. The parameter will
- work only when label is str. Default: dict().
- reduce_zero_label (bool): Wether ignore zero label. The parameter will
- work only when label is str. Default: False.
- Returns:
- ndarray: The intersection of prediction and ground truth histogram
- on all classes.
- ndarray: The union of prediction and ground truth histogram on all
- classes.
- ndarray: The prediction histogram on all classes.
- ndarray: The ground truth histogram on all classes.
- """
-
- if isinstance(pred_label, str):
- pred_label = np.load(pred_label)
-
- if isinstance(label, str):
- label = mmcv.imread(label, flag='unchanged', backend='pillow')
- # modify if custom classes
- if label_map is not None:
- for old_id, new_id in label_map.items():
- label[label == old_id] = new_id
- if reduce_zero_label:
- # avoid using underflow conversion
- label[label == 0] = 255
- label = label - 1
- label[label == 254] = 255
-
- mask = (label != ignore_index)
- pred_label = pred_label[mask]
- label = label[mask]
-
- intersect = pred_label[pred_label == label]
- area_intersect, _ = np.histogram(intersect, bins=np.arange(num_classes + 1))
- area_pred_label, _ = np.histogram(
- pred_label, bins=np.arange(num_classes + 1))
- area_label, _ = np.histogram(label, bins=np.arange(num_classes + 1))
- area_union = area_pred_label + area_label - area_intersect
-
- return area_intersect, area_union, area_pred_label, area_label
-
-
- def total_intersect_and_union(results,
- gt_seg_maps,
- num_classes,
- ignore_index,
- label_map=dict(),
- reduce_zero_label=False):
- """Calculate Total Intersection and Union.
- Args:
- results (list[ndarray]): List of prediction segmentation maps.
- gt_seg_maps (list[ndarray]): list of ground truth segmentation maps.
- num_classes (int): Number of categories.
- ignore_index (int): Index that will be ignored in evaluation.
- label_map (dict): Mapping old labels to new labels. Default: dict().
- reduce_zero_label (bool): Wether ignore zero label. Default: False.
- Returns:
- ndarray: The intersection of prediction and ground truth histogram
- on all classes.
- ndarray: The union of prediction and ground truth histogram on all
- classes.
- ndarray: The prediction histogram on all classes.
- ndarray: The ground truth histogram on all classes.
- """
-
- num_imgs = len(results)
- assert len(gt_seg_maps) == num_imgs
- total_area_intersect = np.zeros((num_classes,), dtype=float)
- total_area_union = np.zeros((num_classes,), dtype=float)
- total_area_pred_label = np.zeros((num_classes,), dtype=float)
- total_area_label = np.zeros((num_classes,), dtype=float)
- for i in range(num_imgs):
- area_intersect, area_union, area_pred_label, area_label = \
- intersect_and_union(results[i], gt_seg_maps[i], num_classes,
- ignore_index, label_map, reduce_zero_label)
-
- # if area_intersect[0] == 0:
- # continue
- # print(area_intersect)
- # print(area_intersect / area_union)
-
- total_area_intersect += area_intersect
- total_area_union += area_union
- total_area_pred_label += area_pred_label
- total_area_label += area_label
-
- return total_area_intersect, total_area_union, total_area_pred_label, total_area_label
-
-
- def mean_iou(results,
- gt_seg_maps,
- num_classes,
- ignore_index,
- nan_to_num=None,
- label_map=dict(),
- reduce_zero_label=False):
- """Calculate Mean Intersection and Union (mIoU)
- Args:
- results (list[ndarray]): List of prediction segmentation maps.
- gt_seg_maps (list[ndarray]): list of ground truth segmentation maps.
- num_classes (int): Number of categories.
- ignore_index (int): Index that will be ignored in evaluation.
- nan_to_num (int, optional): If specified, NaN values will be replaced
- by the numbers defined by the user. Default: None.
- label_map (dict): Mapping old labels to new labels. Default: dict().
- reduce_zero_label (bool): Whether ignore zero label. Default: False.
- Returns:
- float: Overall accuracy on all images.
- ndarray: Per category accuracy, shape (num_classes, ).
- ndarray: Per category IoU, shape (num_classes, ).
- """
-
- all_acc, acc, iou = eval_metrics(
- results=results,
- gt_seg_maps=gt_seg_maps,
- num_classes=num_classes,
- ignore_index=ignore_index,
- metrics=['mIoU'],
- nan_to_num=nan_to_num,
- label_map=label_map,
- reduce_zero_label=reduce_zero_label)
- return all_acc, acc, iou
-
-
- def mean_dice(results,
- gt_seg_maps,
- num_classes,
- ignore_index,
- nan_to_num=None,
- label_map=dict(),
- reduce_zero_label=False):
- """Calculate Mean Dice (mDice)
- Args:
- results (list[ndarray]): List of prediction segmentation maps.
- gt_seg_maps (list[ndarray]): list of ground truth segmentation maps.
- num_classes (int): Number of categories.
- ignore_index (int): Index that will be ignored in evaluation.
- nan_to_num (int, optional): If specified, NaN values will be replaced
- by the numbers defined by the user. Default: None.
- label_map (dict): Mapping old labels to new labels. Default: dict().
- reduce_zero_label (bool): Wether ignore zero label. Default: False.
- Returns:
- float: Overall accuracy on all images.
- ndarray: Per category accuracy, shape (num_classes, ).
- ndarray: Per category dice, shape (num_classes, ).
- """
-
- all_acc, acc, dice = eval_metrics(
- results=results,
- gt_seg_maps=gt_seg_maps,
- num_classes=num_classes,
- ignore_index=ignore_index,
- metrics=['mDice'],
- nan_to_num=nan_to_num,
- label_map=label_map,
- reduce_zero_label=reduce_zero_label)
- return all_acc, acc, dice
-
-
- def eval_metrics(results,
- gt_seg_maps,
- num_classes,
- ignore_index,
- metrics=['mIoU'],
- nan_to_num=None,
- label_map=dict(),
- reduce_zero_label=False):
- """Calculate evaluation metrics
- Args:
- results (list[ndarray]): List of prediction segmentation maps.
- gt_seg_maps (list[ndarray]): list of ground truth segmentation maps.
- num_classes (int): Number of categories.
- ignore_index (int): Index that will be ignored in evaluation.
- metrics (list[str] | str): Metrics to be evaluated, 'mIoU' and 'mDice'.
- nan_to_num (int, optional): If specified, NaN values will be replaced
- by the numbers defined by the user. Default: None.
- label_map (dict): Mapping old labels to new labels. Default: dict().
- reduce_zero_label (bool): Wether ignore zero label. Default: False.
- Returns:
- float: Overall accuracy on all images.
- ndarray: Per category accuracy, shape (num_classes, ).
- ndarray: Per category evalution metrics, shape (num_classes, ).
- """
-
- if isinstance(metrics, str):
- metrics = [metrics]
- allowed_metrics = ['mIoU', 'mDice']
- if not set(metrics).issubset(set(allowed_metrics)):
- raise KeyError('metrics {} is not supported'.format(metrics))
- total_area_intersect, total_area_union, total_area_pred_label, total_area_label = total_intersect_and_union(results,
- gt_seg_maps,
- num_classes,
- ignore_index,
- label_map,
- reduce_zero_label)
-
- # print(total_area_intersect, total_area_union, total_area_pred_label, total_area_label)
-
- all_acc = total_area_intersect.sum() / total_area_label.sum()
- acc = total_area_intersect / total_area_label
- ret_metrics = [all_acc, acc]
- for metric in metrics:
- if metric == 'mIoU':
- iou = total_area_intersect / total_area_union
- ret_metrics.append(iou)
- elif metric == 'mDice':
- dice = 2 * total_area_intersect / (
- total_area_pred_label + total_area_label)
- ret_metrics.append(dice)
- if nan_to_num is not None:
- ret_metrics = [
- np.nan_to_num(metric, nan=nan_to_num) for metric in ret_metrics
- ]
- return ret_metrics
-
-
- def eval_dict_iou(result_files, dataset_coco, coco_root, print_log):
- print('start eval')
- seg_path = result_files['segm']
- f = open(seg_path)
- seg_result = json.loads(f.read())
- f.close()
-
- # 结果合并
- pred_seg = {}
- for i, seg in enumerate(seg_result):
- if seg['image_id'] not in pred_seg.keys():
- pred_seg[seg['image_id']] = maskUtils.decode(seg['segmentation'])
- else:
- seg_np = maskUtils.decode(seg['segmentation'])
- pred_seg[seg['image_id']] = pred_seg[seg['image_id']] + seg_np
-
- # 生成测试的gt/mask
- gt_masks = []
- pred_masks = []
- for image_id in dataset_coco.getImgIds():
- gt_path = os.path.join(coco_root, 'masks', dataset_coco.imgs[image_id]['file_name'])
- # print(gt_path)
- gt_mask = cv2.imread(gt_path, cv2.IMREAD_GRAYSCALE)
- pred_mask = pred_seg[image_id] if image_id in pred_seg.keys() else np.zeros(gt_mask.shape)
-
- # 转换pred的二值化
- pred_mask[pred_mask > 0] = 255.
-
- gt_mask = (gt_mask / 255.).astype(int)
- pred_mask = (pred_mask / 255.).astype(int)
-
- gt_masks.append(gt_mask)
- pred_masks.append(pred_mask)
-
- all_acc, acc, iou = mean_iou(pred_masks, gt_masks, 1, 0)
- all_acc_dice, acc_dice, dice = mean_dice(pred_masks, gt_masks, 1, 0)
-
- print_log("all_acc_dice={}, acc_dice={}, dice={} ".format(all_acc_dice, acc_dice, dice))
- print_log("all_acc={}, acc={}, iou={} ".format(all_acc, acc, iou))
-
- return {'mDice': float(dice), 'mIou': float(iou)}
|