|
- """
- @file 01_test.py
- @brief Script for test
-
- """
-
- ########################################################################
- # import default python-library
- ########################################################################
- import os
- import glob
- import csv
- import re
- import itertools
- import sys
- ########################################################################
-
-
- ########################################################################
- # import additional python-library
- ########################################################################
- import numpy
- import librosa
- import librosa.core
- import librosa.feature
- import matplotlib.image as img
- from sklearn.externals.joblib import load, dump
- # from import
- from tqdm import tqdm
- from sklearn import metrics
- # original lib
- import common as com
- import keras_model as keras_model
- ########################################################################
-
-
- ########################################################################
- # load parameter.yaml
- ########################################################################
- param = com.yaml_load()
- #######################################################################
-
-
- ########################################################################
- # def
- ########################################################################
- def save_csv(save_file_path,
- save_data):
- with open(save_file_path, "w", newline="") as f:
- writer = csv.writer(f, lineterminator='\n')
- writer.writerows(save_data)
-
-
- def get_machine_id_list_for_test(target_dir,
- dir_name="test",
- ext="wav"):
- """
- target_dir : str
- base directory path of "dev_data" or "eval_data"
- test_dir_name : str (default="test")
- directory containing test data
- ext : str (default="wav)
- file extension of audio files
-
- return :
- machine_id_list : list [ str ]
- list of machine IDs extracted from the names of test files
- """
- # create test files
- dir_path = os.path.abspath("{dir}/{dir_name}/*.{ext}".format(dir=target_dir, dir_name=dir_name, ext=ext))
- file_paths = sorted(glob.glob(dir_path))
- # extract id
- machine_id_list = sorted(list(set(itertools.chain.from_iterable(
- [re.findall('id_[0-9][0-9]', ext_id) for ext_id in file_paths]))))
- return machine_id_list
-
-
- def test_file_list_generator(target_dir,
- id_name,
- dir_name="test",
- prefix_normal="normal",
- prefix_anomaly="anomaly",
- ext="wav"):
- """
- target_dir : str
- base directory path of the dev_data or eval_data
- id_name : str
- id of wav file in <<test_dir_name>> directory
- dir_name : str (default="test")
- directory containing test data
- prefix_normal : str (default="normal")
- normal directory name
- prefix_anomaly : str (default="anomaly")
- anomaly directory name
- ext : str (default="wav")
- file extension of audio files
-
- return :
- if the mode is "development":
- test_files : list [ str ]
- file list for test
- test_labels : list [ boolean ]
- label info. list for test
- * normal/anomaly = 0/1
- if the mode is "evaluation":
- test_files : list [ str ]
- file list for test
- """
- com.logger.info("target_dir : {}".format(target_dir+"_"+id_name))
-
- # development
- if mode:
- normal_files = sorted(
- glob.glob("{dir}/{dir_name}/{prefix_normal}_{id_name}*.{ext}".format(dir=target_dir,
- dir_name=dir_name,
- prefix_normal=prefix_normal,
- id_name=id_name,
- ext=ext)))
- normal_labels = numpy.zeros(len(normal_files))
- anomaly_files = sorted(
- glob.glob("{dir}/{dir_name}/{prefix_anomaly}_{id_name}*.{ext}".format(dir=target_dir,
- dir_name=dir_name,
- prefix_anomaly=prefix_anomaly,
- id_name=id_name,
- ext=ext)))
- anomaly_labels = numpy.ones(len(anomaly_files))
- files = numpy.concatenate((normal_files, anomaly_files), axis=0)
- labels = numpy.concatenate((normal_labels, anomaly_labels), axis=0)
- com.logger.info("test_file num : {num}".format(num=len(files)))
- if len(files) == 0:
- com.logger.exception("no_wav_file!!")
- print("\n========================================")
-
- # evaluation
- else:
- files = sorted(
- glob.glob("{dir}/{dir_name}/*{id_name}*.{ext}".format(dir=target_dir,
- dir_name=dir_name,
- id_name=id_name,
- ext=ext)))
- labels = None
- com.logger.info("test_file num : {num}".format(num=len(files)))
- if len(files) == 0:
- com.logger.exception("no_wav_file!!")
- print("\n=========================================")
-
- return files, labels
- ########################################################################
-
-
- ########################################################################
- # main 01_test.py
- ########################################################################
- if __name__ == "__main__":
- # check mode
- # "development": mode == True
- # "evaluation": mode == False
- mode, target = com.command_line_chk()
- if mode is None:
- sys.exit(-1)
-
- # make output result directory
- os.makedirs(param["result_directory"], exist_ok=True)
-
- # load base directory
- dirs = com.select_dirs(param=param, mode=mode, target=target)
-
- # initialize lines in csv for AUC and pAUC
- csv_lines = []
-
- # loop of the base directory (machine type)
- for idx, target_dir in enumerate(dirs):
- print("\n===========================")
- print("[{idx}/{total}] {dirname}".format(dirname=target_dir, idx=idx+1, total=len(dirs)))
- machine_type = os.path.split(target_dir)[1]
-
- print("============== MODEL LOAD ==============")
- # set model path
- model_file = "{model}/model_{machine_type}.hdf5".format(model=param["model_directory"],
- machine_type=machine_type)
-
- features_file_path = "{features}/{machine_type}/{tip}".format(features=param["features_directory"],
- machine_type=machine_type, tip="test")
- features_dir_path = os.path.abspath(features_file_path)
-
- #load scaler
- scaler_file_path = "{scalers}/{machine_type}".format(scalers=param["scalers_directory"], machine_type=machine_type)
- scaler_file_path = os.path.abspath(scaler_file_path)
- scaler = load(scaler_file_path+"/scaler_{machine_type}.bin".format(machine_type=machine_type))
-
-
- # load model file
- if not os.path.exists(model_file):
- com.logger.error("{} model not found ".format(machine_type))
- #sys.exit(-1)
- continue
- model = keras_model.load_model(model_file)
- model.summary()
-
-
- if mode:
- # results by type
- csv_lines.append([machine_type])
- csv_lines.append(["id", "AUC", "pAUC"])
- performance = []
-
- machine_id_list = get_machine_id_list_for_test(target_dir)
-
- # loop of the machine type directory (machine id)
- for id_str in machine_id_list:
- # load test file
- test_files, y_true = test_file_list_generator(target_dir, id_str)
-
- # setup anomaly score file path
- anomaly_score_csv = "{result}/anomaly_score_{machine_type}_{id_str}.csv".format(
- result=param["result_directory"],
- machine_type=machine_type,
- id_str=id_str)
- anomaly_score_list = []
-
- print("\n============== BEGIN TEST FOR A MACHINE ID ==============")
- y_pred = [0. for k in test_files]
- for file_idx, file_path in tqdm(enumerate(test_files), total=len(test_files)):
-
- try:
- # get audio features
- vector_array = com.file_to_vector_array(file_path, param["train_data"][machine_type], scaler,
- n_mels=param["feature"]["n_mels"],
- frames=param["feature"]["frames"],
- n_fft=param["feature"]["n_fft"],
- hop_length=param["feature"]["hop_length"],
- power=param["feature"]["power"])
-
- length, _ = vector_array.shape
-
- dim = param["autoencoder"]["shape0"]
- step = param["step"]
-
- idex = numpy.arange(length-dim+step, step=step)
-
- for idx in range(len(idex)):
- start = min(idex[idx], length - dim)
-
- vector = vector_array[start:start+dim,:]
-
- vector = vector.reshape((1, vector.shape[0], vector.shape[1]))
- if idx==0:
- batch = vector
- else:
- batch = numpy.concatenate((batch, vector))
-
-
- # add channels dimension
- data = batch.reshape((batch.shape[0], batch.shape[1], batch.shape[2], 1))
-
- # calculate predictions
- errors = numpy.mean(numpy.square(data - model.predict(data)), axis=-1)
-
- y_pred[file_idx] = numpy.mean(errors)
- anomaly_score_list.append([os.path.basename(file_path), y_pred[file_idx]])
-
-
- except:
- com.logger.error("file broken!!: {}".format(file_path))
- sys.exit(-1)
-
- # save anomaly score
- save_csv(save_file_path=anomaly_score_csv, save_data=anomaly_score_list)
- com.logger.info("anomaly score result -> {}".format(anomaly_score_csv))
-
- if mode:
- # append AUC and pAUC to lists
- auc = metrics.roc_auc_score(y_true, y_pred)
- p_auc = metrics.roc_auc_score(y_true, y_pred, max_fpr=param["max_fpr"])
- csv_lines.append([id_str.split("_", 1)[1], auc, p_auc])
- performance.append([auc, p_auc])
- com.logger.info("AUC : {}".format(auc))
- com.logger.info("pAUC : {}".format(p_auc))
-
- print("\n============ END OF TEST FOR A MACHINE ID ============")
-
- if mode:
- # calculate averages for AUCs and pAUCs
- averaged_performance = numpy.mean(numpy.array(performance, dtype=float), axis=0)
- com.logger.info("average AUC : {}".format(averaged_performance[0]))
- com.logger.info("average pAUC : {}".format(averaged_performance[1]))
- csv_lines.append(["Average"] + list(averaged_performance))
- csv_lines.append([])
-
- if mode:
- # output results
- if target:
- result_path = "{result}/{target}_{file_name}".format(result=param["result_directory"], file_name=param["result_file"], target=target)
- else:
- result_path = "{result}/{file_name}".format(result=param["result_directory"], file_name=param["result_file"])
- com.logger.info("AUC and pAUC results -> {}".format(result_path))
- save_csv(save_file_path=result_path, save_data=csv_lines)
|