在使用迭代器训练数据增强后的音频数据时,发现tensorflow在windows平台无法使用use_multiprocessing=True来进行多线程地预处理数据增强,报错“页面文件太小”,无法加载DLL。因此,为了加速数据增强时的计算速度,我们在Sequence实例中加入了自定义的多线程处理方法。核心代码和完整代码如下。
'''核心代码'''
def dealDataMultiProcessing(self, index, coreNum, targetF):
indices = self.idx[index*self.batch_size:(index+1)*self.batch_size]
files = [self.files[k] for k in indices]
lenPerSt= int(len(files)/coreNum+1)
paths = []
for i in range(coreNum):
paths.append(files[i*lenPerSt:(i+1)*lenPerSt])
manager = Manager()
return_dict_x = manager.dict()
return_dict_y = manager.dict()
jobs = []
for i in range(coreNum):
p = Process(target=targetF,args=(str(i), paths[i], return_dict_x, return_dict_y))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
data_X = np.asarray((list(return_dict_x['0'])))
for i in range(1,coreNum):
x = np.asarray((list(return_dict_x[str(i)])))
if(int(x.shape[0])>0):
data_X = np.concatenate((data_X,x))
data_Y = np.asarray((list(return_dict_y['0'])))
for i in range(1,coreNum):
x = np.asarray((list(return_dict_y[str(i)])))
if(int(x.shape[0])>0):
data_Y = np.concatenate((data_Y,x))
return data_X, to_categorical(data_Y)
'''完整代码'''
import os, tqdm, librosa
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import warnings
from multiprocessing import Process,Manager
warnings.filterwarnings("ignore")
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
gpu = tf.config.experimental.list_physical_devices(device_type='GPU')
assert len(gpu) == 1
tf.config.experimental.set_memory_growth(gpu[0], True)
from tensorflow.keras.models import load_model
import tensorflow.keras
from tensorflow.keras import Sequential
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, LSTM, Conv2D, AvgPool2D, GRU, \
Flatten, Dropout, MaxPooling2D, BatchNormalization, Activation, \
Reshape, concatenate, Input
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, \
ModelCheckpoint, TensorBoard
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift,\
Shift, FrequencyMask, AddShortNoises, AddBackgroundNoise,\
LowPassFilter, HighPassFilter, Reverse
class DataGenerator(tf.keras.utils.Sequence):
def __init__(self, data_fold, model_name, batch_size=64, max_pad_size=144,\
random_state=19, split_ratio=[.6, .2], mode='train', *args, **kwargs):
self.data_fold = data_fold
self.labels = os.listdir(data_fold)
self.files = [os.path.join(self.data_fold, label, file) for label in self.labels for file in os.listdir(os.path.join(self.data_fold, label))]
X_train, X_test = train_test_split(self.files, test_size=(1 - split_ratio[0]),
random_state=random_state, shuffle=True)
X_val, X_test = train_test_split(X_test, test_size=split_ratio[1] / (1 - split_ratio[0]),
random_state=random_state, shuffle=True)
if mode=='train':
self.files = X_train
elif mode=='val':
self.files = X_val
elif mode=='test':
self.files = X_test
self.mode = mode
self.batch_size = batch_size
self.max_pad_size = max_pad_size
self.model_name = model_name
self.augment = Compose([
AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),
TimeStretch(min_rate=0.8, max_rate=1.25, p=0.5),
PitchShift(min_semitones=-4, max_semitones=4, p=0.5),
Shift(min_fraction=-0.5, max_fraction=0.5, p=0.5),
FrequencyMask(),
LowPassFilter(),
HighPassFilter(),
Reverse(p=0.2),
])
self.on_epoch_end()
def __len__(self):
"""返回生成器的长度,也就是总共分批生成数据的次数。"""
return int(np.ceil(len(self.files) / self.batch_size))
def __getitem__(self, index):
"""该函数返回每次我们需要的经过处理的数据。"""
return self.quickGetData(index)
indices = self.idx[index*self.batch_size:(index+1)*self.batch_size]
list_IDs_temp = [self.files[k] for k in indices]
mfcc_vectors = []
target = []
for file in list_IDs_temp:
audio, sr = librosa.load(path=file, sr=None, mono=1)
if self.mode=='test':
mfcc_vectors.append(self.audio2mfcc(audio, self.max_pad_size))
else:
mfcc_vectors.append(self.audio2mfcc(self.audioAug(audio), self.max_pad_size))
target.append(self.labels.index(file.split('\\')[1]))
return self.tran_data(np.asarray(mfcc_vectors)), to_categorical(target)
def tran_data(self, data):
if self.model_name=='CNN' or self.model_name=='CNN2':
return data.reshape(-1, 20, self.max_pad_size, 1)
elif self.model_name=='LSTM':
return data.reshape(-1, 20, self.max_pad_size)
elif self.model_name=='Den':
return data.reshape(-1, 20 * self.max_pad_size)
def on_epoch_end(self):
self.idx = np.arange(len(self.files))
np.random.shuffle(self.idx)
def audioAug(self, samples, SAMPLE_RATE = 16000):
'''数据增强,针对音频'''
augmented_samples = self.augment(samples=samples, sample_rate=SAMPLE_RATE)
return augmented_samples
def audio2mfcc(self, audio, max_pad_size=11):
'''生成mfcc'''
y = audio[::3]
audio_mac = librosa.feature.mfcc(y=y, sr=16000)
y_shape = audio_mac.shape[1]
if y_shape < max_pad_size:
pad_size = max_pad_size - y_shape
audio_mac = np.pad(audio_mac, ((0, 0), (0, pad_size)), mode='constant')
else:
audio_mac = audio_mac[:, :max_pad_size]
return audio_mac
def dealDataMultiProcessing(self, index, coreNum, targetF):
indices = self.idx[index*self.batch_size:(index+1)*self.batch_size]
files = [self.files[k] for k in indices]
lenPerSt= int(len(files)/coreNum+1)
paths = []
for i in range(coreNum):
paths.append(files[i*lenPerSt:(i+1)*lenPerSt])
manager = Manager()
return_dict_x = manager.dict()
return_dict_y = manager.dict()
jobs = []
for i in range(coreNum):
p = Process(target=targetF,args=(str(i), paths[i], return_dict_x, return_dict_y))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
data_X = np.asarray((list(return_dict_x['0'])))
for i in range(1,coreNum):
x = np.asarray((list(return_dict_x[str(i)])))
if(int(x.shape[0])>0):
data_X = np.concatenate((data_X,x))
data_Y = np.asarray((list(return_dict_y['0'])))
for i in range(1,coreNum):
x = np.asarray((list(return_dict_y[str(i)])))
if(int(x.shape[0])>0):
data_Y = np.concatenate((data_Y,x))
return data_X, to_categorical(data_Y)
def targetF(self, num, paths, return_dict_x, return_dict_y):
mfcc_vectors = []
target = []
for file in paths:
audio, sr = librosa.load(path=file, sr=None, mono=1)
if self.mode=='test':
mfcc_vectors.append(self.audio2mfcc(audio, self.max_pad_size))
else:
mfcc_vectors.append(self.audio2mfcc(self.audioAug(audio), self.max_pad_size))
target.append(self.labels.index(file.split('\\')[1]))
return_dict_x[num] = self.tran_data(np.asarray(mfcc_vectors))
return_dict_y[num] = target
def quickGetData(self, index, coreNum=2):
return self.dealDataMultiProcessing(index, coreNum, self.targetF)
if __name__ == '__main__':
gen_train = DataGenerator('data', 'CNN', batch_size=64, max_pad_size=144*5, mode='train')
|