#!/usr/bin/python3 import os import platform import json import subprocess import time import argparse import sys import shlex import select import uuid import utils_qualtest import utils_sevana import utils_mcon import utils_logcat import utils from bt_controller import Bluetoothctl import bt_call_controller import bt_signal from bt_signal import SignalBoundaries import multiprocessing import shutil import signal import yaml import pathlib from pathlib import Path from datetime import datetime # Name of intermediary file with audio recorded from the GSM phone RECORD_FILE = "/dev/shm/qualtest_recorded.wav" # Backend instance BackendServer : utils_qualtest.QualtestBackend = None # Reference audio to play REFERENCE_AUDIO = "/dev/shm/reference.wav" # Loaded refernce audio (from backend) LOADED_AUDIO = Path("/dev/shm/loaded_audio.wav") # Script to exec after mobile call answering EXEC_SCRIPT = None # Current task name. CURRENT_TASK = None # Current task list TASK_LIST: utils_qualtest.TaskList = utils_qualtest.TaskList() # Number of finished calls CALL_COUNTER = multiprocessing.Value('i', 0) # Maximum number of calls to to. Zero means unlimited number of calls. CALL_LIMIT = 0 # Find script's directory DIR_THIS = Path(__file__).resolve().parent # PID file name QUALTEST_PID = DIR_THIS / "qualtest.pid" # Keep the recorded audio in the directory LOG_AUDIO = False # Recorded audio directory LOG_AUDIO_DIR = DIR_THIS.parent / 'log_audio' # Should the first task run immediately ? FORCE_RUN = False # Exit codes EXIT_OK = 0 EXIT_ERROR = 1 # Use silence eraser or not (speech detector is used in this case) USE_SILENCE_ERASER = True def remove_oldest_log_audio(): list_of_files = os.listdir(LOG_AUDIO_DIR) if len(list_of_files) > 20: full_path = [(LOG_AUDIO_DIR + "/{0}".format(x)) for x in list_of_files] oldest_file = min(full_path, key=os.path.getctime) # os.remove(oldest_file) def detect_degraded_signal(file_test: Path, file_reference: Path) -> SignalBoundaries: global USE_SILENCE_ERASER, LOG_AUDIO, LOG_AUDIO_DIR, BackendServer is_caller : bool = 'caller' in BackendServer.phone.role is_answerer : bool = 'answer' in BackendServer.phone.role if utils.get_wav_length(file_test) < utils.get_wav_length(file_reference): # Seems some problem with recording, return zero boundaries return SignalBoundaries() r = bt_signal.find_reference_signal(file_test) if r.offset_start == 0.0 and is_caller: r.offset_start = 5.0 # Skip ringing tones return r def detect_reference_signal(file_reference: Path) -> SignalBoundaries: global USE_SILENCE_ERASER, LOG_AUDIO, LOG_AUDIO_DIR # Run silence eraser on reference file as well result = bt_signal.find_reference_signal(file_reference) return result def run_analyze(file_test: str, file_reference: str, number: str) -> bool: global CALL_COUNTER result = False if file_test: # Wait 5 seconds to give a chance to flush recorded file time.sleep(5.0) # Check how long audio file is audio_length = utils.get_wav_length(file_test) is_caller : bool = 'caller' in BackendServer.phone.role is_answerer : bool = 'answer' in BackendServer.phone.role utils.log(f'Recorded audio call duration: {audio_length}s') # Check if audio length is strange - skip such calls. Usually this is missed call. if (is_caller and audio_length >= utils_mcon.TIME_LIMIT_CALL) or (is_answerer and audio_length >= utils_mcon.TIME_LIMIT_CALL * 1.2): utils.log_error(f'Recorded call is too big - looks like mobile operator prompt, skipping analysis') return False try: bounds_signal : SignalBoundaries = detect_degraded_signal(Path(file_test), Path(file_reference)) # bounds_signal.offset_start = 0 # bounds_signal.offset_finish = 0 print(f'Found signal bounds: {bounds_signal}') # Check if there is a time to remove oldest files if LOG_AUDIO: remove_oldest_log_audio() remove_oldest_log_audio() # PVQA report pvqa_mos, pvqa_report, pvqa_rfactor = utils_sevana.find_pvqa_mos(file_test, bounds_signal.offset_start, bounds_signal.offset_finish) utils.log(f'PVQA MOS: {pvqa_mos}, PVQA R-factor: {pvqa_rfactor}') # AQuA report bounds_reference : SignalBoundaries = detect_reference_signal(Path(file_reference)) bounds_reference.offset_start = 0 bounds_reference.offset_finish = 0 print(f'Found reference signal bounds: {bounds_reference}') aqua_mos, aqua_percents, aqua_report = utils_sevana.find_aqua_mos(file_reference, file_test, bounds_signal.offset_start, bounds_signal.offset_finish, bounds_reference.offset_start, bounds_reference.offset_finish) utils.log(f'AQuA MOS: {aqua_mos}, AQuA percents: {aqua_percents}') # Build report for qualtest r = None if pvqa_mos == 0.0: r = utils_qualtest.build_error_report(int(time.time()), 'PVQA analyzer error.') else: r = dict() r['id'] = uuid.uuid1().urn[9:] r['duration'] = round(utils.get_wav_length(file_test), 3) # print(r['duration']) # This must be a float r['endtime'] = int(time.time()) r['mos_pvqa'] = pvqa_mos r['mos_aqua'] = aqua_mos r['mos_network'] = 0.0 r['report_pvqa'] = pvqa_report r['report_aqua'] = aqua_report r['r_factor'] = pvqa_rfactor r["percents_aqua"] = aqua_percents r['error'] = '' r['target'] = number r['audio_id'] = 0 r['phone_id'] = BackendServer.phone.identifier r['phone_name'] = '' r['task_id'] = 0 r['task_name'] = CURRENT_TASK # Upload report upload_id = BackendServer.upload_report(r, []) if upload_id != None: utils.log('Report is uploaded ok.') # Upload recorded audio upload_result = BackendServer.upload_audio(r['id'], file_test) if upload_result: utils.log('Recorded audio is uploaded ok.') result = True else: utils.log_error('Recorded audio is not uploaded.') else: utils.log_error('Failed to upload report.') except Exception as e: utils.log_error(e) else: utils.log_error('Seems the file is not recorded. Usually it happens because adb logcat is not stable sometimes. Return signal to restart') # Increase finished calls counter CALL_COUNTER.value = CALL_COUNTER.value + 1 return result def run_error(error_message: str): utils.log_error(error_message) CALL_COUNTER.value = CALL_COUNTER.value + 1 def make_call(target: str): global REFERENCE_AUDIO # Remove old recorded file record_file = '/dev/shm/bt_record.wav' # if Path(record_file).exists(): # os.remove(record_file) # Add prefix and suffix silence for reference to give a chance to record all the file reference_filename = '/dev/shm/prepared_reference.wav' utils.prepare_reference_file(fname=REFERENCE_AUDIO, silence_prefix_length=10.0, silence_suffix_length=5.0, output_fname=reference_filename) # Find duration of prepared reference file reference_length = int(utils.get_wav_length(reference_filename)) # Compose a command cmd = f'/usr/bin/python3 {DIR_THIS}/bt_call_controller.py --play-file {reference_filename} --record-file {record_file} --timelimit {reference_length} --target {target}' retcode = os.system(cmd) if retcode != 0: utils.log_error(f'BT caller script exited with non-zero code {retcode}, skipping analysis.') else: run_analyze(record_file, REFERENCE_AUDIO, target) def perform_answerer(): global CALL_LIMIT # Get reference audio duration in seconds reference_length = utils.get_wav_length(REFERENCE_AUDIO) # Setup analyzer script # Run answering script while True: # Remove old recording record_file = f'/dev/shm/bt_record.wav' cmd = f'/usr/bin/python3 {DIR_THIS}/bt_call_controller.py --play-file {REFERENCE_AUDIO} --record-file {record_file} --timelimit {int(reference_length)}' retcode = os.system(cmd) if retcode != 0: utils.log(f'Got non-zero exit code {retcode} from BT call controller, exiting.') break # Call analyzer script run_analyze(record_file, REFERENCE_AUDIO, '') def run_caller_task(t): global CURRENT_TASK, LOADED_AUDIO, REFERENCE_AUDIO utils.log("Running task:" + str(t)) # Ensure we have international number format - add '+' if missed target_addr = t['target'].strip() if not target_addr.startswith('+'): target_addr = '+' + target_addr task_name = t['name'].strip() # Load reference audio if LOADED_AUDIO.exists(): os.remove(LOADED_AUDIO) if not BackendServer.load_audio(t["audio_id"], LOADED_AUDIO): utils.log_error('No audio is available, exiting.') sys.exit(EXIT_ERROR) # Use loaded audio as reference REFERENCE_AUDIO = str(LOADED_AUDIO) CURRENT_TASK = task_name # Check attributes for precall scenaris attrs: dict = utils_qualtest.ParseAttributes(t['attributes']) retcode = 0 if 'precall' in attrs: # Run precall scenario utils.log('Running precall commands...') retcode = os.system(attrs['precall']) # If all requirements are ok - run the test if retcode != 0: utils.log_error(f'Precall script returned non-zero exit code {retcode}, skipping the actual test.') return # Start call. It will analyse audio as well and upload results make_call(target_addr) # Runs caller probe - load task list and perform calls def run_probe(): global TASK_LIST, REFERENCE_AUDIO, LOADED_AUDIO, CURRENT_TASK, FORCE_RUN while True: # Get task list update tasks = BackendServer.load_tasks() # Did we fetch anything ? if tasks: # Merge with existing ones. Some tasks can be removed, some can be add. changed = TASK_LIST.merge_with(tasks) else: utils.log_verbose(f"No task list assigned, exiting.") sys.exit(EXIT_ERROR) # Sort tasks by triggering time TASK_LIST.schedule() if TASK_LIST.tasks is not None: utils.log_verbose(f"Resulting task list: {TASK_LIST.tasks}") if FORCE_RUN and len(TASK_LIST.tasks) > 0: run_caller_task(TASK_LIST.tasks[0]) break # Process tasks and measure spent time start_time = time.monotonic() for t in TASK_LIST.tasks: if t["scheduled_time"] <= time.monotonic(): if t["command"] == "call": try: # Remove sheduled time del t['scheduled_time'] # Run task run_caller_task(t) utils.log_verbose(f'Call #{CALL_COUNTER.value} finished') if CALL_COUNTER.value >= CALL_LIMIT and CALL_LIMIT > 0: # Time to exit from the script utils.log(f'Call limit {CALL_LIMIT} hit, exiting.') return except Exception as err: utils.log_error(message="Unexpected error.", err=err) spent_time = time.monotonic() - start_time # Wait 1 minute if spent_time < 60: time.sleep(60 - spent_time) # In case of empty task list wait 1 minute before refresh if len(TASK_LIST.tasks) == 0: time.sleep(60) def receive_signal(signal_number, frame): # Delete PID file if os.path.exists(QUALTEST_PID): os.remove(QUALTEST_PID) # Debugging info print(f'Got signal {signal_number} from {frame}') # Stop GSM call utils_mcon.gsm_stop_call() # Exit raise SystemExit('Exiting') return # Check if Python version is ok assert sys.version_info >= (3, 6) # Use later configuration files # https://stackoverflow.com/questions/3609852/which-is-the-best-way-to-allow-configuration-options-be-overridden-at-the-comman parser = argparse.ArgumentParser() parser.add_argument("--config", help="Path to config file, see config.in.yaml.") parser.add_argument("--check-pid-file", action="store_true", help="Check if .pid file exists and exit if yes. Useful for using with .service files") parser.add_argument("--test", action="store_true", help="Run the first task immediately. Useful for testing.") # Parse arguments args = parser.parse_args() # Show help and exit if required if len(sys.argv) < 2: parser.print_help() sys.exit(EXIT_OK) if args.test: FORCE_RUN = True if Path(QUALTEST_PID).exists() and args.check_pid_file: print(f'File {QUALTEST_PID} exists, seems another instance of script is running. Please delete {QUALTEST_PID} to allow the start.') sys.exit(EXIT_OK) # Check if config file exists config = None config_path = 'config.yaml' if args.config: config_path = args.config with open(config_path, 'r') as stream: config = yaml.safe_load(stream) # register the signals to be caught signal.signal(signal.SIGINT, receive_signal) signal.signal(signal.SIGQUIT, receive_signal) # signal.signal(signal.SIGTERM, receive_signal) # SIGTERM is sent from utils_mcon as well (multiprocessing?) # Override default audio samplerate if needed if 'samplerate' in config['audio']: if config['audio']['samplerate']: utils_mcon.SAMPLERATE = int(config['audio']['samplerate']) if config['force_task']: FORCE_RUN = True if 'speech_detector' in config: if config['speech_detector']: USE_SILENCE_ERASER = False if 'bluetooth_mac' in config['audio']: bt_mac = config['audio']['bluetooth_mac'] if len(bt_mac) > 0: # Connect to phone before bt_ctl = Bluetoothctl() bt_ctl.connect(bt_mac) # Logging settings utils.verbose_logging = config['log']['verbose'] if config['log']['path']: utils.open_log_file(config['log']['path'], 'wt') # Use native ALSA utilities on RPi if utils.is_raspberrypi(): utils.log('RPi detected, using alsa-utils player & recorded') utils_mcon.USE_ALSA_AUDIO = True if 'ALSA' in config['audio']: if config['audio']['ALSA']: utils_mcon.USE_ALSA_AUDIO = True if config['log']['adb']: utils_mcon.VERBOSE_ADB = True utils.log('Enabled adb logcat output') # Audio directories if 'audio_dir' in config['log']: if config['log']['audio_dir']: LOG_AUDIO_DIR = config['log']['audio_dir'] # Ensure subdirectory log_audio exists if not os.path.exists(LOG_AUDIO_DIR): utils.log(f'Creating {LOG_AUDIO_DIR}') os.mkdir(LOG_AUDIO_DIR) if 'audio' in config['log']: if config['log']['audio']: LOG_AUDIO = True # Update path to pvqa/aqua-wb dir_script = os.path.dirname(os.path.realpath(__file__)) utils_sevana.find_binaries(os.path.join(dir_script, "../bin")) utils.log('Analyzer binaries are found') # Load latest licenses & configs - this requires utils_sevana.find_binaries() to be called before utils_sevana.load_config_and_licenses(config['backend']) # Audio devices if 'record_device' in config['audio'] and 'play_device' in config['audio']: utils_mcon.AUDIO_DEV_RECORD = config['audio']['record_device'] utils_mcon.AUDIO_DEV_PLAY = config['audio']['play_device'] # Limit number of calls if config['task_limit']: CALL_LIMIT = config['task_limit'] utils.log(f'Limiting number of calls to {CALL_LIMIT}') # Reset task list utils_qualtest.TASK_LIST = [] # Init backend server BackendServer = utils_qualtest.QualtestBackend() BackendServer.instance = config['name'] BackendServer.address = config['backend'] # Write pid file to current working directory with open(QUALTEST_PID, "w") as f: f.write(str(os.getpid())) f.close() try: # Load information about phone utils.log(f'Loading information about the node {BackendServer.instance} from {BackendServer.address}') BackendServer.preload() if 'answerer' in BackendServer.phone.role: # Check if task name is specified if not config['task']: utils.log_error('Please specify task value in config file.') if os.path.exists(QUALTEST_PID): os.remove(QUALTEST_PID) sys.exit(utils_mcon.EXIT_ERROR) # Save current task name CURRENT_TASK = config['task'] # Load reference audio utils.log('Loading reference audio...') if not BackendServer.load_audio(BackendServer.phone.audio_id, REFERENCE_AUDIO): utils.log_error('Audio is not available, exiting.') sys.exit(EXIT_ERROR) # Preparing reference audio utils.log('Running answering loop...') perform_answerer() elif 'caller' in BackendServer.phone.role: utils.log('Running caller...') run_probe() except Exception as e: utils.log_error('Error', e) # Close log file utils.close_log_file() # Exit with success code if os.path.exists(QUALTEST_PID): os.remove(QUALTEST_PID) sys.exit(EXIT_OK)