agent_gsm/src/agent_gsm.py

612 lines
20 KiB
Python

#!/usr/bin/python3
import os
import time
import argparse
import sys
import uuid
import json
import utils_qualtest
import utils_sevana
import utils_mcon
import utils_logcat
import utils
import utils_cache
from bt_controller import Bluetoothctl
import bt_signal
from bt_signal import SignalBoundaries
import multiprocessing
import signal
import yaml
from pathlib import Path
# Name of intermediary file with audio recorded from the GSM phone
RECORD_FILE = "/dev/shm/qualtest_recorded.wav"
# Backend instance
BackendServer : utils_qualtest.QualtestBackend = None
# Reference audio to play
REFERENCE_AUDIO = "/dev/shm/reference_original.wav"
# Loaded reference audio (from backend)
LOADED_AUDIO = Path("/dev/shm/loaded_audio.wav")
# Script to exec after mobile call answering
EXEC_SCRIPT = None
# Current task name.
CURRENT_TASK = None
# Current task list
TASK_LIST: utils_qualtest.TaskList = utils_qualtest.TaskList()
# Number of finished calls
CALL_COUNTER = multiprocessing.Value('i', 0)
# Maximum number of calls to to. Zero means unlimited number of calls.
CALL_LIMIT = 0
# Find script's directory
DIR_THIS = Path(__file__).resolve().parent
DIR_PROJECT = DIR_THIS.parent
# Backup directory (to run without internet)
DIR_CACHE = None
CACHE = utils_cache.InfoCache(None)
# PID file name
QUALTEST_PID = "/dev/shm/qualtest.pid"
# Should the first task run immediately ?
FORCE_RUN = False
# Exit codes
EXIT_OK = 0
EXIT_ERROR = 1
# Use silence eraser or not (speech detector is used in this case)
USE_SILENCE_ERASER = True
def remove_oldest_log_audio():
list_of_files = os.listdir(LOG_AUDIO_DIR)
if len(list_of_files) > 20:
full_path = [(LOG_AUDIO_DIR + "/{0}".format(x)) for x in list_of_files]
oldest_file = min(full_path, key=os.path.getctime)
# os.remove(oldest_file)
def detect_degraded_signal(file_test: Path, file_reference: Path) -> SignalBoundaries:
global USE_SILENCE_ERASER, LOG_AUDIO_DIR, BackendServer
is_caller : bool = 'caller' in BackendServer.phone.role
is_answerer : bool = 'answer' in BackendServer.phone.role
if utils.get_wav_length(file_test) < utils.get_wav_length(file_reference):
# Seems some problem with recording, return zero boundaries
return SignalBoundaries()
r = bt_signal.find_reference_signal(file_test)
if r.offset_start == 0.0 and is_caller:
r.offset_start = 5.0 # Skip ringing tones
return r
def detect_reference_signal(file_reference: Path) -> SignalBoundaries:
# Run silence eraser on reference file as well
result = bt_signal.find_reference_signal(file_reference)
return result
def upload_results():
utils.log(f'Uploading remaining results...')
probe_list = CACHE.get_probe_list()
for t in probe_list:
# Path to .json report
path_report = t[0]
# Path to audio
path_audio = t[1]
try:
with open(path_report, 'rt') as f:
report = json.loads(f.read())
except:
utils.log_error(f'Error when processing {path_report.name}')
continue
upload_id, success = BackendServer.upload_report(report, cache=None)
if success:
utils.log(f'Report {upload_id} is uploaded ok.')
# Rename files to make sync audio filename with reported ones
path_report_fixed = CACHE.dir / f'{upload_id}.json'
path_report = path_report.rename(path_report_fixed)
path_audio_fixed = CACHE.dir / f'{upload_id}.wav'
path_audio = path_audio.rename(path_audio_fixed)
# Upload recorded audio
upload_result = BackendServer.upload_audio(upload_id, path_audio)
if upload_result:
utils.log('Recorded audio {upload_id}.wav is uploaded ok.')
os.remove(path_report)
os.remove(path_audio)
else:
break
else:
utils.log(f'Failed to upload report {path_report.name}')
break
def run_analyze(file_test: str, file_reference: str, number: str) -> bool:
global CALL_COUNTER
result = False
if file_test:
# Wait 5 seconds to give a chance to flush recorded file
time.sleep(5.0)
# Check how long audio file is
audio_length = utils.get_wav_length(file_test)
is_caller : bool = 'caller' in BackendServer.phone.role
is_answerer : bool = 'answer' in BackendServer.phone.role
utils.log(f'Recorded audio call duration: {round(audio_length, 3)}s')
# Check if audio length is strange - skip such calls. Usually this is missed call.
if (is_caller and audio_length >= utils_mcon.TIME_LIMIT_CALL) or (is_answerer and audio_length >= utils_mcon.TIME_LIMIT_CALL * 1.2):
utils.log_error(f'Recorded call is too big - looks like mobile operator prompt, skipping analysis')
return False
try:
bounds_signal : SignalBoundaries = detect_degraded_signal(Path(file_test), Path(file_reference))
# bounds_signal.offset_start = 0
# bounds_signal.offset_finish = 0
print(f'Found signal bounds: {bounds_signal}')
# PVQA report
pvqa_mos, pvqa_report, pvqa_rfactor = utils_sevana.find_pvqa_mos(file_test, bounds_signal.offset_start, bounds_signal.offset_finish)
utils.log(f'PVQA MOS: {pvqa_mos}, PVQA R-factor: {pvqa_rfactor}')
# AQuA report
bounds_reference : SignalBoundaries = detect_reference_signal(Path(file_reference))
bounds_reference.offset_start = 0
bounds_reference.offset_finish = 0
print(f'Found reference signal bounds: {bounds_reference}')
aqua_mos, aqua_percents, aqua_report = utils_sevana.find_aqua_mos(file_reference, file_test,
bounds_signal.offset_start, bounds_signal.offset_finish,
bounds_reference.offset_start, bounds_reference.offset_finish)
utils.log(f'AQuA MOS: {aqua_mos}, AQuA percents: {aqua_percents}')
# Build report for qualtest
r = None
if pvqa_mos == 0.0:
r = utils_qualtest.build_error_report(int(time.time()), 'PVQA analyzer error.')
else:
r = dict()
r['id'] = uuid.uuid1().urn[9:]
r['duration'] = round(utils.get_wav_length(file_test), 3)
# print(r['duration']) # This must be a float
r['endtime'] = int(time.time())
r['mos_pvqa'] = pvqa_mos
r['mos_aqua'] = aqua_mos
r['mos_network'] = 0.0
r['report_pvqa'] = pvqa_report
r['report_aqua'] = aqua_report
r['r_factor'] = pvqa_rfactor
r["percents_aqua"] = aqua_percents
r['error'] = ''
r['target'] = number
r['audio_id'] = 0
r['phone_id'] = BackendServer.phone.identifier
r['phone_name'] = ''
r['task_id'] = 0
r['task_name'] = CURRENT_TASK
# Upload report
upload_id, success = BackendServer.upload_report(r, cache=CACHE)
if success:
utils.log('Report is uploaded ok.')
# Upload recorded audio
upload_result = BackendServer.upload_audio(r['id'], file_test)
if upload_result:
utils.log('Recorded audio is uploaded ok.')
result = True
else:
utils.log_error('Recorded audio is not uploaded.')
CACHE.add_recorded_audio(file_test, probe_id=upload_id)
else:
CACHE.add_recorded_audio(file_test, probe_id=upload_id)
except Exception as e:
utils.log_error(e)
else:
utils.log_error('Seems the file is not recorded. Usually it happens because adb logcat is not stable sometimes. Return signal to restart')
# Increase finished calls counter
CALL_COUNTER.value = CALL_COUNTER.value + 1
return result
def run_error(error_message: str):
utils.log_error(error_message)
CALL_COUNTER.value = CALL_COUNTER.value + 1
def make_call(target: str):
global REFERENCE_AUDIO
# Remove old recorded file
record_file = '/dev/shm/bt_record.wav'
# if Path(record_file).exists():
# os.remove(record_file)
# Add prefix and suffix silence for reference to give a chance to record all the file
reference_filename = Path('/dev/shm/reference_built.wav')
if reference_filename.exists():
os.remove(reference_filename)
utils.prepare_reference_file(fname=str(REFERENCE_AUDIO), silence_prefix_length=10.0, silence_suffix_length=5.0, output_fname=str(reference_filename))
# Find duration of prepared reference file
reference_length = int(utils.get_wav_length(reference_filename))
# Compose a command
cmd = f'/usr/bin/python3 {DIR_THIS}/bt_call_controller.py --play-file {reference_filename} --record-file {record_file} --timelimit {reference_length} --target {target}'
retcode = os.system(cmd)
if retcode != 0:
utils.log_error(f'BT caller script exited with non-zero code {retcode}, skipping analysis.')
else:
run_analyze(record_file, REFERENCE_AUDIO, target)
def perform_answerer():
global CALL_LIMIT
# Get reference audio duration in seconds
reference_length = utils.get_wav_length(REFERENCE_AUDIO)
# Setup analyzer script
# Run answering script
while True:
# Remove old recording
record_file = f'/dev/shm/bt_record.wav'
cmd = f'/usr/bin/python3 {DIR_THIS}/bt_call_controller.py --play-file {REFERENCE_AUDIO} --record-file {record_file} --timelimit {int(reference_length)}'
retcode = os.system(cmd)
if retcode != 0:
utils.log(f'Got non-zero exit code {retcode} from BT call controller, exiting.')
break
# Call analyzer script
run_analyze(record_file, REFERENCE_AUDIO, '')
def run_caller_task(t):
global CURRENT_TASK, LOADED_AUDIO, REFERENCE_AUDIO
utils.log("Running task:" + str(t))
# Ensure we have international number format - add '+' if missed
target_addr = t['target'].strip()
if not target_addr.startswith('+'):
target_addr = '+' + target_addr
task_name = t['name'].strip()
# Load reference audio
if LOADED_AUDIO.exists():
os.remove(LOADED_AUDIO)
audio_id = t["audio_id"]
if not BackendServer.load_audio(audio_id, LOADED_AUDIO):
utils.log_error(f'Failed to load reference audio with ID {audio_id}.')
if CACHE.get_reference_audio(audio_id, LOADED_AUDIO):
utils.log(f' Found in cache.')
else:
utils.log(f' Failed to find the audio in cache.')
raise RuntimeError(f'Reference audio (ID: {audio_id}) is not available.')
else:
# Cache loaded audio
CACHE.add_reference_audio(audio_id, LOADED_AUDIO)
# Use loaded audio as reference
REFERENCE_AUDIO = str(LOADED_AUDIO)
CURRENT_TASK = task_name
# Check attributes for precall scenaris
attrs: dict = utils_qualtest.ParseAttributes(t['attributes'])
retcode = 0
if 'precall' in attrs:
# Run precall scenario
utils.log('Running precall commands...')
retcode = os.system(attrs['precall'])
# If all requirements are ok - run the test
if retcode != 0:
utils.log_error(f'Precall script returned non-zero exit code {retcode}, skipping the actual test.')
return
# Start call. It will analyse audio as well and upload results
make_call(target_addr)
# Runs caller probe - load task list and perform calls
def run_probe():
global TASK_LIST, REFERENCE_AUDIO, LOADED_AUDIO, CURRENT_TASK, FORCE_RUN
while True:
# Get task list update
new_tasks = BackendServer.load_tasks()
if new_tasks is None:
# Check in cache
utils.log('Checking for task list in cache...')
new_tasks = CACHE.get_tasks(BackendServer.phone.name)
# Did we fetch anything ?
if new_tasks:
utils.log(f' Task list found in cache.')
# Merge with existing ones. Some tasks can be removed, some can be add.
changed = TASK_LIST.merge_with(incoming_tasklist = new_tasks)
CACHE.put_tasks(name=BackendServer.phone.name, tasks=TASK_LIST)
else:
utils.log(' Task isn\'t found in cache.')
raise RuntimeError(f'No task list found, exiting.')
if len(TASK_LIST.tasks) == 0:
utils.log(f'Task list is empty, exiting from running loop')
return
# Sort tasks by triggering time
TASK_LIST.schedule()
if TASK_LIST.tasks is not None:
utils.log_verbose(f"Resulting task list: {TASK_LIST.tasks}")
if FORCE_RUN and len(TASK_LIST.tasks) > 0:
run_caller_task(TASK_LIST.tasks[0])
break
# Process tasks and measure spent time
start_time = time.monotonic()
for t in TASK_LIST.tasks:
if t["scheduled_time"] <= time.monotonic():
if t["command"] == "call":
try:
# Remove sheduled time
del t['scheduled_time']
# Run task
run_caller_task(t)
utils.log_verbose(f'Call #{CALL_COUNTER.value} finished')
if CALL_COUNTER.value >= CALL_LIMIT and CALL_LIMIT > 0:
# Time to exit from the script
utils.log(f'Call limit {CALL_LIMIT} hit, exiting.')
return
except Exception as err:
utils.log_error(message="Unexpected error.", err=err)
spent_time = time.monotonic() - start_time
# Wait 1 minute
if spent_time < 60:
time.sleep(60 - spent_time)
# In case of empty task list wait 1 minute before refresh
if len(TASK_LIST.tasks) == 0:
time.sleep(60)
def receive_signal(signal_number, frame):
# Delete PID file
if os.path.exists(QUALTEST_PID):
os.remove(QUALTEST_PID)
# Debugging info
print(f'Got signal {signal_number} from {frame}')
# Stop GSM call
utils_mcon.gsm_stop_call()
# Exit
raise SystemExit('Exiting')
return
# Check if Python version is ok
assert sys.version_info >= (3, 6)
# Use later configuration files
# https://stackoverflow.com/questions/3609852/which-is-the-best-way-to-allow-configuration-options-be-overridden-at-the-comman
parser = argparse.ArgumentParser()
parser.add_argument("--config", help="Path to config file, see config.in.yaml.")
parser.add_argument("--check-pid-file", action="store_true", help="Check if .pid file exists and exit if yes. Useful for using with .service files")
parser.add_argument("--test", action="store_true", help="Run the first task immediately. Useful for testing.")
# Parse arguments
args = parser.parse_args()
# Show help and exit if required
if len(sys.argv) < 2:
parser.print_help()
sys.exit(EXIT_OK)
if args.test:
FORCE_RUN = True
if Path(QUALTEST_PID).exists() and args.check_pid_file:
print(f'File {QUALTEST_PID} exists, seems another instance of script is running. Please delete {QUALTEST_PID} to allow the start.')
sys.exit(EXIT_OK)
# Check if config file exists
config = None
config_path = 'config.yaml'
if args.config:
config_path = args.config
with open(config_path, 'r') as stream:
config = yaml.safe_load(stream)
# register the signals to be caught
signal.signal(signal.SIGINT, receive_signal)
signal.signal(signal.SIGQUIT, receive_signal)
# signal.signal(signal.SIGTERM, receive_signal)
# SIGTERM is sent from utils_mcon as well (multiprocessing?)
# Override default audio samplerate if needed
if 'samplerate' in config['audio']:
if config['audio']['samplerate']:
utils_mcon.SAMPLERATE = int(config['audio']['samplerate'])
if config['force_task']:
FORCE_RUN = True
if 'speech_detector' in config:
if config['speech_detector']:
USE_SILENCE_ERASER = False
if 'bluetooth_mac' in config['audio']:
bt_mac = config['audio']['bluetooth_mac']
if len(bt_mac) > 0:
# Connect to phone before
bt_ctl = Bluetoothctl()
bt_ctl.connect(bt_mac)
# Logging settings
utils.verbose_logging = config['log']['verbose']
if config['log']['path']:
utils.open_log_file(config['log']['path'], 'at')
# Use native ALSA utilities on RPi
if utils.is_raspberrypi():
utils.log('RPi detected, using alsa-utils player & recorded')
utils_mcon.USE_ALSA_AUDIO = True
if 'ALSA' in config['audio']:
if config['audio']['ALSA']:
utils_mcon.USE_ALSA_AUDIO = True
if 'adb' in config['log']:
if config['log']['adb']:
utils_mcon.VERBOSE_ADB = True
utils.log('Enabled adb logcat output')
# Audio directories
if 'cache_dir' in config:
DIR_CACHE = Path(config['cache_dir'])
if not DIR_CACHE.is_absolute():
DIR_CACHE = DIR_THIS.parent / config['cache_dir']
CACHE = utils_cache.InfoCache(dir=DIR_CACHE)
# Update path to pvqa/aqua-wb
utils_sevana.find_binaries(DIR_PROJECT / 'bin')
# Load latest licenses & configs - this requires utils_sevana.find_binaries() to be called before
utils_sevana.load_config_and_licenses(config['backend'])
# Audio devices
if 'record_device' in config['audio'] and 'play_device' in config['audio']:
utils_mcon.AUDIO_DEV_RECORD = config['audio']['record_device']
utils_mcon.AUDIO_DEV_PLAY = config['audio']['play_device']
# Limit number of calls
if config['task_limit']:
CALL_LIMIT = config['task_limit']
utils.log(f'Limiting number of calls to {CALL_LIMIT}')
# Reset task list
utils_qualtest.TASK_LIST = []
# Init backend server
BackendServer = utils_qualtest.QualtestBackend()
BackendServer.instance = config['name']
BackendServer.address = config['backend']
# Write pid file to current working directory
with open(QUALTEST_PID, "w") as f:
f.write(str(os.getpid()))
f.close()
try:
# Load information about phone
utils.log(f'Loading information about the node {BackendServer.instance} from {BackendServer.address}')
BackendServer.preload(CACHE)
if BackendServer.phone is None:
utils.log_error(f'Failed to obtain information about {BackendServer.instance}. Exiting.')
exit(EXIT_ERROR)
# Cache information
CACHE.put_phone(BackendServer.phone)
# Upload results which were remaining in cache
upload_results()
if 'answerer' in BackendServer.phone.role:
# Check if task name is specified
if not config['task']:
utils.log_error('Please specify task value in config file.')
if os.path.exists(QUALTEST_PID):
os.remove(QUALTEST_PID)
sys.exit(utils_mcon.EXIT_ERROR)
# Save current task name
CURRENT_TASK = config['task']
# Load reference audio
utils.log('Loading reference audio...')
if BackendServer.load_audio(BackendServer.phone.audio_id, REFERENCE_AUDIO):
CACHE.add_reference_audio(BackendServer.phone.audio_id, REFERENCE_AUDIO)
else:
utils.log_error('Audio is not available online.')
if not CACHE.get_reference_audio(BackendServer.phone.audio_id, REFERENCE_AUDIO):
utils.log_error('Reference audio is not cached, sorry. Exiting.')
sys.exit(EXIT_ERROR)
# Preparing reference audio
utils.log('Running answering loop...')
perform_answerer()
elif 'caller' in BackendServer.phone.role:
utils.log('Running caller...')
run_probe()
except Exception as e:
utils.log_error('Error', e)
# Close log file
utils.close_log_file()
# Exit with success code
if os.path.exists(QUALTEST_PID):
os.remove(QUALTEST_PID)
sys.exit(EXIT_OK)