t | import asyncio | t | import asyncio |
| from collections import deque | | from collections import deque |
| | | |
| class FilterQueue(asyncio.Queue): | | class FilterQueue(asyncio.Queue): |
| | | |
| def __init__(self, maxsize=0): | | def __init__(self, maxsize=0): |
| super().__init__(maxsize) | | super().__init__(maxsize) |
| self._window = None | | self._window = None |
| | | |
| def __contains__(self, filter_func): | | def __contains__(self, filter_func): |
| return any((filter_func(item) for item in self._queue)) | | return any((filter_func(item) for item in self._queue)) |
| | | |
| @property | | @property |
| def window(self): | | def window(self): |
| return self._queue[0] if not self.empty() else None | | return self._queue[0] if not self.empty() else None |
| | | |
| def later(self): | | def later(self): |
| if self.empty(): | | if self.empty(): |
| raise asyncio.QueueEmpty | | raise asyncio.QueueEmpty |
| item = self._queue.popleft() | | item = self._queue.popleft() |
| self._queue.append(item) | | self._queue.append(item) |
| | | |
| async def get(self, filter_func=None): | | async def get(self, filter_func=None): |
| if filter_func is None: | | if filter_func is None: |
| return await super().get() | | return await super().get() |
| for _ in range(len(self._queue)): | | for _ in range(len(self._queue)): |
| item = self._queue[0] | | item = self._queue[0] |
| if filter_func(item): | | if filter_func(item): |
| return await super().get() | | return await super().get() |
| self.later() | | self.later() |
| return await super().get() | | return await super().get() |