- initial import
This commit is contained in:
163
src/bt_controller.py
Normal file
163
src/bt_controller.py
Normal file
@@ -0,0 +1,163 @@
|
||||
import time
|
||||
import pexpect
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
class BluetoothctlError(Exception):
|
||||
"""This exception is raised, when bluetoothctl fails to start."""
|
||||
pass
|
||||
|
||||
|
||||
class Bluetoothctl:
|
||||
"""A wrapper for bluetoothctl utility."""
|
||||
|
||||
def __init__(self):
|
||||
out = subprocess.check_output("rfkill unblock bluetooth", shell = True)
|
||||
# print("Bluetoothctl")
|
||||
self.child = pexpect.spawn("bluetoothctl", echo = False)
|
||||
|
||||
def get_output(self, command, pause = 0):
|
||||
"""Run a command in bluetoothctl prompt, return output as a list of lines."""
|
||||
self.child.send(command + "\n")
|
||||
time.sleep(pause)
|
||||
start_failed = self.child.expect(["[.]*", pexpect.TIMEOUT, pexpect.EOF])
|
||||
|
||||
if start_failed:
|
||||
raise BluetoothctlError("Bluetoothctl failed after running " + command)
|
||||
|
||||
t = self.child.before
|
||||
return t.decode('utf-8').split("\r\n")
|
||||
|
||||
def start_scan(self):
|
||||
"""Start bluetooth scanning process."""
|
||||
try:
|
||||
out = self.get_output("scan on")
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
def make_discoverable(self):
|
||||
"""Make device discoverable."""
|
||||
try:
|
||||
out = self.get_output("discoverable on")
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
def parse_device_info(self, info_string):
|
||||
"""Parse a string corresponding to a device."""
|
||||
device = {}
|
||||
block_list = ["[\x1b[0;", "removed"]
|
||||
string_valid = not any(keyword in info_string for keyword in block_list)
|
||||
|
||||
if string_valid:
|
||||
try:
|
||||
device_position = info_string.index("Device")
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
if device_position > -1:
|
||||
attribute_list = info_string[device_position:].split(" ", 2)
|
||||
device = {
|
||||
"mac_address": attribute_list[1],
|
||||
"name": attribute_list[2]
|
||||
}
|
||||
|
||||
return device
|
||||
|
||||
def get_available_devices(self):
|
||||
"""Return a list of tuples of paired and discoverable devices."""
|
||||
try:
|
||||
out = self.get_output("devices")
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
else:
|
||||
available_devices = []
|
||||
for line in out:
|
||||
device = self.parse_device_info(line)
|
||||
if device:
|
||||
available_devices.append(device)
|
||||
|
||||
return available_devices
|
||||
|
||||
def get_paired_devices(self):
|
||||
"""Return a list of tuples of paired devices."""
|
||||
try:
|
||||
out = self.get_output("paired-devices")
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
else:
|
||||
paired_devices = []
|
||||
for line in out:
|
||||
device = self.parse_device_info(line)
|
||||
if device:
|
||||
paired_devices.append(device)
|
||||
|
||||
return paired_devices
|
||||
|
||||
def get_discoverable_devices(self):
|
||||
"""Filter paired devices out of available."""
|
||||
available = self.get_available_devices()
|
||||
paired = self.get_paired_devices()
|
||||
|
||||
return [d for d in available if d not in paired]
|
||||
|
||||
def get_device_info(self, mac_address):
|
||||
"""Get device info by mac address."""
|
||||
try:
|
||||
out = self.get_output("info " + mac_address)
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
else:
|
||||
return out
|
||||
|
||||
def pair(self, mac_address):
|
||||
"""Try to pair with a device by mac address."""
|
||||
try:
|
||||
out = self.get_output("pair " + mac_address, 4)
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
else:
|
||||
res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF])
|
||||
success = True if res == 1 else False
|
||||
return success
|
||||
|
||||
def remove(self, mac_address):
|
||||
"""Remove paired device by mac address, return success of the operation."""
|
||||
try:
|
||||
out = self.get_output("remove " + mac_address, 3)
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
else:
|
||||
res = self.child.expect(["not available", "Device has been removed", pexpect.EOF])
|
||||
success = True if res == 1 else False
|
||||
return success
|
||||
|
||||
def connect(self, mac_address):
|
||||
"""Try to connect to a device by mac address."""
|
||||
try:
|
||||
out = self.get_output("connect " + mac_address, 2)
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
else:
|
||||
res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF])
|
||||
success = True if res == 1 else False
|
||||
return success
|
||||
|
||||
def disconnect(self, mac_address):
|
||||
"""Try to disconnect to a device by mac address."""
|
||||
try:
|
||||
out = self.get_output("disconnect " + mac_address, 2)
|
||||
except BluetoothctlError as e:
|
||||
print(e)
|
||||
return None
|
||||
else:
|
||||
res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF])
|
||||
success = True if res == 1 else False
|
||||
return success
|
||||
Reference in New Issue
Block a user