Coverage for src/lib/task.py: 100%

26 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2025-03-09 17:37 +0000

1 

2from typing import Callable 

3import datetime as dt 

4 

5 

6class Task(): 

7 _execfunc: Callable 

8 _interval: dt.timedelta 

9 _last_run: dt.datetime 

10 is_one_shot: bool 

11 

12 def __init__(self, execfunc, interval: dt.timedelta = None, one_shot: bool = False): 

13 self._execfunc = execfunc 

14 self._interval = interval 

15 self._last_run = None 

16 self.is_one_shot = one_shot 

17 

18 if self._interval is not None and one_shot: 

19 self._last_run = dt.datetime.now(dt.UTC) 

20 

21 def __str__(self): # pragma: no cover 

22 return 'Task({})'.format(self._execfunc) 

23 

24 def __repr__(self): # pragma: no cover 

25 return self.__str__() 

26 

27 def run(self) -> bool: 

28 should_run = False 

29 if self._last_run is None: 

30 should_run = True 

31 else: 

32 diff = dt.datetime.now(dt.UTC) - self._last_run 

33 if diff > self._interval: 

34 should_run = True 

35 

36 if should_run: 

37 exec_res = self._execfunc() 

38 self._last_run = dt.datetime.now(dt.UTC) 

39 return exec_res 

40 

41 return False