controlpi
Provide the infrastructure for the ControlPi system.
The infrastructure consists of the message bus from module messagebus, the plugin registry from module pluginregistry and the abstract base plugin from module baseplugin.
The package combines them in its run function, which is used by __main__.py to run a ControlPi system based on a configuration file indefinitely.
The test function is a utility function to test plugins with minimal boilerplate code.
1"""Provide the infrastructure for the ControlPi system. 2 3The infrastructure consists of the message bus from module messagebus, the 4plugin registry from module pluginregistry and the abstract base plugin from 5module baseplugin. 6 7The package combines them in its run function, which is used by __main__.py 8to run a ControlPi system based on a configuration file indefinitely. 9 10The test function is a utility function to test plugins with minimal 11boilerplate code. 12""" 13 14import asyncio 15import fastjsonschema 16 17from controlpi.messagebus import MessageBus, Message, MessageTemplate 18from controlpi.pluginregistry import PluginRegistry 19from controlpi.baseplugin import BasePlugin, PluginConf, ConfException 20 21from typing import Dict, List, Coroutine, Any 22 23 24CONF_SCHEMA = {"type": "object", "patternProperties": {".*": {"type": "object"}}} 25 26 27def _process_conf( 28 message_bus: MessageBus, conf: Dict[str, PluginConf] 29) -> List[Coroutine]: 30 try: 31 conf = fastjsonschema.validate(CONF_SCHEMA, conf) 32 except fastjsonschema.JsonSchemaException as e: 33 print(f"Configuration not valid:\n{e}") 34 return [] 35 plugins = PluginRegistry("controlpi_plugins", BasePlugin) 36 coroutines = [message_bus.run()] 37 for instance_name in conf: 38 instance_conf = conf[instance_name] 39 if "plugin" not in instance_conf: 40 print(f"No plugin implementation specified for instance '{instance_name}'.") 41 continue 42 plugin_name = instance_conf["plugin"] 43 if plugin_name not in plugins: 44 print( 45 f"No implementation found for plugin '{plugin_name}'" 46 f" (specified for instance '{instance_name}')." 47 ) 48 continue 49 plugin = plugins[plugin_name] 50 try: 51 instance = plugin(message_bus, instance_name, instance_conf) 52 coroutines.append(instance.run()) 53 except ConfException as e: 54 print(e) 55 continue 56 return coroutines 57 58 59async def run(conf: Dict[str, PluginConf]) -> None: 60 """Run the ControlPi system based on a configuration. 61 62 Setup message bus, process given configuration, and run message bus and 63 plugins concurrently and indefinitely. 64 65 This function is mainly used by __main__.py to run a ControlPi system 66 based on a configuration loaded from a configuration JSON file on disk. 67 68 >>> async def test_coroutine(): 69 ... conf = {"Example Init": 70 ... {"plugin": "Init", 71 ... "messages": [{"id": 42, 72 ... "content": "Test Message"}, 73 ... {"id": 42.42, 74 ... "content": "Second Message"}]}, 75 ... "Example Log": 76 ... {"plugin": "Log", 77 ... "filter": [{"sender": {"const": "Example Init"}}]}} 78 ... run_task = asyncio.create_task(run(conf)) 79 ... await asyncio.sleep(0.1) 80 ... run_task.cancel() 81 ... try: 82 ... await run_task 83 ... except asyncio.exceptions.CancelledError: 84 ... pass 85 >>> asyncio.run(test_coroutine()) # doctest: +NORMALIZE_WHITESPACE 86 Example Log: {'sender': 'Example Init', 87 'id': 42, 'content': 'Test Message'} 88 Example Log: {'sender': 'Example Init', 89 'id': 42.42, 'content': 'Second Message'} 90 """ 91 message_bus = MessageBus() 92 coroutines = _process_conf(message_bus, conf) 93 try: 94 await asyncio.gather(*coroutines) 95 except asyncio.CancelledError: 96 pass 97 98 99async def test( 100 conf: Dict[str, PluginConf], messages: List[Dict[str, Any]], wait: float = 0.0 101) -> None: 102 """Test configuration of ControlPi system. 103 104 Setup message bus, process given configuration, run message bus and 105 plugins concurrently, send given messages on message bus and print all 106 messages on message bus. Terminate when queue of message bus is empty. 107 108 This function allows to test single plugins or small plugin 109 configurations with minimal boilerplate code: 110 >>> asyncio.run(test( 111 ... {"Example Init": {"plugin": "Init", 112 ... "messages": [{"id": 42, 113 ... "content": "Test Message"}, 114 ... {"id": 42.42, 115 ... "content": "Second Message"}]}}, 116 ... [{"target": "Example Init", 117 ... "command": "execute"}])) # doctest: +NORMALIZE_WHITESPACE 118 test(): {'sender': '', 'event': 'registered', 119 'client': 'Example Init', 'plugin': 'Init', 120 'sends': [{'id': {'const': 42}, 121 'content': {'const': 'Test Message'}}, 122 {'id': {'const': 42.42}, 123 'content': {'const': 'Second Message'}}], 124 'receives': [{'target': {'const': 'Example Init'}, 125 'command': {'const': 'execute'}}]} 126 test(): {'sender': 'Example Init', 127 'id': 42, 'content': 'Test Message'} 128 test(): {'sender': 'Example Init', 129 'id': 42.42, 'content': 'Second Message'} 130 test(): {'sender': 'test()', 'target': 'Example Init', 131 'command': 'execute'} 132 test(): {'sender': 'Example Init', 133 'id': 42, 'content': 'Test Message'} 134 test(): {'sender': 'Example Init', 135 'id': 42.42, 'content': 'Second Message'} 136 137 Similar functionality could be reached by using the Log and Init plugins 138 to print messages and send some messages on the bus, but these would 139 clutter the test configuration and code to stop the indefinitely running 140 bus would have to be added to each and every test. 141 142 Incorrect plugin configurations can also be tested by this: 143 >>> asyncio.run(test( 144 ... {"Example Init": {"plugin": "Init"}}, [])) 145 data must contain ['messages'] properties 146 Configuration for 'Example Init' is not valid. 147 """ 148 message_bus = MessageBus() 149 150 async def log(message): 151 if ( 152 "sender" in message 153 and message["sender"] == "" 154 and "event" in message 155 and message["event"] == "registered" 156 and "client" in message 157 and message["client"] == "test()" 158 ): 159 # Do not log own registration of 'test()': 160 return 161 print(f"test(): {message}") 162 163 message_bus.register( 164 "test()", "Test", [MessageTemplate()], [([MessageTemplate()], log)] 165 ) 166 167 coroutines = _process_conf(message_bus, conf) 168 background_tasks = set() 169 for coroutine in coroutines: 170 task = asyncio.create_task(coroutine) 171 background_tasks.add(task) 172 task.add_done_callback(background_tasks.discard) 173 # Give the created task opportunity to run: 174 await asyncio.sleep(0) 175 for message in messages: 176 await message_bus.send(Message("test()", message)) 177 # Give immediate reactions to messages opportunity to happen: 178 await asyncio.sleep(0) 179 await asyncio.sleep(wait) 180 await message_bus._queue.join()
60async def run(conf: Dict[str, PluginConf]) -> None: 61 """Run the ControlPi system based on a configuration. 62 63 Setup message bus, process given configuration, and run message bus and 64 plugins concurrently and indefinitely. 65 66 This function is mainly used by __main__.py to run a ControlPi system 67 based on a configuration loaded from a configuration JSON file on disk. 68 69 >>> async def test_coroutine(): 70 ... conf = {"Example Init": 71 ... {"plugin": "Init", 72 ... "messages": [{"id": 42, 73 ... "content": "Test Message"}, 74 ... {"id": 42.42, 75 ... "content": "Second Message"}]}, 76 ... "Example Log": 77 ... {"plugin": "Log", 78 ... "filter": [{"sender": {"const": "Example Init"}}]}} 79 ... run_task = asyncio.create_task(run(conf)) 80 ... await asyncio.sleep(0.1) 81 ... run_task.cancel() 82 ... try: 83 ... await run_task 84 ... except asyncio.exceptions.CancelledError: 85 ... pass 86 >>> asyncio.run(test_coroutine()) # doctest: +NORMALIZE_WHITESPACE 87 Example Log: {'sender': 'Example Init', 88 'id': 42, 'content': 'Test Message'} 89 Example Log: {'sender': 'Example Init', 90 'id': 42.42, 'content': 'Second Message'} 91 """ 92 message_bus = MessageBus() 93 coroutines = _process_conf(message_bus, conf) 94 try: 95 await asyncio.gather(*coroutines) 96 except asyncio.CancelledError: 97 pass
Run the ControlPi system based on a configuration.
Setup message bus, process given configuration, and run message bus and plugins concurrently and indefinitely.
This function is mainly used by __main__.py to run a ControlPi system based on a configuration loaded from a configuration JSON file on disk.
>>> async def test_coroutine():
... conf = {"Example Init":
... {"plugin": "Init",
... "messages": [{"id": 42,
... "content": "Test Message"},
... {"id": 42.42,
... "content": "Second Message"}]},
... "Example Log":
... {"plugin": "Log",
... "filter": [{"sender": {"const": "Example Init"}}]}}
... run_task = asyncio.create_task(run(conf))
... await asyncio.sleep(0.1)
... run_task.cancel()
... try:
... await run_task
... except asyncio.exceptions.CancelledError:
... pass
>>> asyncio.run(test_coroutine()) # doctest: +NORMALIZE_WHITESPACE
Example Log: {'sender': 'Example Init',
'id': 42, 'content': 'Test Message'}
Example Log: {'sender': 'Example Init',
'id': 42.42, 'content': 'Second Message'}
100async def test( 101 conf: Dict[str, PluginConf], messages: List[Dict[str, Any]], wait: float = 0.0 102) -> None: 103 """Test configuration of ControlPi system. 104 105 Setup message bus, process given configuration, run message bus and 106 plugins concurrently, send given messages on message bus and print all 107 messages on message bus. Terminate when queue of message bus is empty. 108 109 This function allows to test single plugins or small plugin 110 configurations with minimal boilerplate code: 111 >>> asyncio.run(test( 112 ... {"Example Init": {"plugin": "Init", 113 ... "messages": [{"id": 42, 114 ... "content": "Test Message"}, 115 ... {"id": 42.42, 116 ... "content": "Second Message"}]}}, 117 ... [{"target": "Example Init", 118 ... "command": "execute"}])) # doctest: +NORMALIZE_WHITESPACE 119 test(): {'sender': '', 'event': 'registered', 120 'client': 'Example Init', 'plugin': 'Init', 121 'sends': [{'id': {'const': 42}, 122 'content': {'const': 'Test Message'}}, 123 {'id': {'const': 42.42}, 124 'content': {'const': 'Second Message'}}], 125 'receives': [{'target': {'const': 'Example Init'}, 126 'command': {'const': 'execute'}}]} 127 test(): {'sender': 'Example Init', 128 'id': 42, 'content': 'Test Message'} 129 test(): {'sender': 'Example Init', 130 'id': 42.42, 'content': 'Second Message'} 131 test(): {'sender': 'test()', 'target': 'Example Init', 132 'command': 'execute'} 133 test(): {'sender': 'Example Init', 134 'id': 42, 'content': 'Test Message'} 135 test(): {'sender': 'Example Init', 136 'id': 42.42, 'content': 'Second Message'} 137 138 Similar functionality could be reached by using the Log and Init plugins 139 to print messages and send some messages on the bus, but these would 140 clutter the test configuration and code to stop the indefinitely running 141 bus would have to be added to each and every test. 142 143 Incorrect plugin configurations can also be tested by this: 144 >>> asyncio.run(test( 145 ... {"Example Init": {"plugin": "Init"}}, [])) 146 data must contain ['messages'] properties 147 Configuration for 'Example Init' is not valid. 148 """ 149 message_bus = MessageBus() 150 151 async def log(message): 152 if ( 153 "sender" in message 154 and message["sender"] == "" 155 and "event" in message 156 and message["event"] == "registered" 157 and "client" in message 158 and message["client"] == "test()" 159 ): 160 # Do not log own registration of 'test()': 161 return 162 print(f"test(): {message}") 163 164 message_bus.register( 165 "test()", "Test", [MessageTemplate()], [([MessageTemplate()], log)] 166 ) 167 168 coroutines = _process_conf(message_bus, conf) 169 background_tasks = set() 170 for coroutine in coroutines: 171 task = asyncio.create_task(coroutine) 172 background_tasks.add(task) 173 task.add_done_callback(background_tasks.discard) 174 # Give the created task opportunity to run: 175 await asyncio.sleep(0) 176 for message in messages: 177 await message_bus.send(Message("test()", message)) 178 # Give immediate reactions to messages opportunity to happen: 179 await asyncio.sleep(0) 180 await asyncio.sleep(wait) 181 await message_bus._queue.join()
Test configuration of ControlPi system.
Setup message bus, process given configuration, run message bus and plugins concurrently, send given messages on message bus and print all messages on message bus. Terminate when queue of message bus is empty.
This function allows to test single plugins or small plugin configurations with minimal boilerplate code:
>>> asyncio.run(test(
... {"Example Init": {"plugin": "Init",
... "messages": [{"id": 42,
... "content": "Test Message"},
... {"id": 42.42,
... "content": "Second Message"}]}},
... [{"target": "Example Init",
... "command": "execute"}])) # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Example Init', 'plugin': 'Init',
'sends': [{'id': {'const': 42},
'content': {'const': 'Test Message'}},
{'id': {'const': 42.42},
'content': {'const': 'Second Message'}}],
'receives': [{'target': {'const': 'Example Init'},
'command': {'const': 'execute'}}]}
test(): {'sender': 'Example Init',
'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Example Init',
'id': 42.42, 'content': 'Second Message'}
test(): {'sender': 'test()', 'target': 'Example Init',
'command': 'execute'}
test(): {'sender': 'Example Init',
'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Example Init',
'id': 42.42, 'content': 'Second Message'}
Similar functionality could be reached by using the Log and Init plugins to print messages and send some messages on the bus, but these would clutter the test configuration and code to stop the indefinitely running bus would have to be added to each and every test.
Incorrect plugin configurations can also be tested by this:
>>> asyncio.run(test(
... {"Example Init": {"plugin": "Init"}}, []))
data must contain ['messages'] properties
Configuration for 'Example Init' is not valid.