From 9d2ba8c998b2510077217adcd5cfb8bde8e20636 Mon Sep 17 00:00:00 2001 From: Dmytro Bogovych Date: Mon, 21 Aug 2023 19:56:07 +0300 Subject: [PATCH] - changes for last day --- src/.vscode/launch.json | 8 --- src/agent_gsm.py | 109 ++++++++++++++++++++++++++++------------ src/bt_phone.py | 6 +-- src/bt_preconnect.py | 9 ++-- src/utils.py | 8 +-- src/utils_bt_audio.py | 4 +- src/utils_cache.py | 35 +++++++++++-- src/utils_mcon.py | 6 +-- src/utils_qualtest.py | 25 +++++---- src/utils_sevana.py | 27 +++++----- src/utils_types.py | 12 +++-- 11 files changed, 162 insertions(+), 87 deletions(-) diff --git a/src/.vscode/launch.json b/src/.vscode/launch.json index 1527ecc..1e7ea40 100644 --- a/src/.vscode/launch.json +++ b/src/.vscode/launch.json @@ -4,14 +4,6 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ - { - "name": "Example: answerer", - "type": "python", - "request": "launch", - "program": "example_answer.py", - "console": "integratedTerminal", - "args": [""] - }, { "name": "rabbitmq: utils_mcon", "type": "python", diff --git a/src/agent_gsm.py b/src/agent_gsm.py index 8e7fe2b..bfa0c48 100644 --- a/src/agent_gsm.py +++ b/src/agent_gsm.py @@ -1,15 +1,11 @@ #!/usr/bin/python3 import os -import platform -import json -import subprocess import time import argparse import sys -import shlex -import select import uuid +import json import utils_qualtest import utils_sevana import utils_mcon @@ -18,17 +14,13 @@ import utils import utils_cache from bt_controller import Bluetoothctl -import bt_call_controller import bt_signal from bt_signal import SignalBoundaries import multiprocessing -import shutil import signal import yaml -import pathlib from pathlib import Path -from datetime import datetime # Name of intermediary file with audio recorded from the GSM phone RECORD_FILE = "/dev/shm/qualtest_recorded.wav" @@ -109,10 +101,43 @@ def detect_degraded_signal(file_test: Path, file_reference: Path) -> SignalBound def detect_reference_signal(file_reference: Path) -> SignalBoundaries: # Run silence eraser on reference file as well result = bt_signal.find_reference_signal(file_reference) - return result +def upload_results(): + probe_list = CACHE.get_probe_list() + for t in probe_list: + # Path to .json report + path_report = t[0] + + # Path to audio + path_audio = t[1] + + with open(path_report, 'rt') as f: + report = json.loads(f.read()) + + upload_id, success = BackendServer.upload_report(report, cache=None) + if success: + utils.log(f'Report {upload_id} is uploaded ok.') + + # Rename files to make sync audio filename with reported ones + path_report_fixed = CACHE.dir / f'{upload_id}.json' + path_report = path_report.rename(path_report_fixed) + + path_audio_fixed = CACHE.dir / f'{upload_id}.wav' + path_audio = path_audio.rename(path_audio_fixed) + + # Upload recorded audio + upload_result = BackendServer.upload_audio(upload_id, path_audio) + if upload_result: + utils.log('Recorded audio {upload_id}.wav is uploaded ok.') + os.remove(path_report) + os.remove(path_audio) + else: + break + else: + break + def run_analyze(file_test: str, file_reference: str, number: str) -> bool: global CALL_COUNTER @@ -183,8 +208,8 @@ def run_analyze(file_test: str, file_reference: str, number: str) -> bool: r['task_name'] = CURRENT_TASK # Upload report - upload_id, success = BackendServer.upload_report(r) - if upload_id != None and success: + upload_id, success = BackendServer.upload_report(r, cache=CACHE) + if success: utils.log('Report is uploaded ok.') # Upload recorded audio @@ -195,8 +220,9 @@ def run_analyze(file_test: str, file_reference: str, number: str) -> bool: result = True else: utils.log_error('Recorded audio is not uploaded.') + CACHE.add_recorded_audio(file_test, probe_id=upload_id) else: - utils.log_error('Failed to upload report.') + CACHE.add_recorded_audio(file_test, probe_id=upload_id) except Exception as e: utils.log_error(e) @@ -276,10 +302,18 @@ def run_caller_task(t): if LOADED_AUDIO.exists(): os.remove(LOADED_AUDIO) - if not BackendServer.load_audio(t["audio_id"], LOADED_AUDIO): - utils.log_error('No audio is available, exiting.') - sys.exit(EXIT_ERROR) - + audio_id = t["audio_id"] + if not BackendServer.load_audio(audio_id, LOADED_AUDIO): + utils.log_error(f'Failed to load reference audio with ID {audio_id}.') + if CACHE.get_reference_audio(audio_id, LOADED_AUDIO): + utils.log(f' Found in cache.') + else: + utils.log(f' Failed to find the audio in cache.') + raise RuntimeError(f'Reference audio (ID: {audio_id}) is not available.') + else: + # Cache loaded audio + CACHE.add_reference_audio(audio_id, LOADED_AUDIO) + # Use loaded audio as reference REFERENCE_AUDIO = str(LOADED_AUDIO) @@ -310,20 +344,26 @@ def run_probe(): while True: # Get task list update - tasks = BackendServer.load_tasks() - if tasks is None: + new_tasks = BackendServer.load_tasks() + if new_tasks is None: # Check in cache - tasks = CACHE.get_tasks(BackendServer.phone.name) + utils.log('Checking for task list in cache...') + new_tasks = CACHE.get_tasks(BackendServer.phone.name) # Did we fetch anything ? - if tasks: + if new_tasks: + utils.log(f' Task list found in cache.') # Merge with existing ones. Some tasks can be removed, some can be add. - changed = TASK_LIST.merge_with(tasks) - CACHE.put_tasks(changed) + changed = TASK_LIST.merge_with(incoming_tasklist = new_tasks) + CACHE.put_tasks(name=BackendServer.phone.name, tasks=TASK_LIST) else: - utils.log_verbose(f"No task list assigned, exiting.") - sys.exit(EXIT_ERROR) + utils.log(' Task isn\'t found in cache.') + raise RuntimeError(f'No task list found, exiting.') + if len(TASK_LIST.tasks) == 0: + utils.log(f'Task list is empty, exiting from running loop') + return + # Sort tasks by triggering time TASK_LIST.schedule() if TASK_LIST.tasks is not None: @@ -450,7 +490,7 @@ if 'bluetooth_mac' in config['audio']: utils.verbose_logging = config['log']['verbose'] if config['log']['path']: - utils.open_log_file(config['log']['path'], 'wt') + utils.open_log_file(config['log']['path'], 'at') # Use native ALSA utilities on RPi if utils.is_raspberrypi(): @@ -462,22 +502,22 @@ if 'ALSA' in config['audio']: utils_mcon.USE_ALSA_AUDIO = True -if config['log']['adb']: - utils_mcon.VERBOSE_ADB = True - utils.log('Enabled adb logcat output') +if 'adb' in config['log']: + if config['log']['adb']: + utils_mcon.VERBOSE_ADB = True + utils.log('Enabled adb logcat output') # Audio directories if 'cache_dir' in config: DIR_CACHE = Path(config['cache_dir']) if not DIR_CACHE.is_absolute(): - DIR_CACHE = DIR_THIS / config['cache_dir'] + DIR_CACHE = DIR_THIS.parent / config['cache_dir'] CACHE = utils_cache.InfoCache(dir=DIR_CACHE) # Update path to pvqa/aqua-wb utils_sevana.find_binaries(DIR_PROJECT / 'bin') -utils.log('Analyzer binaries are found') # Load latest licenses & configs - this requires utils_sevana.find_binaries() to be called before utils_sevana.load_config_and_licenses(config['backend']) @@ -510,11 +550,16 @@ with open(QUALTEST_PID, "w") as f: try: # Load information about phone utils.log(f'Loading information about the node {BackendServer.instance} from {BackendServer.address}') - BackendServer.preload(CACHE.dir) + BackendServer.preload(CACHE) if BackendServer.phone is None: utils.log_error(f'Failed to obtain information about {BackendServer.instance}. Exiting.') exit(EXIT_ERROR) + # Cache information + CACHE.put_phone(BackendServer.phone) + + # Upload results which were remaining in cache + upload_results() if 'answerer' in BackendServer.phone.role: # Check if task name is specified diff --git a/src/bt_phone.py b/src/bt_phone.py index 0afe7ce..e5d5c9c 100644 --- a/src/bt_phone.py +++ b/src/bt_phone.py @@ -59,7 +59,7 @@ class Phone(Observable): model_serial = properties['Serial'] modem_online = properties['Online'] - print(f'Found modem: {path} name: {modem_name} serial: {model_serial} online: {modem_online}') + utils.log(f'Found modem: {path} name: {modem_name} serial: {model_serial} online: {modem_online}') if modem_online == 1: return path @@ -110,7 +110,7 @@ class Phone(Observable): self.modems = self.manager.GetModems() # Wait for online modem - utils.log('Waiting for BT modem (phone must be paired and connected before)...') + utils.log('Waiting for BT modem (phone must be paired and connected before) with timeout 10 seconds...') self.modem = self.wait_for_online_modem(timeout_seconds=10) # 10 seconds timeout if self.modem is None: @@ -151,13 +151,11 @@ class Phone(Observable): def set_call_add(self, object, properties): - # print('Call add') self.notifyObservers(object, EVENT_CALL_ADD) self.call_in_progress = True def set_call_ended(self, object): - # print('Call removed') self.notifyObservers(object, EVENT_CALL_REMOVE) self.call_in_progress = False diff --git a/src/bt_preconnect.py b/src/bt_preconnect.py index 6e5591a..439bbd3 100755 --- a/src/bt_preconnect.py +++ b/src/bt_preconnect.py @@ -5,6 +5,7 @@ import sys import yaml import subprocess import utils_bt_audio +import utils from bt_controller import Bluetoothctl if __name__ == '__main__': @@ -25,13 +26,13 @@ if __name__ == '__main__': exit(1) # Connect to phone - print(f'Connecting to {bt_mac} ...') + utils.log(f'Connecting to {bt_mac} ...') bt_ctl = Bluetoothctl() status = bt_ctl.connect(bt_mac) if status: - print(f'Connected ok.') + utils.log(f' Connected ok.') else: - print(f'Not connected, sorry.') + utils.log_error(f' Not connected, sorry.') else: - print('BT config not found.') + utils.log_error('BT config not found.') exit(0) diff --git a/src/utils.py b/src/utils.py index e793a73..9aa84f6 100644 --- a/src/utils.py +++ b/src/utils.py @@ -44,13 +44,13 @@ def close_log_file(): def get_current_time_str(): - return str(datetime.datetime.now()) - + s = str(datetime.datetime.now()) + s = s[:-3] + return s def get_log_line(message: str) -> str: - current_time = get_current_time_str() pid = os.getpid() - line = f'{current_time} : {pid} : {message}' + line = f'{get_current_time_str()} : {pid} : {message}' return line diff --git a/src/utils_bt_audio.py b/src/utils_bt_audio.py index e28edfa..ff08493 100644 --- a/src/utils_bt_audio.py +++ b/src/utils_bt_audio.py @@ -40,10 +40,10 @@ def start_PA() -> bool: utils.log('Attempt to load module-bluetooth-discover...') retcode = os.system('pacmd load-module module-bluetooth-discover') if retcode != 0: - utils.log(f'Failed to load module-bluetooth-discover, exit code: {retcode}') + utils.log_error(f' Failed to load module-bluetooth-discover, exit code: {retcode}') return False else: - print('...success.') + utils.log(' Load success.') return True diff --git a/src/utils_cache.py b/src/utils_cache.py index 55c519c..d878fb2 100644 --- a/src/utils_cache.py +++ b/src/utils_cache.py @@ -50,7 +50,7 @@ class InfoCache: if not self.is_active(): return None - p = self.dir / f'audio_{probe_id}.wav' + p = self.dir / f'{probe_id}.wav' shutil.copy(src_path, p) return p @@ -66,6 +66,29 @@ class InfoCache: else: return None + + def is_valid_uuid(self, value): + try: + uuid.UUID(str(value)) + return True + except ValueError: + return False + + # Returns list of tuples (path_to_probe.json, path_to_audio.wav) + def get_probe_list(self) -> list[Path]: + r = [] + lst = os.listdir(self.dir) + for n in lst: + p = self.dir / n + if self.is_valid_uuid(p.stem) and n.endswith('.json'): + # Probe found + p_audio = p.with_suffix('.wav') + if p_audio.exists(): + r.append(p, p.with_suffix('.wav')) + + return r + + # Caches phone information def put_phone(self, phone: Phone): if self.is_active(): with open(self.dir / f'phone_{phone.name}.json', 'wt') as f: @@ -78,7 +101,7 @@ class InfoCache: return None with open(p, 'rt') as f: - return Phone.make(f.read()) + return Phone.make(json.loads(f.read())) def put_tasks(self, name: str, tasks: TaskList): p = self.dir / f'tasks_{name}.json' @@ -88,7 +111,13 @@ class InfoCache: def get_tasks(self, name: str) -> TaskList: p = self.dir / f'tasks_{name}.json' - # ToDo + try: + with open(p, 'rt') as f: + r = TaskList() + r.tasks = json.loads(f.read()) + return r + except: + return None def add_report(self, report: dict) -> str: diff --git a/src/utils_mcon.py b/src/utils_mcon.py index 5654f0e..90343f6 100644 --- a/src/utils_mcon.py +++ b/src/utils_mcon.py @@ -165,9 +165,9 @@ def gsm_make_call(target: str): # End current GSM call def gsm_stop_call(): - os.system(f"{ADB} shell input keyevent 6") - utils.log_verbose('GSM call stop keyevent is sent.') - + # os.system(f"{ADB} shell input keyevent 6") + # utils.log_verbose('GSM call stop keyevent is sent.') + pass def gsm_send_digit(digit: str): os.system(f"{ADB} shell input KEYCODE_{digit}") diff --git a/src/utils_qualtest.py b/src/utils_qualtest.py index 1adc2dc..33549f9 100644 --- a/src/utils_qualtest.py +++ b/src/utils_qualtest.py @@ -62,8 +62,8 @@ class QualtestBackend: return self.__phone - def preload(self, cache_dir: Path): - self.__phone = self.load_phone(cache_dir) + def preload(self, cache: InfoCache): + self.__phone = self.load_phone(cache) def upload_report(self, report, cache: InfoCache) -> (str, bool): @@ -78,15 +78,18 @@ class QualtestBackend: r = requests.post(url=url, json=report, timeout=utils.NETWORK_TIMEOUT) utils.log_verbose(f"Upload report finished. Response (probe ID): {r.content}") if r.status_code != 200: - raise RuntimeError(f'Server returned code {r.status_code} and content {r.content}') + raise RuntimeError(f'Server returned code {r.status_code}') result = (r.content.decode().strip(), True) except Exception as e: utils.log_error(f"Upload report to {self.address} finished with error.", err=e) - # Backup probe result - probe_id = cache.add_report(report) - result = (probe_id, False) + if cache is not None: + probe_id = cache.add_report(report) + utils.log(f' {probe_id}.json report is put to cache.') + result = (probe_id, False) + else: + return (None, None) return result @@ -128,8 +131,7 @@ class QualtestBackend: # Get response from server response = urllib.request.urlopen(url, timeout=utils.NETWORK_TIMEOUT) if response.getcode() != 200: - utils.log_error("Failed to get task list. Error code: %s" % response.getcode()) - return None + raise RuntimeError(f'Failed to get task list. Error code: {response.getcode()}') result = TaskList() response_content = response.read().decode() @@ -137,7 +139,7 @@ class QualtestBackend: return result except Exception as err: - utils.log_error("Exception when fetching task list: {0}".format(err)) + utils.log_error(f'Error when fetching task list from backend: {str(err)}') return None @@ -154,12 +156,13 @@ class QualtestBackend: try: response = urllib.request.urlopen(url, timeout=utils.NETWORK_TIMEOUT) if response.getcode() != 200: - raise RuntimeError(f'Failed to load phone definition. Error code: {response.getcode()}') + raise RuntimeError(f'Failed to load phone definition from server. Error code: {response.getcode()}') except Exception as e: utils.log_error(f'Problem when loading the phone definition from backend. Error: {str(e)}') r = cache.get_phone(self.instance) if r is None: raise RuntimeError(f'No cached phone definition.') + utils.log(f' Found phone definition in cache.') return r # Get possible list of phones @@ -201,7 +204,7 @@ class QualtestBackend: return result except Exception as err: - utils.log_error(f"Exception when fetching task list: {str(err)}") + utils.log_error(f"Exception loading phone information: {str(err)}") return None diff --git a/src/utils_sevana.py b/src/utils_sevana.py index 7e9b8ca..fbbde21 100644 --- a/src/utils_sevana.py +++ b/src/utils_sevana.py @@ -73,10 +73,13 @@ def load_file(url: str, output_path: str): def load_config_and_licenses(server: str): - load_file(utils.join_host_and_path(server, '/deploy/pvqa.cfg'), PVQA_CFG_PATH) - load_file(utils.join_host_and_path(server, '/deploy/pvqa.lic'), PVQA_LIC_PATH) - load_file(utils.join_host_and_path(server, '/deploy/aqua-wb.lic'), AQUA_LIC_PATH) - + # ToDo: validate licenses before. If they are ok - skip their update + try: + load_file(utils.join_host_and_path(server, '/deploy/pvqa.cfg'), PVQA_CFG_PATH) + load_file(utils.join_host_and_path(server, '/deploy/pvqa.lic'), PVQA_LIC_PATH) + load_file(utils.join_host_and_path(server, '/deploy/aqua-wb.lic'), AQUA_LIC_PATH) + except Exception as e: + utils.log_error(f'Failed to fetch new licenses and config. Skipping it.') def find_binaries(bin_directory: Path, license_server: str = None): # Update path to pvqa/aqua-wb @@ -95,26 +98,26 @@ def find_binaries(bin_directory: Path, license_server: str = None): SILER_PATH = bin_directory / platform_prefix / SILER_PATH SPEECH_DETECTOR_PATH = bin_directory / platform_prefix / SPEECH_DETECTOR_PATH - print(f'Looking for binaries/licenses/configs at {bin_directory}...', end=' ') + utils.log(f'Looking for binaries/licenses/configs at {bin_directory}...') # Check if binaries exist if not PVQA_PATH.exists(): - print(f'Failed to find pvqa binary at {PVQA_PATH}. Exiting.') + utils.log_error(f'Failed to find pvqa binary at {PVQA_PATH}. Exiting.') sys.exit(1) if not PVQA_CFG_PATH.exists(): PVQA_CFG_PATH = Path(utils.get_script_path()) / 'pvqa.cfg' if not PVQA_CFG_PATH.exists(): - print(f'Failed to find pvqa config. Exiting.') + utils.log_error(f'Failed to find pvqa config. Exiting.') sys.exit(1) if not AQUA_PATH.exists(): - print(f'Failed to find aqua-wb binary. Exiting.') + utils.log_error(f'Failed to find aqua-wb binary. Exiting.') sys.exit(1) if not SILER_PATH.exists(): - print(f'Failed to find silence_eraser binary. Exiting.') + utils.log_error(f'Failed to find silence_eraser binary. Exiting.') sys.exit(1) if license_server is not None: @@ -125,16 +128,16 @@ def find_binaries(bin_directory: Path, license_server: str = None): if not PVQA_LIC_PATH.exists(): PVQA_LIC_PATH = Path(utils.get_script_path()) / 'pvqa.lic' if not PVQA_LIC_PATH.exists(): - print(f'Failed to find pvqa license. Exiting.') + utils.log_error(f'Failed to find pvqa license. Exiting.') sys.exit(1) if not AQUA_LIC_PATH.exists(): AQUA_LIC_PATH = Path(utils.get_script_path()) / 'aqua-wb.lic' if not AQUA_LIC_PATH.exists(): - print(f'Failed to find AQuA license. Exiting.') + utils.log_error(f'Failed to find AQuA license. Exiting.') sys.exit(1) - print(f'Found all analyzers.') + utils.log(f' Found all analyzers.') def speech_detector(test_path: str): diff --git a/src/utils_types.py b/src/utils_types.py index 2e02aa2..b94f566 100644 --- a/src/utils_types.py +++ b/src/utils_types.py @@ -3,7 +3,7 @@ import time import sys import utils - +import json from crontab import CronTab start_system_time = time.time() @@ -52,6 +52,10 @@ class Phone: return r + def dump(self) -> str: + return json.dumps(self.to_dict(), indent=4) + + class TaskList: tasks: list = [] @@ -62,14 +66,14 @@ class TaskList: # Merges incoming task list to existing one # It preserves existing schedules # New items are NOT scheduled automatically - def merge_with(self, tasklist) -> bool: + def merge_with(self, incoming_tasklist) -> bool: changed = False - if tasklist.tasks is None: + if incoming_tasklist.tasks is None: return True # Iterate all tasks, see if task with the same name exists already # Copy all keys, but keep existing ones - for new_task in tasklist.tasks: + for new_task in incoming_tasklist.tasks: # Find if this task exists already existing_task = self.find_task_by_name(new_task["name"])