Coverage for src/lib/task.py: 100%
26 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-09 17:37 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-09 17:37 +0000
2from typing import Callable
3import datetime as dt
6class Task():
7 _execfunc: Callable
8 _interval: dt.timedelta
9 _last_run: dt.datetime
10 is_one_shot: bool
12 def __init__(self, execfunc, interval: dt.timedelta = None, one_shot: bool = False):
13 self._execfunc = execfunc
14 self._interval = interval
15 self._last_run = None
16 self.is_one_shot = one_shot
18 if self._interval is not None and one_shot:
19 self._last_run = dt.datetime.now(dt.UTC)
21 def __str__(self): # pragma: no cover
22 return 'Task({})'.format(self._execfunc)
24 def __repr__(self): # pragma: no cover
25 return self.__str__()
27 def run(self) -> bool:
28 should_run = False
29 if self._last_run is None:
30 should_run = True
31 else:
32 diff = dt.datetime.now(dt.UTC) - self._last_run
33 if diff > self._interval:
34 should_run = True
36 if should_run:
37 exec_res = self._execfunc()
38 self._last_run = dt.datetime.now(dt.UTC)
39 return exec_res
41 return False