Module message_channel.router
Expand source code
from asyncio import Queue
from dataclasses import dataclass
from typing import Generic, List, TypeVar
T = TypeVar("T")
class Predicator(Generic[T]):
"""Predicator"""
def __call__(self, message: T) -> bool:
"""Return True if the message is predicated"""
...
@dataclass(frozen=True)
class Route(Generic[T]):
"""Route which combine predicator and predicated messages"""
messages: Queue[T]
predicator: Predicator[T]
class Router(Generic[T]):
"""Router distribute messages to registered routes by predicators"""
routes: List[Route[T]]
def __init__(self) -> None:
self.routes = []
def distribute(self, message: T) -> bool:
"""Distribute message to routes"""
for route in self.routes:
if route.predicator(message):
route.messages.put_nowait(message)
return True
return False
Classes
class Predicator
-
Predicator
Expand source code
class Predicator(Generic[T]): """Predicator""" def __call__(self, message: T) -> bool: """Return True if the message is predicated""" ...
Ancestors
- typing.Generic
class Route (messages: asyncio.queues.Queue, predicator: Predicator[~T])
-
Route which combine predicator and predicated messages
Expand source code
@dataclass(frozen=True) class Route(Generic[T]): """Route which combine predicator and predicated messages""" messages: Queue[T] predicator: Predicator[T]
Ancestors
- typing.Generic
Class variables
var messages : asyncio.queues.Queue
var predicator : Predicator[~T]
class Router
-
Router distribute messages to registered routes by predicators
Expand source code
class Router(Generic[T]): """Router distribute messages to registered routes by predicators""" routes: List[Route[T]] def __init__(self) -> None: self.routes = [] def distribute(self, message: T) -> bool: """Distribute message to routes""" for route in self.routes: if route.predicator(message): route.messages.put_nowait(message) return True return False
Ancestors
- typing.Generic
Class variables
var routes : List[Route[~T]]
Methods
def distribute(self, message: ~T) ‑> bool
-
Distribute message to routes
Expand source code
def distribute(self, message: T) -> bool: """Distribute message to routes""" for route in self.routes: if route.predicator(message): route.messages.put_nowait(message) return True return False