Files
agent_gsm/src/utils_logcat.py
2023-08-22 18:55:00 +03:00

107 lines
2.8 KiB
Python

import os
import utils
import multiprocessing
import subprocess
import select
import time
import utils
import utils_event
# adb utility location
ADB = '/usr/bin/adb'
class LogcatEventSource(multiprocessing.Process):
terminate_flag: multiprocessing.Value = multiprocessing.Value('b')
# Monitoring time limit (in seconds)
timelimit: float = 300.0
# Please set this value before opening the logcat
queue: multiprocessing.Queue
def __init__(self):
super().__init__()
return
# def __repr__(self) -> str:
# return ''
def run(self):
# Clear from old logcat output
os.system(f'{ADB} logcat -c')
# Open adb logcat - show only messages from QualTestGSM
cmdline = f'{ADB} logcat QualTestGSM:D *.S'
utils.log_verbose(f'ADB command line: {cmdline}')
process_logcat = subprocess.Popen(cmdline, stdout=subprocess.PIPE, shell=True)
process_poll = select.poll()
process_poll.register(process_logcat.stdout, select.POLLIN)
# Monitoring start time
current_timestamp = utils.get_monotonic_time()
# Read logcat output line by line
while self.terminate_flag.value == 0:
# Check if time limit is hit
if utils.get_monotonic_time() - current_timestamp > self.timelimit:
break
current_timestamp = utils.get_monotonic_time()
# Look for available data on stdout
try:
if not process_poll.poll(1):
continue
except:
break
current_line = None
try:
current_line = process_logcat.stdout.readline().decode()
except:
continue
if not current_line:
break
# Log read line
if 'QualTestGSM' in current_line:
utils.log_verbose(current_line.strip())
# Reset event name
event = utils_event.CallEvent.parse_unified(current_line)
if event is None:
# This line is not event description
continue
if self.queue is not None:
utils.log_verbose(f'Logcat event: {event}')
self.queue.put_nowait(event)
return
def open(self):
if self.is_alive():
return
# Reset terminate flag
self.terminate_flag.value = 0
# Start worker process
self.start()
# self.worker_process = multiprocessing.Process(target=self.worker, args=(self))
return
def close(self):
if not self.is_alive():
return
self.terminate_flag.value = 1
self.join()
return