Package message_channel
Expand source code
from .channel import Channel, Reader
from .router import Predicator
__version__ = "0.0.0"
__all__ = [
"Channel",
"Reader",
"Predicator",
]
Sub-modules
message_channel.channel
message_channel.exceptions
message_channel.router
Classes
class Channel (reader: Callable[[], Awaitable[~T]], writer: Optional[Callable[[~T], Awaitable[NoneType]]] = None)
-
Channel distribute messages into sub-channels and itself
Expand source code
class Channel(Generic[T]): """Channel distribute messages into sub-channels and itself""" _router: Router[T] _messages: Queue[T] _listener: Optional[Task[None]] def __init__(self, reader: Reader[T], writer: Optional[Writer[T]] = None) -> None: self._reader = reader self._writer = writer self._closed = Event() self._router = Router() self._messages = Queue() self._listener = None async def __aenter__(self) -> "Channel[T]": self.open() return self async def __aexit__(self, *args: Any, **kwargs: Any) -> None: await self.close() def open(self) -> None: """Open the channel""" if self._listener: raise exceptions.ChannelAlreadyOpenedError("the channel is already opened") self._listener = asyncio.create_task(self._start_listener()) async def _start_listener(self) -> None: async def waiter_handler() -> None: await self._closed.wait() async def consumer_handler() -> None: router = self._router messages = self._messages while True: logger.debug(f"Receive message [pre ] ({id(self)})") m = await self._reader() logger.debug(f"Receive message [post] ({id(self)}): {str(m)}") if router.distribute(m): logger.debug(f"The message is distributed ({id(self)})") continue logger.debug(f"The message is queued ({id(self)})") # The message is not distributed so put in this channel messages.put_nowait(m) logger.debug(f"Open listener ({id(self)})") waiter = asyncio.create_task(waiter_handler()) consumer = asyncio.create_task(consumer_handler()) done, pending = await asyncio.wait( [waiter, consumer], return_when=asyncio.FIRST_COMPLETED, ) for task in pending: task.cancel() for task in done: task.result() logger.debug(f"Close listener ({id(self)})") async def recv(self) -> T: """Receive a message which is not distributed to subchannels It raises ChannelClosedError when the channel is closed and no residual messages exist in the internal message queue. """ if self._listener is None and self._messages.empty(): raise exceptions.ChannelClosedError( "the channel is closed and no residual message exist" ) loop = asyncio.get_event_loop() waiter: Future[T] = loop.create_future() def _release_waiter(f: Union[Future[T], Future[None]]) -> None: if waiter.done(): return elif (exc := f.exception()) : waiter.set_exception(exc) elif (res := f.result()) : waiter.set_result(res) else: waiter.set_exception( exceptions.ChannelClosedError("the channel is closed") ) if self._listener: self._listener.add_done_callback(_release_waiter) waiter.add_done_callback( partial(self._listener.remove_done_callback, _release_waiter) ) getter = asyncio.create_task(self._messages.get()) getter.add_done_callback(_release_waiter) try: return await waiter finally: getter.cancel() async def send(self, message: T) -> None: """Send a message through the writer It raises ChannelNoWriterError when no writer had specified to the channel constructor. """ if self._writer is None: raise exceptions.ChannelNoWriterError("the channel does not have writer") await self._writer(message) async def close(self) -> None: """Close the channel""" if self._listener is None: raise exceptions.ChannelClosedError("the channel is already closed") self._closed.set() await self._listener self._listener = None def split(self, predicator: Predicator[T]) -> "Subchannel[T]": """Split the channel by the predicator and return subchannel""" if self._listener is None: raise exceptions.ChannelClosedError("the channel is not opened yet") return Subchannel(self, predicator)
Ancestors
- typing.Generic
Subclasses
Methods
async def close(self) ‑> NoneType
-
Close the channel
Expand source code
async def close(self) -> None: """Close the channel""" if self._listener is None: raise exceptions.ChannelClosedError("the channel is already closed") self._closed.set() await self._listener self._listener = None
def open(self) ‑> NoneType
-
Open the channel
Expand source code
def open(self) -> None: """Open the channel""" if self._listener: raise exceptions.ChannelAlreadyOpenedError("the channel is already opened") self._listener = asyncio.create_task(self._start_listener())
async def recv(self) ‑> ~T
-
Receive a message which is not distributed to subchannels
It raises ChannelClosedError when the channel is closed and no residual messages exist in the internal message queue.
Expand source code
async def recv(self) -> T: """Receive a message which is not distributed to subchannels It raises ChannelClosedError when the channel is closed and no residual messages exist in the internal message queue. """ if self._listener is None and self._messages.empty(): raise exceptions.ChannelClosedError( "the channel is closed and no residual message exist" ) loop = asyncio.get_event_loop() waiter: Future[T] = loop.create_future() def _release_waiter(f: Union[Future[T], Future[None]]) -> None: if waiter.done(): return elif (exc := f.exception()) : waiter.set_exception(exc) elif (res := f.result()) : waiter.set_result(res) else: waiter.set_exception( exceptions.ChannelClosedError("the channel is closed") ) if self._listener: self._listener.add_done_callback(_release_waiter) waiter.add_done_callback( partial(self._listener.remove_done_callback, _release_waiter) ) getter = asyncio.create_task(self._messages.get()) getter.add_done_callback(_release_waiter) try: return await waiter finally: getter.cancel()
async def send(self, message: ~T) ‑> NoneType
-
Send a message through the writer
It raises ChannelNoWriterError when no writer had specified to the channel constructor.
Expand source code
async def send(self, message: T) -> None: """Send a message through the writer It raises ChannelNoWriterError when no writer had specified to the channel constructor. """ if self._writer is None: raise exceptions.ChannelNoWriterError("the channel does not have writer") await self._writer(message)
def split(self, predicator: Predicator[~T]) ‑> Subchannel[~T]
-
Split the channel by the predicator and return subchannel
Expand source code
def split(self, predicator: Predicator[T]) -> "Subchannel[T]": """Split the channel by the predicator and return subchannel""" if self._listener is None: raise exceptions.ChannelClosedError("the channel is not opened yet") return Subchannel(self, predicator)
class Predicator
-
Predicator
Expand source code
class Predicator(Generic[T]): """Predicator""" def __call__(self, message: T) -> bool: """Return True if the message is predicated""" ...
Ancestors
- typing.Generic