controlpi

Provide the infrastructure for the ControlPi system.

The infrastructure consists of the message bus from module messagebus, the plugin registry from module pluginregistry and the abstract base plugin from module baseplugin.

The package combines them in its run function, which is used by __main__.py to run a ControlPi system based on a configuration file indefinitely.

The test function is a utility function to test plugins with minimal boilerplate code.

  1"""Provide the infrastructure for the ControlPi system.
  2
  3The infrastructure consists of the message bus from module messagebus, the
  4plugin registry from module pluginregistry and the abstract base plugin from
  5module baseplugin.
  6
  7The package combines them in its run function, which is used by __main__.py
  8to run a ControlPi system based on a configuration file indefinitely.
  9
 10The test function is a utility function to test plugins with minimal
 11boilerplate code.
 12"""
 13
 14import asyncio
 15import fastjsonschema
 16
 17from controlpi.messagebus import MessageBus, Message, MessageTemplate
 18from controlpi.pluginregistry import PluginRegistry
 19from controlpi.baseplugin import BasePlugin, PluginConf, ConfException
 20
 21from typing import Dict, List, Coroutine, Any
 22
 23
 24CONF_SCHEMA = {"type": "object", "patternProperties": {".*": {"type": "object"}}}
 25
 26
 27def _process_conf(
 28    message_bus: MessageBus, conf: Dict[str, PluginConf]
 29) -> List[Coroutine]:
 30    try:
 31        conf = fastjsonschema.validate(CONF_SCHEMA, conf)
 32    except fastjsonschema.JsonSchemaException as e:
 33        print(f"Configuration not valid:\n{e}")
 34        return []
 35    plugins = PluginRegistry("controlpi_plugins", BasePlugin)
 36    coroutines = [message_bus.run()]
 37    for instance_name in conf:
 38        instance_conf = conf[instance_name]
 39        if "plugin" not in instance_conf:
 40            print(f"No plugin implementation specified for instance '{instance_name}'.")
 41            continue
 42        plugin_name = instance_conf["plugin"]
 43        if plugin_name not in plugins:
 44            print(
 45                f"No implementation found for plugin '{plugin_name}'"
 46                f" (specified for instance '{instance_name}')."
 47            )
 48            continue
 49        plugin = plugins[plugin_name]
 50        try:
 51            instance = plugin(message_bus, instance_name, instance_conf)
 52            coroutines.append(instance.run())
 53        except ConfException as e:
 54            print(e)
 55            continue
 56    return coroutines
 57
 58
 59async def run(conf: Dict[str, PluginConf]) -> None:
 60    """Run the ControlPi system based on a configuration.
 61
 62    Setup message bus, process given configuration, and run message bus and
 63    plugins concurrently and indefinitely.
 64
 65    This function is mainly used by __main__.py to run a ControlPi system
 66    based on a configuration loaded from a configuration JSON file on disk.
 67
 68    >>> async def test_coroutine():
 69    ...     conf = {"Example Init":
 70    ...             {"plugin": "Init",
 71    ...              "messages": [{"id": 42,
 72    ...                            "content": "Test Message"},
 73    ...                           {"id": 42.42,
 74    ...                            "content": "Second Message"}]},
 75    ...             "Example Log":
 76    ...             {"plugin": "Log",
 77    ...              "filter": [{"sender": {"const": "Example Init"}}]}}
 78    ...     run_task = asyncio.create_task(run(conf))
 79    ...     await asyncio.sleep(0.1)
 80    ...     run_task.cancel()
 81    ...     try:
 82    ...         await run_task
 83    ...     except asyncio.exceptions.CancelledError:
 84    ...         pass
 85    >>> asyncio.run(test_coroutine())  # doctest: +NORMALIZE_WHITESPACE
 86    Example Log: {'sender': 'Example Init',
 87                  'id': 42, 'content': 'Test Message'}
 88    Example Log: {'sender': 'Example Init',
 89                  'id': 42.42, 'content': 'Second Message'}
 90    """
 91    message_bus = MessageBus()
 92    coroutines = _process_conf(message_bus, conf)
 93    try:
 94        await asyncio.gather(*coroutines)
 95    except asyncio.CancelledError:
 96        pass
 97
 98
 99async def test(
100    conf: Dict[str, PluginConf], messages: List[Dict[str, Any]], wait: float = 0.0
101) -> None:
102    """Test configuration of ControlPi system.
103
104    Setup message bus, process given configuration, run message bus and
105    plugins concurrently, send given messages on message bus and print all
106    messages on message bus. Terminate when queue of message bus is empty.
107
108    This function allows to test single plugins or small plugin
109    configurations with minimal boilerplate code:
110    >>> asyncio.run(test(
111    ...     {"Example Init": {"plugin": "Init",
112    ...                       "messages": [{"id": 42,
113    ...                                     "content": "Test Message"},
114    ...                                    {"id": 42.42,
115    ...                                     "content": "Second Message"}]}},
116    ...     [{"target": "Example Init",
117    ...       "command": "execute"}]))  # doctest: +NORMALIZE_WHITESPACE
118    test(): {'sender': '', 'event': 'registered',
119             'client': 'Example Init', 'plugin': 'Init',
120             'sends': [{'id': {'const': 42},
121                        'content': {'const': 'Test Message'}},
122                       {'id': {'const': 42.42},
123                        'content': {'const': 'Second Message'}}],
124             'receives': [{'target': {'const': 'Example Init'},
125                           'command': {'const': 'execute'}}]}
126    test(): {'sender': 'Example Init',
127             'id': 42, 'content': 'Test Message'}
128    test(): {'sender': 'Example Init',
129             'id': 42.42, 'content': 'Second Message'}
130    test(): {'sender': 'test()', 'target': 'Example Init',
131             'command': 'execute'}
132    test(): {'sender': 'Example Init',
133             'id': 42, 'content': 'Test Message'}
134    test(): {'sender': 'Example Init',
135             'id': 42.42, 'content': 'Second Message'}
136
137    Similar functionality could be reached by using the Log and Init plugins
138    to print messages and send some messages on the bus, but these would
139    clutter the test configuration and code to stop the indefinitely running
140    bus would have to be added to each and every test.
141
142    Incorrect plugin configurations can also be tested by this:
143    >>> asyncio.run(test(
144    ...     {"Example Init": {"plugin": "Init"}}, []))
145    data must contain ['messages'] properties
146    Configuration for 'Example Init' is not valid.
147    """
148    message_bus = MessageBus()
149
150    async def log(message):
151        if (
152            "sender" in message
153            and message["sender"] == ""
154            and "event" in message
155            and message["event"] == "registered"
156            and "client" in message
157            and message["client"] == "test()"
158        ):
159            # Do not log own registration of 'test()':
160            return
161        print(f"test(): {message}")
162
163    message_bus.register(
164        "test()", "Test", [MessageTemplate()], [([MessageTemplate()], log)]
165    )
166
167    coroutines = _process_conf(message_bus, conf)
168    background_tasks = set()
169    for coroutine in coroutines:
170        task = asyncio.create_task(coroutine)
171        background_tasks.add(task)
172        task.add_done_callback(background_tasks.discard)
173        # Give the created task opportunity to run:
174        await asyncio.sleep(0)
175    for message in messages:
176        await message_bus.send(Message("test()", message))
177        # Give immediate reactions to messages opportunity to happen:
178        await asyncio.sleep(0)
179    await asyncio.sleep(wait)
180    await message_bus._queue.join()
CONF_SCHEMA = {'type': 'object', 'patternProperties': {'.*': {'type': 'object'}}}
async def run(conf: Dict[str, Dict[str, Any]]) -> None:
60async def run(conf: Dict[str, PluginConf]) -> None:
61    """Run the ControlPi system based on a configuration.
62
63    Setup message bus, process given configuration, and run message bus and
64    plugins concurrently and indefinitely.
65
66    This function is mainly used by __main__.py to run a ControlPi system
67    based on a configuration loaded from a configuration JSON file on disk.
68
69    >>> async def test_coroutine():
70    ...     conf = {"Example Init":
71    ...             {"plugin": "Init",
72    ...              "messages": [{"id": 42,
73    ...                            "content": "Test Message"},
74    ...                           {"id": 42.42,
75    ...                            "content": "Second Message"}]},
76    ...             "Example Log":
77    ...             {"plugin": "Log",
78    ...              "filter": [{"sender": {"const": "Example Init"}}]}}
79    ...     run_task = asyncio.create_task(run(conf))
80    ...     await asyncio.sleep(0.1)
81    ...     run_task.cancel()
82    ...     try:
83    ...         await run_task
84    ...     except asyncio.exceptions.CancelledError:
85    ...         pass
86    >>> asyncio.run(test_coroutine())  # doctest: +NORMALIZE_WHITESPACE
87    Example Log: {'sender': 'Example Init',
88                  'id': 42, 'content': 'Test Message'}
89    Example Log: {'sender': 'Example Init',
90                  'id': 42.42, 'content': 'Second Message'}
91    """
92    message_bus = MessageBus()
93    coroutines = _process_conf(message_bus, conf)
94    try:
95        await asyncio.gather(*coroutines)
96    except asyncio.CancelledError:
97        pass

Run the ControlPi system based on a configuration.

Setup message bus, process given configuration, and run message bus and plugins concurrently and indefinitely.

This function is mainly used by __main__.py to run a ControlPi system based on a configuration loaded from a configuration JSON file on disk.

>>> async def test_coroutine():
...     conf = {"Example Init":
...             {"plugin": "Init",
...              "messages": [{"id": 42,
...                            "content": "Test Message"},
...                           {"id": 42.42,
...                            "content": "Second Message"}]},
...             "Example Log":
...             {"plugin": "Log",
...              "filter": [{"sender": {"const": "Example Init"}}]}}
...     run_task = asyncio.create_task(run(conf))
...     await asyncio.sleep(0.1)
...     run_task.cancel()
...     try:
...         await run_task
...     except asyncio.exceptions.CancelledError:
...         pass
>>> asyncio.run(test_coroutine())  # doctest: +NORMALIZE_WHITESPACE
Example Log: {'sender': 'Example Init',
              'id': 42, 'content': 'Test Message'}
Example Log: {'sender': 'Example Init',
              'id': 42.42, 'content': 'Second Message'}
async def test( conf: Dict[str, Dict[str, Any]], messages: List[Dict[str, Any]], wait: float = 0.0) -> None:
100async def test(
101    conf: Dict[str, PluginConf], messages: List[Dict[str, Any]], wait: float = 0.0
102) -> None:
103    """Test configuration of ControlPi system.
104
105    Setup message bus, process given configuration, run message bus and
106    plugins concurrently, send given messages on message bus and print all
107    messages on message bus. Terminate when queue of message bus is empty.
108
109    This function allows to test single plugins or small plugin
110    configurations with minimal boilerplate code:
111    >>> asyncio.run(test(
112    ...     {"Example Init": {"plugin": "Init",
113    ...                       "messages": [{"id": 42,
114    ...                                     "content": "Test Message"},
115    ...                                    {"id": 42.42,
116    ...                                     "content": "Second Message"}]}},
117    ...     [{"target": "Example Init",
118    ...       "command": "execute"}]))  # doctest: +NORMALIZE_WHITESPACE
119    test(): {'sender': '', 'event': 'registered',
120             'client': 'Example Init', 'plugin': 'Init',
121             'sends': [{'id': {'const': 42},
122                        'content': {'const': 'Test Message'}},
123                       {'id': {'const': 42.42},
124                        'content': {'const': 'Second Message'}}],
125             'receives': [{'target': {'const': 'Example Init'},
126                           'command': {'const': 'execute'}}]}
127    test(): {'sender': 'Example Init',
128             'id': 42, 'content': 'Test Message'}
129    test(): {'sender': 'Example Init',
130             'id': 42.42, 'content': 'Second Message'}
131    test(): {'sender': 'test()', 'target': 'Example Init',
132             'command': 'execute'}
133    test(): {'sender': 'Example Init',
134             'id': 42, 'content': 'Test Message'}
135    test(): {'sender': 'Example Init',
136             'id': 42.42, 'content': 'Second Message'}
137
138    Similar functionality could be reached by using the Log and Init plugins
139    to print messages and send some messages on the bus, but these would
140    clutter the test configuration and code to stop the indefinitely running
141    bus would have to be added to each and every test.
142
143    Incorrect plugin configurations can also be tested by this:
144    >>> asyncio.run(test(
145    ...     {"Example Init": {"plugin": "Init"}}, []))
146    data must contain ['messages'] properties
147    Configuration for 'Example Init' is not valid.
148    """
149    message_bus = MessageBus()
150
151    async def log(message):
152        if (
153            "sender" in message
154            and message["sender"] == ""
155            and "event" in message
156            and message["event"] == "registered"
157            and "client" in message
158            and message["client"] == "test()"
159        ):
160            # Do not log own registration of 'test()':
161            return
162        print(f"test(): {message}")
163
164    message_bus.register(
165        "test()", "Test", [MessageTemplate()], [([MessageTemplate()], log)]
166    )
167
168    coroutines = _process_conf(message_bus, conf)
169    background_tasks = set()
170    for coroutine in coroutines:
171        task = asyncio.create_task(coroutine)
172        background_tasks.add(task)
173        task.add_done_callback(background_tasks.discard)
174        # Give the created task opportunity to run:
175        await asyncio.sleep(0)
176    for message in messages:
177        await message_bus.send(Message("test()", message))
178        # Give immediate reactions to messages opportunity to happen:
179        await asyncio.sleep(0)
180    await asyncio.sleep(wait)
181    await message_bus._queue.join()

Test configuration of ControlPi system.

Setup message bus, process given configuration, run message bus and plugins concurrently, send given messages on message bus and print all messages on message bus. Terminate when queue of message bus is empty.

This function allows to test single plugins or small plugin configurations with minimal boilerplate code:

>>> asyncio.run(test(
...     {"Example Init": {"plugin": "Init",
...                       "messages": [{"id": 42,
...                                     "content": "Test Message"},
...                                    {"id": 42.42,
...                                     "content": "Second Message"}]}},
...     [{"target": "Example Init",
...       "command": "execute"}]))  # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Example Init', 'plugin': 'Init',
         'sends': [{'id': {'const': 42},
                    'content': {'const': 'Test Message'}},
                   {'id': {'const': 42.42},
                    'content': {'const': 'Second Message'}}],
         'receives': [{'target': {'const': 'Example Init'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': 'Example Init',
         'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Example Init',
         'id': 42.42, 'content': 'Second Message'}
test(): {'sender': 'test()', 'target': 'Example Init',
         'command': 'execute'}
test(): {'sender': 'Example Init',
         'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Example Init',
         'id': 42.42, 'content': 'Second Message'}

Similar functionality could be reached by using the Log and Init plugins to print messages and send some messages on the bus, but these would clutter the test configuration and code to stop the indefinitely running bus would have to be added to each and every test.

Incorrect plugin configurations can also be tested by this:

>>> asyncio.run(test(
...     {"Example Init": {"plugin": "Init"}}, []))
data must contain ['messages'] properties
Configuration for 'Example Init' is not valid.