Module controlpi_plugins.util

Provide utility plugins for all kinds of systems.

  • Log logs messages on stdout.
  • Init sends list of messages on startup and on demand.
  • Alias translates messages to an alias.
>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log",
...                   "filter": [{"sender": {"const": "Test Alias"}}]},
...      "Test Init": {"plugin": "Init",
...                    "messages": [{"id": 42, "content": "Test Message"}]},
...      "Test Alias": {"plugin": "Alias",
...                     "from": {"sender": {"const": "Test Init"},
...                              "id": {"const": 42}},
...                     "to": {"id": "translated"}}}, []))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Log', 'plugin': 'Log',
         'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Init', 'plugin': 'Init',
         'sends': [{'id': {'const': 42},
                    'content': {'const': 'Test Message'}}],
         'receives': [{'target': {'const': 'Test Init'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Alias', 'plugin': 'Alias',
         'sends': [{'id': {'const': 'translated'}}],
         'receives': [{'sender': {'const': 'Test Init'},
                       'id': {'const': 42}}]}
test(): {'sender': 'Test Init', 'id': 42,
         'content': 'Test Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
         'content': 'Test Message'}
Test Log: {'sender': 'Test Alias', 'id': 'translated',
           'content': 'Test Message'}
Expand source code
"""Provide utility plugins for all kinds of systems.

- Log logs messages on stdout.
- Init sends list of messages on startup and on demand.
- Alias translates messages to an alias.

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log",
...                   "filter": [{"sender": {"const": "Test Alias"}}]},
...      "Test Init": {"plugin": "Init",
...                    "messages": [{"id": 42, "content": "Test Message"}]},
...      "Test Alias": {"plugin": "Alias",
...                     "from": {"sender": {"const": "Test Init"},
...                              "id": {"const": 42}},
...                     "to": {"id": "translated"}}}, []))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Log', 'plugin': 'Log',
         'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Init', 'plugin': 'Init',
         'sends': [{'id': {'const': 42},
                    'content': {'const': 'Test Message'}}],
         'receives': [{'target': {'const': 'Test Init'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Alias', 'plugin': 'Alias',
         'sends': [{'id': {'const': 'translated'}}],
         'receives': [{'sender': {'const': 'Test Init'},
                       'id': {'const': 42}}]}
test(): {'sender': 'Test Init', 'id': 42,
         'content': 'Test Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
         'content': 'Test Message'}
Test Log: {'sender': 'Test Alias', 'id': 'translated',
           'content': 'Test Message'}
"""
import asyncio

from controlpi import BasePlugin, Message, MessageTemplate


class Log(BasePlugin):
    """Log messages on stdout.

    The "filter" configuration key gets a list of message templates defining
    the messages that should be logged by the plugin instance.

    In the following example the first and third message match the given
    template and are logged by the instance "Test Log", while the second
    message does not match and is only logged by the test, but not by the
    Log instance:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Log": {"plugin": "Log",
    ...                   "filter": [{"id": {"const": 42}}]}},
    ...     [{"id": 42, "message": "Test Message"},
    ...      {"id": 42.42, "message": "Second Message"},
    ...      {"id": 42, "message": "Third Message"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Log', 'plugin': 'Log',
             'sends': [], 'receives': [{'id': {'const': 42}}]}
    test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
    test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'}
    test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}

    The "filter" key is required:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Log": {"plugin": "Log"}}, []))
    'filter' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'filter': {'items': {'type': 'object'},
                                   'type': 'array'}},
         'required': ['filter']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Log'}
    Configuration for 'Test Log' is not valid.

    The "filter" key has to contain a list of message templates, i.e.,
    JSON objects:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Log": {"plugin": "Log",
    ...                   "filter": [42]}}, []))
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['filter']['items']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['filter'][0]:
        42
    Configuration for 'Test Log' is not valid.
    """

    CONF_SCHEMA = {'properties': {'filter': {'type': 'array',
                                             'items': {'type': 'object'}}},
                   'required': ['filter']}
    """Schema for Log plugin configuration.

    Required configuration key:

    - 'filter': list of message templates to be logged.
    """

    async def log(self, message: Message) -> None:
        """Log received message on stdout using own name as prefix."""
        print(f"{self.name}: {message}")

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)

    async def run(self) -> None:
        """Run no code proactively."""
        pass


class Init(BasePlugin):
    """Send list of messages on startup and on demand.

    The "messages" configuration key gets a list of messages to be sent on
    startup. The same list is sent in reaction to a message with
    "target": <name> and "command": "execute".

    In the example, the two configured messages are sent twice, once at
    startup and a second time in reaction to the "execute" command sent by
    the test:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Init": {"plugin": "Init",
    ...                    "messages": [{"id": 42,
    ...                                  "content": "Test Message"},
    ...                                 {"id": 42.42,
    ...                                  "content": "Second Message"}]}},
    ...     [{"target": "Test Init", "command": "execute"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Init', 'plugin': 'Init',
             'sends': [{'id': {'const': 42},
                        'content': {'const': 'Test Message'}},
                       {'id': {'const': 42.42},
                        'content': {'const': 'Second Message'}}],
             'receives': [{'target': {'const': 'Test Init'},
                           'command': {'const': 'execute'}}]}
    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
    test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'}
    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}

    The "messages" key is required:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Init": {"plugin": "Init"}}, []))
    'messages' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'messages': {'items': {'type': 'object'},
                                     'type': 'array'}},
         'required': ['messages']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Init'}
    Configuration for 'Test Init' is not valid.

    The "messages" key has to contain a list of (partial) messages, i.e.,
    JSON objects:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Init": {"plugin": "Init",
    ...                    "messages": [42]}}, []))
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['messages']['items']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['messages'][0]:
        42
    Configuration for 'Test Init' is not valid.
    """

    CONF_SCHEMA = {'properties': {'messages': {'type': 'array',
                                               'items': {'type': 'object'}}},
                   'required': ['messages']}
    """Schema for Init plugin configuration.

    Required configuration key:

    - 'messages': list of messages to be sent.
    """

    async def execute(self, message: Message) -> None:
        """Send configured messages."""
        for message in self.conf['messages']:
            await self.bus.send(Message(self.name, message))
            # Give immediate reactions to messages opportunity to happen:
            await asyncio.sleep(0)

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        receives = [MessageTemplate({'target': {'const': self.name},
                                     'command': {'const': 'execute'}})]
        sends = [MessageTemplate.from_message(message)
                 for message in self.conf['messages']]
        self.bus.register(self.name, 'Init', sends, receives, self.execute)

    async def run(self) -> None:
        """Send configured messages on startup."""
        for message in self.conf['messages']:
            await self.bus.send(Message(self.name, message))


class Execute(BasePlugin):
    """Send configurable list of messages on demand.

    An Execute plugin instance receives two kinds of commands.
    The "set messages" command has a "messages" key with a list of (partial)
    messages, which are sent by the Execute instance in reaction to an
    "execute" command.

    In the example, the first command sent by the test sets two messages,
    which are then sent in reaction to the second command sent by the test:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Execute": {"plugin": "Execute"}},
    ...     [{"target": "Test Execute", "command": "set messages",
    ...       "messages": [{"id": 42, "content": "Test Message"},
    ...                    {"id": 42.42, "content": "Second Message"}]},
    ...      {"target": "Test Execute", "command": "execute"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Execute', 'plugin': 'Execute',
             'sends': [{}],
             'receives': [{'target': {'const': 'Test Execute'},
                           'command': {'const': 'set messages'},
                           'messages': {'type': 'array',
                                        'items': {'type': 'object'}}},
                          {'target': {'const': 'Test Execute'},
                           'command': {'const': 'execute'}}]}
    test(): {'sender': 'test()', 'target': 'Test Execute',
             'command': 'set messages',
             'messages': [{'id': 42, 'content': 'Test Message'},
                          {'id': 42.42, 'content': 'Second Message'}]}
    test(): {'sender': 'test()', 'target': 'Test Execute',
             'command': 'execute'}
    test(): {'sender': 'Test Execute', 'id': 42,
             'content': 'Test Message'}
    test(): {'sender': 'Test Execute', 'id': 42.42,
             'content': 'Second Message'}
    """

    CONF_SCHEMA = True
    """Schema for Execute plugin configuration.

    There are no required or optional configuration keys.
    """

    async def execute(self, message: Message) -> None:
        """Set or send configured messages."""
        if message['command'] == 'set messages':
            assert isinstance(message['messages'], list)
            self.messages = list(message['messages'])
        elif message['command'] == 'execute':
            for message in self.messages:
                await self.bus.send(Message(self.name, message))
                # Give immediate reactions to messages opportunity to happen:
                await asyncio.sleep(0)

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        self.messages = []
        receives = [MessageTemplate({'target': {'const': self.name},
                                     'command': {'const': 'set messages'},
                                     'messages':
                                     {'type': 'array',
                                      'items': {'type': 'object'}}}),
                    MessageTemplate({'target': {'const': self.name},
                                     'command': {'const': 'execute'}})]
        sends = [MessageTemplate()]
        self.bus.register(self.name, 'Execute', sends, receives, self.execute)

    async def run(self) -> None:
        """Run no code proactively."""
        pass


class Alias(BasePlugin):
    """Translate messages to an alias.

    The "from" configuration key gets a message template and the
    configuration key "to" a (partial) message. All messages matching the
    template are received by the Alias instance and a message translated by
    removing the keys of the "from" template and adding the keys and values
    of the "to" message is sent. Keys that are neither in "from" nor in "to"
    are retained.

    In the example, the two messages sent by the test are translated by the
    Alias instance and the translated messages are sent by it preserving
    the "content" keys:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Alias": {"plugin": "Alias",
    ...                     "from": {"id": {"const": 42}},
    ...                     "to": {"id": "translated"}}},
    ...     [{"id": 42, "content": "Test Message"},
    ...      {"id": 42, "content": "Second Message"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Alias', 'plugin': 'Alias',
             'sends': [{'id': {'const': 'translated'}}],
             'receives': [{'id': {'const': 42}}]}
    test(): {'sender': 'test()', 'id': 42,
             'content': 'Test Message'}
    test(): {'sender': 'Test Alias', 'id': 'translated',
             'content': 'Test Message'}
    test(): {'sender': 'test()', 'id': 42,
             'content': 'Second Message'}
    test(): {'sender': 'Test Alias', 'id': 'translated',
             'content': 'Second Message'}

    The "from" and "to" keys are required:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Alias": {"plugin": "Alias"}}, []))
    'from' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
         'required': ['from', 'to']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Alias'}
    'to' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
         'required': ['from', 'to']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Alias'}
    Configuration for 'Test Alias' is not valid.

    The "from" key has to contain a message template and the "to" key a
    (partial) message, i.e., both have to be JSON objects:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Alias": {"plugin": "Alias",
    ...                     "from": 42,
    ...                     "to": 42}}, []))
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['from']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['from']:
        42
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['to']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['to']:
        42
    Configuration for 'Test Alias' is not valid.
    """

    CONF_SCHEMA = {'properties': {'from': {'type': 'object'},
                                  'to': {'type': 'object'}},
                   'required': ['from', 'to']}
    """Schema for Alias plugin configuration.

    Required configuration keys:

    - 'from': template of messages to be translated.
    - 'to': translated message to be sent.
    """

    async def alias(self, message: Message) -> None:
        """Translate and send message."""
        alias_message = Message(self.name)
        alias_message.update(self.conf['to'])
        for key in message:
            if key != 'sender' and key not in self.conf['from']:
                alias_message[key] = message[key]
        await self.bus.send(alias_message)

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        self.bus.register(self.name, 'Alias',
                          [MessageTemplate.from_message(self.conf['to'])],
                          [self.conf['from']],
                          self.alias)

    async def run(self) -> None:
        """Run no code proactively."""
        pass

Classes

class Log (bus: MessageBus, name: str, conf: Dict[str, Any])

Log messages on stdout.

The "filter" configuration key gets a list of message templates defining the messages that should be logged by the plugin instance.

In the following example the first and third message match the given template and are logged by the instance "Test Log", while the second message does not match and is only logged by the test, but not by the Log instance:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log",
...                   "filter": [{"id": {"const": 42}}]}},
...     [{"id": 42, "message": "Test Message"},
...      {"id": 42.42, "message": "Second Message"},
...      {"id": 42, "message": "Third Message"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Log', 'plugin': 'Log',
         'sends': [], 'receives': [{'id': {'const': 42}}]}
test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'}
test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}

The "filter" key is required:

>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log"}}, []))
'filter' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
    {'properties': {'filter': {'items': {'type': 'object'},
                               'type': 'array'}},
     'required': ['filter']}
<BLANKLINE>
On instance:
    {'plugin': 'Log'}
Configuration for 'Test Log' is not valid.

The "filter" key has to contain a list of message templates, i.e., JSON objects:

>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log",
...                   "filter": [42]}}, []))
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['filter']['items']:
    {'type': 'object'}
<BLANKLINE>
On instance['filter'][0]:
    42
Configuration for 'Test Log' is not valid.
Expand source code
class Log(BasePlugin):
    """Log messages on stdout.

    The "filter" configuration key gets a list of message templates defining
    the messages that should be logged by the plugin instance.

    In the following example the first and third message match the given
    template and are logged by the instance "Test Log", while the second
    message does not match and is only logged by the test, but not by the
    Log instance:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Log": {"plugin": "Log",
    ...                   "filter": [{"id": {"const": 42}}]}},
    ...     [{"id": 42, "message": "Test Message"},
    ...      {"id": 42.42, "message": "Second Message"},
    ...      {"id": 42, "message": "Third Message"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Log', 'plugin': 'Log',
             'sends': [], 'receives': [{'id': {'const': 42}}]}
    test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
    test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'}
    test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}

    The "filter" key is required:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Log": {"plugin": "Log"}}, []))
    'filter' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'filter': {'items': {'type': 'object'},
                                   'type': 'array'}},
         'required': ['filter']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Log'}
    Configuration for 'Test Log' is not valid.

    The "filter" key has to contain a list of message templates, i.e.,
    JSON objects:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Log": {"plugin": "Log",
    ...                   "filter": [42]}}, []))
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['filter']['items']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['filter'][0]:
        42
    Configuration for 'Test Log' is not valid.
    """

    CONF_SCHEMA = {'properties': {'filter': {'type': 'array',
                                             'items': {'type': 'object'}}},
                   'required': ['filter']}
    """Schema for Log plugin configuration.

    Required configuration key:

    - 'filter': list of message templates to be logged.
    """

    async def log(self, message: Message) -> None:
        """Log received message on stdout using own name as prefix."""
        print(f"{self.name}: {message}")

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)

    async def run(self) -> None:
        """Run no code proactively."""
        pass

Ancestors

Class variables

var CONF_SCHEMA

Schema for Log plugin configuration.

Required configuration key:

  • 'filter': list of message templates to be logged.

Methods

async def log(self, message: Message) ‑> NoneType

Log received message on stdout using own name as prefix.

Expand source code
async def log(self, message: Message) -> None:
    """Log received message on stdout using own name as prefix."""
    print(f"{self.name}: {message}")
def process_conf(self) ‑> NoneType

Register plugin as bus client.

Expand source code
def process_conf(self) -> None:
    """Register plugin as bus client."""
    self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)
async def run(self) ‑> NoneType

Run no code proactively.

Expand source code
async def run(self) -> None:
    """Run no code proactively."""
    pass
class Init (bus: MessageBus, name: str, conf: Dict[str, Any])

Send list of messages on startup and on demand.

The "messages" configuration key gets a list of messages to be sent on startup. The same list is sent in reaction to a message with "target": and "command": "execute".

In the example, the two configured messages are sent twice, once at startup and a second time in reaction to the "execute" command sent by the test:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Init": {"plugin": "Init",
...                    "messages": [{"id": 42,
...                                  "content": "Test Message"},
...                                 {"id": 42.42,
...                                  "content": "Second Message"}]}},
...     [{"target": "Test Init", "command": "execute"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Init', 'plugin': 'Init',
         'sends': [{'id': {'const': 42},
                    'content': {'const': 'Test Message'}},
                   {'id': {'const': 42.42},
                    'content': {'const': 'Second Message'}}],
         'receives': [{'target': {'const': 'Test Init'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'}
test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}

The "messages" key is required:

>>> asyncio.run(controlpi.test(
...     {"Test Init": {"plugin": "Init"}}, []))
'messages' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
    {'properties': {'messages': {'items': {'type': 'object'},
                                 'type': 'array'}},
     'required': ['messages']}
<BLANKLINE>
On instance:
    {'plugin': 'Init'}
Configuration for 'Test Init' is not valid.

The "messages" key has to contain a list of (partial) messages, i.e., JSON objects:

>>> asyncio.run(controlpi.test(
...     {"Test Init": {"plugin": "Init",
...                    "messages": [42]}}, []))
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['messages']['items']:
    {'type': 'object'}
<BLANKLINE>
On instance['messages'][0]:
    42
Configuration for 'Test Init' is not valid.
Expand source code
class Init(BasePlugin):
    """Send list of messages on startup and on demand.

    The "messages" configuration key gets a list of messages to be sent on
    startup. The same list is sent in reaction to a message with
    "target": <name> and "command": "execute".

    In the example, the two configured messages are sent twice, once at
    startup and a second time in reaction to the "execute" command sent by
    the test:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Init": {"plugin": "Init",
    ...                    "messages": [{"id": 42,
    ...                                  "content": "Test Message"},
    ...                                 {"id": 42.42,
    ...                                  "content": "Second Message"}]}},
    ...     [{"target": "Test Init", "command": "execute"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Init', 'plugin': 'Init',
             'sends': [{'id': {'const': 42},
                        'content': {'const': 'Test Message'}},
                       {'id': {'const': 42.42},
                        'content': {'const': 'Second Message'}}],
             'receives': [{'target': {'const': 'Test Init'},
                           'command': {'const': 'execute'}}]}
    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
    test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'}
    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}

    The "messages" key is required:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Init": {"plugin": "Init"}}, []))
    'messages' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'messages': {'items': {'type': 'object'},
                                     'type': 'array'}},
         'required': ['messages']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Init'}
    Configuration for 'Test Init' is not valid.

    The "messages" key has to contain a list of (partial) messages, i.e.,
    JSON objects:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Init": {"plugin": "Init",
    ...                    "messages": [42]}}, []))
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['messages']['items']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['messages'][0]:
        42
    Configuration for 'Test Init' is not valid.
    """

    CONF_SCHEMA = {'properties': {'messages': {'type': 'array',
                                               'items': {'type': 'object'}}},
                   'required': ['messages']}
    """Schema for Init plugin configuration.

    Required configuration key:

    - 'messages': list of messages to be sent.
    """

    async def execute(self, message: Message) -> None:
        """Send configured messages."""
        for message in self.conf['messages']:
            await self.bus.send(Message(self.name, message))
            # Give immediate reactions to messages opportunity to happen:
            await asyncio.sleep(0)

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        receives = [MessageTemplate({'target': {'const': self.name},
                                     'command': {'const': 'execute'}})]
        sends = [MessageTemplate.from_message(message)
                 for message in self.conf['messages']]
        self.bus.register(self.name, 'Init', sends, receives, self.execute)

    async def run(self) -> None:
        """Send configured messages on startup."""
        for message in self.conf['messages']:
            await self.bus.send(Message(self.name, message))

Ancestors

Class variables

var CONF_SCHEMA

Schema for Init plugin configuration.

Required configuration key:

  • 'messages': list of messages to be sent.

Methods

async def execute(self, message: Message) ‑> NoneType

Send configured messages.

Expand source code
async def execute(self, message: Message) -> None:
    """Send configured messages."""
    for message in self.conf['messages']:
        await self.bus.send(Message(self.name, message))
        # Give immediate reactions to messages opportunity to happen:
        await asyncio.sleep(0)
def process_conf(self) ‑> NoneType

Register plugin as bus client.

Expand source code
def process_conf(self) -> None:
    """Register plugin as bus client."""
    receives = [MessageTemplate({'target': {'const': self.name},
                                 'command': {'const': 'execute'}})]
    sends = [MessageTemplate.from_message(message)
             for message in self.conf['messages']]
    self.bus.register(self.name, 'Init', sends, receives, self.execute)
async def run(self) ‑> NoneType

Send configured messages on startup.

Expand source code
async def run(self) -> None:
    """Send configured messages on startup."""
    for message in self.conf['messages']:
        await self.bus.send(Message(self.name, message))
class Execute (bus: MessageBus, name: str, conf: Dict[str, Any])

Send configurable list of messages on demand.

An Execute plugin instance receives two kinds of commands. The "set messages" command has a "messages" key with a list of (partial) messages, which are sent by the Execute instance in reaction to an "execute" command.

In the example, the first command sent by the test sets two messages, which are then sent in reaction to the second command sent by the test:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Execute": {"plugin": "Execute"}},
...     [{"target": "Test Execute", "command": "set messages",
...       "messages": [{"id": 42, "content": "Test Message"},
...                    {"id": 42.42, "content": "Second Message"}]},
...      {"target": "Test Execute", "command": "execute"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Execute', 'plugin': 'Execute',
         'sends': [{}],
         'receives': [{'target': {'const': 'Test Execute'},
                       'command': {'const': 'set messages'},
                       'messages': {'type': 'array',
                                    'items': {'type': 'object'}}},
                      {'target': {'const': 'Test Execute'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': 'test()', 'target': 'Test Execute',
         'command': 'set messages',
         'messages': [{'id': 42, 'content': 'Test Message'},
                      {'id': 42.42, 'content': 'Second Message'}]}
test(): {'sender': 'test()', 'target': 'Test Execute',
         'command': 'execute'}
test(): {'sender': 'Test Execute', 'id': 42,
         'content': 'Test Message'}
test(): {'sender': 'Test Execute', 'id': 42.42,
         'content': 'Second Message'}
Expand source code
class Execute(BasePlugin):
    """Send configurable list of messages on demand.

    An Execute plugin instance receives two kinds of commands.
    The "set messages" command has a "messages" key with a list of (partial)
    messages, which are sent by the Execute instance in reaction to an
    "execute" command.

    In the example, the first command sent by the test sets two messages,
    which are then sent in reaction to the second command sent by the test:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Execute": {"plugin": "Execute"}},
    ...     [{"target": "Test Execute", "command": "set messages",
    ...       "messages": [{"id": 42, "content": "Test Message"},
    ...                    {"id": 42.42, "content": "Second Message"}]},
    ...      {"target": "Test Execute", "command": "execute"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Execute', 'plugin': 'Execute',
             'sends': [{}],
             'receives': [{'target': {'const': 'Test Execute'},
                           'command': {'const': 'set messages'},
                           'messages': {'type': 'array',
                                        'items': {'type': 'object'}}},
                          {'target': {'const': 'Test Execute'},
                           'command': {'const': 'execute'}}]}
    test(): {'sender': 'test()', 'target': 'Test Execute',
             'command': 'set messages',
             'messages': [{'id': 42, 'content': 'Test Message'},
                          {'id': 42.42, 'content': 'Second Message'}]}
    test(): {'sender': 'test()', 'target': 'Test Execute',
             'command': 'execute'}
    test(): {'sender': 'Test Execute', 'id': 42,
             'content': 'Test Message'}
    test(): {'sender': 'Test Execute', 'id': 42.42,
             'content': 'Second Message'}
    """

    CONF_SCHEMA = True
    """Schema for Execute plugin configuration.

    There are no required or optional configuration keys.
    """

    async def execute(self, message: Message) -> None:
        """Set or send configured messages."""
        if message['command'] == 'set messages':
            assert isinstance(message['messages'], list)
            self.messages = list(message['messages'])
        elif message['command'] == 'execute':
            for message in self.messages:
                await self.bus.send(Message(self.name, message))
                # Give immediate reactions to messages opportunity to happen:
                await asyncio.sleep(0)

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        self.messages = []
        receives = [MessageTemplate({'target': {'const': self.name},
                                     'command': {'const': 'set messages'},
                                     'messages':
                                     {'type': 'array',
                                      'items': {'type': 'object'}}}),
                    MessageTemplate({'target': {'const': self.name},
                                     'command': {'const': 'execute'}})]
        sends = [MessageTemplate()]
        self.bus.register(self.name, 'Execute', sends, receives, self.execute)

    async def run(self) -> None:
        """Run no code proactively."""
        pass

Ancestors

Class variables

var CONF_SCHEMA

Schema for Execute plugin configuration.

There are no required or optional configuration keys.

Methods

async def execute(self, message: Message) ‑> NoneType

Set or send configured messages.

Expand source code
async def execute(self, message: Message) -> None:
    """Set or send configured messages."""
    if message['command'] == 'set messages':
        assert isinstance(message['messages'], list)
        self.messages = list(message['messages'])
    elif message['command'] == 'execute':
        for message in self.messages:
            await self.bus.send(Message(self.name, message))
            # Give immediate reactions to messages opportunity to happen:
            await asyncio.sleep(0)
def process_conf(self) ‑> NoneType

Register plugin as bus client.

Expand source code
def process_conf(self) -> None:
    """Register plugin as bus client."""
    self.messages = []
    receives = [MessageTemplate({'target': {'const': self.name},
                                 'command': {'const': 'set messages'},
                                 'messages':
                                 {'type': 'array',
                                  'items': {'type': 'object'}}}),
                MessageTemplate({'target': {'const': self.name},
                                 'command': {'const': 'execute'}})]
    sends = [MessageTemplate()]
    self.bus.register(self.name, 'Execute', sends, receives, self.execute)
async def run(self) ‑> NoneType

Run no code proactively.

Expand source code
async def run(self) -> None:
    """Run no code proactively."""
    pass
class Alias (bus: MessageBus, name: str, conf: Dict[str, Any])

Translate messages to an alias.

The "from" configuration key gets a message template and the configuration key "to" a (partial) message. All messages matching the template are received by the Alias instance and a message translated by removing the keys of the "from" template and adding the keys and values of the "to" message is sent. Keys that are neither in "from" nor in "to" are retained.

In the example, the two messages sent by the test are translated by the Alias instance and the translated messages are sent by it preserving the "content" keys:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias",
...                     "from": {"id": {"const": 42}},
...                     "to": {"id": "translated"}}},
...     [{"id": 42, "content": "Test Message"},
...      {"id": 42, "content": "Second Message"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Alias', 'plugin': 'Alias',
         'sends': [{'id': {'const': 'translated'}}],
         'receives': [{'id': {'const': 42}}]}
test(): {'sender': 'test()', 'id': 42,
         'content': 'Test Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
         'content': 'Test Message'}
test(): {'sender': 'test()', 'id': 42,
         'content': 'Second Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
         'content': 'Second Message'}

The "from" and "to" keys are required:

>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias"}}, []))
'from' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
    {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
     'required': ['from', 'to']}
<BLANKLINE>
On instance:
    {'plugin': 'Alias'}
'to' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
    {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
     'required': ['from', 'to']}
<BLANKLINE>
On instance:
    {'plugin': 'Alias'}
Configuration for 'Test Alias' is not valid.

The "from" key has to contain a message template and the "to" key a (partial) message, i.e., both have to be JSON objects:

>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias",
...                     "from": 42,
...                     "to": 42}}, []))
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['from']:
    {'type': 'object'}
<BLANKLINE>
On instance['from']:
    42
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['to']:
    {'type': 'object'}
<BLANKLINE>
On instance['to']:
    42
Configuration for 'Test Alias' is not valid.
Expand source code
class Alias(BasePlugin):
    """Translate messages to an alias.

    The "from" configuration key gets a message template and the
    configuration key "to" a (partial) message. All messages matching the
    template are received by the Alias instance and a message translated by
    removing the keys of the "from" template and adding the keys and values
    of the "to" message is sent. Keys that are neither in "from" nor in "to"
    are retained.

    In the example, the two messages sent by the test are translated by the
    Alias instance and the translated messages are sent by it preserving
    the "content" keys:
    >>> import controlpi
    >>> asyncio.run(controlpi.test(
    ...     {"Test Alias": {"plugin": "Alias",
    ...                     "from": {"id": {"const": 42}},
    ...                     "to": {"id": "translated"}}},
    ...     [{"id": 42, "content": "Test Message"},
    ...      {"id": 42, "content": "Second Message"}]))
    ... # doctest: +NORMALIZE_WHITESPACE
    test(): {'sender': '', 'event': 'registered',
             'client': 'Test Alias', 'plugin': 'Alias',
             'sends': [{'id': {'const': 'translated'}}],
             'receives': [{'id': {'const': 42}}]}
    test(): {'sender': 'test()', 'id': 42,
             'content': 'Test Message'}
    test(): {'sender': 'Test Alias', 'id': 'translated',
             'content': 'Test Message'}
    test(): {'sender': 'test()', 'id': 42,
             'content': 'Second Message'}
    test(): {'sender': 'Test Alias', 'id': 'translated',
             'content': 'Second Message'}

    The "from" and "to" keys are required:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Alias": {"plugin": "Alias"}}, []))
    'from' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
         'required': ['from', 'to']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Alias'}
    'to' is a required property
    <BLANKLINE>
    Failed validating 'required' in schema:
        {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
         'required': ['from', 'to']}
    <BLANKLINE>
    On instance:
        {'plugin': 'Alias'}
    Configuration for 'Test Alias' is not valid.

    The "from" key has to contain a message template and the "to" key a
    (partial) message, i.e., both have to be JSON objects:
    >>> asyncio.run(controlpi.test(
    ...     {"Test Alias": {"plugin": "Alias",
    ...                     "from": 42,
    ...                     "to": 42}}, []))
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['from']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['from']:
        42
    42 is not of type 'object'
    <BLANKLINE>
    Failed validating 'type' in schema['properties']['to']:
        {'type': 'object'}
    <BLANKLINE>
    On instance['to']:
        42
    Configuration for 'Test Alias' is not valid.
    """

    CONF_SCHEMA = {'properties': {'from': {'type': 'object'},
                                  'to': {'type': 'object'}},
                   'required': ['from', 'to']}
    """Schema for Alias plugin configuration.

    Required configuration keys:

    - 'from': template of messages to be translated.
    - 'to': translated message to be sent.
    """

    async def alias(self, message: Message) -> None:
        """Translate and send message."""
        alias_message = Message(self.name)
        alias_message.update(self.conf['to'])
        for key in message:
            if key != 'sender' and key not in self.conf['from']:
                alias_message[key] = message[key]
        await self.bus.send(alias_message)

    def process_conf(self) -> None:
        """Register plugin as bus client."""
        self.bus.register(self.name, 'Alias',
                          [MessageTemplate.from_message(self.conf['to'])],
                          [self.conf['from']],
                          self.alias)

    async def run(self) -> None:
        """Run no code proactively."""
        pass

Ancestors

Class variables

var CONF_SCHEMA

Schema for Alias plugin configuration.

Required configuration keys:

  • 'from': template of messages to be translated.
  • 'to': translated message to be sent.

Methods

async def alias(self, message: Message) ‑> NoneType

Translate and send message.

Expand source code
async def alias(self, message: Message) -> None:
    """Translate and send message."""
    alias_message = Message(self.name)
    alias_message.update(self.conf['to'])
    for key in message:
        if key != 'sender' and key not in self.conf['from']:
            alias_message[key] = message[key]
    await self.bus.send(alias_message)
def process_conf(self) ‑> NoneType

Register plugin as bus client.

Expand source code
def process_conf(self) -> None:
    """Register plugin as bus client."""
    self.bus.register(self.name, 'Alias',
                      [MessageTemplate.from_message(self.conf['to'])],
                      [self.conf['from']],
                      self.alias)
async def run(self) ‑> NoneType

Run no code proactively.

Expand source code
async def run(self) -> None:
    """Run no code proactively."""
    pass