agent_gsm/src/agent_gsm.py

598 lines
20 KiB
Python

#!/usr/bin/python3
import os
import time
import argparse
import sys
import uuid
import json
import multiprocessing
import signal
import atexit
from pathlib import Path
import utils_qualtest
import utils_sevana
import utils
import utils_cache
from utils_types import (EXIT_ERROR, EXIT_OK)
from agent_config import AgentConfig
from bt_controller import Bluetoothctl
import bt_signal
from bt_signal import SignalBoundaries
from bt_call_controller import INTERRUPT_SIGNAL
import bt_call_controller
import agent_point
CONFIG = AgentConfig()
# Current task name.
CURRENT_TASK = None
# Current task list
TASK_LIST: utils_qualtest.TaskList = utils_qualtest.TaskList()
# Number of finished calls
CALL_COUNTER = multiprocessing.Value('i', 0)
# Find script's directory
DIR_THIS = Path(__file__).resolve().parent
DIR_PROJECT = DIR_THIS.parent
# Backup directory (to run without internet)
CACHE = utils_cache.InfoCache(None)
# Backend instance
BACKEND : utils_qualtest.QualtestBackend = None
# ANalyzer binaries found or not ?
VOICE_QUALITY_AVAILABLE = False
def remove_oldest_log_audio():
list_of_files = os.listdir(LOG_AUDIO_DIR)
if len(list_of_files) > 20:
full_path = [(LOG_AUDIO_DIR + "/{0}".format(x)) for x in list_of_files]
oldest_file = min(full_path, key=os.path.getctime)
# os.remove(oldest_file)
def detect_degraded_signal(file_test: Path, file_reference: Path) -> SignalBoundaries:
global USE_SILENCE_ERASER, LOG_AUDIO_DIR, BACKEND
is_caller : bool = 'caller' in BACKEND.phone.role
is_answerer : bool = 'answer' in BACKEND.phone.role
if utils.get_wav_length(file_test) < utils.get_wav_length(file_reference):
# Seems some problem with recording, return zero boundaries
return SignalBoundaries()
if CONFIG.UseSpeechDetector:
r = bt_signal.find_reference_signal_via_speechdetector(file_test)
else:
r = bt_signal.find_reference_signal(file_test)
if r.offset_start == 0.0 and is_caller:
r.offset_start = 5.0 # Skip ringing tones
return r
def detect_reference_signal(file_reference: Path) -> SignalBoundaries:
# Run silence eraser on reference file as well
if CONFIG.UseSpeechDetector:
result = bt_signal.find_reference_signal_via_speechdetector(file_reference)
else:
result = bt_signal.find_reference_signal(file_reference)
return result
def upload_results():
utils.log(f'Uploading remaining results...')
probe_list = CACHE.get_probe_list()
for t in probe_list:
# Path to .json report
path_report = t[0]
# Path to audio
path_audio = t[1]
utils.log(f'Found {t} report pair.')
if path_report is not None and path_report.exists():
try:
with open(path_report, 'rt') as f:
report = json.loads(f.read())
except:
utils.log_error(f'Error when processing {path_report.name}')
continue
upload_id, success = BACKEND.upload_report(report, cache=None)
if success:
utils.log(f'Report {upload_id} is uploaded ok.')
os.remove(path_report)
if path_audio is not None and path_audio.exists():
utils.log(f'Uploading {path_audio.name} file...')
# Upload recorded audio
upload_result = BACKEND.upload_audio(path_audio.stem, path_audio)
if upload_result:
utils.log(f' Recorded audio {path_audio.stem}.wav is uploaded ok.')
os.remove(path_audio)
def run_analyze(file_test: str, file_reference: str, number: str) -> bool:
global CALL_COUNTER
result = False
if not VOICE_QUALITY_AVAILABLE:
utils.log('Voice quality analyzers are not available, skipping analysis.')
return
if file_test.exists():
# Wait 5 seconds to give a chance to flush recorded file
time.sleep(5.0)
# Check how long audio file is
test_audio_length = round(utils.get_wav_length(file_test), 3)
ref_audio_length = round(utils.get_wav_length(file_reference), 3)
is_caller : bool = 'caller' in BACKEND.phone.role
is_answerer : bool = 'answer' in BACKEND.phone.role
utils.log(f'Recorded audio call duration: {test_audio_length}s, reference audio length: {ref_audio_length}s')
# Check if audio length is strange - skip such calls. Usually this is missed call.
is_caller_audio_big = is_caller and test_audio_length > ref_audio_length * 3
is_answerer_audio_big = is_answerer and test_audio_length > ref_audio_length * 3
if is_caller_audio_big or is_answerer_audio_big:
utils.log_error(f'Recorded call is too big - looks like mobile operator prompt, skipping analysis')
return False
try:
bounds_signal = SignalBoundaries()
if is_caller:
bounds_signal.offset_start = 10.0 # Skip ringtones
bounds_signal.offset_finish = 1.0 # Eat possible end tone
elif is_answerer:
bounds_signal.offset_start = 0.0
bounds_signal.offset_finish = 1.0 # Eat possible end tone
# PVQA report
pvqa_mos, pvqa_report, pvqa_rfactor = utils_sevana.find_pvqa_mos(file_test, bounds_signal.offset_start, bounds_signal.offset_finish)
utils.log(f'PVQA MOS: {pvqa_mos}, PVQA R-factor: {pvqa_rfactor}')
# AQuA report
bounds_reference : SignalBoundaries = SignalBoundaries()
bounds_reference.offset_start = 0
bounds_reference.offset_finish = 0
print(f'Found reference signal bounds: {bounds_reference}')
aqua_mos, aqua_percents, aqua_report = utils_sevana.find_aqua_mos(file_reference, file_test,
bounds_signal.offset_start, bounds_signal.offset_finish,
bounds_reference.offset_start, bounds_reference.offset_finish)
utils.log(f'AQuA MOS: {aqua_mos}, AQuA percents: {aqua_percents}')
# Build report for qualtest
r = None
if pvqa_mos == 0.0:
r = utils_qualtest.build_error_report(int(time.time()), 'PVQA analyzer error.')
else:
r = dict()
r['id'] = uuid.uuid1().urn[9:]
r['duration'] = round(utils.get_wav_length(file_test), 3)
# print(r['duration']) # This must be a float
r['endtime'] = int(time.time())
r['mos_pvqa'] = pvqa_mos
r['mos_aqua'] = aqua_mos
r['mos_network'] = 0.0
r['report_pvqa'] = pvqa_report
r['report_aqua'] = aqua_report
r['r_factor'] = pvqa_rfactor
r['percents_aqua'] = aqua_percents
r['error'] = ''
r['target'] = number
r['audio_id'] = 0
r['phone_id'] = BACKEND.phone.identifier
r['phone_name'] = ''
r['task_id'] = 0
r['task_name'] = CURRENT_TASK
# Upload report
upload_id, success = BACKEND.upload_report(r, cache=CACHE)
if success:
utils.log('Report is uploaded ok.')
# Upload recorded audio
upload_result = BACKEND.upload_audio(r['id'], file_test)
if upload_result:
utils.log(' Recorded audio is uploaded ok.')
result = True
else:
utils.log_error(' Recorded audio is not uploaded.')
CACHE.add_recorded_audio(file_test, probe_id=upload_id)
else:
CACHE.add_recorded_audio(file_test, probe_id=upload_id)
except Exception as e:
utils.log_error(e)
else:
utils.log_error('Seems the file is not recorded. Skipping analysis and upload.')
# Increase finished calls counter
CALL_COUNTER.value = CALL_COUNTER.value + 1
return result
def run_error(error_message: str):
utils.log_error(error_message)
CALL_COUNTER.value = CALL_COUNTER.value + 1
def make_call(target: str):
# Remove old recorded file
if CONFIG.RecordFile.exists():
os.remove(CONFIG.RecordFile)
# Add prefix and suffix silence for reference to give a chance to record all the file
utils.log(f'Preparing reference file...')
if CONFIG.PreparedReferenceAudio.exists():
os.remove(CONFIG.PreparedReferenceAudio)
utils.prepare_reference_file(fname=str(CONFIG.ReferenceAudio),
silence_prefix_length=CONFIG.SilencePrefix,
silence_suffix_length=CONFIG.SilenceSuffix,
output_fname=str(CONFIG.PreparedReferenceAudio))
# Find duration of prepared reference file
ref_time_length = round(utils.get_wav_length(CONFIG.PreparedReferenceAudio), 3)
utils.log(f' Done. Length of prepared reference audio file: {ref_time_length}s')
# Compose a command
# utils.close_log_file()
try:
bt_call_controller.run(play_file=CONFIG.PreparedReferenceAudio,
record_file=CONFIG.RecordFile,
timelimit_seconds=ref_time_length,
target=target)
run_analyze(CONFIG.RecordFile, CONFIG.PreparedReferenceAudio, target)
except Exception as e:
utils.log_error(f'BT I/O failed finally. Error: {str(e)}')
def perform_answerer():
if CONFIG.PreparedReferenceAudio.exists():
os.remove(CONFIG.PreparedReferenceAudio)
# Prepare answering file - this must be prepended with few seconds of silence which can be eatean by call setup procedure
utils.prepare_reference_file(fname=str(CONFIG.ReferenceAudio),
silence_prefix_length=CONFIG.SilencePrefix,
silence_suffix_length=CONFIG.SilenceSuffix,
output_fname=str(CONFIG.PreparedReferenceAudio))
# Get reference audio duration in seconds
ref_time_length = round(utils.get_wav_length(CONFIG.PreparedReferenceAudio), 3)
# Setup analyzer script
# Run answering script
attempt_idx = 0
while attempt_idx < CONFIG.TaskLimit or CONFIG.TaskLimit is None or CONFIG.TaskLimit == 0:
# Remove old recording
if CONFIG.RecordFile.exists():
os.remove(CONFIG.RecordFile)
try:
bt_call_controller.run(play_file=CONFIG.PreparedReferenceAudio,
record_file=CONFIG.RecordFile,
timelimit_seconds=int(ref_time_length),
target=None)
except Exception as e:
utils.log(f'BT I/O failed, exiting. Error: {str(e)}')
break
# Call analyzer script
run_analyze(CONFIG.RecordFile, CONFIG.PreparedReferenceAudio, '')
# Increase counter of attempts
attempt_idx += 1
def run_caller_task(t):
global CURRENT_TASK
utils.log("Running task:" + str(t))
# Ensure we have international number format - add '+' if missed
target_addr = t['target'].strip()
if not target_addr.startswith('+'):
target_addr = '+' + target_addr
task_name = t['name'].strip()
# Load reference audio
if CONFIG.LoadedAudio.exists():
os.remove(CONFIG.LoadedAudio)
audio_id = t["audio_id"]
if CACHE.get_reference_audio(audio_id, CONFIG.LoadedAudio):
utils.log(f'Reference audio {audio_id} found in cache.')
else:
utils.log(f'Reference audio {audio_id} not found in cache.')
if not BACKEND.load_audio(audio_id, CONFIG.LoadedAudio):
utils.log_error(f'Failed to load reference audio with ID {audio_id}.')
raise RuntimeError(f'Reference audio (ID: {audio_id}) is not available.')
# Cache loaded audio
CACHE.add_reference_audio(audio_id, CONFIG.LoadedAudio)
# Use loaded audio as reference
CONFIG.ReferenceAudio = str(CONFIG.LoadedAudio)
CURRENT_TASK = task_name
# Check attributes for precall scenaris
attrs: dict = utils_qualtest.ParseAttributes(t['attributes'])
retcode = 0
if 'precall' in attrs:
# Run precall scenario
utils.log('Running precall commands...')
retcode = os.system(attrs['precall'])
# If all requirements are ok - run the test
if retcode != 0:
utils.log_error(f'Precall script returned non-zero exit code {retcode}, skipping the actual test.')
return
# Start call. It will analyse audio as well and upload results
make_call(target_addr)
# Runs caller probe - load task list and perform calls
def run_probe():
global TASK_LIST, CURRENT_TASK
while True:
# Get task list update
new_tasks = BACKEND.load_tasks()
if new_tasks is None:
# Check in cache
utils.log('Checking for task list in cache...')
new_tasks = CACHE.get_tasks(BACKEND.phone.name)
# Did we fetch anything ?
if new_tasks:
utils.log(f' Task list found.')
# Merge with existing ones. Some tasks can be removed, some can be add.
changed = TASK_LIST.merge_with(incoming_tasklist = new_tasks)
CACHE.put_tasks(name=BACKEND.phone.name, tasks=TASK_LIST)
else:
utils.log(' Task isn\'t found in cache.')
raise RuntimeError(f'No task list found, exiting.')
if len(TASK_LIST.tasks) == 0:
utils.log(f'Task list is empty, exiting from running loop')
return
# Sort tasks by triggering time
TASK_LIST.schedule()
if TASK_LIST.tasks is not None:
utils.log_verbose(f"Resulting task list: {TASK_LIST.tasks}")
# Run test immediately if specified
if CONFIG.ForceRun and len(TASK_LIST.tasks) > 0:
run_caller_task(TASK_LIST.tasks[0])
break
# Process tasks and measure spent time
start_time = utils.get_monotonic_time()
for t in TASK_LIST.tasks:
if t["scheduled_time"] <= utils.get_monotonic_time():
if t["command"] == "call":
try:
# Remove sheduled time
del t['scheduled_time']
# Run task
run_caller_task(t)
utils.log_verbose(f'Call #{CALL_COUNTER.value} finished')
if CALL_COUNTER.value >= CONFIG.TaskLimit and CONFIG.TaskLimit > 0:
# Time to exit from the script
utils.log(f'Call limit {CONFIG.TaskLimit} hit, exiting.')
return
except Exception as err:
utils.log_error(message="Unexpected error.", err=err)
# Sleep for
spent_time = utils.get_monotonic_time() - start_time
# Wait 1 minute
if spent_time < 60:
timeout_time = 60 - spent_time
else:
timeout_time = 0
# Try to get next task
try:
if agent_point.WEB_QUEUE is None:
utils.log('Web task queue is None')
task = agent_point.WEB_QUEUE.get(block=True, timeout=timeout_time)
if task is not None:
run_caller_task(task)
except:
# Do nothing here, it is normal to get exception
pass
# In case of empty task list wait 1 minute before refresh
# if len(TASK_LIST.tasks) == 0:
# time.sleep(60)
def remove_pid_on_exit():
if CONFIG.QualtestPID:
if CONFIG.QualtestPID.exists():
os.remove(CONFIG.QualtestPID)
def receive_signal(signal_number, frame):
global INTERRUPT_SIGNAL
# Delete PID file
remove_pid_on_exit()
# Stop optional access point
agent_point.stop()
# Debugging info
print(f'Got signal {signal_number} from {frame}')
# This it to break BT play controller
INTERRUPT_SIGNAL = True
# Exit
raise SystemExit('Exiting')
return
# Check if Python version is ok
assert sys.version_info >= (3, 6)
# Use later configuration files
# https://stackoverflow.com/questions/3609852/which-is-the-best-way-to-allow-configuration-options-be-overridden-at-the-comman
if __name__ == '__main__':
CONFIG = AgentConfig()
if len(sys.argv) < 2:
CONFIG.parser().print_help()
sys.exit(EXIT_OK)
if CONFIG.QualtestPID.exists() and CONFIG.CheckPIDFile:
print(f'File {CONFIG.QualtestPID} exists, seems another instance of script is running. Please delete {CONFIG.QualtestPID} to allow the start.')
sys.exit(EXIT_OK)
# Remove PID file on exit (if needed)
atexit.register(remove_pid_on_exit)
# register the signals to be caught
signal.signal(signal.SIGINT, receive_signal)
signal.signal(signal.SIGQUIT, receive_signal)
if CONFIG.CacheDir:
CACHE = utils_cache.InfoCache(dir=CONFIG.CacheDir)
# Start own hotspot and API server
agent_point.CONFIG = CONFIG
agent_point.CACHE = CACHE
agent_point.start()
# Preconnect the phone
if CONFIG.BT_MAC:
# Connect to phone before
utils.log(f'Connecting to BT MAC {CONFIG.BT_MAC} ...')
bt_ctl = Bluetoothctl()
bt_ctl.connect(CONFIG.BT_MAC)
utils.log(f' Done.')
else:
utils.log_error(f'No BT MAC specified, cannot connect. Exiting.')
raise SystemExit(EXIT_ERROR)
# Init BT modem - here we wait for it
bt_call_controller.init()
# Logging settings
utils.verbose_logging = CONFIG.Verbose
if CONFIG.LogPath:
utils.open_log_file(CONFIG.LogPath, 'at')
# Update path to pvqa/aqua-wb
VOICE_QUALITY_AVAILABLE = utils_sevana.find_binaries(DIR_PROJECT / 'bin')
# Load latest licenses & configs - this requires utils_sevana.find_binaries() to be called before
# utils_sevana.load_config_and_licenses(config['backend'])
# Limit number of calls
if CONFIG.TaskLimit:
utils.log(f'Limiting number of calls to {CONFIG.TaskLimit}')
# Reset task list
utils_qualtest.TASK_LIST = []
# Init backend server
BACKEND = utils_qualtest.QualtestBackend()
BACKEND.instance = CONFIG.Name
BACKEND.address = CONFIG.Backend
# Write pid file to current working directory
if CONFIG.QualtestPID:
with open(CONFIG.QualtestPID, 'w') as f:
f.write(str(os.getpid()))
f.close()
try:
# Load information about phone
utils.log(f'Loading information about the node {BACKEND.instance} from {BACKEND.address}')
BACKEND.preload(CACHE)
if BACKEND.phone is None:
utils.log_error(f'Failed to obtain information about {BACKEND.instance}. Exiting.')
exit(EXIT_ERROR)
# Cache phone information
CACHE.put_phone(BACKEND.phone)
# Upload results which were remaining in cache
upload_results()
if 'answerer' in BACKEND.phone.role:
# Check if task name is specified
if CONFIG.TaskName is None:
utils.log_error('Please specify task value in config file.')
sys.exit(EXIT_ERROR)
# Save current task name
CURRENT_TASK = CONFIG.TaskName
# Load reference audio
if BACKEND.load_audio(BACKEND.phone.audio_id, CONFIG.ReferenceAudio):
CACHE.add_reference_audio(BACKEND.phone.audio_id, CONFIG.ReferenceAudio)
else:
utils.log('Audio is not available online...')
if not CACHE.get_reference_audio(BACKEND.phone.audio_id, CONFIG.ReferenceAudio):
utils.log_error(' Reference audio is not cached, sorry. Exiting.')
sys.exit(EXIT_ERROR)
else:
utils.log(f' Found in cache.')
# Preparing reference audio
utils.log('Running answering loop...')
perform_answerer()
elif 'caller' in BACKEND.phone.role:
utils.log('Running caller...')
run_probe()
except Exception as e:
utils.log_error('Error', e)
# Close log file
utils.close_log_file()
# Stop optional access point
agent_point.stop()
sys.exit(EXIT_OK)