Coverage for src/lib/scheduler.py: 52%

50 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-03-09 17:37 +0000

1 

2import datetime as dt 

3from logging import getLogger 

4from lib.task import Task 

5from asyncio import sleep as asleep 

6 

7 

8class Scheduler(): 

9 _running: bool 

10 _tasks: list 

11 MAX_TIME = 0.2 

12 SLEEP_TIME = 0.3 

13 IDLE_MULTIPLIER = 1.5 

14 MAX_SLEEP_TIME = 10 

15 

16 def __init__(self): 

17 self._running = False 

18 self._tasks = [] 

19 

20 self._logger = getLogger('app.scheduler') 

21 self._logger.info('init()') 

22 

23 def __del__(self): 

24 self._logger.info('__del__()') 

25 

26 def add_task(self, execfunc, interval: dt.timedelta = None, one_shot: bool = False): 

27 self._logger.debug('add_task(%s, %s)', execfunc, interval) 

28 task = Task(execfunc, interval, one_shot) 

29 self._tasks.append(task) 

30 

31 async def run(self, max_cycles: int = None): 

32 self._logger.info('run()') 

33 self._running = True 

34 _sleep_time = self.SLEEP_TIME 

35 

36 _cycle = 0 

37 while (self._running and max_cycles is None) or (max_cycles is not None and _cycle < max_cycles): 

38 _start = dt.datetime.now(dt.UTC) 

39 

40 tasks_running = 0 

41 for task in self._tasks: 

42 # self._logger.debug('run task %s', task) 

43 was_running = task.run() 

44 if was_running: 

45 tasks_running += 1 

46 if task.is_one_shot: 

47 # self._logger.debug('removing one shot task') 

48 self._tasks.remove(task) 

49 

50 _diff = dt.datetime.now(dt.UTC) - _start 

51 if _diff > dt.timedelta(seconds=self.MAX_TIME): 

52 # self._logger.debug('run() exceeded max time') 

53 break 

54 

55 if tasks_running == 0: 

56 _sleep_time = _sleep_time * self.IDLE_MULTIPLIER 

57 if _sleep_time > self.MAX_SLEEP_TIME: 

58 _sleep_time = self.MAX_SLEEP_TIME 

59 # self._logger.debug('_sleep_time', _sleep_time) 

60 else: 

61 _sleep_time = self.SLEEP_TIME 

62 

63 #self._logger.debug('sleeping: {}'.format(_sleep_time)) 

64 await asleep(_sleep_time) 

65 

66 _cycle += 1 

67 

68 self._logger.info('run() finished after %s cycles', _cycle) 

69 

70 def shutdown(self, reason: str = None): 

71 self._logger.info('shutdown: %s', reason) 

72 self._running = False