agent_gsm/src/bt_call_controller.py

335 lines
9.8 KiB
Python

#!/usr/bin/python3
import signal
import subprocess
import os
import time
import dbus
import tempfile
import argparse
import threading
import multiprocessing
import soundfile
import utils
import utils_bt_audio
import bt_phone
from bt_controller import Bluetoothctl
# Current call path
CALL_PATH = ''
CALL_ADDED = multiprocessing.Value('b', False)
CALL_REMOVED = multiprocessing.Value('b', False)
CALL_LOCK = threading.Lock()
# Call state change event
class CallState(bt_phone.Observer):
def update(self, call_object, event_type):
global CALL_PATH, CALL_LOCK, CALL_ADDED, CALL_REMOVED
utils.log(f'Call path: {call_object}, event: {event_type}. PID: {os.getpid()}, TID: {threading.get_ident()}')
if event_type == bt_phone.EVENT_CALL_REMOVE:
CALL_PATH = None
CALL_REMOVED.value = True
utils.log('Set CALL_REMOVED = True')
elif event_type == bt_phone.EVENT_CALL_ADD:
CALL_PATH = str(call_object)
CALL_REMOVED.value = False
CALL_ADDED.value = True
# Listen to call changes
CALL_STATE_EVENT = CallState()
PHONE = bt_phone.Phone()
PHONE.addObserver(CALL_STATE_EVENT)
# virtualmic module
PA_MODULE_IDX = -1
# Set volume 0..100%
def set_headset_spk_volume(vol: float):
cmd = f'pacmd set-sink-volume 0 0x {format(vol*100)}'
ret = os.popen(cmd).read()
return ret
def set_headset_mic_volume(vol: float):
cmd = f'pacmd set-source-volume 0 0x {format(vol*100)}'
ret = os.popen(cmd).read()
return ret
# Function to get the phone stream index to capture the downlink.
def get_headset_spk_idx():
utils.log('Waiting for phone stream index (please ensure all PA Bluetooth modules are loaded before)... ')
phoneIdx = ''
while phoneIdx == '':
time.sleep(1)
# grep 1-4 digit
phoneIdx = os.popen('pacmd list-sink-inputs | grep -B5 alsa_output | grep index | grep -oP "[0-9]{1,4}"').read()
return phoneIdx
# Start a call
def dial_number(number: str, play_file: str):
global CALL_PATH, CALL_LOCK, CALL_ADDED, CALL_REMOVED
if CALL_PATH is not None and len(CALL_PATH) > 0:
utils.log('Call exists already')
return
# Start audio inject
utils.log(f'Inject to uplink {play_file}')
inject_to_uplink(play_file)
# Initiate a call
utils.log(f'Initiate call to {number}')
PHONE.call_number(number)
# Answer the call
def answer_call(play_file: str):
global CALL_PATH, CALL_LOCK, CALL_ADDED
utils.log('Waiting for incoming call...')
# Wait for incoming call
while not CALL_ADDED.value:
time.sleep(0.1)
utils.log(f'Found incoming call {CALL_PATH}')
# CALL_LOCK.release()
# Start audio inject
inject_to_uplink(play_file)
# Answer the call
utils.log(f'Accepting the call {CALL_PATH}')
# Accept the call
PHONE.answer_call(CALL_PATH)
# Record downlink.
def capture_phone_alsaoutput(output_path: str):
default_output = get_headset_spk_idx().rstrip('\n')
cmd = f'parec --monitor-stream={default_output} --file-format=wav {output_path}'
utils.log(cmd)
# Example: parec --monitor-stream=34 --file-format=wav sample1.wav
parec_process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
utils.log('Start recording downlink.')
return parec_process
# Cleanup
def cleanup():
global PA_MODULE_IDX, CALL_PATH, CALL_LOCK
utils.log(f'Cleaning call {CALL_PATH}...')
if PA_MODULE_IDX != -1:
cmd = f'pactl unload-module {PA_MODULE_IDX}'
utils.log(f'Unloading PulseAudio module... {cmd}')
p = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE)
# Wait process to terminate to prevent hang the ssh session
(err, out) = p.communicate()
utils.log(f'PulseAudio module is unloaded.')
PA_MODULE_IDX = -1
# Stop the call itself
stop_call()
PHONE.quit_dbus_loop()
utils.log(f'Cleanup is finished: PID: {os.getpid()}')
# Function to inject to the uplink.
# Note: This function must run prior to the dial_number.
def inject_to_uplink(input_filename: str, verbose: bool = True):
global PA_MODULE_IDX
source_name = 'virtualmic'
default = '1'
format = 's16le'
rate = '44100'
channels = '1'
# Generate name for pipe
pipe_filename = tempfile.NamedTemporaryFile().name
cmd = f'pactl load-module module-pipe-source source_name={source_name} file={pipe_filename} format={format} rate={rate} channels={channels}'
utils.log(cmd)
# Create source
try:
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
outdata = p.stdout.read()
PA_MODULE_IDX = int( outdata.decode('utf8').rstrip("\n") )
if verbose:
utils.log(f'PulseAudio module index: {PA_MODULE_IDX}')
if default != '':
cmd = f'pactl set-default-source {source_name}'
utils.log(cmd)
p = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE)
outdata = p.stdout.read()
# print(outdata)
except Exception as e:
print('Failed to inject audio to uplink')
pass
# Send file to pipe - use ffmpeg
cmd = f'ffmpeg -hide_banner -loglevel error -re -i {input_filename} -f {format} -ar {rate} -ac {channels} - > {pipe_filename}'
utils.log(cmd)
p = subprocess.Popen(cmd, shell=True,stdout=subprocess.PIPE)
# (err, out) = p.communicate()
utils.log('Audio is injecting to uplink')
# Connect Rpi to phone as headset.
def connect_to_phone():
utils.log("Init bluetooth...")
bl = Bluetoothctl()
utils.log('BT control ready.')
devices = bl.get_paired_devices()
utils.log(f'List BT devices: {devices}')
if devices != None:
# dev = bl.get_device_info( devices[0].get('mac_address') )
utils.log(devices)
# disconnect before connect
bl.disconnect( devices[0].get('mac_address') )
ret = bl.connect(devices[0].get('mac_address'))
if ret == False:
utils.log( 'Connect to %s:%s failed' % ( devices[0].get('name'),devices[0].get('mac_address') ) )
return False
else:
utils.log( 'Connect to %s:%s success' % ( devices[0].get('name'), devices[0].get('mac_address') ) )
return True
else:
utils.log("no bluetooth device")
return False
# Function to stop the call once timing is expired.
def stop_call():
utils.log('Stopping all calls...')
PHONE.hangup_call()
# Returns pid of specified process
def get_pid(name):
return int(subprocess(["pidof","-s",name]))
def main(args: dict):
global CALL_PATH, CALL_LOCK, CALL_ADDED, CALL_REMOVED
# Ensure Ctrl-C handler is default
# signal.signal(signal.SIGINT, signal.SIG_DFL)
# Check if input file exists
if not os.path.exists(args['play_file']):
utils.log(f'Problem: file to play ({args["play_file"]}) doesn\'t exists.')
exit(os.EX_DATAERR)
# Duration in seconds
watchdog_timeout = int(args['timelimit'])
if watchdog_timeout == 0:
# Use duration of played file
audio_file = soundfile.SoundFile(args['play_file'])
watchdog_timeout = int(audio_file.frames / audio_file.samplerate + 0.5)
utils.log(f'Play timeout is set to {watchdog_timeout} seconds')
# Empty call path means 'no call started'
# CALL_LOCK.acquire()
CALL_PATH = ''
CALL_ADDED.value = False
CALL_REMOVED.value = False
# CALL_LOCK.release()
# This is done in preconnect script
# Ensure PulseAudio is running
# if not utils_bt_audio.start_PA():
# utils.log('Exiting.')
# exit(1)
# Attach to DBus (detach will happen in cleanup() function)
PHONE.setup_dbus_loop()
# Start call
if 'target' in args:
target_number = args['target']
if target_number is not None and len(target_number) > 0:
# Make a call
dial_number(target_number, args['play_file'])
else:
answer_call(args['play_file'])
else:
answer_call(args['play_file'])
# Don't make volume 100% - that's too much
audio_volume = 50
utils.log(f'Adjust speaker and microphone volume to {audio_volume}%')
set_headset_spk_volume(audio_volume)
set_headset_mic_volume(audio_volume)
# Start recording
utils.log(f'Start recording with ALSA to {args["record_file"]}')
process_recording = capture_phone_alsaoutput(args['record_file'])
utils.log(f'Main loop PID: {os.getpid()}, TID: {threading.get_ident()}')
# Wait until call is finished
time_start = time.time()
while not CALL_REMOVED.value and time_start + watchdog_timeout > time.time():
time.sleep(0.5)
utils.log(f'Call {CALL_PATH} finished.')
process_recording.kill()
cleanup()
retcode = os.system('pkill parec')
if retcode != 0:
print(f'Failed to terminate parec, exit code {retcode}')
utils.log('Exit')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Raspberry Pi headset.')
parser.add_argument('--play-file', help='File to play.', required=True)
parser.add_argument('--record-file', help='File to record.', default='bt_recorded.wav', required=True)
parser.add_argument('--timelimit', help='Call duration.', default=0, type=int, required=True)
parser.add_argument('--target', help='Phone number to dial. If missed - try to answer the call.', type=str)
args = vars(parser.parse_args())
retcode = 0
try:
main(args)
except KeyboardInterrupt as e:
print('Ctrl-C pressed, exiting')
cleanup()
retcode = 130 # From http://tldp.org/LDP/abs/html/exitcodes.html
except Exception as e:
print(e)
print('Finalizing...')
cleanup()
retcode = 1
print(f'Call controller exits with return code {retcode}')
exit(retcode)