diff --git a/src/utils_cache.py b/src/utils_cache.py index 4ccc2af..735b36c 100644 --- a/src/utils_cache.py +++ b/src/utils_cache.py @@ -1,6 +1,6 @@ #!/usr/bin/python3 from pathlib import Path -from utils_qualtest import Phone, TaskList +from utils_types import Phone, TaskList import os import shutil import json diff --git a/src/utils_qualtest.py b/src/utils_qualtest.py index 334a541..5a28d14 100644 --- a/src/utils_qualtest.py +++ b/src/utils_qualtest.py @@ -18,9 +18,7 @@ from socket import timeout from crontab import CronTab from pathlib import Path from utils_cache import InfoCache - -start_system_time = time.time() -start_monotonic_time = time.monotonic() +from utils_types import Phone, TaskList # Error report produced by this function has to be updated with 'task_name' & 'phone_name' keys def build_error_report(endtime: int, reason: str): @@ -37,94 +35,6 @@ def build_error_report(endtime: int, reason: str): return r -class TaskList: - tasks: list = [] - - def __init__(self): - self.tasks = [] - - - # Merges incoming task list to existing one - # It preserves existing schedules - # New items are NOT scheduled automatically - def merge_with(self, tasklist) -> bool: - changed = False - if tasklist.tasks is None: - return True - - # Iterate all tasks, see if task with the same name exists already - # Copy all keys, but keep existing ones - for new_task in tasklist.tasks: - # Find if this task exists already - existing_task = self.find_task_by_name(new_task["name"]) - - # If task is found - copy all items to it. - # It is required as task can hold schedule items already - # Bad idea to copy tasks itself. - if existing_task is not None: - # Check if scheduled time point has to be removed (if cron string changed) - if new_task["schedule"] != existing_task["schedule"] and "scheduled_time" in existing_task: - del existing_task["scheduled_time"] - - - # Finally copy new values - for key, value in new_task.items(): - if existing_task[key] != value: - existing_task[key] = value - changed = True - else: - # Copy new task to list - self.tasks.extend([new_task]) - changed = True - - # Check if old tasks are here... And delete them - for existing_task in self.tasks: - new_task = self.find_task_by_name(existing_task["name"]) - if new_task is None: - self.tasks.remove(existing_task) - changed = True - - return changed - - - def schedule(self): - # Remove items without schedule before - self.tasks = [t for t in self.tasks if len(t['schedule']) > 0] - - # https://crontab.guru is good for crontab strings generation - # Use monotonic time source! - current_time = time.monotonic() - for task in self.tasks: - if 'scheduled_time' not in task and 'schedule' in task: - # No schedule flag, so time to schedule - try: - cron_string = task['schedule'].strip() - if cron_string == '* * * * *': - task['scheduled_time'] = time.monotonic() - 0.001 # To ensure further comparison will not be affected by precision errors - else: - cron = CronTab(task['schedule']) - task['scheduled_time'] = current_time + cron.next(default_utc=True) - - # Just to help in further log reading & debugging - show the scheduled time in readable form - task['scheduled_time_str'] = time.ctime(task['scheduled_time'] - start_monotonic_time + start_system_time) - except: - utils.log_error("Error", sys.exc_info()[0]) - - # Remove non scheduled items - self.tasks = [t for t in self.tasks if 'scheduled_time' in t] - - # Sort everything - self.tasks = sorted(self.tasks, key=lambda t: t["scheduled_time"]) - - - # Returns None if failed - def find_task_by_name(self, name): - for t in self.tasks: - if t["name"] == name: - return t - - return None - def ParseAttributes(t: str) -> dict: result: dict = dict() @@ -136,46 +46,6 @@ def ParseAttributes(t: str) -> dict: return result -class Phone: - identifier: int = 0 - name: str = "" - role: str = "" - attributes: dict = "" - audio_id: int = 0 - - def __init__(self): - self.identifier = 0 - self.name = "" - self.role = "" - self.attributes = dict() - self.audio_id = 0 - - def to_dict(self) -> dict: - return { - 'id': self.identifier, - 'name': self.name, - 'role': self.role, - 'attr': self.attributes, - 'audio_id': self.audio_id - } - - def make(d: dict): - r = Phone() - - r.identifier = d['id'] - r.name = d['name'] - r.role = d['role'] - if 'attr' in d: - r.attr = d['attr'] - else: - r.attr = None - - if 'audio_id' in d: - r.audio_id = d['audio_id'] - else: - r.audio_id = None - - return r class QualtestBackend: address: str diff --git a/src/utils_types.py b/src/utils_types.py new file mode 100644 index 0000000..49d8d65 --- /dev/null +++ b/src/utils_types.py @@ -0,0 +1,137 @@ +#!/usr/bin/python3 + +import time + +start_system_time = time.time() +start_monotonic_time = time.monotonic() + + +class Phone: + identifier: int = 0 + name: str = "" + role: str = "" + attributes: dict = "" + audio_id: int = 0 + + def __init__(self): + self.identifier = 0 + self.name = "" + self.role = "" + self.attributes = dict() + self.audio_id = 0 + + def to_dict(self) -> dict: + return { + 'id': self.identifier, + 'name': self.name, + 'role': self.role, + 'attr': self.attributes, + 'audio_id': self.audio_id + } + + def make(d: dict): + r = Phone() + + r.identifier = d['id'] + r.name = d['name'] + r.role = d['role'] + if 'attr' in d: + r.attr = d['attr'] + else: + r.attr = None + + if 'audio_id' in d: + r.audio_id = d['audio_id'] + else: + r.audio_id = None + + return r + + +class TaskList: + tasks: list = [] + + def __init__(self): + self.tasks = [] + + + # Merges incoming task list to existing one + # It preserves existing schedules + # New items are NOT scheduled automatically + def merge_with(self, tasklist) -> bool: + changed = False + if tasklist.tasks is None: + return True + + # Iterate all tasks, see if task with the same name exists already + # Copy all keys, but keep existing ones + for new_task in tasklist.tasks: + # Find if this task exists already + existing_task = self.find_task_by_name(new_task["name"]) + + # If task is found - copy all items to it. + # It is required as task can hold schedule items already + # Bad idea to copy tasks itself. + if existing_task is not None: + # Check if scheduled time point has to be removed (if cron string changed) + if new_task["schedule"] != existing_task["schedule"] and "scheduled_time" in existing_task: + del existing_task["scheduled_time"] + + + # Finally copy new values + for key, value in new_task.items(): + if existing_task[key] != value: + existing_task[key] = value + changed = True + else: + # Copy new task to list + self.tasks.extend([new_task]) + changed = True + + # Check if old tasks are here... And delete them + for existing_task in self.tasks: + new_task = self.find_task_by_name(existing_task["name"]) + if new_task is None: + self.tasks.remove(existing_task) + changed = True + + return changed + + + def schedule(self): + # Remove items without schedule before + self.tasks = [t for t in self.tasks if len(t['schedule']) > 0] + + # https://crontab.guru is good for crontab strings generation + # Use monotonic time source! + current_time = time.monotonic() + for task in self.tasks: + if 'scheduled_time' not in task and 'schedule' in task: + # No schedule flag, so time to schedule + try: + cron_string = task['schedule'].strip() + if cron_string == '* * * * *': + task['scheduled_time'] = time.monotonic() - 0.001 # To ensure further comparison will not be affected by precision errors + else: + cron = CronTab(task['schedule']) + task['scheduled_time'] = current_time + cron.next(default_utc=True) + + # Just to help in further log reading & debugging - show the scheduled time in readable form + task['scheduled_time_str'] = time.ctime(task['scheduled_time'] - start_monotonic_time + start_system_time) + except: + utils.log_error("Error", sys.exc_info()[0]) + + # Remove non scheduled items + self.tasks = [t for t in self.tasks if 'scheduled_time' in t] + + # Sort everything + self.tasks = sorted(self.tasks, key=lambda t: t["scheduled_time"]) + + + # Returns None if failed + def find_task_by_name(self, name): + for t in self.tasks: + if t["name"] == name: + return t + + return None