import time import pexpect import subprocess import sys class BluetoothctlError(Exception): """This exception is raised, when bluetoothctl fails to start.""" pass class Bluetoothctl: """A wrapper for bluetoothctl utility.""" def __init__(self): out = subprocess.check_output("rfkill unblock bluetooth", shell = True) # print("Bluetoothctl") self.child = pexpect.spawn("bluetoothctl", echo = False) def get_output(self, command, pause = 0): """Run a command in bluetoothctl prompt, return output as a list of lines.""" self.child.send(command + "\n") time.sleep(pause) start_failed = self.child.expect(["[.]*", pexpect.TIMEOUT, pexpect.EOF]) if start_failed: raise BluetoothctlError("Bluetoothctl failed after running " + command) t = self.child.before return t.decode('utf-8').split("\r\n") def start_scan(self): """Start bluetooth scanning process.""" try: out = self.get_output("scan on") except BluetoothctlError as e: print(e) return None def make_discoverable(self): """Make device discoverable.""" try: out = self.get_output("discoverable on") except BluetoothctlError as e: print(e) return None def parse_device_info(self, info_string): """Parse a string corresponding to a device.""" device = {} block_list = ["[\x1b[0;", "removed"] string_valid = not any(keyword in info_string for keyword in block_list) if string_valid: try: device_position = info_string.index("Device") except ValueError: pass else: if device_position > -1: attribute_list = info_string[device_position:].split(" ", 2) device = { "mac_address": attribute_list[1], "name": attribute_list[2] } return device def get_available_devices(self): """Return a list of tuples of paired and discoverable devices.""" try: out = self.get_output("devices") except BluetoothctlError as e: print(e) return None else: available_devices = [] for line in out: device = self.parse_device_info(line) if device: available_devices.append(device) return available_devices def get_paired_devices(self): """Return a list of tuples of paired devices.""" try: out = self.get_output("paired-devices") except BluetoothctlError as e: print(e) return None else: paired_devices = [] for line in out: device = self.parse_device_info(line) if device: paired_devices.append(device) return paired_devices def get_discoverable_devices(self): """Filter paired devices out of available.""" available = self.get_available_devices() paired = self.get_paired_devices() return [d for d in available if d not in paired] def get_device_info(self, mac_address): """Get device info by mac address.""" try: out = self.get_output("info " + mac_address) except BluetoothctlError as e: print(e) return None else: return out def pair(self, mac_address): """Try to pair with a device by mac address.""" try: out = self.get_output("pair " + mac_address, 4) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF]) success = True if res == 1 else False return success def remove(self, mac_address): """Remove paired device by mac address, return success of the operation.""" try: out = self.get_output("remove " + mac_address, 3) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["not available", "Device has been removed", pexpect.EOF]) success = True if res == 1 else False return success def connect(self, mac_address): """Try to connect to a device by mac address.""" try: out = self.get_output("connect " + mac_address, 2) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF]) success = True if res == 1 else False return success def disconnect(self, mac_address): """Try to disconnect to a device by mac address.""" try: out = self.get_output("disconnect " + mac_address, 2) except BluetoothctlError as e: print(e) return None else: res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF]) success = True if res == 1 else False return success