- today's changes

This commit is contained in:
Dmytro Bogovych 2023-09-04 16:04:03 +03:00
parent a069b5c471
commit d044aaaa1a
7 changed files with 167 additions and 916 deletions

View File

@ -15,7 +15,7 @@ export DBUS_SESSION_BUS_ADDRESS
pkill pulseaudio pkill pulseaudio
# Ensure BT stack is here # Ensure BT stack is here
python3 -u $SCRIPT_DIR/src/bt_preconnect.py $SCRIPT_DIR/config/agent.yaml # python3 -u $SCRIPT_DIR/src/bt_preconnect.py $SCRIPT_DIR/config/agent.yaml
#!/bin/bash #!/bin/bash
if [ -n "$1" ] if [ -n "$1" ]

View File

@ -6,37 +6,25 @@ import argparse
import sys import sys
import uuid import uuid
import json import json
import multiprocessing
import signal
import atexit
from pathlib import Path
import utils_qualtest import utils_qualtest
import utils_sevana import utils_sevana
import utils_mcon
import utils_logcat
import utils import utils
import utils_cache import utils_cache
from utils_types import (EXIT_ERROR, EXIT_OK)
from agent_config import AgentConfig
from bt_controller import Bluetoothctl from bt_controller import Bluetoothctl
import bt_signal import bt_signal
from bt_signal import SignalBoundaries from bt_signal import SignalBoundaries
import bt_call_controller import bt_call_controller
import multiprocessing
import signal
import yaml
from pathlib import Path
# Name of intermediary file with audio recorded from the GSM phone CONFIG = AgentConfig()
RECORD_FILE = "/dev/shm/qualtest_recorded.wav"
# Backend instance
BackendServer : utils_qualtest.QualtestBackend = None
# Reference audio to play
REFERENCE_AUDIO = "/dev/shm/reference_original.wav"
# Loaded reference audio (from backend)
LOADED_AUDIO = Path("/dev/shm/loaded_audio.wav")
# Script to exec after mobile call answering
EXEC_SCRIPT = None
# Current task name. # Current task name.
CURRENT_TASK = None CURRENT_TASK = None
@ -55,23 +43,10 @@ DIR_THIS = Path(__file__).resolve().parent
DIR_PROJECT = DIR_THIS.parent DIR_PROJECT = DIR_THIS.parent
# Backup directory (to run without internet) # Backup directory (to run without internet)
DIR_CACHE = None
CACHE = utils_cache.InfoCache(None) CACHE = utils_cache.InfoCache(None)
# PID file name # Backend instance
QUALTEST_PID = "/dev/shm/qualtest.pid" BACKEND : utils_qualtest.QualtestBackend = None
# Should the first task run immediately ?
FORCE_RUN = False
# Exit codes
EXIT_OK = 0
EXIT_ERROR = 1
# Use silence eraser or not (speech detector is used in this case)
USE_SILENCE_ERASER = True
LOG_FILEPATH = None
def remove_oldest_log_audio(): def remove_oldest_log_audio():
list_of_files = os.listdir(LOG_AUDIO_DIR) list_of_files = os.listdir(LOG_AUDIO_DIR)
@ -83,10 +58,10 @@ def remove_oldest_log_audio():
def detect_degraded_signal(file_test: Path, file_reference: Path) -> SignalBoundaries: def detect_degraded_signal(file_test: Path, file_reference: Path) -> SignalBoundaries:
global USE_SILENCE_ERASER, LOG_AUDIO_DIR, BackendServer global USE_SILENCE_ERASER, LOG_AUDIO_DIR, BACKEND
is_caller : bool = 'caller' in BackendServer.phone.role is_caller : bool = 'caller' in BACKEND.phone.role
is_answerer : bool = 'answer' in BackendServer.phone.role is_answerer : bool = 'answer' in BACKEND.phone.role
if utils.get_wav_length(file_test) < utils.get_wav_length(file_reference): if utils.get_wav_length(file_test) < utils.get_wav_length(file_reference):
# Seems some problem with recording, return zero boundaries # Seems some problem with recording, return zero boundaries
@ -124,7 +99,7 @@ def upload_results():
utils.log_error(f'Error when processing {path_report.name}') utils.log_error(f'Error when processing {path_report.name}')
continue continue
upload_id, success = BackendServer.upload_report(report, cache=None) upload_id, success = BACKEND.upload_report(report, cache=None)
if success: if success:
utils.log(f'Report {upload_id} is uploaded ok.') utils.log(f'Report {upload_id} is uploaded ok.')
@ -136,7 +111,7 @@ def upload_results():
# path_audio = path_audio.rename(path_audio_fixed) # path_audio = path_audio.rename(path_audio_fixed)
utils.log(f'Uploading {path_audio.name} file...') utils.log(f'Uploading {path_audio.name} file...')
# Upload recorded audio # Upload recorded audio
upload_result = BackendServer.upload_audio(upload_id, path_audio) upload_result = BACKEND.upload_audio(upload_id, path_audio)
if upload_result: if upload_result:
utils.log(f' Recorded audio {upload_id}.wav is uploaded ok.') utils.log(f' Recorded audio {upload_id}.wav is uploaded ok.')
os.remove(path_report) os.remove(path_report)
@ -157,14 +132,19 @@ def run_analyze(file_test: str, file_reference: str, number: str) -> bool:
time.sleep(5.0) time.sleep(5.0)
# Check how long audio file is # Check how long audio file is
audio_length = utils.get_wav_length(file_test) test_audio_length = round(utils.get_wav_length(file_test), 3)
ref_audio_length = round(utils.get_wav_length(file_reference), 3)
is_caller : bool = 'caller' in BackendServer.phone.role is_caller : bool = 'caller' in BACKEND.phone.role
is_answerer : bool = 'answer' in BackendServer.phone.role is_answerer : bool = 'answer' in BACKEND.phone.role
utils.log(f'Recorded audio call duration: {test_audio_length}s, reference audio length: {ref_audio_length}s')
utils.log(f'Recorded audio call duration: {round(audio_length, 3)}s')
# Check if audio length is strange - skip such calls. Usually this is missed call. # Check if audio length is strange - skip such calls. Usually this is missed call.
if (is_caller and audio_length >= utils_mcon.TIME_LIMIT_CALL) or (is_answerer and audio_length >= utils_mcon.TIME_LIMIT_CALL * 1.2): is_caller_audio_big = is_caller and test_audio_length > ref_audio_length * 1.5
is_answerer_audio_big = is_answerer and test_audio_length > ref_audio_length * 1.5
if is_caller_audio_big or is_answerer_audio_big:
utils.log_error(f'Recorded call is too big - looks like mobile operator prompt, skipping analysis') utils.log_error(f'Recorded call is too big - looks like mobile operator prompt, skipping analysis')
return False return False
@ -206,23 +186,23 @@ def run_analyze(file_test: str, file_reference: str, number: str) -> bool:
r['report_pvqa'] = pvqa_report r['report_pvqa'] = pvqa_report
r['report_aqua'] = aqua_report r['report_aqua'] = aqua_report
r['r_factor'] = pvqa_rfactor r['r_factor'] = pvqa_rfactor
r["percents_aqua"] = aqua_percents r['percents_aqua'] = aqua_percents
r['error'] = '' r['error'] = ''
r['target'] = number r['target'] = number
r['audio_id'] = 0 r['audio_id'] = 0
r['phone_id'] = BackendServer.phone.identifier r['phone_id'] = BACKEND.phone.identifier
r['phone_name'] = '' r['phone_name'] = ''
r['task_id'] = 0 r['task_id'] = 0
r['task_name'] = CURRENT_TASK r['task_name'] = CURRENT_TASK
# Upload report # Upload report
upload_id, success = BackendServer.upload_report(r, cache=CACHE) upload_id, success = BACKEND.upload_report(r, cache=CACHE)
if success: if success:
utils.log('Report is uploaded ok.') utils.log('Report is uploaded ok.')
# Upload recorded audio # Upload recorded audio
upload_result = BackendServer.upload_audio(r['id'], file_test) upload_result = BACKEND.upload_audio(r['id'], file_test)
if upload_result: if upload_result:
utils.log('Recorded audio is uploaded ok.') utils.log('Recorded audio is uploaded ok.')
@ -250,29 +230,27 @@ def run_error(error_message: str):
def make_call(target: str): def make_call(target: str):
global REFERENCE_AUDIO
# Remove old recorded file # Remove old recorded file
record_file = '/dev/shm/bt_record.wav' if CONFIG.RecordFile.exists():
# if Path(record_file).exists(): os.remove(CONFIG.RecordFile)
# os.remove(record_file)
# Add prefix and suffix silence for reference to give a chance to record all the file # Add prefix and suffix silence for reference to give a chance to record all the file
utils.log(f'Preparing reference file...') utils.log(f'Preparing reference file...')
reference_filename = Path('/dev/shm/reference_built.wav') if CONFIG.PreparedReferenceAudio.exists():
if reference_filename.exists(): os.remove(CONFIG.PreparedReferenceAudio)
os.remove(reference_filename) utils.prepare_reference_file(fname=str(CONFIG.ReferenceAudio),
utils.prepare_reference_file(fname=str(REFERENCE_AUDIO), silence_prefix_length=10.0, silence_suffix_length=5.0, output_fname=str(reference_filename)) silence_prefix_length=10.0, silence_suffix_length=5.0,
output_fname=str(CONFIG.PreparedReferenceAudio))
# Find duration of prepared reference file # Find duration of prepared reference file
reference_length = int(utils.get_wav_length(reference_filename)) reference_length = round(utils.get_wav_length(CONFIG.PreparedReferenceAudio), 3)
utils.log(f' Done.') utils.log(f' Done. Length of prepared reference audio file: {reference_length}s')
# Compose a command # Compose a command
# utils.close_log_file() # utils.close_log_file()
try: try:
bt_call_controller.run(play_file=reference_filename, record_file=record_file, timelimit_seconds=reference_length, target=target) bt_call_controller.run(play_file=CONFIG.PreparedReferenceAudio, record_file=CONFIG.RecordFile, timelimit_seconds=reference_length, target=target)
run_analyze(record_file, REFERENCE_AUDIO, target) run_analyze(CONFIG.RecordFile, CONFIG.ReferenceAudio, target)
except Exception as e: except Exception as e:
utils.log_error(f'BT I/O failed finally. Error: {str(e)}') utils.log_error(f'BT I/O failed finally. Error: {str(e)}')
@ -281,25 +259,30 @@ def perform_answerer():
global CALL_LIMIT global CALL_LIMIT
# Get reference audio duration in seconds # Get reference audio duration in seconds
reference_length = utils.get_wav_length(REFERENCE_AUDIO) ref_time_length = round(utils.get_wav_length(CONFIG.ReferenceAudio), 3)
# Setup analyzer script # Setup analyzer script
# Run answering script # Run answering script
while True: while True:
# Remove old recording # Remove old recording
record_file = f'/dev/shm/bt_record.wav' if CONFIG.RecordFile.exists():
os.remove(CONFIG.RecordFile)
try: try:
bt_call_controller.run(play_file=REFERENCE_AUDIO, record_file=record_file, timelimit_seconds=int(reference_length), target=None) bt_call_controller.run(play_file=CONFIG.ReferenceAudio,
record_file=CONFIG.RecordFile,
timelimit_seconds=int(ref_time_length),
target=None)
except Exception as e: except Exception as e:
utils.log(f'BT I/O failed, exiting. Error: {str(e)}') utils.log(f'BT I/O failed, exiting. Error: {str(e)}')
break break
# Call analyzer script # Call analyzer script
run_analyze(record_file, REFERENCE_AUDIO, '') run_analyze(CONFIG.RecordFile, CONFIG.ReferenceAudio, '')
def run_caller_task(t): def run_caller_task(t):
global CURRENT_TASK, LOADED_AUDIO, REFERENCE_AUDIO global CURRENT_TASK
utils.log("Running task:" + str(t)) utils.log("Running task:" + str(t))
@ -311,23 +294,23 @@ def run_caller_task(t):
task_name = t['name'].strip() task_name = t['name'].strip()
# Load reference audio # Load reference audio
if LOADED_AUDIO.exists(): if CONFIG.LoadedAudio.exists():
os.remove(LOADED_AUDIO) os.remove(CONFIG.LoadedAudio)
audio_id = t["audio_id"] audio_id = t["audio_id"]
if CACHE.get_reference_audio(audio_id, LOADED_AUDIO): if CACHE.get_reference_audio(audio_id, CONFIG.LoadedAudio):
utils.log(f'Reference audio {audio_id} found in cache.') utils.log(f'Reference audio {audio_id} found in cache.')
else: else:
utils.log(f'Reference audio {audio_id} not found in cache.') utils.log(f'Reference audio {audio_id} not found in cache.')
if not BackendServer.load_audio(audio_id, LOADED_AUDIO): if not BACKEND.load_audio(audio_id, CONFIG.LoadedAudio):
utils.log_error(f'Failed to load reference audio with ID {audio_id}.') utils.log_error(f'Failed to load reference audio with ID {audio_id}.')
raise RuntimeError(f'Reference audio (ID: {audio_id}) is not available.') raise RuntimeError(f'Reference audio (ID: {audio_id}) is not available.')
# Cache loaded audio # Cache loaded audio
CACHE.add_reference_audio(audio_id, LOADED_AUDIO) CACHE.add_reference_audio(audio_id, CONFIG.LoadedAudio)
# Use loaded audio as reference # Use loaded audio as reference
REFERENCE_AUDIO = str(LOADED_AUDIO) CONFIG.ReferenceAudio = str(CONFIG.LoadedAudio)
CURRENT_TASK = task_name CURRENT_TASK = task_name
@ -352,22 +335,22 @@ def run_caller_task(t):
# Runs caller probe - load task list and perform calls # Runs caller probe - load task list and perform calls
def run_probe(): def run_probe():
global TASK_LIST, REFERENCE_AUDIO, LOADED_AUDIO, CURRENT_TASK, FORCE_RUN global TASK_LIST, CURRENT_TASK
while True: while True:
# Get task list update # Get task list update
new_tasks = BackendServer.load_tasks() new_tasks = BACKEND.load_tasks()
if new_tasks is None: if new_tasks is None:
# Check in cache # Check in cache
utils.log('Checking for task list in cache...') utils.log('Checking for task list in cache...')
new_tasks = CACHE.get_tasks(BackendServer.phone.name) new_tasks = CACHE.get_tasks(BACKEND.phone.name)
# Did we fetch anything ? # Did we fetch anything ?
if new_tasks: if new_tasks:
utils.log(f' Task list found.') utils.log(f' Task list found.')
# Merge with existing ones. Some tasks can be removed, some can be add. # Merge with existing ones. Some tasks can be removed, some can be add.
changed = TASK_LIST.merge_with(incoming_tasklist = new_tasks) changed = TASK_LIST.merge_with(incoming_tasklist = new_tasks)
CACHE.put_tasks(name=BackendServer.phone.name, tasks=TASK_LIST) CACHE.put_tasks(name=BACKEND.phone.name, tasks=TASK_LIST)
else: else:
utils.log(' Task isn\'t found in cache.') utils.log(' Task isn\'t found in cache.')
raise RuntimeError(f'No task list found, exiting.') raise RuntimeError(f'No task list found, exiting.')
@ -382,7 +365,7 @@ def run_probe():
utils.log_verbose(f"Resulting task list: {TASK_LIST.tasks}") utils.log_verbose(f"Resulting task list: {TASK_LIST.tasks}")
if FORCE_RUN and len(TASK_LIST.tasks) > 0: if CONFIG.ForceRun and len(TASK_LIST.tasks) > 0:
run_caller_task(TASK_LIST.tasks[0]) run_caller_task(TASK_LIST.tasks[0])
break break
@ -419,179 +402,123 @@ def run_probe():
time.sleep(60) time.sleep(60)
def remove_pid_on_exit():
if CONFIG.QualtestPID:
if CONFIG.QualtestPID.exists():
os.remove(CONFIG.QualtestPID)
def receive_signal(signal_number, frame): def receive_signal(signal_number, frame):
# Delete PID file # Delete PID file
if os.path.exists(QUALTEST_PID): remove_pid_on_exit()
os.remove(QUALTEST_PID)
# Debugging info # Debugging info
print(f'Got signal {signal_number} from {frame}') print(f'Got signal {signal_number} from {frame}')
# Stop GSM call
utils_mcon.gsm_stop_call()
# Exit # Exit
raise SystemExit('Exiting') raise SystemExit('Exiting')
return return
# Check if Python version is ok # Check if Python version is ok
assert sys.version_info >= (3, 6) assert sys.version_info >= (3, 6)
# Use later configuration files # Use later configuration files
# https://stackoverflow.com/questions/3609852/which-is-the-best-way-to-allow-configuration-options-be-overridden-at-the-comman # https://stackoverflow.com/questions/3609852/which-is-the-best-way-to-allow-configuration-options-be-overridden-at-the-comman
if __name__ == '__main__':
parser = argparse.ArgumentParser() CONFIG = AgentConfig()
parser.add_argument("--config", help="Path to config file, see config.in.yaml.")
parser.add_argument("--check-pid-file", action="store_true", help="Check if .pid file exists and exit if yes. Useful for using with .service files")
parser.add_argument("--test", action="store_true", help="Run the first task immediately. Useful for testing.")
# Parse arguments
args = parser.parse_args()
# Show help and exit if required
if len(sys.argv) < 2: if len(sys.argv) < 2:
parser.print_help() CONFIG.parser().print_help()
sys.exit(EXIT_OK) sys.exit(EXIT_OK)
if args.test: if CONFIG.QualtestPID.exists() and CONFIG.CheckPIDFile:
FORCE_RUN = True print(f'File {CONFIG.QualtestPID} exists, seems another instance of script is running. Please delete {CONFIG.QualtestPID} to allow the start.')
if Path(QUALTEST_PID).exists() and args.check_pid_file:
print(f'File {QUALTEST_PID} exists, seems another instance of script is running. Please delete {QUALTEST_PID} to allow the start.')
sys.exit(EXIT_OK) sys.exit(EXIT_OK)
# Check if config file exists # Remove PID file on exit (if needed)
config = None atexit.register(remove_pid_on_exit)
config_path = 'config.yaml'
if args.config:
config_path = args.config
with open(config_path, 'r') as stream:
config = yaml.safe_load(stream)
# register the signals to be caught # register the signals to be caught
signal.signal(signal.SIGINT, receive_signal) signal.signal(signal.SIGINT, receive_signal)
signal.signal(signal.SIGQUIT, receive_signal) signal.signal(signal.SIGQUIT, receive_signal)
# signal.signal(signal.SIGTERM, receive_signal)
# SIGTERM is sent from utils_mcon as well (multiprocessing?)
# Override default audio samplerate if needed # Preconnect the phone
if 'samplerate' in config['audio']: if CONFIG.BT_MAC:
if config['audio']['samplerate']:
utils_mcon.SAMPLERATE = int(config['audio']['samplerate'])
if config['force_task']:
FORCE_RUN = True
if 'speech_detector' in config:
if config['speech_detector']:
USE_SILENCE_ERASER = False
if 'bluetooth_mac' in config['audio']:
bt_mac = config['audio']['bluetooth_mac']
if len(bt_mac) > 0:
# Connect to phone before # Connect to phone before
utils.log(f'Connecting to BT MAC {CONFIG.BT_MAC} ...')
bt_ctl = Bluetoothctl() bt_ctl = Bluetoothctl()
bt_ctl.connect(bt_mac) bt_ctl.connect(CONFIG.BT_MAC)
utils.log(f' Done.')
# Logging settings # Logging settings
utils.verbose_logging = config['log']['verbose'] utils.verbose_logging = CONFIG.Verbose
if config['log']['path']: if CONFIG.LogPath:
LOG_FILEPATH = config['log']['path'] utils.open_log_file(CONFIG.LogPath, 'at')
utils.open_log_file(LOG_FILEPATH, 'at')
# Use native ALSA utilities on RPi
if utils.is_raspberrypi():
utils.log('RPi detected, using alsa-utils player & recorded')
utils_mcon.USE_ALSA_AUDIO = True
if 'ALSA' in config['audio']:
if config['audio']['ALSA']:
utils_mcon.USE_ALSA_AUDIO = True
if 'adb' in config['log']: if CONFIG.CacheDir:
if config['log']['adb']: CACHE = utils_cache.InfoCache(dir=CONFIG.CacheDir)
utils_mcon.VERBOSE_ADB = True
utils.log('Enabled adb logcat output')
# Audio directories
if 'cache_dir' in config:
DIR_CACHE = Path(config['cache_dir'])
if not DIR_CACHE.is_absolute():
DIR_CACHE = DIR_THIS.parent / config['cache_dir']
CACHE = utils_cache.InfoCache(dir=DIR_CACHE)
# Update path to pvqa/aqua-wb # Update path to pvqa/aqua-wb
utils_sevana.find_binaries(DIR_PROJECT / 'bin') utils_sevana.find_binaries(DIR_PROJECT / 'bin')
# Load latest licenses & configs - this requires utils_sevana.find_binaries() to be called before # Load latest licenses & configs - this requires utils_sevana.find_binaries() to be called before
utils_sevana.load_config_and_licenses(config['backend']) # utils_sevana.load_config_and_licenses(config['backend'])
# Audio devices
if 'record_device' in config['audio'] and 'play_device' in config['audio']:
utils_mcon.AUDIO_DEV_RECORD = config['audio']['record_device']
utils_mcon.AUDIO_DEV_PLAY = config['audio']['play_device']
# Limit number of calls # Limit number of calls
if config['task_limit']: if CONFIG.TaskLimit:
CALL_LIMIT = config['task_limit'] utils.log(f'Limiting number of calls to {CONFIG.TaskLimit}')
utils.log(f'Limiting number of calls to {CALL_LIMIT}')
# Reset task list # Reset task list
utils_qualtest.TASK_LIST = [] utils_qualtest.TASK_LIST = []
# Init backend server # Init backend server
BackendServer = utils_qualtest.QualtestBackend() BACKEND = utils_qualtest.QualtestBackend()
BackendServer.instance = config['name'] BACKEND.instance = CONFIG.Name
BackendServer.address = config['backend'] BACKEND.address = CONFIG.Backend
# Write pid file to current working directory # Write pid file to current working directory
with open(QUALTEST_PID, "w") as f: if CONFIG.QualtestPID:
with open(CONFIG.QualtestPID, 'w') as f:
f.write(str(os.getpid())) f.write(str(os.getpid()))
f.close() f.close()
try: try:
# Load information about phone # Load information about phone
utils.log(f'Loading information about the node {BackendServer.instance} from {BackendServer.address}') utils.log(f'Loading information about the node {BACKEND.instance} from {BACKEND.address}')
BackendServer.preload(CACHE) BACKEND.preload(CACHE)
if BackendServer.phone is None: if BACKEND.phone is None:
utils.log_error(f'Failed to obtain information about {BackendServer.instance}. Exiting.') utils.log_error(f'Failed to obtain information about {BACKEND.instance}. Exiting.')
exit(EXIT_ERROR) exit(EXIT_ERROR)
# Cache information # Cache phone information
CACHE.put_phone(BackendServer.phone) CACHE.put_phone(BACKEND.phone)
# Upload results which were remaining in cache # Upload results which were remaining in cache
upload_results() upload_results()
if 'answerer' in BackendServer.phone.role: if 'answerer' in BACKEND.phone.role:
# Check if task name is specified # Check if task name is specified
if not config['task']: if CONFIG.TaskName is None:
utils.log_error('Please specify task value in config file.') utils.log_error('Please specify task value in config file.')
if os.path.exists(QUALTEST_PID): sys.exit(EXIT_ERROR)
os.remove(QUALTEST_PID)
sys.exit(utils_mcon.EXIT_ERROR)
# Save current task name # Save current task name
CURRENT_TASK = config['task'] CURRENT_TASK = CONFIG.TaskName
# Load reference audio # Load reference audio
utils.log('Loading reference audio...') utils.log('Loading reference audio...')
if BackendServer.load_audio(BackendServer.phone.audio_id, REFERENCE_AUDIO): if BACKEND.load_audio(BACKEND.phone.audio_id, CONFIG.ReferenceAudio):
CACHE.add_reference_audio(BackendServer.phone.audio_id, REFERENCE_AUDIO) CACHE.add_reference_audio(BACKEND.phone.audio_id, CONFIG.ReferenceAudio)
else: else:
utils.log_error('Audio is not available online.') utils.log_error('Audio is not available online.')
if not CACHE.get_reference_audio(BackendServer.phone.audio_id, REFERENCE_AUDIO): if not CACHE.get_reference_audio(BACKEND.phone.audio_id, CONFIG.REFERENCE_AUDIO):
utils.log_error('Reference audio is not cached, sorry. Exiting.') utils.log_error('Reference audio is not cached, sorry. Exiting.')
sys.exit(EXIT_ERROR) sys.exit(EXIT_ERROR)
@ -599,7 +526,7 @@ try:
utils.log('Running answering loop...') utils.log('Running answering loop...')
perform_answerer() perform_answerer()
elif 'caller' in BackendServer.phone.role: elif 'caller' in BACKEND.phone.role:
utils.log('Running caller...') utils.log('Running caller...')
run_probe() run_probe()
@ -609,8 +536,4 @@ except Exception as e:
# Close log file # Close log file
utils.close_log_file() utils.close_log_file()
# Exit with success code
if os.path.exists(QUALTEST_PID):
os.remove(QUALTEST_PID)
sys.exit(EXIT_OK) sys.exit(EXIT_OK)

View File

@ -63,10 +63,11 @@ def set_headset_mic_volume(vol: float):
# Function to get the phone stream index to capture the downlink. # Function to get the phone stream index to capture the downlink.
def get_headset_spk_idx(): def get_headset_spk_idx(timeout: int = 10):
utils.log('Waiting for phone stream index (please ensure all PA Bluetooth modules are loaded before)... ') utils.log('Waiting for phone stream index (please ensure all PA Bluetooth modules are loaded before)... ')
phoneIdx = '' phoneIdx = ''
while phoneIdx == '': startTime = time.time()
while phoneIdx == '' and (time.time() - startTime < timeout):
time.sleep(1) time.sleep(1)
# grep 1-4 digit # grep 1-4 digit
phoneIdx = os.popen('pacmd list-sink-inputs | grep -B5 alsa_output | grep index | grep -oP "[0-9]{1,4}"').read() phoneIdx = os.popen('pacmd list-sink-inputs | grep -B5 alsa_output | grep index | grep -oP "[0-9]{1,4}"').read()
@ -116,6 +117,9 @@ def answer_call(play_file: str):
# Record downlink. # Record downlink.
def capture_phone_alsaoutput(output_path: str): def capture_phone_alsaoutput(output_path: str):
default_output = get_headset_spk_idx().rstrip('\n') default_output = get_headset_spk_idx().rstrip('\n')
if default_output == '':
return None
cmd = f'parec --monitor-stream={default_output} --file-format=wav {output_path}' cmd = f'parec --monitor-stream={default_output} --file-format=wav {output_path}'
utils.log(cmd) utils.log(cmd)
# Example: parec --monitor-stream=34 --file-format=wav sample1.wav # Example: parec --monitor-stream=34 --file-format=wav sample1.wav
@ -281,6 +285,11 @@ def run(play_file: str, record_file: str, timelimit_seconds: int, target: str):
# Start recording # Start recording
utils.log(f'Start recording with ALSA to {record_file}') utils.log(f'Start recording with ALSA to {record_file}')
process_recording = capture_phone_alsaoutput(record_file) process_recording = capture_phone_alsaoutput(record_file)
if process_recording is None:
utils.log_error(f'Failed to start recording downlink, exiting.')
cleanup()
return
utils.log(f'Main loop PID: {os.getpid()}, TID: {threading.get_ident()}') utils.log(f'Main loop PID: {os.getpid()}, TID: {threading.get_ident()}')
# Wait until call is finished # Wait until call is finished

View File

@ -1,38 +0,0 @@
#!/usr/bin/python3
import os
import sys
import yaml
import subprocess
import utils_bt_audio
import utils
from bt_controller import Bluetoothctl
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f'Usage: bt_preconnect.py <path to config file>')
exit(0)
with open(sys.argv[1], 'r') as config_stream:
config = yaml.safe_load(config_stream)
if 'bluetooth_mac' in config['audio'] and 'bluetooth' in config['audio']:
use_bt = config['audio']['bluetooth']
bt_mac = config['audio']['bluetooth_mac']
if use_bt and len(bt_mac) > 0:
if not utils_bt_audio.start_PA():
print('Exiting')
exit(1)
# Connect to phone
utils.log(f'Connecting to {bt_mac} ...')
bt_ctl = Bluetoothctl()
status = bt_ctl.connect(bt_mac)
if status:
utils.log(f' Connected ok.')
else:
utils.log_error(f' Not connected, sorry.')
else:
utils.log_error('BT config not found.')
exit(0)

View File

@ -1,648 +0,0 @@
#!/usr/bin/python3
# coding: utf-8
import argparse
from multiprocessing.synchronize import Event
import os
import sys
import traceback
import time
import subprocess
import multiprocessing
import signal
import enum
import utils
import utils_logcat
import utils_rabbitmq
import utils_event
from enum import Enum
from multiprocessing import Value
import utils_alsa
# if not utils.is_raspberrypi():
# import utils_audio
# import uiautomator2 as u2
# This script is a bridge between android phone & audio recording & mobile helper app (Qualtest GSM)
ADB = utils_logcat.ADB
# This script version number
MCON_VERSION = "1.2.7"
# Audio devices to play & record
AUDIO_DEV_PLAY = None
AUDIO_DEV_RECORD = None
# Files to play & record
FILE_PLAY = None
FILE_RECORD = None
# Exit codes
EXIT_SUCCESS = 0
EXIT_ERROR = 1
# Time limitation for monitoring function
TIME_LIMIT_MONITORING = 86400*10000
# Subprocesses
PROCESS_MONITOR : multiprocessing.Process = None
# PROCESS_RECORD : multiprocessing.Process = None
# PROCESS_PLAY : multiprocessing.Process = None
# Log ADB messages in verbose mode ?
VERBOSE_ADB = False
# Call time limit (in seconds)
TIME_LIMIT_CALL = 120
# Silence suffix length (in seconds)
SILENCE_SUFFIX_LENGTH = 30
# Silence prefix length (in seconds)
SILENCE_PREFIX_LENGTH = 15
# Override samplerate if needed
SAMPLERATE: int = 48000
# Processing script
PROCESSING_SCRIPT = None
# Nr of processed calls
PROCESSED_CALLS: Value = Value('i', 0)
# Number of calls todo
LIMIT_CALLS: Value = Value('i', 0)
# Use aplay / arecord from alsa-utils to play&capture an audio
USE_ALSA_AUDIO: bool = False
# Stop notification. Put it to non-zero when script has to be stopped.
STOP_FLAG = multiprocessing.Value('i', 0)
RABBITMQ_CONNECTION = None
RABBITMQ_EXCHANGE = None
RABBITMQ_QUEUE = None
RABBITMQ_SESSIONID = None
# Can be 'caller' or 'answerer'
class Role(Enum):
Caller = 1
Answerer = 2
ROLE = None
def signal_handler(signum, frame):
print(f'Signal handler with code {signum}')
if PROCESS_MONITOR:
if PROCESS_MONITOR.is_alive:
print('Finishing the monitoring process...')
try:
if PROCESS_MONITOR._popen is not None:
PROCESS_MONITOR.terminate()
except Exception:
traceback.print_exc()
print('Signal handler exit.')
exit(0)
def start_gsm_app():
cmdline = f'{ADB} shell am start -n biz.sevana.qualtestgsm/.MainActivity'
retcode = os.system(cmdline)
if retcode != 0:
raise IOError()
# Initiates file playing and wait for finish (optionally)
def play_file(path: str, wait: bool, device: str, samplerate: int = None):
path_to_player = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'audio_play.py')
cmdline = f'python3 {path_to_player} --device "{device}" --input "{path}"'
if samplerate:
cmdline = cmdline + f' --samplerate {samplerate}'
utils.log_verbose(cmdline)
if wait:
os.system(cmdline)
else:
p = subprocess.Popen(cmdline, stdout=subprocess.PIPE, shell=True)
return p
# Initiates file playing and wait for finish (optionally)
def record_file(path: str, wait: bool, device: str, time_limit: int = 10, samplerate: int = None):
path_to_recorder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'audio_record.py')
# Please be aware - macOS prohibits recording from microphone by default. When debugging under VSCode please ensure it has permission to record audio.
cmdline = f'python3 {path_to_recorder} --device "{device}" --output "{path}" --limit {time_limit}'
if samplerate:
cmdline = cmdline + f' --samplerate {samplerate}'
utils.log_verbose(cmdline)
if wait:
os.system(cmdline)
else:
p = subprocess.Popen(cmdline, stdout=subprocess.PIPE, shell=True)
return p
# Accept incoming GSM call
def gsm_accept_incoming():
os.system(f"{ADB} shell input keyevent 5")
# Reject incoming GSM call
def gsm_reject_incoming():
os.system(f"{ADB} shell input keyevent 6")
# Initiate new GSM call
def gsm_make_call(target: str):
os.system(f"{ADB} shell am start -a android.intent.action.CALL -d tel:{target}")
# End current GSM call
def gsm_stop_call():
# os.system(f"{ADB} shell input keyevent 6")
# utils.log_verbose('GSM call stop keyevent is sent.')
pass
def gsm_send_digit(digit: str):
os.system(f"{ADB} shell input KEYCODE_{digit}")
#def gsm_attach_automator():
# # Run stock dialer as way to preload automator stack
# utils.log("Connecting to device...")
# d = u2.connect()
# # Preload GSM helper app
# utils.log("Preloading GSM helper app")
# d.app_start("biz.sevana.qualtestgsm")
# # Wait timeout for UI element is 60.0s
# d.implicitly_wait(60.0)
# # Preload stock dialer
# # utils.log("Preloading stock dialer")
# # d.app_start("com.skype.raider")
# return d
def gsm_switch_to_dtmf_panel(d):
# As stub for now - use Skype Contact click
# d(resourceId="com.skype.raider:id/vm_name", text=contact_name).click()
return None
def run_shell_script(file_recorded: str, file_played: str, number: str):
global PROCESSED_CALLS
# Log about passed parameters
utils.log_verbose(f'Running shell script with variables: recorded - {file_recorded}, played - {file_played}, number - {number}')
utils.log_verbose(f'Template: {PROCESSING_SCRIPT}')
# Prepare command line
cmdline = PROCESSING_SCRIPT.replace('$RECORDED', file_recorded).replace('$PLAYED', file_played).replace('$NUMBER', number)
utils.log_verbose(cmdline)
# Run script
retcode = os.system(cmdline)
if retcode != 0:
utils.log_error(f'Processing script call \'{cmdline}\' returned exit code {retcode}')
PROCESSED_CALLS.value = PROCESSED_CALLS.value + 1
return True
def run_error_handler(error_message):
global PROCESSED_CALLS
utils.log_error(f'Processing script call ended with problem: {error_message}')
# Increase counter of processed calls to allow script to exit
PROCESSED_CALLS.value = PROCESSED_CALLS.value + 1
class CallState(enum.Enum):
IDLE = 0
INCOMING = 1
ESTABLISHED = 2
# Monitor logcat output and tell about events
# on_start is lambda with 3 parameters (file_test, file_reference, phone_number)
# on_finish is lambda with 3 parameters (file_test, file_reference, phone_number)
PREPARED_REFERENCE_AUDIO = '/dev/shm/built_reference.wav'
def gsm_monitor(file_to_play: str, file_to_record: str, on_start, on_finish, on_error):
global PREPARED_REFERENCE_AUDIO, STOP_FLAG, USE_ALSA_AUDIO, AUDIO_DEV_RECORD, AUDIO_DEV_PLAY
utils.log_verbose(f'File to play: {file_to_play}, file to record: {file_to_record}')
utils.log_verbose(f'on_start: {on_start}, on_finish: {on_finish}, on_error: {on_error}')
# Reset stop flag
STOP_FLAG.value = 0
# Prepare reference audio for RPi
utils.prepare_reference_file(fname=file_to_play,
silence_prefix_length=SILENCE_PREFIX_LENGTH,
silence_suffix_length=SILENCE_SUFFIX_LENGTH,
output_fname=PREPARED_REFERENCE_AUDIO)
# Create event queue
event_queue = multiprocessing.Queue()
# Logcat event source
logcat = utils_logcat.LogcatEventSource()
logcat.queue = event_queue
logcat.open()
# RabbitMQ event source
rabbitmq = utils_rabbitmq.RabbitMQServer()
rabbitmq.event_queue = event_queue
rabbitmq.queue_name = RABBITMQ_QUEUE
rabbitmq.exchange_name = RABBITMQ_EXCHANGE
rabbitmq.url = RABBITMQ_CONNECTION
rabbitmq.open()
# Audio related processes and poll objects
audio_player = None
audio_recorder = None
# Ensure audio devices are recognized
if USE_ALSA_AUDIO:
if AUDIO_DEV_RECORD == 'auto':
AUDIO_DEV_RECORD = utils_alsa.AlsaRecorder.find_default()
utils.log(f'Recording device resolved to {AUDIO_DEV_RECORD}')
if AUDIO_DEV_PLAY == 'auto':
AUDIO_DEV_PLAY = utils_alsa.AlsaPlayer.find_default()
utils.log(f'Playing device resolved to {AUDIO_DEV_PLAY}')
# Monitoring start time
timestamp_start = utils.get_monotonic_time()
# Call start time
timestamp_call = None
if ROLE == Role.Caller:
timestamp_call = utils.get_monotonic_time()
# Should call to be stopped ?
force_call_stop = False
call_state : CallState = CallState.IDLE
# Read logcat output line by line
while True:
# Check if time limit is hit
if utils.get_monotonic_time() - timestamp_start > TIME_LIMIT_MONITORING:
break
# Check if limit of calls hit
if LIMIT_CALLS.value != 0 and PROCESSED_CALLS.value >= LIMIT_CALLS.value:
break
# Check if call hit maximum length - smth goes weird, exit from the script
if timestamp_call:
if util.get_monotonic_time() - timestamp_call > TIME_LIMIT_CALL:
utils.log_verbose(f'Call time limit ({TIME_LIMIT_CALL}s). Stop the call.')
timestamp_call = None
# Try to end mobile call twice. Sometimes first attempt fails (observed on Galaxy M11).
gsm_stop_call()
gsm_stop_call()
if ROLE == Role.Caller:
# Treat call as stopped
# Exit from loop
utils.log_verbose(f'Exit from the processing loop as call time limit hit; smth goes wrong, exit from the script.')
# Signal to caller to stop processing outer script
STOP_FLAG.value = 1
# Exit
exit(1)
# break
# Next event ?
event: utils_event.CallEvent = None
try:
event = event_queue.get(timeout = 1.0)
except:
# No event available
continue
if event is None:
continue
if len(event.session_id) > 0 and event.session_id != RABBITMQ_SESSIONID:
utils.log_verbose(f'Skip event from old session')
continue
# Process events
if event.name == utils_event.EVENT_IDLE:
idle_detected = True
elif event.name == utils_event.EVENT_CALL_INCOMING:
if call_state != CallState.IDLE:
utils.log(f'Duplicate event {event}, ignoring.')
continue
call_state = CallState.INCOMING
# Accept incoming call
utils.log_verbose(f'Detected Incoming call notification (number {event.number}) from mobile helper app.')
# Double accept - sometimes phones ignore the first attempts
gsm_accept_incoming()
gsm_accept_incoming()
utils.log_verbose(f'Incoming call accepted.')
elif event.name == utils_event.EVENT_CALL_FINISHED:
if call_state != CallState.ESTABLISHED:
utils.log(f'Duplicate event {event}, ignoring.')
call_state = CallState.IDLE
utils.log_verbose(f'Detected call stop notification from the mobile helper app')
# Reset counter of call length
timestamp_call = None
# Stop playing & capturing
utils.log_verbose(f'Call from {event.number} finished.')
if audio_recorder:
audio_recorder.stop_recording()
audio_recorder.close()
audio_recorder = None
if audio_player:
audio_player.stop_playing()
audio_player.close()
audio_player = None
# Restart audio - lot of debugging output from ALSA libraries can be here. It is a known problem of ALSA libraries.
if USE_ALSA_AUDIO:
utils_alsa.restart_audio()
else:
utils_audio.restart_audio()
# Here recording finished, call script to process
if on_finish:
if os.path.exists(file_to_record):
utils.log(f'Recorded file: {file_to_record}')
# Call handler
if on_finish(file_to_record, file_to_play, event.permissions) in [False, None] :
utils.log_error(f'Analyzer routine returned negative result, exiting.')
# Signal to caller to stop processing outer script
STOP_FLAG.value = 1
sys.exit(EXIT_ERROR)
# Remove processed file before writing the next one
# if os.path.exists(file_to_record):
# os.remove(file_to_record)
else:
utils.log_error(f'Smth wrong - no recorded file {file_to_record}')
if not on_finish(None, file_to_play, None):
# Signal to caller to stop processing outer script
STOP_FLAG.value = 1
sys.exit(EXIT_ERROR)
elif event.name == utils_event.EVENT_CALL_ESTABLISHED:
if call_state == CallState.ESTABLISHED:
utils.log(f'Duplicate event {event}, ignoring.')
continue
call_state = CallState.ESTABLISHED
utils.log_verbose(f'Detected call start notification from the mobile helper app, trying to start audio.')
# Save call start time
timestamp_call = utils.get_monotonic_time()
# Is audio failed
audio_failed = False
# Start playing
utils.log_verbose(f'Call with {event.number} is established.')
if file_to_play:
if not USE_ALSA_AUDIO:
device_index, device_rate = utils_audio.get_output_device_index(AUDIO_DEV_PLAY)
if SAMPLERATE:
device_rate = SAMPLERATE
utils.resample_to(file_to_play, int(device_rate))
utils.log_verbose(f'Playing file: {file_to_play}')
try:
if USE_ALSA_AUDIO:
audio_player = utils_alsa.AlsaPlayer(device_name=AUDIO_DEV_PLAY, channels=2, rate=48000, fname=PREPARED_REFERENCE_AUDIO)
else:
audio_player = utils_audio.Player(device_index=device_index).open(fname=file_to_play,
silence_prefix=SILENCE_PREFIX_LENGTH, silence_suffix=SILENCE_SUFFIX_LENGTH)
audio_player.start_playing()
except Exception as e:
utils.log_error(e)
audio_failed = True
# Start capturing
if file_to_record and not audio_failed:
utils.log_verbose(f'Recording file: {file_to_record}')
# Remove old file if needed
if os.path.exists(file_to_record):
os.remove(file_to_record)
if not USE_ALSA_AUDIO:
device_index, device_rate = utils_audio.get_input_device_index(AUDIO_DEV_RECORD)
if SAMPLERATE:
device_rate = SAMPLERATE
try:
if USE_ALSA_AUDIO:
audio_recorder = utils_alsa.AlsaRecorder(device_name=AUDIO_DEV_RECORD, rate=int(device_rate), fname=file_to_record)
else:
audio_recorder = utils_audio.Recorder(device_index=device_index, rate=int(device_rate)).open(fname=file_to_record)
audio_recorder.start_recording()
except Exception as e:
utils.log_error(e)
audio_failed = True
if audio_failed:
gsm_stop_call()
gsm_stop_call()
if on_error:
on_error('Audio failed.')
elif on_start:
on_start(file_to_record, file_to_play, event.number)
def make_call(target: str):
global ROLE, PROCESS_MONITOR, STOP_FLAG, PROCESSED_CALLS
ROLE = Role.Caller
# Start subprocess to monitor events from Qualtest GSM
finish_handler = lambda file_record, file_play, number: run_shell_script(file_record, file_play, number)
error_handler = lambda error_message: run_error_handler(error_message)
PROCESS_MONITOR = multiprocessing.Process(target=gsm_monitor, args=(FILE_PLAY, FILE_RECORD, None, finish_handler, error_handler))
PROCESS_MONITOR.start()
# Initiate GSM phone call via adb
gsm_make_call(target)
# Log
utils.log_verbose('Call is initiated, processing...')
# Wait for call finish with some timeout. Kill monitoring process on finish.
while True and STOP_FLAG.value != 1 and PROCESSED_CALLS.value == 0:
time.sleep(0.5)
# Kill the monitoring process - this will send SIGTERM signal. It is a cause why agent_gsm doesn't handle SIGTERM
PROCESS_MONITOR.terminate()
return None
def answer_calls():
global ROLE, PROCESS_MONITOR, STOP_FLAG, PROCESSED_CALLS
ROLE = Role.Answerer
# Start subprocess to monitor events from Qualtest GSM.
finish_handler = lambda file_record, file_play, number: run_shell_script(file_record, file_play, number)
error_handler = lambda error_message: run_error_handler(error_message)
PROCESS_MONITOR = multiprocessing.Process(target=gsm_monitor, args=(FILE_PLAY, FILE_RECORD, None, finish_handler, error_handler))
PROCESS_MONITOR.start()
# Indefinite loop. Exit is in signal handler
while True and STOP_FLAG.value != 1 and PROCESSED_CALLS.value == 0:
time.sleep(0.5)
# Kill the monitoring process - this will send SIGTERM signal. It is a cause why agent_gsm doesn't handle SIGTERM
PROCESS_MONITOR.terminate()
return None
if __name__ == '__main__':
# Default exit code
retcode = EXIT_SUCCESS
# Handle signals
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Command line parameters
parser = argparse.ArgumentParser()
parser.add_argument("--play-device", help="Output audio device name. Used to play reference audio to mobile call. Example (for ALSA): hw:2,0")
parser.add_argument("--record-device", help="Input device name. Used to record audio received from the mobile call.")
# parser.add_argument("--show-devices", help="list available output audio devices.", action="store_true")
parser.add_argument("--make-call", help="Target number as is. Usuall smth like +XYZ. Initiate a call to target number invoking the call on mobile phone and playing/recording audio to/from the call. Otherwise script will run expecting for incoming call.")
parser.add_argument("--play-file", help="Path to played (reference) audio. On RPi platform this should be 48KHz stereo audio.")
parser.add_argument("--record-file", help="Path to recorded audio (received from mobile call). On RPi platform it will be 48KHz mono audio.")
parser.add_argument("--exec", help="Path to postprocessing script. Postprocessing script will be run after the call finish with path to recorded audio as parameter. This should be a string like /home/user/postprocessing.sh $RECORDED. Substring $RECORDED will be replaced with actual path to recorded audio.")
# parser.add_argument("--adb-path", help="Path to adb utility. This must be set to work with mobile phone!")
parser.add_argument("--call-timelimit", help="Number of seconds. Call will be ended after specified timeout. Default value is 0 - no timeout.")
parser.add_argument("--test-play", help="Play test audio file. Useful when testing configuration. However this will not work on RPi.", action="store_true")
parser.add_argument("--test-record", help="Record test audio file for 10 seconds. Useful when testing configuration. However this will not work on RPi.", action="store_true")
parser.add_argument("--silence-prefix", help="Number of seconds. Adds silence before played audio. Default value is 10 (seconds)")
parser.add_argument("--silence-suffix", help="Number of seconds. Adds silence after played audio. Default value is 10 (seconds)")
parser.add_argument("--verbose", help="Run in verbose mode. It doesn't generate too much data, recommended to set.", action="store_true")
parser.add_argument("--verbose-adb", help="Log ADB messages when running in verbose mode. This can generate a lot of data, please be aware.", action="store_true")
parser.add_argument("--log-file", help="Path to log file. By default log is sent to console.")
parser.add_argument("--version", help="Show version number & exit", action="store_true")
parser.add_argument("--alsa-audio", help="Use ALSA audio instead of PyAudio (portaudio)", action="store_true")
parser.add_argument("--rabbitmq-connection")
parser.add_argument("--rabbitmq-exchange")
parser.add_argument("--rabbitmq-queue")
parser.add_argument("--rabbitmq-sessionid")
# parser.add_argument("--dtmf", help="Send DTMF string after call establishing and finish a call. Helper tool for some cases.")
# parser.add_argument("--samplerate", help="<audio samplerate>. Overrides default audio samplerate.")
args = parser.parse_args()
if args.version:
print(f"Version: {MCON_VERSION}")
sys.exit(0)
RABBITMQ_CONNECTION = args.rabbitmq_connection
RABBITMQ_EXCHANGE = args.rabbitmq_exchange
RABBITMQ_QUEUE = args.rabbitmq_queue
RABBITMQ_SESSIONID = args.rabbitmq_sessionid
# ALSA audio ? Required on RPi
USE_ALSA_AUDIO = args.alsa_audio
# Open log file if needed
VERBOSE_ADB = args.verbose_adb
utils.verbose_logging = args.verbose
if args.log_file:
utils.open_log_file(args.log_file, "at")
utils.log(f"mcon version: {MCON_VERSION}")
if args.call_timelimit:
TIME_LIMIT_CALL = int(args.call_timelimit)
elif args.play_file:
TIME_LIMIT_CALL = utils.get_wav_length(args.play_file)
utils.log(f'Limiting call time to {TIME_LIMIT_CALL}')
# Save audio devices
if args.play_device:
AUDIO_DEV_PLAY = args.play_device
if args.record_device:
AUDIO_DEV_RECORD = args.record_device
# Save files to play & record
if args.play_file:
FILE_PLAY = args.play_file
if args.record_file:
FILE_RECORD = args.record_file
# Processing script
if args.exec:
PROCESSING_SCRIPT = args.exec
# Should we make test here ?
if args.test_play:
if FILE_PLAY:
utils.log(f"Start test playing {FILE_PLAY} to {AUDIO_DEV_PLAY}")
play_file(FILE_PLAY, device=AUDIO_DEV_PLAY, wait=True)
else:
utils.log_error("File to play is not specified, exiting.")
retcode = EXIT_ERROR
sys.exit(retcode)
if args.test_record:
if FILE_RECORD:
utils.log(f"Start test recording from {AUDIO_DEV_RECORD} to {FILE_RECORD}")
record_file(FILE_RECORD, device=AUDIO_DEV_RECORD, wait=True)
else:
utils.log_error("File to record is not specified, exiting")
retcode = EXIT_ERROR
sys.exit(retcode)
# Check if we have to make a call
try:
if args.make_call:
make_call(args.make_call)
else:
answer_calls()
except Exception as e:
utils.log_error(e)
# Close log file
utils.close_log_file()
# Exit code 0 (success)
sys.exit(retcode)

View File

@ -7,6 +7,11 @@ import json
from crontab import CronTab from crontab import CronTab
# Exit codes
EXIT_OK = 0
EXIT_ERROR = 1
class Phone: class Phone:
identifier: int = 0 identifier: int = 0
name: str = "" name: str = ""