Package message_channel

Expand source code
from .channel import Channel, Reader
from .router import Predicator

__version__ = "0.0.0"


__all__ = [
    "Channel",
    "Reader",
    "Predicator",
]

Sub-modules

message_channel.channel
message_channel.exceptions
message_channel.router

Classes

class Channel (reader: Callable[[], Awaitable[~T]], writer: Optional[Callable[[~T], Awaitable[NoneType]]] = None)

Channel distribute messages into sub-channels and itself

Expand source code
class Channel(Generic[T]):
    """Channel distribute messages into sub-channels and itself"""

    _router: Router[T]
    _messages: Queue[T]
    _listener: Optional[Task[None]]

    def __init__(self, reader: Reader[T], writer: Optional[Writer[T]] = None) -> None:
        self._reader = reader
        self._writer = writer
        self._closed = Event()
        self._router = Router()
        self._messages = Queue()
        self._listener = None

    async def __aenter__(self) -> "Channel[T]":
        self.open()
        return self

    async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
        await self.close()

    def open(self) -> None:
        """Open the channel"""
        if self._listener:
            raise exceptions.ChannelAlreadyOpenedError("the channel is already opened")
        self._listener = asyncio.create_task(self._start_listener())

    async def _start_listener(self) -> None:
        async def waiter_handler() -> None:
            await self._closed.wait()

        async def consumer_handler() -> None:
            router = self._router
            messages = self._messages

            while True:
                logger.debug(f"Receive message [pre ] ({id(self)})")
                m = await self._reader()
                logger.debug(f"Receive message [post] ({id(self)}): {str(m)}")
                if router.distribute(m):
                    logger.debug(f"The message is distributed ({id(self)})")
                    continue
                logger.debug(f"The message is queued ({id(self)})")
                # The message is not distributed so put in this channel
                messages.put_nowait(m)

        logger.debug(f"Open listener ({id(self)})")
        waiter = asyncio.create_task(waiter_handler())
        consumer = asyncio.create_task(consumer_handler())
        done, pending = await asyncio.wait(
            [waiter, consumer],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()
        for task in done:
            task.result()
        logger.debug(f"Close listener ({id(self)})")

    async def recv(self) -> T:
        """Receive a message which is not distributed to subchannels

        It raises ChannelClosedError when the channel is closed and no
        residual messages exist in the internal message queue.
        """
        if self._listener is None and self._messages.empty():
            raise exceptions.ChannelClosedError(
                "the channel is closed and no residual message exist"
            )

        loop = asyncio.get_event_loop()
        waiter: Future[T] = loop.create_future()

        def _release_waiter(f: Union[Future[T], Future[None]]) -> None:
            if waiter.done():
                return
            elif (exc := f.exception()) :
                waiter.set_exception(exc)
            elif (res := f.result()) :
                waiter.set_result(res)
            else:
                waiter.set_exception(
                    exceptions.ChannelClosedError("the channel is closed")
                )

        if self._listener:
            self._listener.add_done_callback(_release_waiter)
            waiter.add_done_callback(
                partial(self._listener.remove_done_callback, _release_waiter)
            )

        getter = asyncio.create_task(self._messages.get())
        getter.add_done_callback(_release_waiter)
        try:
            return await waiter
        finally:
            getter.cancel()

    async def send(self, message: T) -> None:
        """Send a message through the writer

        It raises ChannelNoWriterError when no writer had specified to
        the channel constructor.
        """
        if self._writer is None:
            raise exceptions.ChannelNoWriterError("the channel does not have writer")
        await self._writer(message)

    async def close(self) -> None:
        """Close the channel"""
        if self._listener is None:
            raise exceptions.ChannelClosedError("the channel is already closed")
        self._closed.set()
        await self._listener
        self._listener = None

    def split(self, predicator: Predicator[T]) -> "Subchannel[T]":
        """Split the channel by the predicator and return subchannel"""
        if self._listener is None:
            raise exceptions.ChannelClosedError("the channel is not opened yet")
        return Subchannel(self, predicator)

Ancestors

  • typing.Generic

Subclasses

Methods

async def close(self) ‑> NoneType

Close the channel

Expand source code
async def close(self) -> None:
    """Close the channel"""
    if self._listener is None:
        raise exceptions.ChannelClosedError("the channel is already closed")
    self._closed.set()
    await self._listener
    self._listener = None
def open(self) ‑> NoneType

Open the channel

Expand source code
def open(self) -> None:
    """Open the channel"""
    if self._listener:
        raise exceptions.ChannelAlreadyOpenedError("the channel is already opened")
    self._listener = asyncio.create_task(self._start_listener())
async def recv(self) ‑> ~T

Receive a message which is not distributed to subchannels

It raises ChannelClosedError when the channel is closed and no residual messages exist in the internal message queue.

Expand source code
async def recv(self) -> T:
    """Receive a message which is not distributed to subchannels

    It raises ChannelClosedError when the channel is closed and no
    residual messages exist in the internal message queue.
    """
    if self._listener is None and self._messages.empty():
        raise exceptions.ChannelClosedError(
            "the channel is closed and no residual message exist"
        )

    loop = asyncio.get_event_loop()
    waiter: Future[T] = loop.create_future()

    def _release_waiter(f: Union[Future[T], Future[None]]) -> None:
        if waiter.done():
            return
        elif (exc := f.exception()) :
            waiter.set_exception(exc)
        elif (res := f.result()) :
            waiter.set_result(res)
        else:
            waiter.set_exception(
                exceptions.ChannelClosedError("the channel is closed")
            )

    if self._listener:
        self._listener.add_done_callback(_release_waiter)
        waiter.add_done_callback(
            partial(self._listener.remove_done_callback, _release_waiter)
        )

    getter = asyncio.create_task(self._messages.get())
    getter.add_done_callback(_release_waiter)
    try:
        return await waiter
    finally:
        getter.cancel()
async def send(self, message: ~T) ‑> NoneType

Send a message through the writer

It raises ChannelNoWriterError when no writer had specified to the channel constructor.

Expand source code
async def send(self, message: T) -> None:
    """Send a message through the writer

    It raises ChannelNoWriterError when no writer had specified to
    the channel constructor.
    """
    if self._writer is None:
        raise exceptions.ChannelNoWriterError("the channel does not have writer")
    await self._writer(message)
def split(self, predicator: Predicator[~T]) ‑> Subchannel[~T]

Split the channel by the predicator and return subchannel

Expand source code
def split(self, predicator: Predicator[T]) -> "Subchannel[T]":
    """Split the channel by the predicator and return subchannel"""
    if self._listener is None:
        raise exceptions.ChannelClosedError("the channel is not opened yet")
    return Subchannel(self, predicator)
class Predicator

Predicator

Expand source code
class Predicator(Generic[T]):
    """Predicator"""

    def __call__(self, message: T) -> bool:
        """Return True if the message is predicated"""
        ...

Ancestors

  • typing.Generic