|
- import os
- from sklearn.manifold import TSNE
- import matplotlib.pyplot as plt
- import numpy as np
- from lfm_channel_vis import load_lfm_channel
- import librosa
- import scipy
-
- def normalization(x):
- x = x - np.mean(x)
- x = x / np.max(np.abs(x))
- return x
-
- def load_audio(path):
- recv_wav_signal, _ = librosa.load(path, sr=128000)
- recv_wav_signal = normalization(recv_wav_signal)
- return recv_wav_signal
-
- def audio2spectrum(input, sample_rate=128000, window_size=0.002, window_stride=0.001):
- n_fft = int(sample_rate * window_size)
- win_length = n_fft
- hop_length = int(sample_rate * window_stride)
-
- # Short-time Fourier transform (STFT)
- D = librosa.stft(input, n_fft=n_fft, hop_length=hop_length,
- win_length=win_length, window=scipy.signal.hamming)
- spect, phase = librosa.magphase(D)
- return spect, phase
-
- def get_all_channel_toOneMatrix(channel_dir, do_norm=True, do_sort=False):
- all_ABC_channel_matrix = np.zeros((1650, 5121))
- all_ABC_item_namelist = []
- all_ABC_labelist = []
- if do_norm:
- for each_dir in os.listdir(channel_dir):
- if 'norm' in each_dir:
- this_dir = os.path.join(channel_dir, each_dir)
- challenge_name = this_dir.split('/')[-1].split('_')[0]
- for item in os.listdir(this_dir):
- item_path = os.path.join(this_dir, item)
- all_ABC_item_namelist.append(item_path)
- all_ABC_labelist.append(challenge_name)
-
- if do_sort:
- sorted_idx = np.argsort(all_ABC_item_namelist)
- all_ABC_item_namelist.sort()
- all_ABC_labelist = np.array(all_ABC_labelist)[sorted_idx].tolist()
-
- assert len(all_ABC_item_namelist) == all_ABC_channel_matrix.shape[0]
-
- for i, each_item_path in enumerate(all_ABC_item_namelist):
- lfm_channel = load_lfm_channel(each_item_path)
- all_ABC_channel_matrix[i, :] = lfm_channel.reshape(-1)
-
- return all_ABC_channel_matrix, all_ABC_item_namelist, all_ABC_labelist
-
- if __name__ == '__main__':
-
- # # plotting all_lfm_channel t-SNE
- # alldata_channel_dir = '/userhome/wave_training_old/matlab_lfm_channel'
- # all_channel, all_channel_namelist, all_channel_label = get_all_channel_toOneMatrix(alldata_channel_dir)
- #
- # tSNE = TSNE(n_components=2, random_state=15)
- # data = tSNE.fit_transform(all_channel)
- #
- # save_dir = '/userhome/wave_training_old/channel_plots/all_channel_tSNE'
- # if not os.path.exists(save_dir):
- # os.makedirs(save_dir, exist_ok=True)
- #
- # save_namelist_f = open(os.path.join(save_dir, 'plotting_rank_namelist.txt'), 'w+')
- # for i, channel_name in enumerate(all_channel_namelist):
- # save_namelist_f.write('{} {}\n'.format(str(i), channel_name))
- # save_namelist_f.close()
- #
- # for plot_num in range(1650):
- # for idx, item in enumerate(data):
- # if all_channel_label[idx] == 'A':
- # c = 'b'
- # elif all_channel_label[idx] == 'B':
- # c = 'g'
- # else:
- # c = 'y'
- #
- # if idx > plot_num:
- # break
- # elif idx == plot_num:
- # plt.scatter(data[idx, 0], data[idx, 1], color='r', label=all_channel_namelist[idx].split('/')[-1])
- # else:
- # plt.scatter(data[idx, 0], data[idx, 1], color=c)#, label=all_channel_label[idx])
- #
- #
- # plt.xlim([-76, 76])
- # plt.ylim([-50, 50])
- # plt.tight_layout()
- # plt.legend()
- # plt.savefig(os.path.join(save_dir, '{}.png'.format(plot_num+1)))
- # plt.close()
- # # plt.show()
- #
- # # for idx, item in enumerate(data):
- # # if all_channel_label[idx] == 'A':
- # # c = 'b'
- # # elif all_channel_label[idx] == 'B':
- # # c = 'g'
- # # else:
- # # c = 'r'
- # # plt.scatter(data[idx, 0], data[idx, 1], color=c) # , label=all_channel_label[idx])
- # # plt.xlim([-76, 76])
- # # plt.ylim([-50, 50])
- # # plt.tight_layout()
- # # plt.show()
-
-
- # plotting raw_signal t-SNE
- A_rawdata_dir = '/userhome/wave_training/raw_data/train/data'
- B_rawdata_dir = '/userhome/wave_training/newB_data/test/data'
- juesai_rawdata_dir = '/userhome/wave_training/juesai_data/test/data'
- all_rawdata_dirs = [A_rawdata_dir, B_rawdata_dir, juesai_rawdata_dir]
- for each_dir in all_rawdata_dirs:
- for each_f in os.listdir(each_dir):
- each_f_path = os.path.join(each_dir, each_f)
- this_wav = load_audio(each_f_path)
- this_spec, this_phase = audio2spectrum(this_wav)
- print(this_wav.shape, this_spec.shape, this_phase.shape)
- exit()
|