|
@ -3,14 +3,13 @@ |
|
|
# Non ROS imports |
|
|
# Non ROS imports |
|
|
from __future__ import print_function |
|
|
from __future__ import print_function |
|
|
import pyaudio |
|
|
import pyaudio |
|
|
import wave |
|
|
|
|
|
import numpy as np |
|
|
import numpy as np |
|
|
from sympy import re |
|
|
|
|
|
import usb.core |
|
|
import usb.core |
|
|
import struct |
|
|
import struct |
|
|
import time |
|
|
|
|
|
|
|
|
import scipy |
|
|
import sys |
|
|
import sys |
|
|
import os |
|
|
import os |
|
|
|
|
|
import os.path |
|
|
import pickle |
|
|
import pickle |
|
|
from contextlib import contextmanager |
|
|
from contextlib import contextmanager |
|
|
|
|
|
|
|
@ -213,21 +212,27 @@ class ROSInterface: |
|
|
self.loaded_model = pickle.load(open(self.cough_classifier_file, 'rb')) |
|
|
self.loaded_model = pickle.load(open(self.cough_classifier_file, 'rb')) |
|
|
self.loaded_scaler = pickle.load(open(self.cough_classifier_scaler, 'rb')) |
|
|
self.loaded_scaler = pickle.load(open(self.cough_classifier_scaler, 'rb')) |
|
|
|
|
|
|
|
|
|
|
|
# for testing audio collection |
|
|
|
|
|
# self.file_name = 'testing_audio_node1.wav' |
|
|
|
|
|
# self.dir = os.path.join('/home/hello-robot/clone/Robot_Autonomous_Navigation/catkin_ws/src/stretch_ros/vz_acoustic_scene_analysis/scripts/', self.file_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_audio(self): |
|
|
def get_audio(self): |
|
|
recorded_frames = self.record_audio(self.chunk_size) # set param here chunk size |
|
|
|
|
|
|
|
|
return_list = False |
|
|
|
|
|
[recorded_frames, len_of_frames] = self.record_audio(self.chunk_size) # set param here chunk size |
|
|
self.wav_list.append(recorded_frames) |
|
|
self.wav_list.append(recorded_frames) |
|
|
self.record_count += 1 |
|
|
|
|
|
self.sequence_size = 3 # self.secs/self.chunk_size |
|
|
|
|
|
|
|
|
self.record_count = len(self.wav_list) |
|
|
|
|
|
self.sequence_size = 11 # self.secs/self.chunk_size |
|
|
if (self.record_count % self.sequence_size == 0): |
|
|
if (self.record_count % self.sequence_size == 0): |
|
|
return_list = self.wav_list |
|
|
|
|
|
|
|
|
return_list = True |
|
|
|
|
|
print("length") |
|
|
|
|
|
print(len(self.wav_list)) |
|
|
print("Collected %d seconds of audio" %(self.secs)) |
|
|
print("Collected %d seconds of audio" %(self.secs)) |
|
|
# Remove fist two objects in array to remove 0.2 seconds of audio data from array |
|
|
# Remove fist two objects in array to remove 0.2 seconds of audio data from array |
|
|
self.wav_list.pop(0) |
|
|
|
|
|
self.wav_list.pop(0) |
|
|
|
|
|
|
|
|
|
|
|
# send the frames at the beginning of the list (save as wav for now) |
|
|
# send the frames at the beginning of the list (save as wav for now) |
|
|
return return_list |
|
|
|
|
|
|
|
|
return [return_list, len_of_frames] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -243,11 +248,14 @@ class ROSInterface: |
|
|
|
|
|
|
|
|
frames = [] |
|
|
frames = [] |
|
|
arr_length = int(RESPEAKER_RATE / CHUNK * seconds) # get length of array to produce n seconds of audio |
|
|
arr_length = int(RESPEAKER_RATE / CHUNK * seconds) # get length of array to produce n seconds of audio |
|
|
|
|
|
print("arr length") |
|
|
|
|
|
print(arr_length) |
|
|
for i in range(0, arr_length): |
|
|
for i in range(0, arr_length): |
|
|
data = stream.read(CHUNK) |
|
|
data = stream.read(CHUNK) |
|
|
a = np.frombuffer(data,dtype=np.int16)[0::6] # extracts fused channel 0 |
|
|
a = np.frombuffer(data,dtype=np.int16)[0::6] # extracts fused channel 0 |
|
|
self.audio_data_pub.publish(a) |
|
|
self.audio_data_pub.publish(a) |
|
|
frames.append(a.tobytes()) |
|
|
|
|
|
|
|
|
# print(a) |
|
|
|
|
|
frames.append(a) |
|
|
|
|
|
|
|
|
# Check to prevent empty frames list |
|
|
# Check to prevent empty frames list |
|
|
if (len(frames) == 0): |
|
|
if (len(frames) == 0): |
|
@ -259,7 +267,7 @@ class ROSInterface: |
|
|
stream.close() |
|
|
stream.close() |
|
|
p.terminate() |
|
|
p.terminate() |
|
|
|
|
|
|
|
|
return frames |
|
|
|
|
|
|
|
|
return [frames, arr_length] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_audio_loop(self): |
|
|
def process_audio_loop(self): |
|
@ -268,14 +276,26 @@ class ROSInterface: |
|
|
if dev: |
|
|
if dev: |
|
|
while not rospy.is_shutdown(): |
|
|
while not rospy.is_shutdown(): |
|
|
if self.respeaker.is_voice() == 1: |
|
|
if self.respeaker.is_voice() == 1: |
|
|
# wav_data = list of lists of bytes |
|
|
|
|
|
wav_data = self.get_audio() |
|
|
|
|
|
# if wav_data is a list |
|
|
|
|
|
if(isinstance(wav_data, list)): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
[iter, data_len] = self.get_audio() |
|
|
|
|
|
|
|
|
|
|
|
# if the wav list has values |
|
|
|
|
|
if(iter): |
|
|
# flatten list |
|
|
# flatten list |
|
|
flat_list = [item for sublist in wav_data for item in sublist] |
|
|
|
|
|
|
|
|
flat_list = [item for sublist in self.wav_list for item in sublist] |
|
|
|
|
|
flatter_list = [item for sublist in flat_list for item in sublist] |
|
|
|
|
|
|
|
|
|
|
|
flatter_list_np = np.asarray(flatter_list) |
|
|
|
|
|
|
|
|
|
|
|
# Test that the data is good |
|
|
|
|
|
# scipy.io.wavfile.write(self.dir, 16000, flatter_list_np) |
|
|
|
|
|
|
|
|
|
|
|
# remove first 0.2 seconds from wav data |
|
|
|
|
|
self.wav_list.pop(0) |
|
|
|
|
|
self.wav_list.pop(0) |
|
|
|
|
|
|
|
|
# Call of Utku's function |
|
|
# Call of Utku's function |
|
|
self.cough_prob = dsp.classify_cough(flat_list,RESPEAKER_RATE,self.loaded_model,self.loaded_scaler) |
|
|
|
|
|
|
|
|
self.cough_prob = dsp.classify_cough(flatter_list_np,RESPEAKER_RATE,self.loaded_model,self.loaded_scaler) |
|
|
print("%d %% probability of cough" %(self.cough_prob)) |
|
|
print("%d %% probability of cough" %(self.cough_prob)) |
|
|
# do something with probability (publish?) |
|
|
# do something with probability (publish?) |
|
|
except usb.core.USBError: |
|
|
except usb.core.USBError: |
|
|