agent_gsm/src/utils_qualtest.py

298 lines
10 KiB
Python

#!/usr/bin/python
import utils
import re
import subprocess
import typing
import csv
import platform
import json
import os
import sys
import urllib.request
import urllib
import uuid
import time
import requests
from socket import timeout
from crontab import CronTab
from pathlib import Path
from utils_cache import InfoCache
from utils_types import Phone, TaskList
# Error report produced by this function has to be updated with 'task_name' & 'phone_name' keys
def build_error_report(endtime: int, reason: str):
r = dict()
r["id"] = uuid.uuid1().urn[9:]
r["duration"] = 0
r["endtime"] = endtime
r["mos_pvqa"] = 0.0
r["mos_aqua"] = 0.0
r["mos_network"] = 0.0
r["r_factor"] = 0
r["percents_aqua"] = 0.0
r["error"] = reason
return r
def ParseAttributes(t: str) -> dict:
result: dict = dict()
for l in t.split('\n'):
tokens = l.strip().split('=')
if len(tokens) == 2:
result[tokens[0].strip()] = tokens[1].strip()
return result
# Time of operation start
TRACE_START_TIME = None
# 10 seconds for I/O operation
TRACE_TOTAL_TIMEOUT = 30
# This function serves as a "hook" that executes for each Python statement
# down the road. There may be some performance penalty, but as downloading
# a webpage is mostly I/O bound, it's not going to be significant.
def trace_function(frame, event, arg):
global TRACE_START_TIME
if time.time() - TRACE_START_TIME > TRACE_TOTAL_TIMEOUT:
raise Exception('Timed out!') # Use whatever exception you consider appropriate.
class QualtestBackend:
address: str
instance: str
def __init__(self):
self.address = ""
self.instance = ""
self.__phone = None
@property
def phone(self) -> Phone:
return self.__phone
def preload(self, cache: InfoCache):
self.__phone = self.load_phone(cache)
def upload_report(self, report: dict, cache: InfoCache) -> (str, bool):
# UUID string as result
result = (None, False)
# Log about upload attempt
utils.log_verbose(f'Uploading to {self.address} report with audio duration: {report["duration"]}s, AQuA MOS: {round(report["mos_aqua"], 3)}')
url = utils.join_host_and_path(self.address, "/probes/")
try:
r = requests.post(url=url, json=report, timeout=utils.NETWORK_TIMEOUT)
utils.log_verbose(f"Upload report finished. Response (probe ID): {r.content}")
if r.status_code != 200:
if r.status_code == 500 and 'Duplicate entry' in r.content.decode():
# Suppose it success
result = (report['id'], True)
else:
raise RuntimeError(f'Server returned code {r.status_code}')
result = (r.content.decode().strip('" '), True)
except Exception as e:
utils.log_error(f'Upload report to {self.address} finished with error: {str(e)}')
# Backup probe result
if cache is not None:
probe_id = cache.add_report(report)
utils.log(f' {probe_id}.json report is put to cache.')
result = (probe_id, False)
else:
return (None, None)
return result
def upload_audio(self, probe_id, path_recorded: Path):
global TRACE_START_TIME
result = False
# Log about upload attempt
utils.log_verbose(f"Uploading to {self.address} audio {path_recorded}")
if not path_recorded.exists():
utils.log_error(' File doesn\'t exists, skip.')
return False
# Find URL for uploading
url = utils.join_host_and_path(self.address, "/upload_audio/")
try:
files = {'file': (os.path.basename(path_recorded), open(path_recorded, 'rb')),
'probe_id': (None, probe_id),
'audio_kind': (None, '1'),
'audio_name': (None, os.path.basename(path_recorded))}
try:
# Limit POST time by TRACE_TOTAL_TIMEOUT seconds
TRACE_START_TIME = time.time()
sys.settrace(trace_function)
response = requests.post(url, files=files, timeout=utils.NETWORK_TIMEOUT)
except:
raise
finally:
sys.settrace(None)
if response.status_code != 200:
utils.log_error(f"Upload audio to {self.address} finished with error {response.status_code}", None)
else:
utils.log_verbose(f"Upload audio finished. Response (audio ID): {response.text}")
result = True
except Exception as e:
utils.log_error(f"Upload audio to {self.address} finished with error.", err=e)
return result
def load_tasks(self) -> TaskList:
try:
# Build query for both V1 & V2 API
instance = urllib.parse.urlencode({"phone_id": self.instance, "phone_name": self.instance})
# Find URL
url = utils.join_host_and_path(self.address, "/tasks/?") + instance
# Get response from server
response = urllib.request.urlopen(url, timeout=utils.NETWORK_TIMEOUT)
if response.getcode() != 200:
raise RuntimeError(f'Failed to get task list. Error code: {response.getcode()}')
result = TaskList()
response_content = response.read().decode()
result.tasks = json.loads(response_content)
return result
except Exception as err:
utils.log_error(f'Error when fetching task list from backend: {str(err)}')
return None
def load_phone(self, cache: InfoCache) -> dict:
result = None
try:
# Build query for both V1 & V2 API
instance = urllib.parse.urlencode({"phone_id": self.instance, "phone_name": self.instance})
# Find URL
url = utils.join_host_and_path(self.address, "/phones/?") + instance
# Get response from server
try:
response = urllib.request.urlopen(url, timeout=utils.NETWORK_TIMEOUT)
if response.getcode() != 200:
raise RuntimeError(f'Failed to load phone definition from server. Error code: {response.getcode()}')
except Exception as e:
utils.log_error(f'Problem when loading the phone definition from backend. Error: {str(e)}')
r = cache.get_phone(self.instance)
if r is None:
raise RuntimeError(f'No cached phone definition.')
utils.log(f' Found phone definition in cache.')
return r
# Get possible list of phones
phones = json.loads(response.read().decode())
if len(phones) == 0:
return None
# But use first one
phone = phones[0]
attr_dict = dict()
attributes_string = phone['attributes']
attributes_lines = attributes_string.split('\n')
for l in attributes_lines:
parts = l.split('=')
if len(parts) == 2:
p0: str = parts[0]
p1: str = parts[1]
attr_dict[p0.strip()] = p1.strip()
# Fix received attributes
if 'stun_server' in attr_dict:
attr_dict['sip_stunserver'] = attr_dict.pop('stun_server')
if 'transport' in attr_dict:
attr_dict['sip_transport'] = attr_dict.pop('transport')
if 'sip_secure' not in attr_dict:
attr_dict['sip_secure'] = False
if 'sip_useproxy' not in attr_dict:
attr_dict['sip_useproxy'] = True
result = Phone()
result.attributes = attr_dict
result.identifier = phone['id']
result.name = phone['instance']
result.role = phone['type']
result.audio_id = phone['audio_id']
return result
except Exception as err:
utils.log_error(f"Exception loading phone information: {str(err)}")
return None
def load_audio(self, audio_id: int, output_path: Path):
global TRACE_START_TIME
utils.log(f'Loading audio with ID: {audio_id} ...')
TRACE_START_TIME = time.time()
try:
# Build query for both V1 & V2 API
params = urllib.parse.urlencode({"audio_id": audio_id})
# Find URL
url = utils.join_host_and_path(self.address, "/play_audio/?") + params
sys.settrace(trace_function)
try:
# Get response from server
response = requests.get(url, timeout=(utils.NETWORK_TIMEOUT, 5))
except:
raise
finally:
sys.settrace(None)
if response.status_code != 200:
utils.log_error(f' Failed to get audio. Error code: {response.status_code}, msg: {response.content}')
return False
with open (output_path, 'wb') as f:
f.write(response.content)
utils.log(' Done.')
return True
except Exception as err:
utils.log_error(f' Exception when fetching audio: {str(err)}')
return False
def load_task(self, task_name: str) -> dict:
try:
params = urllib.parse.urlencode({'task_name': task_name})
url = utils.join_host_and_path(self.address, "/tasks/?" + params)
response = urllib.request.urlopen(url, timeout=utils.NETWORK_TIMEOUT)
if response.getcode() != 200:
utils.log_error(f'Failed to get task info. Error code: {response.getcode()}')
return None
task_list = json.loads(response.read().decode())
if len(task_list) > 0:
return task_list[0]
else:
return None
except Exception as err:
utils.log_error(f'Exception when fetching task info: {err}')
return None