clash-telegram-bot/utils/timer.py

61 lines
1.5 KiB
Python
Raw Normal View History

2022-03-18 22:25:50 +03:00
import asyncio
class KickTimer:
def __init__(self, timeout, callback, args):
self._timeout = timeout
self._callback = callback
self._args = args
self._task = None
async def _job(self):
await asyncio.sleep(self._timeout)
await self._callback(*self._args)
await asyncio.sleep(5)
self._task.cancel()
async def _run_now(self):
await self._callback(*self._args)
await asyncio.sleep(5)
self._task.cancel()
def start(self):
self._task = asyncio.ensure_future(self._job())
def execute_now(self):
self._task = asyncio.ensure_future(self._run_now())
def cancel(self):
self._task.cancel()
class TimerManager:
def __init__(self):
self.timers = {}
def add_timer(self, user_id: int, message, timer: KickTimer):
if user_id not in self.timers:
timer.start()
self.timers[user_id] = [timer, message]
def cancel_timer(self, user_id):
if user_id in self.timers:
self.timers[user_id][0].cancel()
message = self.timers[user_id][1]
del self.timers[user_id]
print(f"Timer for user {user_id} canceled")
return message
def run_now(self, user_id):
if user_id in self.timers:
self.timers[user_id][0].execute_now()
message = self.timers[user_id][1]
del self.timers[user_id]
return message
timer_manager = TimerManager()