#!/usr/bin/python import utils import re import subprocess import typing import csv import platform import json import os import sys import time import urllib from pathlib import Path PVQA_CMD = "{pvqa} --license {pvqa_lic} --config {pvqa_cfg} --mode analysis --channel 0 " \ "--report {output} --input {input} --cut-begin {cut_begin} --cut-end {cut_end}" PVQA_CMD_LIC_SERVER = "{pvqa} --license-server {pvqa_lic} --config {pvqa_cfg} --mode analysis --channel 0 " \ "--report {output} --input {input}" AQUA_CMD = ("{aqua} {aqua_lic} -mode files -src file \"{reference}\" -tstf \"{input}\" -config {aqua_config} " "-specp 32 {spectrum} -fau {faults} -cut-tst {cut_begin} {cut_end} -cut-src {cut_begin_src} {cut_end_src}") PVQA_PATH = "" PVQA_LIC_PATH = "pvqa.lic" PVQA_CFG_PATH = "pvqa.cfg" AQUA_PATH = "" AQUA_LIC_PATH = "aqua-wb.lic" AQUA_CFG_PATH = "aqua.cfg" SILER_PATH = "" if platform.system() == 'Windows': PVQA_OUTPUT = 'pvqa_output.txt' AQUA_FAULTS = 'aqua_faults.txt' AQUA_SPECTRUM = 'aqua_spectrum.csv' else: PVQA_OUTPUT = '/dev/shm/pvqa_output.txt' AQUA_FAULTS = '/dev/shm/aqua_faults.txt' AQUA_SPECTRUM = '/dev/shm/aqua_spectrum.csv' if platform.system() == 'Windows': PVQA_PATH = 'pvqa.exe' AQUA_PATH = 'aqua-wb.exe' SILER_PATH = 'silence_eraser.exe' SPEECH_DETECTOR_PATH = 'speech_detector.exe' else: PVQA_PATH = 'pvqa' AQUA_PATH = 'aqua-wb' SILER_PATH = 'silence_eraser' SPEECH_DETECTOR_PATH = 'speech_detector' def load_file(url: str, output_path: str): try: response = urllib.request.urlopen(url, timeout=utils.NETWORK_TIMEOUT) if response.getcode() != 200: utils.log_error(f'Fetch file {output_path} from URL {url} failed with code {response.getcode()}') return except urllib.error.HTTPError as e: utils.log_error(f'Fetch file {output_path} from URL {url} failed with code {e.code}') return # Write downloaded content to file response_content = response.read() open(output_path, 'wb').write(response_content) def load_config_and_licenses(server: str): # ToDo: validate licenses before. If they are ok - skip their update try: load_file(utils.join_host_and_path(server, '/deploy/pvqa.cfg'), PVQA_CFG_PATH) load_file(utils.join_host_and_path(server, '/deploy/pvqa.lic'), PVQA_LIC_PATH) load_file(utils.join_host_and_path(server, '/deploy/aqua-wb.lic'), AQUA_LIC_PATH) load_file(utils.join_host_and_path(server, '/deploy/aqua.cfg'), AQUA_CFG_PATH) except Exception as e: utils.log_error(f'Failed to fetch new licenses and config. Skipping it.') def find_binaries(bin_directory: Path, license_server: str = None) -> bool: # Update path to pvqa/aqua-wb global PVQA_CFG_PATH, PVQA_LIC_PATH, AQUA_LIC_PATH, AQUA_CFG_PATH global PVQA_PATH, AQUA_PATH, PVQA_CMD, AQUA_CMD global SILER_PATH, SPEECH_DETECTOR_PATH # Find platform prefix platform_prefix = platform.system().lower() if utils.is_raspberrypi(): platform_prefix = 'rpi' PVQA_PATH = bin_directory / platform_prefix / PVQA_PATH PVQA_LIC_PATH = bin_directory / PVQA_LIC_PATH PVQA_CFG_PATH = bin_directory / PVQA_CFG_PATH AQUA_PATH = bin_directory / platform_prefix / AQUA_PATH AQUA_CFG_PATH = bin_directory / AQUA_CFG_PATH AQUA_LIC_PATH = bin_directory / AQUA_LIC_PATH # SILER_PATH = bin_directory / platform_prefix / SILER_PATH SPEECH_DETECTOR_PATH = bin_directory / platform_prefix / SPEECH_DETECTOR_PATH utils.log(f'Looking for binaries/licenses/configs at {bin_directory}...') # Check if binaries exist if not PVQA_PATH.exists(): utils.log_error(f'Failed to find pvqa binary at {PVQA_PATH}. Exiting.') sys.exit(1) if not PVQA_CFG_PATH.exists(): PVQA_CFG_PATH = Path(utils.get_script_path()) / 'pvqa.cfg' if not PVQA_CFG_PATH.exists(): utils.log_error(f'Failed to find PVQA config file.') return False if not AQUA_CFG_PATH.exists(): AQUA_CFG_PATH = Path(utils.get_script_path()) / 'aqua.cfg' if not AQUA_CFG_PATH.exists(): utils.log_error(f'Failed to find AQuA config file.') return False if not AQUA_PATH.exists(): utils.log_error(f'Failed to find aqua-wb binary.') return False # if not SILER_PATH.exists(): # utils.log_error(f'Failed to find silence_eraser binary..') # return False if license_server is not None: AQUA_LIC_PATH = '"license://' + license_server + '"' PVQA_LIC_PATH = license_server PVQA_CMD = PVQA_CMD_LIC_SERVER else: if not PVQA_LIC_PATH.exists(): PVQA_LIC_PATH = Path(utils.get_script_path()) / 'pvqa.lic' if not PVQA_LIC_PATH.exists(): utils.log_error(f'Failed to find pvqa license.') return False if not AQUA_LIC_PATH.exists(): AQUA_LIC_PATH = Path(utils.get_script_path()) / 'aqua-wb.lic' if not AQUA_LIC_PATH.exists(): utils.log_error(f'Failed to find AQuA license.') return False utils.log(f' Found all analyzers.') return True def speech_detector(test_path: str): cmd = f'{SPEECH_DETECTOR_PATH} --input "{test_path}"' utils.log_verbose(cmd) retcode, output = subprocess.getstatusoutput(cmd) if retcode != 0: return retcode utils.log_verbose(output) r = json.loads(output) utils.log_verbose(f'Parsed: {r}') if 'error' in r: return r['error'] if 'offset_start' in r and 'offset_end' in r: return r return None # Erases silence on the begin & end of audio file def silence_eraser(test_path: str, file_offset_begin: float = 0.0, file_offset_end: float = 0.0) -> int: TEMP_FILE = 'silence_removed.wav' if os.path.exists(TEMP_FILE): os.remove(TEMP_FILE) # Find total duration of audio duration = utils.get_wav_length(test_path) # Find correct end file offset if file_offset_end is None: cmd = f'{SILER_PATH} {test_path} {TEMP_FILE} --process-body off --starttime {file_offset_begin}' else: file_offset_end = duration - file_offset_end cmd = f'{SILER_PATH} {test_path} {TEMP_FILE} --process-body off --starttime {file_offset_begin} --endtime {file_offset_end}' utils.log(f'Silence eraser command: {cmd}') retcode = os.system(cmd) if retcode == 0 and os.path.exists(TEMP_FILE): os.remove(test_path) os.rename(TEMP_FILE, test_path) utils.log(f'Prefix/suffix silence is removed on: {test_path}') return 0 else: return retcode def find_pvqa_mos(test_path: str, file_offset_begin: float = 0.0, file_offset_end: float = 0.0): cmd = PVQA_CMD.format(pvqa=PVQA_PATH, pvqa_lic=PVQA_LIC_PATH, pvqa_cfg=PVQA_CFG_PATH, output=PVQA_OUTPUT, input=test_path, cut_begin=file_offset_begin, cut_end=file_offset_end) utils.log_verbose(cmd) # print(cmd) exit_code, out_data = subprocess.getstatusoutput(cmd) # Check if failed if exit_code != 0: utils.log_error(f'PVQA returned exit code {exit_code} and message {out_data}') return 0.0, '', 0 # Verbose logging utils.log_verbose(out_data) # print(out_data) p_mos = re.compile(r".*= ([\d\.]+)", re.MULTILINE) m = p_mos.search(out_data) if m: with open(PVQA_OUTPUT, 'r') as report_file: content = report_file.read() # Find R-factor from content count_intervals = 0 count_bad = 0 csv_parser = csv.reader(open(PVQA_OUTPUT, newline=''), delimiter=';') for row in csv_parser: # Check status status = row[-1].strip() # log_verbose("New CSV row is read. Last two items: %s and %s" % (status_0, status)) if status in ['Poor', 'Ok', 'Uncertain']: count_intervals += 1 if status == 'Poor': count_bad += 1 utils.log_verbose(f'Nr of intervals {count_intervals}, nr of bad intervals {count_bad}') if count_intervals > 0: r_factor = float(count_intervals - count_bad) / float(count_intervals) else: r_factor = 0.0 return round(float(m.group(1)), 3), content, int(r_factor * 100) return 0.0, out_data, 0 # Runs AQuA utility on reference and test files. file_offset_begin / file_offset_end are offsets in seconds def find_aqua_mos(good_path, test_path, test_file_offset_begin: float = 0.0, test_file_offset_end: float = 0.0, good_file_offset_begin: float = 0.0, good_file_offset_end: float = 0.0): try: out_data = "" cmd = AQUA_CMD.format(aqua=AQUA_PATH, aqua_lic=AQUA_LIC_PATH, aqua_config = AQUA_CFG_PATH, reference=good_path, input=test_path, spectrum=AQUA_SPECTRUM, faults=AQUA_FAULTS, cut_begin=int(test_file_offset_begin * 1000), cut_end=int(test_file_offset_end * 1000), cut_begin_src=int(good_file_offset_begin * 1000), cut_end_src=int(good_file_offset_end * 1000)) utils.log_verbose(cmd) # print(cmd) exit_code, out_data = subprocess.getstatusoutput(cmd) # Return if exit_code != 0: utils.log_error(f'AQuA returned error code {exit_code} with message {out_data}') return 0.0, 0, '{}' # Log for debugging purposes utils.log_verbose(out_data) with open(AQUA_FAULTS, 'r') as f: report = f.read() json_data = json.loads(report) # print (out_data) if 'AQuAReport' in json_data: aqua_report = json_data['AQuAReport'] if 'QualityResults' in aqua_report: qr = aqua_report['QualityResults'] return round(qr['MOS'], 3), round(qr['Percent'], 3), report except Exception as err: utils.log_error(message='Unexpected error.', err=err) return 0.0, 0.0, out_data return 0.0, 0.0, out_data