Module controlpi_plugins.util
Provide utility plugins for all kinds of systems.
- Log logs messages on stdout.
- Init sends list of messages on startup and on demand.
- Alias translates messages to an alias.
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test Log": {"plugin": "Log",
... "filter": [{"sender": {"const": "Test Alias"}}]},
... "Test Init": {"plugin": "Init",
... "messages": [{"id": 42, "content": "Test Message"}]},
... "Test Alias": {"plugin": "Alias",
... "from": {"sender": {"const": "Test Init"},
... "id": {"const": 42}},
... "to": {"id": "translated"}}}, []))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test Log', 'plugin': 'Log',
'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
test(): {'sender': '', 'event': 'registered',
'client': 'Test Init', 'plugin': 'Init',
'sends': [{'id': {'const': 42},
'content': {'const': 'Test Message'}}],
'receives': [{'target': {'const': 'Test Init'},
'command': {'const': 'execute'}}]}
test(): {'sender': '', 'event': 'registered',
'client': 'Test Alias', 'plugin': 'Alias',
'sends': [{'id': {'const': 'translated'}}],
'receives': [{'sender': {'const': 'Test Init'},
'id': {'const': 42}}]}
test(): {'sender': 'Test Init', 'id': 42,
'content': 'Test Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
'content': 'Test Message'}
Test Log: {'sender': 'Test Alias', 'id': 'translated',
'content': 'Test Message'}
Expand source code
"""Provide utility plugins for all kinds of systems.
- Log logs messages on stdout.
- Init sends list of messages on startup and on demand.
- Alias translates messages to an alias.
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test Log": {"plugin": "Log",
... "filter": [{"sender": {"const": "Test Alias"}}]},
... "Test Init": {"plugin": "Init",
... "messages": [{"id": 42, "content": "Test Message"}]},
... "Test Alias": {"plugin": "Alias",
... "from": {"sender": {"const": "Test Init"},
... "id": {"const": 42}},
... "to": {"id": "translated"}}}, []))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test Log', 'plugin': 'Log',
'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
test(): {'sender': '', 'event': 'registered',
'client': 'Test Init', 'plugin': 'Init',
'sends': [{'id': {'const': 42},
'content': {'const': 'Test Message'}}],
'receives': [{'target': {'const': 'Test Init'},
'command': {'const': 'execute'}}]}
test(): {'sender': '', 'event': 'registered',
'client': 'Test Alias', 'plugin': 'Alias',
'sends': [{'id': {'const': 'translated'}}],
'receives': [{'sender': {'const': 'Test Init'},
'id': {'const': 42}}]}
test(): {'sender': 'Test Init', 'id': 42,
'content': 'Test Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
'content': 'Test Message'}
Test Log: {'sender': 'Test Alias', 'id': 'translated',
'content': 'Test Message'}
"""
import asyncio
from controlpi import BasePlugin, Message, MessageTemplate
class Log(BasePlugin):
"""Log messages on stdout.
The "filter" configuration key gets a list of message templates defining
the messages that should be logged by the plugin instance.
In the following example the first and third message match the given
template and are logged by the instance "Test Log", while the second
message does not match and is only logged by the test, but not by the
Log instance:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test Log": {"plugin": "Log",
... "filter": [{"id": {"const": 42}}]}},
... [{"id": 42, "message": "Test Message"},
... {"id": 42.42, "message": "Second Message"},
... {"id": 42, "message": "Third Message"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test Log', 'plugin': 'Log',
'sends': [], 'receives': [{'id': {'const': 42}}]}
test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'}
test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
The "filter" key is required:
>>> asyncio.run(controlpi.test(
... {"Test Log": {"plugin": "Log"}}, []))
'filter' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
{'properties': {'filter': {'items': {'type': 'object'},
'type': 'array'}},
'required': ['filter']}
<BLANKLINE>
On instance:
{'plugin': 'Log'}
Configuration for 'Test Log' is not valid.
The "filter" key has to contain a list of message templates, i.e.,
JSON objects:
>>> asyncio.run(controlpi.test(
... {"Test Log": {"plugin": "Log",
... "filter": [42]}}, []))
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['filter']['items']:
{'type': 'object'}
<BLANKLINE>
On instance['filter'][0]:
42
Configuration for 'Test Log' is not valid.
"""
CONF_SCHEMA = {'properties': {'filter': {'type': 'array',
'items': {'type': 'object'}}},
'required': ['filter']}
"""Schema for Log plugin configuration.
Required configuration key:
- 'filter': list of message templates to be logged.
"""
async def log(self, message: Message) -> None:
"""Log received message on stdout using own name as prefix."""
print(f"{self.name}: {message}")
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)
async def run(self) -> None:
"""Run no code proactively."""
pass
class Init(BasePlugin):
"""Send list of messages on startup and on demand.
The "messages" configuration key gets a list of messages to be sent on
startup. The same list is sent in reaction to a message with
"target": <name> and "command": "execute".
In the example, the two configured messages are sent twice, once at
startup and a second time in reaction to the "execute" command sent by
the test:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test Init": {"plugin": "Init",
... "messages": [{"id": 42,
... "content": "Test Message"},
... {"id": 42.42,
... "content": "Second Message"}]}},
... [{"target": "Test Init", "command": "execute"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test Init', 'plugin': 'Init',
'sends': [{'id': {'const': 42},
'content': {'const': 'Test Message'}},
{'id': {'const': 42.42},
'content': {'const': 'Second Message'}}],
'receives': [{'target': {'const': 'Test Init'},
'command': {'const': 'execute'}}]}
test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'}
test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
The "messages" key is required:
>>> asyncio.run(controlpi.test(
... {"Test Init": {"plugin": "Init"}}, []))
'messages' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
{'properties': {'messages': {'items': {'type': 'object'},
'type': 'array'}},
'required': ['messages']}
<BLANKLINE>
On instance:
{'plugin': 'Init'}
Configuration for 'Test Init' is not valid.
The "messages" key has to contain a list of (partial) messages, i.e.,
JSON objects:
>>> asyncio.run(controlpi.test(
... {"Test Init": {"plugin": "Init",
... "messages": [42]}}, []))
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['messages']['items']:
{'type': 'object'}
<BLANKLINE>
On instance['messages'][0]:
42
Configuration for 'Test Init' is not valid.
"""
CONF_SCHEMA = {'properties': {'messages': {'type': 'array',
'items': {'type': 'object'}}},
'required': ['messages']}
"""Schema for Init plugin configuration.
Required configuration key:
- 'messages': list of messages to be sent.
"""
async def execute(self, message: Message) -> None:
"""Send configured messages."""
for message in self.conf['messages']:
await self.bus.send(Message(self.name, message))
# Give immediate reactions to messages opportunity to happen:
await asyncio.sleep(0)
def process_conf(self) -> None:
"""Register plugin as bus client."""
receives = [MessageTemplate({'target': {'const': self.name},
'command': {'const': 'execute'}})]
sends = [MessageTemplate.from_message(message)
for message in self.conf['messages']]
self.bus.register(self.name, 'Init', sends, receives, self.execute)
async def run(self) -> None:
"""Send configured messages on startup."""
for message in self.conf['messages']:
await self.bus.send(Message(self.name, message))
class Execute(BasePlugin):
"""Send configurable list of messages on demand.
An Execute plugin instance receives two kinds of commands.
The "set messages" command has a "messages" key with a list of (partial)
messages, which are sent by the Execute instance in reaction to an
"execute" command.
In the example, the first command sent by the test sets two messages,
which are then sent in reaction to the second command sent by the test:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test Execute": {"plugin": "Execute"}},
... [{"target": "Test Execute", "command": "set messages",
... "messages": [{"id": 42, "content": "Test Message"},
... {"id": 42.42, "content": "Second Message"}]},
... {"target": "Test Execute", "command": "execute"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test Execute', 'plugin': 'Execute',
'sends': [{}],
'receives': [{'target': {'const': 'Test Execute'},
'command': {'const': 'set messages'},
'messages': {'type': 'array',
'items': {'type': 'object'}}},
{'target': {'const': 'Test Execute'},
'command': {'const': 'execute'}}]}
test(): {'sender': 'test()', 'target': 'Test Execute',
'command': 'set messages',
'messages': [{'id': 42, 'content': 'Test Message'},
{'id': 42.42, 'content': 'Second Message'}]}
test(): {'sender': 'test()', 'target': 'Test Execute',
'command': 'execute'}
test(): {'sender': 'Test Execute', 'id': 42,
'content': 'Test Message'}
test(): {'sender': 'Test Execute', 'id': 42.42,
'content': 'Second Message'}
"""
CONF_SCHEMA = True
"""Schema for Execute plugin configuration.
There are no required or optional configuration keys.
"""
async def execute(self, message: Message) -> None:
"""Set or send configured messages."""
if message['command'] == 'set messages':
assert isinstance(message['messages'], list)
self.messages = list(message['messages'])
elif message['command'] == 'execute':
for message in self.messages:
await self.bus.send(Message(self.name, message))
# Give immediate reactions to messages opportunity to happen:
await asyncio.sleep(0)
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.messages = []
receives = [MessageTemplate({'target': {'const': self.name},
'command': {'const': 'set messages'},
'messages':
{'type': 'array',
'items': {'type': 'object'}}}),
MessageTemplate({'target': {'const': self.name},
'command': {'const': 'execute'}})]
sends = [MessageTemplate()]
self.bus.register(self.name, 'Execute', sends, receives, self.execute)
async def run(self) -> None:
"""Run no code proactively."""
pass
class Alias(BasePlugin):
"""Translate messages to an alias.
The "from" configuration key gets a message template and the
configuration key "to" a (partial) message. All messages matching the
template are received by the Alias instance and a message translated by
removing the keys of the "from" template and adding the keys and values
of the "to" message is sent. Keys that are neither in "from" nor in "to"
are retained.
In the example, the two messages sent by the test are translated by the
Alias instance and the translated messages are sent by it preserving
the "content" keys:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test Alias": {"plugin": "Alias",
... "from": {"id": {"const": 42}},
... "to": {"id": "translated"}}},
... [{"id": 42, "content": "Test Message"},
... {"id": 42, "content": "Second Message"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test Alias', 'plugin': 'Alias',
'sends': [{'id': {'const': 'translated'}}],
'receives': [{'id': {'const': 42}}]}
test(): {'sender': 'test()', 'id': 42,
'content': 'Test Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
'content': 'Test Message'}
test(): {'sender': 'test()', 'id': 42,
'content': 'Second Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
'content': 'Second Message'}
The "from" and "to" keys are required:
>>> asyncio.run(controlpi.test(
... {"Test Alias": {"plugin": "Alias"}}, []))
'from' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
{'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
'required': ['from', 'to']}
<BLANKLINE>
On instance:
{'plugin': 'Alias'}
'to' is a required property
<BLANKLINE>
Failed validating 'required' in schema:
{'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}},
'required': ['from', 'to']}
<BLANKLINE>
On instance:
{'plugin': 'Alias'}
Configuration for 'Test Alias' is not valid.
The "from" key has to contain a message template and the "to" key a
(partial) message, i.e., both have to be JSON objects:
>>> asyncio.run(controlpi.test(
... {"Test Alias": {"plugin": "Alias",
... "from": 42,
... "to": 42}}, []))
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['from']:
{'type': 'object'}
<BLANKLINE>
On instance['from']:
42
42 is not of type 'object'
<BLANKLINE>
Failed validating 'type' in schema['properties']['to']:
{'type': 'object'}
<BLANKLINE>
On instance['to']:
42
Configuration for 'Test Alias' is not valid.
"""
CONF_SCHEMA = {'properties': {'from': {'type': 'object'},
'to': {'type': 'object'}},
'required': ['from', 'to']}
"""Schema for Alias plugin configuration.
Required configuration keys:
- 'from': template of messages to be translated.
- 'to': translated message to be sent.
"""
async def alias(self, message: Message) -> None:
"""Translate and send message."""
alias_message = Message(self.name)
alias_message.update(self.conf['to'])
for key in message:
if key != 'sender' and key not in self.conf['from']:
alias_message[key] = message[key]
await self.bus.send(alias_message)
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.bus.register(self.name, 'Alias',
[MessageTemplate.from_message(self.conf['to'])],
[self.conf['from']],
self.alias)
async def run(self) -> None:
"""Run no code proactively."""
pass
Classes
class Log (bus: MessageBus, name: str, conf: Dict[str, Any])-
Log messages on stdout.
The "filter" configuration key gets a list of message templates defining the messages that should be logged by the plugin instance.
In the following example the first and third message match the given template and are logged by the instance "Test Log", while the second message does not match and is only logged by the test, but not by the Log instance:
>>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Log": {"plugin": "Log", ... "filter": [{"id": {"const": 42}}]}}, ... [{"id": 42, "message": "Test Message"}, ... {"id": 42.42, "message": "Second Message"}, ... {"id": 42, "message": "Third Message"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Log', 'plugin': 'Log', 'sends': [], 'receives': [{'id': {'const': 42}}]} test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'} Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'} test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'} test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'} Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}The "filter" key is required:
>>> asyncio.run(controlpi.test( ... {"Test Log": {"plugin": "Log"}}, [])) 'filter' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'filter': {'items': {'type': 'object'}, 'type': 'array'}}, 'required': ['filter']} <BLANKLINE> On instance: {'plugin': 'Log'} Configuration for 'Test Log' is not valid.The "filter" key has to contain a list of message templates, i.e., JSON objects:
>>> asyncio.run(controlpi.test( ... {"Test Log": {"plugin": "Log", ... "filter": [42]}}, [])) 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['filter']['items']: {'type': 'object'} <BLANKLINE> On instance['filter'][0]: 42 Configuration for 'Test Log' is not valid.Expand source code
class Log(BasePlugin): """Log messages on stdout. The "filter" configuration key gets a list of message templates defining the messages that should be logged by the plugin instance. In the following example the first and third message match the given template and are logged by the instance "Test Log", while the second message does not match and is only logged by the test, but not by the Log instance: >>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Log": {"plugin": "Log", ... "filter": [{"id": {"const": 42}}]}}, ... [{"id": 42, "message": "Test Message"}, ... {"id": 42.42, "message": "Second Message"}, ... {"id": 42, "message": "Third Message"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Log', 'plugin': 'Log', 'sends': [], 'receives': [{'id': {'const': 42}}]} test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'} Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'} test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'} test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'} Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'} The "filter" key is required: >>> asyncio.run(controlpi.test( ... {"Test Log": {"plugin": "Log"}}, [])) 'filter' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'filter': {'items': {'type': 'object'}, 'type': 'array'}}, 'required': ['filter']} <BLANKLINE> On instance: {'plugin': 'Log'} Configuration for 'Test Log' is not valid. The "filter" key has to contain a list of message templates, i.e., JSON objects: >>> asyncio.run(controlpi.test( ... {"Test Log": {"plugin": "Log", ... "filter": [42]}}, [])) 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['filter']['items']: {'type': 'object'} <BLANKLINE> On instance['filter'][0]: 42 Configuration for 'Test Log' is not valid. """ CONF_SCHEMA = {'properties': {'filter': {'type': 'array', 'items': {'type': 'object'}}}, 'required': ['filter']} """Schema for Log plugin configuration. Required configuration key: - 'filter': list of message templates to be logged. """ async def log(self, message: Message) -> None: """Log received message on stdout using own name as prefix.""" print(f"{self.name}: {message}") def process_conf(self) -> None: """Register plugin as bus client.""" self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log) async def run(self) -> None: """Run no code proactively.""" passAncestors
- BasePlugin
- abc.ABC
Class variables
var CONF_SCHEMA-
Schema for Log plugin configuration.
Required configuration key:
- 'filter': list of message templates to be logged.
Methods
async def log(self, message: Message) ‑> NoneType-
Log received message on stdout using own name as prefix.
Expand source code
async def log(self, message: Message) -> None: """Log received message on stdout using own name as prefix.""" print(f"{self.name}: {message}") def process_conf(self) ‑> NoneType-
Register plugin as bus client.
Expand source code
def process_conf(self) -> None: """Register plugin as bus client.""" self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log) async def run(self) ‑> NoneType-
Run no code proactively.
Expand source code
async def run(self) -> None: """Run no code proactively.""" pass
class Init (bus: MessageBus, name: str, conf: Dict[str, Any])-
Send list of messages on startup and on demand.
The "messages" configuration key gets a list of messages to be sent on startup. The same list is sent in reaction to a message with "target":
and "command": "execute". In the example, the two configured messages are sent twice, once at startup and a second time in reaction to the "execute" command sent by the test:
>>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Init": {"plugin": "Init", ... "messages": [{"id": 42, ... "content": "Test Message"}, ... {"id": 42.42, ... "content": "Second Message"}]}}, ... [{"target": "Test Init", "command": "execute"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Init', 'plugin': 'Init', 'sends': [{'id': {'const': 42}, 'content': {'const': 'Test Message'}}, {'id': {'const': 42.42}, 'content': {'const': 'Second Message'}}], 'receives': [{'target': {'const': 'Test Init'}, 'command': {'const': 'execute'}}]} test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'} test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'} test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}The "messages" key is required:
>>> asyncio.run(controlpi.test( ... {"Test Init": {"plugin": "Init"}}, [])) 'messages' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'messages': {'items': {'type': 'object'}, 'type': 'array'}}, 'required': ['messages']} <BLANKLINE> On instance: {'plugin': 'Init'} Configuration for 'Test Init' is not valid.The "messages" key has to contain a list of (partial) messages, i.e., JSON objects:
>>> asyncio.run(controlpi.test( ... {"Test Init": {"plugin": "Init", ... "messages": [42]}}, [])) 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['messages']['items']: {'type': 'object'} <BLANKLINE> On instance['messages'][0]: 42 Configuration for 'Test Init' is not valid.Expand source code
class Init(BasePlugin): """Send list of messages on startup and on demand. The "messages" configuration key gets a list of messages to be sent on startup. The same list is sent in reaction to a message with "target": <name> and "command": "execute". In the example, the two configured messages are sent twice, once at startup and a second time in reaction to the "execute" command sent by the test: >>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Init": {"plugin": "Init", ... "messages": [{"id": 42, ... "content": "Test Message"}, ... {"id": 42.42, ... "content": "Second Message"}]}}, ... [{"target": "Test Init", "command": "execute"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Init', 'plugin': 'Init', 'sends': [{'id': {'const': 42}, 'content': {'const': 'Test Message'}}, {'id': {'const': 42.42}, 'content': {'const': 'Second Message'}}], 'receives': [{'target': {'const': 'Test Init'}, 'command': {'const': 'execute'}}]} test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'} test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'} test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'} The "messages" key is required: >>> asyncio.run(controlpi.test( ... {"Test Init": {"plugin": "Init"}}, [])) 'messages' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'messages': {'items': {'type': 'object'}, 'type': 'array'}}, 'required': ['messages']} <BLANKLINE> On instance: {'plugin': 'Init'} Configuration for 'Test Init' is not valid. The "messages" key has to contain a list of (partial) messages, i.e., JSON objects: >>> asyncio.run(controlpi.test( ... {"Test Init": {"plugin": "Init", ... "messages": [42]}}, [])) 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['messages']['items']: {'type': 'object'} <BLANKLINE> On instance['messages'][0]: 42 Configuration for 'Test Init' is not valid. """ CONF_SCHEMA = {'properties': {'messages': {'type': 'array', 'items': {'type': 'object'}}}, 'required': ['messages']} """Schema for Init plugin configuration. Required configuration key: - 'messages': list of messages to be sent. """ async def execute(self, message: Message) -> None: """Send configured messages.""" for message in self.conf['messages']: await self.bus.send(Message(self.name, message)) # Give immediate reactions to messages opportunity to happen: await asyncio.sleep(0) def process_conf(self) -> None: """Register plugin as bus client.""" receives = [MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'execute'}})] sends = [MessageTemplate.from_message(message) for message in self.conf['messages']] self.bus.register(self.name, 'Init', sends, receives, self.execute) async def run(self) -> None: """Send configured messages on startup.""" for message in self.conf['messages']: await self.bus.send(Message(self.name, message))Ancestors
- BasePlugin
- abc.ABC
Class variables
var CONF_SCHEMA-
Schema for Init plugin configuration.
Required configuration key:
- 'messages': list of messages to be sent.
Methods
async def execute(self, message: Message) ‑> NoneType-
Send configured messages.
Expand source code
async def execute(self, message: Message) -> None: """Send configured messages.""" for message in self.conf['messages']: await self.bus.send(Message(self.name, message)) # Give immediate reactions to messages opportunity to happen: await asyncio.sleep(0) def process_conf(self) ‑> NoneType-
Register plugin as bus client.
Expand source code
def process_conf(self) -> None: """Register plugin as bus client.""" receives = [MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'execute'}})] sends = [MessageTemplate.from_message(message) for message in self.conf['messages']] self.bus.register(self.name, 'Init', sends, receives, self.execute) async def run(self) ‑> NoneType-
Send configured messages on startup.
Expand source code
async def run(self) -> None: """Send configured messages on startup.""" for message in self.conf['messages']: await self.bus.send(Message(self.name, message))
class Execute (bus: MessageBus, name: str, conf: Dict[str, Any])-
Send configurable list of messages on demand.
An Execute plugin instance receives two kinds of commands. The "set messages" command has a "messages" key with a list of (partial) messages, which are sent by the Execute instance in reaction to an "execute" command.
In the example, the first command sent by the test sets two messages, which are then sent in reaction to the second command sent by the test:
>>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Execute": {"plugin": "Execute"}}, ... [{"target": "Test Execute", "command": "set messages", ... "messages": [{"id": 42, "content": "Test Message"}, ... {"id": 42.42, "content": "Second Message"}]}, ... {"target": "Test Execute", "command": "execute"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Execute', 'plugin': 'Execute', 'sends': [{}], 'receives': [{'target': {'const': 'Test Execute'}, 'command': {'const': 'set messages'}, 'messages': {'type': 'array', 'items': {'type': 'object'}}}, {'target': {'const': 'Test Execute'}, 'command': {'const': 'execute'}}]} test(): {'sender': 'test()', 'target': 'Test Execute', 'command': 'set messages', 'messages': [{'id': 42, 'content': 'Test Message'}, {'id': 42.42, 'content': 'Second Message'}]} test(): {'sender': 'test()', 'target': 'Test Execute', 'command': 'execute'} test(): {'sender': 'Test Execute', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Execute', 'id': 42.42, 'content': 'Second Message'}Expand source code
class Execute(BasePlugin): """Send configurable list of messages on demand. An Execute plugin instance receives two kinds of commands. The "set messages" command has a "messages" key with a list of (partial) messages, which are sent by the Execute instance in reaction to an "execute" command. In the example, the first command sent by the test sets two messages, which are then sent in reaction to the second command sent by the test: >>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Execute": {"plugin": "Execute"}}, ... [{"target": "Test Execute", "command": "set messages", ... "messages": [{"id": 42, "content": "Test Message"}, ... {"id": 42.42, "content": "Second Message"}]}, ... {"target": "Test Execute", "command": "execute"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Execute', 'plugin': 'Execute', 'sends': [{}], 'receives': [{'target': {'const': 'Test Execute'}, 'command': {'const': 'set messages'}, 'messages': {'type': 'array', 'items': {'type': 'object'}}}, {'target': {'const': 'Test Execute'}, 'command': {'const': 'execute'}}]} test(): {'sender': 'test()', 'target': 'Test Execute', 'command': 'set messages', 'messages': [{'id': 42, 'content': 'Test Message'}, {'id': 42.42, 'content': 'Second Message'}]} test(): {'sender': 'test()', 'target': 'Test Execute', 'command': 'execute'} test(): {'sender': 'Test Execute', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Execute', 'id': 42.42, 'content': 'Second Message'} """ CONF_SCHEMA = True """Schema for Execute plugin configuration. There are no required or optional configuration keys. """ async def execute(self, message: Message) -> None: """Set or send configured messages.""" if message['command'] == 'set messages': assert isinstance(message['messages'], list) self.messages = list(message['messages']) elif message['command'] == 'execute': for message in self.messages: await self.bus.send(Message(self.name, message)) # Give immediate reactions to messages opportunity to happen: await asyncio.sleep(0) def process_conf(self) -> None: """Register plugin as bus client.""" self.messages = [] receives = [MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'set messages'}, 'messages': {'type': 'array', 'items': {'type': 'object'}}}), MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'execute'}})] sends = [MessageTemplate()] self.bus.register(self.name, 'Execute', sends, receives, self.execute) async def run(self) -> None: """Run no code proactively.""" passAncestors
- BasePlugin
- abc.ABC
Class variables
var CONF_SCHEMA-
Schema for Execute plugin configuration.
There are no required or optional configuration keys.
Methods
async def execute(self, message: Message) ‑> NoneType-
Set or send configured messages.
Expand source code
async def execute(self, message: Message) -> None: """Set or send configured messages.""" if message['command'] == 'set messages': assert isinstance(message['messages'], list) self.messages = list(message['messages']) elif message['command'] == 'execute': for message in self.messages: await self.bus.send(Message(self.name, message)) # Give immediate reactions to messages opportunity to happen: await asyncio.sleep(0) def process_conf(self) ‑> NoneType-
Register plugin as bus client.
Expand source code
def process_conf(self) -> None: """Register plugin as bus client.""" self.messages = [] receives = [MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'set messages'}, 'messages': {'type': 'array', 'items': {'type': 'object'}}}), MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'execute'}})] sends = [MessageTemplate()] self.bus.register(self.name, 'Execute', sends, receives, self.execute) async def run(self) ‑> NoneType-
Run no code proactively.
Expand source code
async def run(self) -> None: """Run no code proactively.""" pass
class Alias (bus: MessageBus, name: str, conf: Dict[str, Any])-
Translate messages to an alias.
The "from" configuration key gets a message template and the configuration key "to" a (partial) message. All messages matching the template are received by the Alias instance and a message translated by removing the keys of the "from" template and adding the keys and values of the "to" message is sent. Keys that are neither in "from" nor in "to" are retained.
In the example, the two messages sent by the test are translated by the Alias instance and the translated messages are sent by it preserving the "content" keys:
>>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Alias": {"plugin": "Alias", ... "from": {"id": {"const": 42}}, ... "to": {"id": "translated"}}}, ... [{"id": 42, "content": "Test Message"}, ... {"id": 42, "content": "Second Message"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Alias', 'plugin': 'Alias', 'sends': [{'id': {'const': 'translated'}}], 'receives': [{'id': {'const': 42}}]} test(): {'sender': 'test()', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Alias', 'id': 'translated', 'content': 'Test Message'} test(): {'sender': 'test()', 'id': 42, 'content': 'Second Message'} test(): {'sender': 'Test Alias', 'id': 'translated', 'content': 'Second Message'}The "from" and "to" keys are required:
>>> asyncio.run(controlpi.test( ... {"Test Alias": {"plugin": "Alias"}}, [])) 'from' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}}, 'required': ['from', 'to']} <BLANKLINE> On instance: {'plugin': 'Alias'} 'to' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}}, 'required': ['from', 'to']} <BLANKLINE> On instance: {'plugin': 'Alias'} Configuration for 'Test Alias' is not valid.The "from" key has to contain a message template and the "to" key a (partial) message, i.e., both have to be JSON objects:
>>> asyncio.run(controlpi.test( ... {"Test Alias": {"plugin": "Alias", ... "from": 42, ... "to": 42}}, [])) 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['from']: {'type': 'object'} <BLANKLINE> On instance['from']: 42 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['to']: {'type': 'object'} <BLANKLINE> On instance['to']: 42 Configuration for 'Test Alias' is not valid.Expand source code
class Alias(BasePlugin): """Translate messages to an alias. The "from" configuration key gets a message template and the configuration key "to" a (partial) message. All messages matching the template are received by the Alias instance and a message translated by removing the keys of the "from" template and adding the keys and values of the "to" message is sent. Keys that are neither in "from" nor in "to" are retained. In the example, the two messages sent by the test are translated by the Alias instance and the translated messages are sent by it preserving the "content" keys: >>> import controlpi >>> asyncio.run(controlpi.test( ... {"Test Alias": {"plugin": "Alias", ... "from": {"id": {"const": 42}}, ... "to": {"id": "translated"}}}, ... [{"id": 42, "content": "Test Message"}, ... {"id": 42, "content": "Second Message"}])) ... # doctest: +NORMALIZE_WHITESPACE test(): {'sender': '', 'event': 'registered', 'client': 'Test Alias', 'plugin': 'Alias', 'sends': [{'id': {'const': 'translated'}}], 'receives': [{'id': {'const': 42}}]} test(): {'sender': 'test()', 'id': 42, 'content': 'Test Message'} test(): {'sender': 'Test Alias', 'id': 'translated', 'content': 'Test Message'} test(): {'sender': 'test()', 'id': 42, 'content': 'Second Message'} test(): {'sender': 'Test Alias', 'id': 'translated', 'content': 'Second Message'} The "from" and "to" keys are required: >>> asyncio.run(controlpi.test( ... {"Test Alias": {"plugin": "Alias"}}, [])) 'from' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}}, 'required': ['from', 'to']} <BLANKLINE> On instance: {'plugin': 'Alias'} 'to' is a required property <BLANKLINE> Failed validating 'required' in schema: {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}}, 'required': ['from', 'to']} <BLANKLINE> On instance: {'plugin': 'Alias'} Configuration for 'Test Alias' is not valid. The "from" key has to contain a message template and the "to" key a (partial) message, i.e., both have to be JSON objects: >>> asyncio.run(controlpi.test( ... {"Test Alias": {"plugin": "Alias", ... "from": 42, ... "to": 42}}, [])) 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['from']: {'type': 'object'} <BLANKLINE> On instance['from']: 42 42 is not of type 'object' <BLANKLINE> Failed validating 'type' in schema['properties']['to']: {'type': 'object'} <BLANKLINE> On instance['to']: 42 Configuration for 'Test Alias' is not valid. """ CONF_SCHEMA = {'properties': {'from': {'type': 'object'}, 'to': {'type': 'object'}}, 'required': ['from', 'to']} """Schema for Alias plugin configuration. Required configuration keys: - 'from': template of messages to be translated. - 'to': translated message to be sent. """ async def alias(self, message: Message) -> None: """Translate and send message.""" alias_message = Message(self.name) alias_message.update(self.conf['to']) for key in message: if key != 'sender' and key not in self.conf['from']: alias_message[key] = message[key] await self.bus.send(alias_message) def process_conf(self) -> None: """Register plugin as bus client.""" self.bus.register(self.name, 'Alias', [MessageTemplate.from_message(self.conf['to'])], [self.conf['from']], self.alias) async def run(self) -> None: """Run no code proactively.""" passAncestors
- BasePlugin
- abc.ABC
Class variables
var CONF_SCHEMA-
Schema for Alias plugin configuration.
Required configuration keys:
- 'from': template of messages to be translated.
- 'to': translated message to be sent.
Methods
async def alias(self, message: Message) ‑> NoneType-
Translate and send message.
Expand source code
async def alias(self, message: Message) -> None: """Translate and send message.""" alias_message = Message(self.name) alias_message.update(self.conf['to']) for key in message: if key != 'sender' and key not in self.conf['from']: alias_message[key] = message[key] await self.bus.send(alias_message) def process_conf(self) ‑> NoneType-
Register plugin as bus client.
Expand source code
def process_conf(self) -> None: """Register plugin as bus client.""" self.bus.register(self.name, 'Alias', [MessageTemplate.from_message(self.conf['to'])], [self.conf['from']], self.alias) async def run(self) ‑> NoneType-
Run no code proactively.
Expand source code
async def run(self) -> None: """Run no code proactively.""" pass