Python wave 模块,open() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用wave.open()。
def play_file(fname):
# create an audio object
wf = wave.open(fname, 'rb')
p = pyaudio.PyAudio()
chunk = 1024
# open stream based on the wave object which has been input.
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
# read data (based on the chunk size)
data = wf.readframes(chunk)
# play stream (looping from beginning of file to the end)
while data != '':
# writing to the stream is what *actually* plays the sound.
stream.write(data)
data = wf.readframes(chunk)
# cleanup stuff.
stream.close()
p.terminate()
def get_config(cls):
# FIXME: Replace this as soon as we have a config module
config = {}
# Try to get baidu_yuyin config from config
profile_path = dingdangpath.config('profile.yml')
if os.path.exists(profile_path):
with open(profile_path, 'r') as f:
profile = yaml.safe_load(f)
if 'baidu_yuyin' in profile:
if 'api_key' in profile['baidu_yuyin']:
config['api_key'] = \
profile['baidu_yuyin']['api_key']
if 'secret_key' in profile['baidu_yuyin']:
config['secret_key'] = \
profile['baidu_yuyin']['secret_key']
return config
def audio_int(num_samples=50):
""" Gets average audio intensity of your mic sound. You can use it to get
average intensities while you're talking and/or silent. The average
is the avg of the 20% largest intensities recorded.
"""
print "Getting intensity values from mic."
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
values = [math.sqrt(abs(audioop.avg(stream.read(CHUNK), 4)))
for x in range(num_samples)]
values = sorted(values, reverse=True)
r = sum(values[:int(num_samples * 0.2)]) / int(num_samples * 0.2)
print " Finished "
print " Average audio intensity is ", r
stream.close()
p.terminate()
return r
def read_json_file(file_path):
'''
Args:
1. file_path: File path for a json file.
File should be similar to the format -
https://gist.github.com/pandeydivesh15/2012ab10562cc85e796e1f57554aca33
Returns:
data: A list of dicts. Each dict contains timing info for a spoken word(or punctuation).
'''
with open(file_path, 'r') as f:
data = json.loads(f.read())['words']
# for line in f:
# temp = json.loads(line)
# temp['start'] = None if temp['start'] == 'NA' else float(temp['start'])
# temp['end'] = None if temp['end'] == 'NA' else float(temp['end'])
# try:
# temp['word'] = temp['word'].encode('ascii')
# except KeyError:
# temp['punctuation'] = temp['punctuation'].encode('ascii')
# data.append(temp)
return data
def __init__(self,codes=[40], chunk_size=2**15):
import wave
cwd = os.path.dirname(os.path.realpath(__file__))
self._wfs = []
for code in codes:
c = ""
if code > 0 and code < 10:
c = "0"+str(code)
elif code < 89:
c = str(code)
else:
print("out of code index")
exit()
#self._wfs.append(wave.open(cwd + "\piano88\Piano 0" + c + ".wav",'rb'))
self._wfs.append(cwd + "\piano88\Piano 0" + c + ".wav")
def __init__(self,'rb'))
self._wfs.append(cwd + "\piano88\Piano 0" + c + ".wav")
def save(self, path):
"""Save waveform to file path as a WAV file.
:returns: Path to the saved file.
"""
(folder, filename) = os.path.split(path)
(name, extension) = os.path.splitext(filename)
if not name:
raise ValueError, "name is required"
path = os.path.join(folder, name + self.extension)
f = open(path, "wb")
f.write(self.contents)
f.close()
return path
#-- Import submodules --#
def save_values_to_wave_file(
values = None,
filename = None,
maximum_amplitude = 65535, # maximum value of unsigned short 16 bit number
sample_rate = 44100, # Hz
number_of_channels = 1,
sample_width = 2 # bytes per frame
):
values = datavision.normalize_to_range(
values,
minimum = -(maximum_amplitude / 2),
maximum = maximum_amplitude / 2
)
file_output = wave.open(filename, "w")
file_output.setnchannels(number_of_channels)
file_output.setsampwidth(sample_width)
file_output.setframerate(sample_rate)
for value in values:
write_data = struct.pack("<h", value)
file_output.writeframesraw(write_data)
file_output.writeframes("")
file_output.close()
def load_wav_file(name):
f = wave.open(name, "rb")
# print("loading %s"%name)
chunk = []
data0 = f.readframes(CHUNK)
while data0: # f.getnframes()
# data=numpy.fromstring(data0,dtype='float32')
# data = numpy.fromstring(data0,dtype='uint16')
data = numpy.fromstring(data0, dtype='uint8')
data = (data + 128) / 255. # 0-1 for Better convergence
# chunks.append(data)
chunk.extend(data)
data0 = f.readframes(CHUNK)
# finally trim:
chunk = chunk[0:CHUNK * 2] # should be enough for Now -> cut
chunk.extend(numpy.zeros(CHUNK * 2 - len(chunk))) # fill with padding 0's
# print("%s loaded"%name)
return chunk
def get_config(cls):
# FIXME: Replace this as soon as we have a config module
config = {}
# HMM dir
# Try to get hmm_dir from config
profile_path = dingdangpath.config('profile.yml')
if os.path.exists(profile_path):
with open(profile_path, 'r') as f:
profile = yaml.safe_load(f)
try:
config['hmm_dir'] = profile['pocketsphinx']['hmm_dir']
except KeyError:
pass
return config
def get_config(cls):
# FIXME: Replace this as soon as we have a config module
config = {}
# Try to get iflytek_yuyin config from config
profile_path = dingdangpath.config('profile.yml')
if os.path.exists(profile_path):
with open(profile_path, 'r') as f:
profile = yaml.safe_load(f)
if 'iflytek_yuyin' in profile:
if 'api_id' in profile['iflytek_yuyin']:
config['api_id'] = \
profile['iflytek_yuyin']['api_id']
if 'api_key' in profile['iflytek_yuyin']:
config['api_key'] = \
profile['iflytek_yuyin']['api_key']
if 'url' in profile['iflytek_yuyin']:
config['url'] = \
profile['iflytek_yuyin']['url']
return config
def get_config(cls):
# FIXME: Replace this as soon as we have a config module
config = {}
# Try to get ali_yuyin config from config
profile_path = dingdangpath.config('profile.yml')
if os.path.exists(profile_path):
with open(profile_path, 'r') as f:
profile = yaml.safe_load(f)
if 'ali_yuyin' in profile:
if 'ak_id' in profile['ali_yuyin']:
config['ak_id'] = \
profile['ali_yuyin']['ak_id']
if 'ak_secret' in profile['ali_yuyin']:
config['ak_secret'] = \
profile['ali_yuyin']['ak_secret']
return config
def get_config(cls):
# FIXME: Replace this as soon as we have a config module
config = {}
# Try to get sNowboy config from config
profile_path = dingdangpath.config('profile.yml')
if os.path.exists(profile_path):
with open(profile_path, 'r') as f:
profile = yaml.safe_load(f)
if 'sNowboy' in profile:
if 'model' in profile['sNowboy']:
config['model'] = \
profile['sNowboy']['model']
else:
config['model'] = os.path.join(
dingdangpath.LIB_PATH, 'sNowboy/dingdang.pmdl')
if 'sensitivity' in profile['sNowboy']:
config['sensitivity'] = \
profile['sNowboy']['sensitivity']
else:
config['sensitivity'] = "0.5"
if 'robot_name' in profile:
config['hotword'] = profile['robot_name']
else:
config['hotword'] = 'DINGDANG'
return config
def cache(func):
"""Wrapper for cache the audio"""
@wraps(func)
def _(*args, **kwargs):
cache_handler = CacheHandler()
id_ = unique_id(func, *args, **kwargs)
cache = cache_handler.get(id_)
if cache:
audio_handler = AudioHandler()
audio_handler.aplay(base64.b64decode(cache), is_buffer=True)
# return cache
else:
func(*args, **kwargs)
with open('output.wav', 'rb') as f:
encoded_audio = base64.b64encode(f.read())
cache_handler.set(id_, encoded_audio, 86400*7)
# return buffer_
return _
def testWavDataToSamples(self):
w = wave.open(self.wav_filename, 'rb')
w_mono = wave.open(self.wav_filename_mono, 'rb')
# Check content size.
y = audio_io.wav_data_to_samples(self.wav_data, sample_rate=16000)
y_mono = audio_io.wav_data_to_samples(self.wav_data_mono, sample_rate=22050)
self.assertEquals(
round(16000.0 * w.getnframes() / w.getframerate()), y.shape[0])
self.assertEquals(
round(22050.0 * w_mono.getnframes() / w_mono.getframerate()),
y_mono.shape[0])
# Check a few obvIoUs failure modes.
self.assertLess(0.01, y.std())
self.assertLess(0.01, y_mono.std())
self.assertGreater(-0.1, y.min())
self.assertGreater(-0.1, y_mono.min())
self.assertLess(0.1, y.max())
self.assertLess(0.1, y_mono.max())
def make_audio(tensor, sample_rate, length_frames, num_channels):
"""Convert an numpy representation audio to Audio protobuf"""
output = StringIO()
wav_out = wave.open(output, "w")
wav_out.setframerate(float(sample_rate))
wav_out.setsampwidth(2)
wav_out.setcomptype('NONE', 'not compressed')
wav_out.setnchannels(num_channels)
wav_out.writeframes(tensor.astype("int16").tostring())
wav_out.close()
output.flush()
audio_string = output.getvalue()
return Summary.Audio(sample_rate=float(sample_rate),
num_channels=num_channels,
length_frames=length_frames,
encoded_audio_string=audio_string,
content_type="audio/wav")
def read(self):
"""Return audio file as array of integer.
Returns:
audio_data: np.ndarray,shape of (frame_num,)
"""
# Read wav file
with wave.open(self.file_path, "r") as wav:
# Move to head of the audio file
wav.rewind()
self.frame_num = wav.getnframes()
self.sampling_rate = wav.getframerate() # 16,000 Hz
self.channels = wav.getnchannels()
self.sample_size = wav.getsampwidth() # 2
# Read to buffer as binary format
buf = wav.readframes(self.frame_num)
if self.channels == 1:
audio_data = np.frombuffer(buf, dtype="int16")
elif self.channels == 2:
audio_data = np.frombuffer(buf, dtype="int32")
return audio_data
def record_to_file(filename,FORMAT = pyaudio.paInt16, CHANNELS = 1, RATE = 8000,
CHUNK = 1024, RECORD_SECONDS=1):
audio = pyaudio.PyAudio()
# start Recording
stream = audio.open(format=FORMAT, channels=CHANNELS, input=True,
frames_per_buffer=CHUNK)
frames = []
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)
# stop Recording
stream.stop_stream()
stream.close()
audio.terminate()
waveFile = wave.open(filename, 'wb')
waveFile.setnchannels(CHANNELS)
waveFile.setsampwidth(audio.get_sample_size(FORMAT))
waveFile.setframerate(RATE)
waveFile.writeframes(b''.join(frames))
waveFile.close()
def extract_sound(self, start=0, end=None):
if not start and not end:
raise ValueError
start_pos = self.to_index(start)
if end:
end_pos = self.to_index(end)
else:
end_pos = len(self.raw)
_buffer = io.BytesIO()
_output = wave.open(_buffer, "wb")
_output.setnchannels(self.channels)
_output.setsampwidth(self.samplewidth)
_output.setframerate(self.framerate)
raw = self.raw[start_pos:end_pos]
_output.writeframes(self.raw[start_pos:end_pos])
_output.close()
_buffer.seek(0)
return Sound(_buffer)
def read_wav(source, end=None):
warnings.warn(
"read_wav() is deprecated,use Sound() class instead",
DeprecationWarning)
in_wav = wave.open(source, "rb")
fr = in_wav.getframerate()
chan = in_wav.getnchannels()
sw = in_wav.getsampwidth()
in_wav.setpos(int(start * fr))
if end is None:
end = (in_wav.getnframes() - start / fr)
data = in_wav.readframes(int((end - start) * fr))
in_wav.close()
d = {"framerate": fr,
"channels": chan,
"samplewidth": sw,
"length": end - start,
"state": 0,
"data": data}
return d
def speak(self, text, is_phonetic=False):
temp = 'temp.wav'
self.save_wav(text, temp, is_phonetic)
w = wave.open(temp)
p = pyaudio.PyAudio()
stream = p.open(
format=p.get_format_from_width(w.getsampwidth()),
channels=w.getnchannels(),
rate=w.getframerate(),
output=True)
chunk = 1024
data = w.readframes(chunk)
while data:
stream.write(data)
data = w.readframes(chunk)
stream.close()
p.terminate()
def _play_audio(sound, delay):
try:
time.sleep(delay)
wf = wave.open("sounds/"+sound+".wav", 'rb')
p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
data = wf.readframes(TextToSpeech.CHUNK)
while data:
stream.write(data)
data = wf.readframes(TextToSpeech.CHUNK)
stream.stop_stream()
stream.close()
p.terminate()
return
except:
pass
def writeScottFile(output_name, header, data):
"""
Writes header and data @R_489_4045@ion to a file.
Takes in a list of byte objects 'header',
a list of byte objects 'data' and an 'output_name'
which is the new scott file. The scott file contains
the byte objects in header and data.
"""
with open(output_name, 'wb') as scott_file:
for item in header:
scott_file.write(item)
for item in data:
scott_file.write(item)
def wavFileType(filename):
#Given a file,the function will determine
#whether it is a SCOT WAV file or just a
#regular WAV file.
try:
with open(filename, 'rb') as wav_file:
wav_file.seek(8)
is_wav_file = wav_file.read(4)
if not is_wav_file == bytes('WAVE', 'ASCII'):
return 'notwav'
else:
wav_file.seek(60)
scot = wav_file.read(4)
if scot == bytes('scot', 'ASCII'):
return 'scottwav'
else:
return 'wav'
except IOError:
print("--wavFileType Error--")
return 'error'
def editScottWav(filename, edit):
#Edits the scott file 'filename',optionally re-naming
#the file.
addr = {
"note" : 369, "title" : 72, "artist" : 335, "audio_id" : 115,
"year" : 406, "end" : 405, "intro" : 403, "eom" : 152,
"s_date" : 133, "e_date" : 139, "s_hour" : 145, "e_hour": 146
}
try:
with open(filename, 'rb+') as f:
for name, data in edit:
f.seek(addr[name])
if isinstance(data, str):
f.write(bytes(data, 'utf-8'))
else:
num_bytes = len(str(abs(data)))
f.write((data).to_bytes(num_bytes, byteorder='little'))
except IOError:
print("---EditScott cannot open {}. ---".format(filename))
def audio(tag, tensor, sample_rate=44100):
tensor = makenp(tensor)
tensor = tensor.squeeze()
assert(tensor.ndim==1), 'input tensor should be 1 dimensional.'
tensor_list = [int(32767.0*x) for x in tensor]
import io
import wave
import struct
fio = io.BytesIO()
Wave_write = wave.open(fio, 'wb')
Wave_write.setnchannels(1)
Wave_write.setsampwidth(2)
Wave_write.setframerate(sample_rate)
tensor_enc = b''
for v in tensor_list:
tensor_enc += struct.pack('<h', v)
Wave_write.writeframes(tensor_enc)
Wave_write.close()
audio_string = fio.getvalue()
fio.close()
audio = Summary.Audio(sample_rate=sample_rate, num_channels=1, length_frames=len(tensor_list), encoded_audio_string=audio_string, content_type='audio/wav')
return Summary(value=[Summary.Value(tag=tag, audio=audio)])
def test_it(self, test_rounding=False):
self.f = wave.open(TESTFN, 'wb')
self.f.setnchannels(nchannels)
self.f.setsampwidth(sampwidth)
if test_rounding:
self.f.setframerate(framerate - 0.1)
else:
self.f.setframerate(framerate)
self.f.setnframes(nframes)
output = b'\0' * nframes * nchannels * sampwidth
self.f.writeframes(output)
self.f.close()
self.f = wave.open(TESTFN, 'rb')
self.assertEqual(nchannels, self.f.getnchannels())
self.assertEqual(sampwidth, self.f.getsampwidth())
self.assertEqual(framerate, self.f.getframerate())
self.assertEqual(nframes, self.f.getnframes())
self.assertEqual(self.f.readframes(nframes), output)
def fetch_sample_speech_fruit(n_samples=None):
url = 'https://dl.dropBoxusercontent.com/u/15378192/audio.tar.gz'
wav_path = "audio.tar.gz"
if not os.path.exists(wav_path):
download(url, wav_path)
tf = tarfile.open(wav_path)
wav_names = [fname for fname in tf.getnames()
if ".wav" in fname.split(os.sep)[-1]]
speech = []
print("Loading speech files...")
for wav_name in wav_names[:n_samples]:
f = tf.extractfile(wav_name)
fs, d = wavfile.read(f)
d = d.astype('float32') / (2 ** 15)
speech.append(d)
return fs, speech
def play_audio_file(fname=DETECT_DONG):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, 'rb')
ding_data = ding_wav.readframes(ding_wav.getnframes())
audio = pyaudio.PyAudio()
stream_out = audio.open(
format=audio.get_format_from_width(ding_wav.getsampwidth()),
channels=ding_wav.getnchannels(),
rate=ding_wav.getframerate(), input=False, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def __init__(self, clocks):
super(PygameStretchTIA_Sound, self).__init__(clocks)
# Flag to indicate if samples should be stretched in frequency,or more outputs generated.
self._maintain_pitch = True
self._wav_output = [wave.open('pytari_stretch_chan0.wav', 'w'),wave.open('pytari_stretch_chan1.wav', 'w')]
self._wav_output[0].setparams((1, 1, self.SAMPLERATE, 0, 'NONE', 'not compressed'))
self._wav_output[1].setparams((1, 'not compressed'))
self._sound_chunk_size = 1024*4
self.openSound()
self._test_accumulated_sound = self._sound_chunk_size * 2
# Hold 'stretch' state for each channel.
self._stretcher = tiasound.Stretch()
self._stretched = [[],[]]
self._last_update_time = self.clocks.system_clock
def __init__(self, data=None, **kwargs):
if kwargs.get('Metadata', False):
# internal use only
self._data = data
for attr, val in kwargs.pop('Metadata').items():
setattr(self, attr, val)
else:
# normal construction
data = data if isinstance(data, basestring) else data.read()
raw = wave.open(StringIO(data), 'rb')
raw.rewind()
self.channels = raw.getnchannels()
self.sample_width = raw.getsampwidth()
self.frame_rate = raw.getframerate()
self.frame_width = self.channels * self.sample_width
raw.rewind()
self._data = raw.readframes(float('inf'))
super(AudioSegment, self).__init__(*args, **kwargs)
def play_raw(self, raw_data, rate=16000, channels=1, width=2, block=True):
self.raw = raw_data
self.width = width
self.channels = channels
self.event.clear()
self.stream = self.pa.open(format=self.pa.get_format_from_width(width),
channels=channels,
rate=rate,
output=True,
# output_device_index=1,
frames_per_buffer=CHUNK_SIZE,
stream_callback=self.raw_callback)
if block:
self.event.wait()
time.sleep(2) # wait for playing audio data in buffer,a alsa driver bug
self.stream.close()
def play_audio_file(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def play_audio_file(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def play_audio_file(fname=DETECT_DING):
"""Simple callback function to play a wave file. By default it plays
a Ding sound.
:param str fname: wave file name
:return: None
"""
ding_wav = wave.open(fname, output=True)
stream_out.start_stream()
stream_out.write(ding_data)
time.sleep(0.2)
stream_out.stop_stream()
stream_out.close()
audio.terminate()
def _is_good_wave(self, filename):
"""
check if wav is in correct format for MARF.
"""
par = None
try:
w_file = wave.open(filename)
par = w_file.getparams()
w_file.close()
except wave.Error as exc:
print (exc)
return False
if par[:3] == (1, 2, 8000) and par[-1:] == ('not compressed',):
return True
else:
return False
def real_signal():
spf = wave.open('helloworld.wav', 'r')
#Extract Raw Audio from Wav File
# If you right-click on the file and go to "Get Info",you can see:
# sampling rate = 16000 Hz
# bits per sample = 16
# The first is quantization in time
# The second is quantization in amplitude
# We also do this for images!
# 2^16 = 65536 is how many different sound levels we have
signal = spf.readframes(-1)
signal = np.fromstring(signal, 'Int16')
T = len(signal)
signal = (signal - signal.mean()) / signal.std()
hmm = HMM(5, 3)
hmm.fit(signal.reshape(1, T, 1))
def real_signal():
spf = wave.open('helloworld.wav', 'Int16')
T = len(signal)
signal = (signal - signal.mean()) / signal.std()
hmm = HMM(5, 3)
# signal needs to be of shape N x T(n) x D
hmm.fit(signal.reshape(1, 1), learning_rate=10e-6, max_iter=20)
def real_signal():
spf = wave.open('helloworld.wav', 'Int16')
T = len(signal)
hmm = HMM(10)
hmm.fit(signal.reshape(1, T))
def use_cloud(token):
fp = wave.open('output.wav','r')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "123456" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
c.setopt(c.HTTPHEADER, http_header) #must be list,not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform()
def use_cloud(token):
fp = wave.open('output.wav', f_len)
c.perform()
def use_cloud(token):
fp = wave.open('output.wav', 'rb')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "xxxxxxxxxx" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
#c.setopt(c.RETURNTRANSFER,1)
c.setopt(c.HTTPHEADER,not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform() #pycurl.perform() has no return val
def use_cloud(token):
fp = wave.open('output.wav',
'Content-Length: %d' % f_len
]
c = pycurl.Curl()
c.setopt(pycurl.URL, f_len)
c.perform()
def split(split_file_path, main_file_path, transcript_path, split_info):
'''
Here,splitting takes place.
Args:
split_file_path: File path for new split file.
main_file_path: File path for original .wav file.
transcript_path: File path where transcript will be written.
split_info: A tuple of the form (x,(y,z))
'''
audio_file = wave.open(main_file_path, 'rb')
split_file = wave.open(split_file_path, 'wb')
t0, t1 = split_info[1] # cut audio between t0,t1 seconds
s0, s1 = int(t0*audio_file.getframerate()), int(t1*audio_file.getframerate())
audio_file.readframes(s0) # discard frames up to s0
frames = audio_file.readframes(s1-s0)
split_file.setparams(audio_file.getparams())
split_file.writeframes(frames)
split_file.close()
# Store transcript
with open(transcript_path, 'wb') as f:
f.write(split_info[0])
# Todo: Get rid of multiple opening and closing of the same main audio file.
audio_file.close()
def create_csv(data_dir):
'''
Generates CSV file (as required by DeepSpeech_RHL.py) in the given dir.
Args:
data_dir: Directory where all .wav files and
their associated timescripts are stored.
'''
# Get all audio and transcript file paths.
audio_file_paths = sorted(glob.glob(data_dir + "*.wav"))
transcript_file_paths = sorted(glob.glob(data_dir + "*.txt"))
audio_file_sizes = []
transcripts = []
for x, y in zip(audio_file_paths, transcript_file_paths):
with open(y, "rb") as f:
transcripts.append(f.read())
# Get file size.
Metadata = os.stat(x)
audio_file_sizes.append(Metadata.st_size)
# Create pandas dataframe
df = pandas.DataFrame(columns=["wav_filename", "wav_filesize", "transcript"])
df["wav_filename"] = audio_file_paths
df["wav_filesize"] = audio_file_sizes
df["transcript"] = transcripts
df.to_csv(data_dir + "data.csv", sep=",", index=None) # Save CSV
def audio(tag, sample_rate=44100):
tensor = makenp(tensor)
tensor = tensor.squeeze()
assert (tensor.ndim == 1), 'input tensor should be 1 dimensional.'
tensor_list = [int(32767.0 * x) for x in tensor]
import io
import wave
import struct
fio = io.BytesIO()
Wave_write = wave.open(fio, 'wb')
Wave_write.setnchannels(1)
Wave_write.setsampwidth(2)
Wave_write.setframerate(sample_rate)
tensor_enc = b''
for v in tensor_list:
tensor_enc += struct.pack('<h', v)
Wave_write.writeframes(tensor_enc)
Wave_write.close()
audio_string = fio.getvalue()
fio.close()
audio = Summary.Audio(sample_rate=sample_rate,
encoded_audio_string=audio_string, content_type='audio/wav')
return Summary(value=[Summary.Value(tag=tag, audio=audio)])
def load_sound(file_name):
fp = wave.open(file_name, 'rb')
try:
assert fp.getnchannels() == 1, '{0}: sound format is incorrect! Sound must be mono.'.format(file_name)
assert fp.getsampwidth() == 2, '{0}: sound format is incorrect! ' \
'Sample width of sound must be 2 bytes.'.format(file_name)
assert fp.getframerate() in (8000, 16000, 32000), '{0}: sound format is incorrect! ' \
'Sampling frequency must be 8000 Hz,16000 Hz or 32000 Hz.'
sampling_frequency = fp.getframerate()
sound_data = fp.readframes(fp.getnframes())
finally:
fp.close()
del fp
return sound_data, sampling_frequency
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。