Module message_channel.router

Expand source code
from asyncio import Queue
from dataclasses import dataclass
from typing import Generic, List, TypeVar

T = TypeVar("T")


class Predicator(Generic[T]):
    """Predicator"""

    def __call__(self, message: T) -> bool:
        """Return True if the message is predicated"""
        ...


@dataclass(frozen=True)
class Route(Generic[T]):
    """Route which combine predicator and predicated messages"""

    messages: Queue[T]
    predicator: Predicator[T]


class Router(Generic[T]):
    """Router distribute messages to registered routes by predicators"""

    routes: List[Route[T]]

    def __init__(self) -> None:
        self.routes = []

    def distribute(self, message: T) -> bool:
        """Distribute message to routes"""
        for route in self.routes:
            if route.predicator(message):
                route.messages.put_nowait(message)
                return True
        return False

Classes

class Predicator

Predicator

Expand source code
class Predicator(Generic[T]):
    """Predicator"""

    def __call__(self, message: T) -> bool:
        """Return True if the message is predicated"""
        ...

Ancestors

  • typing.Generic
class Route (messages: asyncio.queues.Queue, predicator: Predicator[~T])

Route which combine predicator and predicated messages

Expand source code
@dataclass(frozen=True)
class Route(Generic[T]):
    """Route which combine predicator and predicated messages"""

    messages: Queue[T]
    predicator: Predicator[T]

Ancestors

  • typing.Generic

Class variables

var messages : asyncio.queues.Queue
var predicatorPredicator[~T]
class Router

Router distribute messages to registered routes by predicators

Expand source code
class Router(Generic[T]):
    """Router distribute messages to registered routes by predicators"""

    routes: List[Route[T]]

    def __init__(self) -> None:
        self.routes = []

    def distribute(self, message: T) -> bool:
        """Distribute message to routes"""
        for route in self.routes:
            if route.predicator(message):
                route.messages.put_nowait(message)
                return True
        return False

Ancestors

  • typing.Generic

Class variables

var routes : List[Route[~T]]

Methods

def distribute(self, message: ~T) ‑> bool

Distribute message to routes

Expand source code
def distribute(self, message: T) -> bool:
    """Distribute message to routes"""
    for route in self.routes:
        if route.predicator(message):
            route.messages.put_nowait(message)
            return True
    return False