#!/usr/bin/python import typing import datetime import traceback import wave import contextlib import os import sys import smtplib import socket import sox import io from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from email.mime.text import MIMEText from email.utils import COMMASPACE, formatdate # mute logging silent_logging: bool = False # verbose logging flag verbose_logging: bool = False # Log file the_log = None # 1 minute network timeout NETWORK_TIMEOUT = 15 def open_log_file(path: str, mode: str): global the_log try: the_log = open(path, mode) except Exception as e: log_error("Failed to open log file.", err=e) def close_log_file(): global the_log if the_log: the_log.close() def get_current_time_str(): s = str(datetime.datetime.now()) s = s[:-3] return s def get_log_line(message: str) -> str: pid = os.getpid() line = f'{get_current_time_str()} : {pid} : {message}' return line def log(message: str): global silent_logging, the_log if not silent_logging: line = get_log_line(message) print(line) if the_log: if not the_log.closed: the_log.write(f'{line}\n') the_log.flush() def log_error(message: str, err: Exception = None): global the_log err_string = message if isinstance(err, Exception): message = message + "".join(traceback.format_exception(err.__class__, err, err.__traceback__)) elif err: message = message + str(err) line = get_log_line(message) print(line) if the_log: if not the_log.closed: the_log.write(f'{line}\n') the_log.flush() def log_verbose(message): global verbose_logging, silent_logging, the_log if verbose_logging and len(message) > 0 and not silent_logging: line = get_log_line(message) print(line) if the_log: if not the_log.closed: the_log.write(f'{line}\n') the_log.flush() def merge_two_dicts(x, y): z = x.copy() # start with x's keys and values z.update(y) # modifies z with y's keys and values & returns None return z def fix_sip_address(sip_target): if not sip_target: return None if sip_target.startswith("sip:"): return sip_target if sip_target.startswith("sips:"): return sip_target return "sip:" + sip_target # Finds length of audio file in seconds def find_file_length(path): with contextlib.closing(wave.open(path, 'r')) as f: frames = f.getnframes() rate = f.getframerate() duration = frames / float(rate) return duration def get_script_path(): return os.path.dirname(os.path.realpath(sys.argv[0])) def send_mail_report(email_config: dict, title: str, report: dict, files): try: log_verbose("Sending report via email...") msg = MIMEMultipart() # Prepare text contents title = "PVQA MOS: " + str(report["mos_pvqa"]) + ", AQuA MOS: " + str(report["mos_aqua"]) text = title # Setup email headers msg["Subject"] = title msg["From"] = email_config['email_from'] msg["To"] = email_config['email_to'] msg["Date"] = formatdate(localtime=True) # Add text msg.attach(MIMEText(text)) # Add files for f in files or []: with open(f, "rb") as fil: part = MIMEApplication( fil.read(), Name=os.path.basename(f) ) # After the file is closed part['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename(f) msg.attach(part) # Login & send smtp = smtplib.SMTP(email_config['email_server']) log_verbose("Login to SMTP server...") smtp.login(email_config['email_user'], email_config['email_password']) log_verbose("Sending files...") smtp.sendmail(email_config['email_from'], email_config['email_to'], msg.as_string()) smtp.close() log_verbose("Email sent.") except Exception as err: print("Exception when sending email: {0}".format(err)) def get_wav_length(path) -> float: try: with wave.open(str(path)) as f: return f.getnframes() / f.getframerate() except Exception as e: log_error(f'Failed to get .wav file {path} length. Error: {e}') return 0.0 def is_port_busy(port: int) -> bool: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('0.0.0.0', port)) s.close() with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.bind(('0.0.0.0', port)) s.close() with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: s.bind(('::1', port)) s.close() with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) as s: s.bind(('::1', port)) s.close() return False except: log_error(f"Failed to check if port {port} is busy.", err=sys.exc_info()[0]) return True def resample_to(path: str, rate: int): with wave.open(path, 'rb') as wf: if rate == wf.getframerate(): return # Resampling is not needed else: log(f'Resampling {path} from {wf.getframerate()} to {rate}.') TEMP_RESAMPLED = '/dev/shm/temp_resampled.wav' retcode = os.system(f'sox {path} -c 1 -r {rate} {TEMP_RESAMPLED}') if retcode != 0: raise RuntimeError(f'Failed to convert {path} to samplerate {rate}') os.remove(path) os.rename(TEMP_RESAMPLED, path) def join_host_and_path(hostname: str, path): if not hostname.startswith("http://") and not hostname.startswith("https://"): hostname = "http://" + hostname if not hostname.endswith("/"): hostname = hostname + "/" if path.startswith("/"): path = path[1:] return hostname + path # Prepare audio reference for playing. Generates silence prefix & suffix, merges them with audio itself. # Resamples everything to 48K and stereo (currently it is required ) def prepare_reference_file(fname: str, silence_prefix_length: float, silence_suffix_length: float, output_fname: str): tfm = sox.Transformer() tfm.rate(44100) tfm.channels(2) tfm.pad(start_duration=silence_prefix_length, end_duration=silence_suffix_length) tfm.build_file(input_filepath=fname, output_filepath=output_fname) def is_raspberrypi(): try: with io.open('/sys/firmware/devicetree/base/model', 'r') as m: if 'raspberry pi' in m.read().lower(): return True except Exception: pass return False