controlpi_plugins.util

Provide utility plugins for all kinds of systems.

  • Log logs messages on stdout.
  • Init sends list of messages on startup and on demand.
  • Execute sends configurable list of messages on demand.
  • Alias translates messages to an alias.
>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log",
...                   "filter": [{"sender": {"const": "Test Alias"}}]},
...      "Test Init": {"plugin": "Init",
...                    "messages": [{"id": 42, "content": "Test Message"}]},
...      "Test Alias": {"plugin": "Alias",
...                     "from": {"sender": {"const": "Test Init"},
...                              "id": {"const": 42}},
...                     "to": {"id": "translated"}}}, []))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Log', 'plugin': 'Log',
         'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Init', 'plugin': 'Init',
         'sends': [{'id': {'const': 42},
                    'content': {'const': 'Test Message'}}],
         'receives': [{'target': {'const': 'Test Init'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Alias', 'plugin': 'Alias',
         'sends': [{'id': {'const': 'translated'}}],
         'receives': [{'sender': {'const': 'Test Init'},
                       'id': {'const': 42}}]}
test(): {'sender': 'Test Init', 'id': 42,
         'content': 'Test Message'}
test(): {'sender': 'Test Alias', 'id': 'translated',
         'content': 'Test Message'}
Test Log: {'sender': 'Test Alias', 'id': 'translated',
           'content': 'Test Message'}
  1"""Provide utility plugins for all kinds of systems.
  2
  3- Log logs messages on stdout.
  4- Init sends list of messages on startup and on demand.
  5- Execute sends configurable list of messages on demand.
  6- Alias translates messages to an alias.
  7
  8>>> import controlpi
  9>>> asyncio.run(controlpi.test(
 10...     {"Test Log": {"plugin": "Log",
 11...                   "filter": [{"sender": {"const": "Test Alias"}}]},
 12...      "Test Init": {"plugin": "Init",
 13...                    "messages": [{"id": 42, "content": "Test Message"}]},
 14...      "Test Alias": {"plugin": "Alias",
 15...                     "from": {"sender": {"const": "Test Init"},
 16...                              "id": {"const": 42}},
 17...                     "to": {"id": "translated"}}}, []))
 18... # doctest: +NORMALIZE_WHITESPACE
 19test(): {'sender': '', 'event': 'registered',
 20         'client': 'Test Log', 'plugin': 'Log',
 21         'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
 22test(): {'sender': '', 'event': 'registered',
 23         'client': 'Test Init', 'plugin': 'Init',
 24         'sends': [{'id': {'const': 42},
 25                    'content': {'const': 'Test Message'}}],
 26         'receives': [{'target': {'const': 'Test Init'},
 27                       'command': {'const': 'execute'}}]}
 28test(): {'sender': '', 'event': 'registered',
 29         'client': 'Test Alias', 'plugin': 'Alias',
 30         'sends': [{'id': {'const': 'translated'}}],
 31         'receives': [{'sender': {'const': 'Test Init'},
 32                       'id': {'const': 42}}]}
 33test(): {'sender': 'Test Init', 'id': 42,
 34         'content': 'Test Message'}
 35test(): {'sender': 'Test Alias', 'id': 'translated',
 36         'content': 'Test Message'}
 37Test Log: {'sender': 'Test Alias', 'id': 'translated',
 38           'content': 'Test Message'}
 39"""
 40
 41import asyncio
 42from datetime import datetime
 43
 44from controlpi import BasePlugin, Message, MessageTemplate
 45
 46from typing import List
 47
 48
 49class Log(BasePlugin):
 50    """Log messages on stdout.
 51
 52    The "filter" configuration key gets a list of message templates defining
 53    the messages that should be logged by the plugin instance.
 54
 55    In the following example the first and third message match the given
 56    template and are logged by the instance "Test Log", while the second
 57    message does not match and is only logged by the test, but not by the
 58    Log instance:
 59    >>> import controlpi
 60    >>> asyncio.run(controlpi.test(
 61    ...     {"Test Log": {"plugin": "Log",
 62    ...                   "filter": [{"id": {"const": 42}}]}},
 63    ...     [{"id": 42, "message": "Test Message"},
 64    ...      {"id": 42.42, "message": "Second Message"},
 65    ...      {"id": 42, "message": "Third Message"}]))
 66    ... # doctest: +NORMALIZE_WHITESPACE
 67    test(): {'sender': '', 'event': 'registered',
 68             'client': 'Test Log', 'plugin': 'Log',
 69             'sends': [], 'receives': [{'id': {'const': 42}}]}
 70    test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
 71    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
 72    test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'}
 73    test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
 74    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
 75
 76    The "filter" key is required:
 77    >>> asyncio.run(controlpi.test(
 78    ...     {"Test Log": {"plugin": "Log"}}, []))
 79    data must contain ['filter'] properties
 80    Configuration for 'Test Log' is not valid.
 81
 82    The "filter" key has to contain a list of message templates, i.e.,
 83    JSON objects:
 84    >>> asyncio.run(controlpi.test(
 85    ...     {"Test Log": {"plugin": "Log",
 86    ...                   "filter": [42]}}, []))
 87    data.filter[0] must be object
 88    Configuration for 'Test Log' is not valid.
 89    """
 90
 91    CONF_SCHEMA = {
 92        "properties": {"filter": {"type": "array", "items": {"type": "object"}}},
 93        "required": ["filter"],
 94    }
 95    """Schema for Log plugin configuration.
 96
 97    Required configuration key:
 98
 99    - 'filter': list of message templates to be logged.
100    """
101
102    def process_conf(self) -> None:
103        """Register plugin as bus client."""
104        self.bus.register(self.name, "Log", [], [(self.conf["filter"], self._log)])
105
106    async def _log(self, message: Message) -> None:
107        print(f"{self.name}: {message}")
108
109    async def run(self) -> None:
110        """Run no code proactively."""
111        pass
112
113
114class Init(BasePlugin):
115    """Send list of messages on startup and on demand.
116
117    The "messages" configuration key gets a list of messages to be sent on
118    startup. The same list is sent in reaction to a message with
119    "target": NAME and "command": "execute".
120
121    In the example, the two configured messages are sent twice, once at
122    startup and a second time in reaction to the "execute" command sent by
123    the test:
124    >>> import controlpi
125    >>> asyncio.run(controlpi.test(
126    ...     {"Test Init": {"plugin": "Init",
127    ...                    "messages": [{"id": 42,
128    ...                                  "content": "Test Message"},
129    ...                                 {"id": 42.42,
130    ...                                  "content": "Second Message"}]}},
131    ...     [{"target": "Test Init", "command": "execute"}]))
132    ... # doctest: +NORMALIZE_WHITESPACE
133    test(): {'sender': '', 'event': 'registered',
134             'client': 'Test Init', 'plugin': 'Init',
135             'sends': [{'id': {'const': 42},
136                        'content': {'const': 'Test Message'}},
137                       {'id': {'const': 42.42},
138                        'content': {'const': 'Second Message'}}],
139             'receives': [{'target': {'const': 'Test Init'},
140                           'command': {'const': 'execute'}}]}
141    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
142    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
143    test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'}
144    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
145    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
146
147    The "messages" key is required:
148    >>> asyncio.run(controlpi.test(
149    ...     {"Test Init": {"plugin": "Init"}}, []))
150    data must contain ['messages'] properties
151    Configuration for 'Test Init' is not valid.
152
153    The "messages" key has to contain a list of (partial) messages, i.e.,
154    JSON objects:
155    >>> asyncio.run(controlpi.test(
156    ...     {"Test Init": {"plugin": "Init",
157    ...                    "messages": [42]}}, []))
158    data.messages[0] must be object
159    Configuration for 'Test Init' is not valid.
160    """
161
162    CONF_SCHEMA = {
163        "properties": {"messages": {"type": "array", "items": {"type": "object"}}},
164        "required": ["messages"],
165    }
166    """Schema for Init plugin configuration.
167
168    Required configuration key:
169
170    - 'messages': list of messages to be sent.
171    """
172
173    def process_conf(self) -> None:
174        """Register plugin as bus client."""
175        self.bus.register(
176            self.name,
177            "Init",
178            [
179                MessageTemplate.from_message(message)
180                for message in self.conf["messages"]
181            ],
182            [
183                (
184                    [
185                        MessageTemplate(
186                            {
187                                "target": {"const": self.name},
188                                "command": {"const": "execute"},
189                            }
190                        )
191                    ],
192                    self._execute,
193                )
194            ],
195        )
196
197    async def _execute(self, message: Message) -> None:
198        for message in self.conf["messages"]:
199            await self.bus.send(Message(self.name, message))
200            # Give immediate reactions to messages opportunity to happen:
201            await asyncio.sleep(0)
202
203    async def run(self) -> None:
204        """Send configured messages on startup."""
205        for message in self.conf["messages"]:
206            await self.bus.send(Message(self.name, message))
207
208
209class Execute(BasePlugin):
210    """Send configurable list of messages on demand.
211
212    An Execute plugin instance receives two kinds of commands.
213    The "set messages" command has a "messages" key with a list of (partial)
214    messages, which are sent by the Execute instance in reaction to an
215    "execute" command.
216
217    In the example, the first command sent by the test sets two messages,
218    which are then sent in reaction to the second command sent by the test:
219    >>> import controlpi
220    >>> asyncio.run(controlpi.test(
221    ...     {"Test Execute": {"plugin": "Execute"}},
222    ...     [{"target": "Test Execute", "command": "set messages",
223    ...       "messages": [{"id": 42, "content": "Test Message"},
224    ...                    {"id": 42.42, "content": "Second Message"}]},
225    ...      {"target": "Test Execute", "command": "execute"}]))
226    ... # doctest: +NORMALIZE_WHITESPACE
227    test(): {'sender': '', 'event': 'registered',
228             'client': 'Test Execute', 'plugin': 'Execute',
229             'sends': [{}],
230             'receives': [{'target': {'const': 'Test Execute'},
231                           'command': {'const': 'set messages'},
232                           'messages': {'type': 'array',
233                                        'items': {'type': 'object'}}},
234                          {'target': {'const': 'Test Execute'},
235                           'command': {'const': 'execute'}}]}
236    test(): {'sender': 'test()', 'target': 'Test Execute',
237             'command': 'set messages',
238             'messages': [{'id': 42, 'content': 'Test Message'},
239                          {'id': 42.42, 'content': 'Second Message'}]}
240    test(): {'sender': 'test()', 'target': 'Test Execute',
241             'command': 'execute'}
242    test(): {'sender': 'Test Execute', 'id': 42,
243             'content': 'Test Message'}
244    test(): {'sender': 'Test Execute', 'id': 42.42,
245             'content': 'Second Message'}
246    """
247
248    CONF_SCHEMA = True
249    """Schema for Execute plugin configuration.
250
251    There are no required or optional configuration keys.
252    """
253
254    def process_conf(self) -> None:
255        """Register plugin as bus client."""
256        self.messages: List[Message] = []
257        self.bus.register(
258            self.name,
259            "Execute",
260            [MessageTemplate()],
261            [
262                (
263                    [
264                        MessageTemplate(
265                            {
266                                "target": {"const": self.name},
267                                "command": {"const": "set messages"},
268                                "messages": {
269                                    "type": "array",
270                                    "items": {"type": "object"},
271                                },
272                            }
273                        )
274                    ],
275                    self._set_messages,
276                ),
277                (
278                    [
279                        MessageTemplate(
280                            {
281                                "target": {"const": self.name},
282                                "command": {"const": "execute"},
283                            }
284                        )
285                    ],
286                    self._execute,
287                ),
288            ],
289        )
290
291    async def _set_messages(self, message: Message) -> None:
292        assert isinstance(message["messages"], list)
293        self.messages = list(message["messages"])
294
295    async def _execute(self, message: Message) -> None:
296        for message in self.messages:
297            await self.bus.send(Message(self.name, message))
298            # Give immediate reactions to messages opportunity to happen:
299            await asyncio.sleep(0)
300
301    async def run(self) -> None:
302        """Run no code proactively."""
303        pass
304
305
306class Alias(BasePlugin):
307    """Translate messages to an alias.
308
309    The "from" configuration key gets a message template and the
310    configuration key "to" a (partial) message. The "translate"
311    configuration key contains pairs of message keys, where the "from"
312    message key is translated to the "to" message key if present in the
313    message.
314
315    All messages matching the "from" template are received by the Alias
316    instance and a message translated by adding the keys and values of the
317    "to" message and the translated key-value pairs according to
318    "translate" is sent. Keys that are not "sender" and not modified by
319    "to" or "translate" are retained.
320
321    In the example, the two messages sent by the test are translated by the
322    Alias instance and the translated messages are sent by it preserving
323    the "content" keys:
324    >>> import controlpi
325    >>> asyncio.run(controlpi.test(
326    ...     {"Test Alias": {"plugin": "Alias",
327    ...                     "from": {"id": {"const": 42}},
328    ...                     "to": {"id": "translated"},
329    ...                     "translate": [{'from': "old", "to": "new"}]}},
330    ...     [{"id": 42, "content": "Test Message", "old": "content"},
331    ...      {"id": 42, "content": "Second Message", "old": "content"}]))
332    ... # doctest: +NORMALIZE_WHITESPACE
333    test(): {'sender': '', 'event': 'registered',
334             'client': 'Test Alias', 'plugin': 'Alias',
335             'sends': [{'id': {'const': 'translated'}}],
336             'receives': [{'id': {'const': 42}}]}
337    test(): {'sender': 'test()', 'id': 42,
338             'content': 'Test Message', 'old': 'content'}
339    test(): {'sender': 'test()', 'id': 42,
340             'content': 'Second Message', 'old': 'content'}
341    test(): {'sender': 'Test Alias', 'id': 'translated',
342             'content': 'Test Message', 'old': 'content', 'new': 'content'}
343    test(): {'sender': 'Test Alias', 'id': 'translated',
344             'content': 'Second Message', 'old': 'content', 'new': 'content'}
345
346    An Alias instance can also translate to a list of messages instead of
347    a single message:
348    >>> asyncio.run(controlpi.test(
349    ...     {"Test Alias": {"plugin": "Alias",
350    ...                     "from": {"id": {"const": 42}},
351    ...                     "to": [{"id": "first"}, {"id": "second"}],
352    ...                     "translate": [{'from': "old", "to": "new"}]}},
353    ...     [{"id": 42, "content": "Test Message", "old": "content"}]))
354    ... # doctest: +NORMALIZE_WHITESPACE
355    test(): {'sender': '', 'event': 'registered',
356             'client': 'Test Alias', 'plugin': 'Alias',
357             'sends': [{'id': {'const': 'first'}},
358                       {'id': {'const': 'second'}}],
359             'receives': [{'id': {'const': 42}}]}
360    test(): {'sender': 'test()', 'id': 42,
361             'content': 'Test Message', 'old': 'content'}
362    test(): {'sender': 'Test Alias', 'id': 'first',
363             'content': 'Test Message', 'old': 'content', 'new': 'content'}
364    test(): {'sender': 'Test Alias', 'id': 'second',
365             'content': 'Test Message', 'old': 'content', 'new': 'content'}
366
367    The "from" key is required:
368    >>> asyncio.run(controlpi.test(
369    ...     {"Test Alias": {"plugin": "Alias"}}, []))
370    data must contain ['from'] properties
371    Configuration for 'Test Alias' is not valid.
372
373    The "from" key has to contain a message template and the "to" key a
374    (partial) message, i.e., both have to be JSON objects:
375    >>> asyncio.run(controlpi.test(
376    ...     {"Test Alias": {"plugin": "Alias",
377    ...                     "from": 42,
378    ...                     "to": 42}}, []))
379    data.from must be object
380    Configuration for 'Test Alias' is not valid.
381    >>> asyncio.run(controlpi.test(
382    ...     {"Test Alias": {"plugin": "Alias",
383    ...                     "from": {"id": {"const": 42}},
384    ...                     "to": 42}}, []))
385    data.to cannot be validated by any definition
386    Configuration for 'Test Alias' is not valid.
387    """
388
389    CONF_SCHEMA = {
390        "properties": {
391            "from": {"type": "object"},
392            "to": {
393                "anyOf": [
394                    {"type": "object"},
395                    {"type": "array", "items": {"type": "object"}},
396                ]
397            },
398            "translate": {
399                "type": "array",
400                "items": {
401                    "type": "object",
402                    "properties": {
403                        "from": {"type": "string"},
404                        "to": {"type": "string"},
405                    },
406                },
407            },
408        },
409        "required": ["from"],
410    }
411    """Schema for Alias plugin configuration.
412
413    Required configuration keys:
414
415    - 'from': template of messages to be translated.
416
417    Optional configuration keys:
418
419    - 'to': translated message(s) to be sent.
420    - 'translate': array of pairs of keys to be translated.
421    """
422
423    def process_conf(self) -> None:
424        """Register plugin as bus client."""
425        sends = []
426        self._to = []
427        if "to" in self.conf:
428            if isinstance(self.conf["to"], list):
429                self._to = self.conf["to"]
430                for to in self.conf["to"]:
431                    sends.append(MessageTemplate.from_message(to))
432            else:
433                self._to = [self.conf["to"]]
434                sends.append(MessageTemplate.from_message(self.conf["to"]))
435        self._translate = {}
436        if "translate" in self.conf:
437            for pair in self.conf["translate"]:
438                self._translate[pair["from"]] = pair["to"]
439        self.bus.register(
440            self.name, "Alias", sends, [([self.conf["from"]], self._alias)]
441        )
442
443    async def _alias(self, message: Message) -> None:
444        # Prevent endless loop:
445        if message["sender"] != self.name:
446            for to in self._to:
447                alias_message = Message(self.name, message)
448                alias_message.update(to)
449                for key in self._translate:
450                    if key in message:
451                        alias_message[self._translate[key]] = message[key]
452                await self.bus.send(alias_message)
453
454    async def run(self) -> None:
455        """Run no code proactively."""
456        pass
457
458
459class Counter(BasePlugin):
460    """Count messages confirming to a given template.
461
462    The plugin counts messages confirming to the given template. The
463    counter can be queried and reset by commands. The 'reset' command also
464    queries the last count before the reset:
465    >>> import controlpi
466    >>> asyncio.run(controlpi.test(
467    ...     {"Test Counter": {"plugin": "Counter",
468    ...                       "count": {"id": {"const": 42}}}},
469    ...     [{"target": "Test Counter", "command": "get count"},
470    ...      {"id": 42}, {"id": 42}, {"id": 49},
471    ...      {"target": "Test Counter", "command": "get count"},
472    ...      {"id": 42}, {"id": 42}, {"id": 42},
473    ...      {"target": "Test Counter", "command": "reset"},
474    ...      {"target": "Test Counter", "command": "get count"},
475    ...      {"id": 42}, {"id": 42}, {"id": 42},
476    ...      {"target": "Test Counter", "command": "get count"}]))
477    ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
478    test(): {'sender': '', 'event': 'registered',
479             'client': 'Test Counter', 'plugin': 'Counter',
480             'sends': [{'count': {'type': 'integer'}}],
481             'receives': [{'id': {'const': 42}},
482                          {'target': {'const': 'Test Counter'},
483                           'command': {'const': 'get count'}},
484                          {'target': {'const': 'Test Counter'},
485                           'command': {'const': 'reset'}}]}
486    test(): {'sender': 'test()', 'target': 'Test Counter',
487             'command': 'get count'}
488    test(): {'sender': 'test()', 'id': 42}
489    test(): {'sender': 'Test Counter', 'count': 0,
490             'since': ..., 'until': ...}
491    test(): {'sender': 'test()', 'id': 42}
492    test(): {'sender': 'test()', 'id': 49}
493    test(): {'sender': 'test()', 'target': 'Test Counter',
494             'command': 'get count'}
495    test(): {'sender': 'test()', 'id': 42}
496    test(): {'sender': 'Test Counter', 'count': 2,
497             'since': ..., 'until': ...}
498    test(): {'sender': 'test()', 'id': 42}
499    test(): {'sender': 'test()', 'id': 42}
500    test(): {'sender': 'test()', 'target': 'Test Counter',
501             'command': 'reset'}
502    test(): {'sender': 'test()', 'target': 'Test Counter',
503             'command': 'get count'}
504    test(): {'sender': 'Test Counter', 'count': 5,
505             'since': ..., 'until': ...}
506    test(): {'sender': 'test()', 'id': 42}
507    test(): {'sender': 'Test Counter', 'count': 0,
508             'since': ..., 'until': ...}
509    test(): {'sender': 'test()', 'id': 42}
510    test(): {'sender': 'test()', 'id': 42}
511    test(): {'sender': 'test()', 'target': 'Test Counter',
512             'command': 'get count'}
513    test(): {'sender': 'Test Counter', 'count': 3,
514             'since': ..., 'until': ...}
515    """
516
517    CONF_SCHEMA = {
518        "properties": {
519            "count": {"type": "object"},
520            "date format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"},
521        },
522        "required": ["count"],
523    }
524    """Schema for Counter plugin configuration.
525
526    Required configuration key:
527
528    - 'count': template of messages to be counted.
529    """
530
531    def process_conf(self) -> None:
532        """Register plugin as bus client."""
533        self._since = datetime.now().strftime(self.conf["date format"])
534        self._counter = 0
535        self.bus.register(
536            self.name,
537            "Counter",
538            [MessageTemplate({"count": {"type": "integer"}})],
539            [
540                ([MessageTemplate(self.conf["count"])], self._count),
541                (
542                    [
543                        MessageTemplate(
544                            {
545                                "target": {"const": self.name},
546                                "command": {"const": "get count"},
547                            }
548                        )
549                    ],
550                    self._get_count,
551                ),
552                (
553                    [
554                        MessageTemplate(
555                            {
556                                "target": {"const": self.name},
557                                "command": {"const": "reset"},
558                            }
559                        )
560                    ],
561                    self._reset,
562                ),
563            ],
564        )
565
566    async def _count(self, message: Message) -> None:
567        self._counter += 1
568
569    async def _get_count(self, message: Message) -> None:
570        now = datetime.now().strftime(self.conf["date format"])
571        await self.bus.send(
572            Message(
573                self.name, {"count": self._counter, "since": self._since, "until": now}
574            )
575        )
576
577    async def _reset(self, message: Message) -> None:
578        now = datetime.now().strftime(self.conf["date format"])
579        counter = self._counter
580        self._counter = 0
581        await self.bus.send(
582            Message(self.name, {"count": counter, "since": self._since, "until": now})
583        )
584        self._since = now
585
586    async def run(self) -> None:
587        """Run no code proactively."""
588        pass
589
590
591class Date(BasePlugin):
592    """Send message with current date.
593
594    The plugin reacts to 'get date' commands by sending messages with
595    a 'date' key:
596    >>> import controlpi
597    >>> asyncio.run(controlpi.test(
598    ...     {"Test Date": {"plugin": "Date"}},
599    ...     [{"target": "Test Date", "command": "get date"}]))
600    ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
601    test(): {'sender': '', 'event': 'registered',
602             'client': 'Test Date', 'plugin': 'Date',
603             'sends': [{'date': {'type': 'string'}}],
604             'receives': [{'target': {'const': 'Test Date'},
605                           'command': {'const': 'get date'}}]}
606    test(): {'sender': 'test()', 'target': 'Test Date',
607             'command': 'get date'}
608    test(): {'sender': 'Test Date', 'date': ...}
609
610    The format of the date can be configured with the 'format'
611    configuration key:
612    >>> asyncio.run(controlpi.test(
613    ...     {"Test Date": {"plugin": "Date",
614    ...                    "format": "%Y%m%d%H%M%S%f"}},
615    ...     [{"target": "Test Date", "command": "get date"}]))
616    ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
617    test(): {'sender': '', 'event': 'registered',
618             'client': 'Test Date', 'plugin': 'Date',
619             'sends': [{'date': {'type': 'string'}}],
620             'receives': [{'target': {'const': 'Test Date'},
621                           'command': {'const': 'get date'}}]}
622    test(): {'sender': 'test()', 'target': 'Test Date',
623             'command': 'get date'}
624    test(): {'sender': 'Test Date', 'date': ...}
625    """
626
627    CONF_SCHEMA = {
628        "properties": {"format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"}}
629    }
630    """Schema for Date plugin configuration.
631
632    Optional configuration key:
633
634    - 'format': format for the sent datetime string.
635                Default: '%Y-%m-%d %H:%M:%S'
636    """
637
638    def process_conf(self) -> None:
639        """Register plugin as bus client."""
640        self.bus.register(
641            self.name,
642            "Date",
643            [MessageTemplate({"date": {"type": "string"}})],
644            [
645                (
646                    [
647                        MessageTemplate(
648                            {
649                                "target": {"const": self.name},
650                                "command": {"const": "get date"},
651                            }
652                        )
653                    ],
654                    self._date,
655                )
656            ],
657        )
658
659    async def _date(self, message: Message) -> None:
660        date = datetime.now().strftime(self.conf["format"])
661        await self.bus.send(Message(self.name, {"date": date}))
662
663    async def run(self) -> None:
664        """Run no code proactively."""
665        pass
class Log(controlpi.baseplugin.BasePlugin):
 50class Log(BasePlugin):
 51    """Log messages on stdout.
 52
 53    The "filter" configuration key gets a list of message templates defining
 54    the messages that should be logged by the plugin instance.
 55
 56    In the following example the first and third message match the given
 57    template and are logged by the instance "Test Log", while the second
 58    message does not match and is only logged by the test, but not by the
 59    Log instance:
 60    >>> import controlpi
 61    >>> asyncio.run(controlpi.test(
 62    ...     {"Test Log": {"plugin": "Log",
 63    ...                   "filter": [{"id": {"const": 42}}]}},
 64    ...     [{"id": 42, "message": "Test Message"},
 65    ...      {"id": 42.42, "message": "Second Message"},
 66    ...      {"id": 42, "message": "Third Message"}]))
 67    ... # doctest: +NORMALIZE_WHITESPACE
 68    test(): {'sender': '', 'event': 'registered',
 69             'client': 'Test Log', 'plugin': 'Log',
 70             'sends': [], 'receives': [{'id': {'const': 42}}]}
 71    test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
 72    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
 73    test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'}
 74    test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
 75    Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
 76
 77    The "filter" key is required:
 78    >>> asyncio.run(controlpi.test(
 79    ...     {"Test Log": {"plugin": "Log"}}, []))
 80    data must contain ['filter'] properties
 81    Configuration for 'Test Log' is not valid.
 82
 83    The "filter" key has to contain a list of message templates, i.e.,
 84    JSON objects:
 85    >>> asyncio.run(controlpi.test(
 86    ...     {"Test Log": {"plugin": "Log",
 87    ...                   "filter": [42]}}, []))
 88    data.filter[0] must be object
 89    Configuration for 'Test Log' is not valid.
 90    """
 91
 92    CONF_SCHEMA = {
 93        "properties": {"filter": {"type": "array", "items": {"type": "object"}}},
 94        "required": ["filter"],
 95    }
 96    """Schema for Log plugin configuration.
 97
 98    Required configuration key:
 99
100    - 'filter': list of message templates to be logged.
101    """
102
103    def process_conf(self) -> None:
104        """Register plugin as bus client."""
105        self.bus.register(self.name, "Log", [], [(self.conf["filter"], self._log)])
106
107    async def _log(self, message: Message) -> None:
108        print(f"{self.name}: {message}")
109
110    async def run(self) -> None:
111        """Run no code proactively."""
112        pass

Log messages on stdout.

The "filter" configuration key gets a list of message templates defining the messages that should be logged by the plugin instance.

In the following example the first and third message match the given template and are logged by the instance "Test Log", while the second message does not match and is only logged by the test, but not by the Log instance:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log",
...                   "filter": [{"id": {"const": 42}}]}},
...     [{"id": 42, "message": "Test Message"},
...      {"id": 42.42, "message": "Second Message"},
...      {"id": 42, "message": "Third Message"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Log', 'plugin': 'Log',
         'sends': [], 'receives': [{'id': {'const': 42}}]}
test(): {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
Test Log: {'sender': 'test()', 'id': 42, 'message': 'Test Message'}
test(): {'sender': 'test()', 'id': 42.42, 'message': 'Second Message'}
test(): {'sender': 'test()', 'id': 42, 'message': 'Third Message'}
Test Log: {'sender': 'test()', 'id': 42, 'message': 'Third Message'}

The "filter" key is required:

>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log"}}, []))
data must contain ['filter'] properties
Configuration for 'Test Log' is not valid.

The "filter" key has to contain a list of message templates, i.e., JSON objects:

>>> asyncio.run(controlpi.test(
...     {"Test Log": {"plugin": "Log",
...                   "filter": [42]}}, []))
data.filter[0] must be object
Configuration for 'Test Log' is not valid.
CONF_SCHEMA = {'properties': {'filter': {'type': 'array', 'items': {'type': 'object'}}}, 'required': ['filter']}

Schema for Log plugin configuration.

Required configuration key:

  • 'filter': list of message templates to be logged.
def process_conf(self) -> None:
103    def process_conf(self) -> None:
104        """Register plugin as bus client."""
105        self.bus.register(self.name, "Log", [], [(self.conf["filter"], self._log)])

Register plugin as bus client.

async def run(self) -> None:
110    async def run(self) -> None:
111        """Run no code proactively."""
112        pass

Run no code proactively.

class Init(controlpi.baseplugin.BasePlugin):
115class Init(BasePlugin):
116    """Send list of messages on startup and on demand.
117
118    The "messages" configuration key gets a list of messages to be sent on
119    startup. The same list is sent in reaction to a message with
120    "target": NAME and "command": "execute".
121
122    In the example, the two configured messages are sent twice, once at
123    startup and a second time in reaction to the "execute" command sent by
124    the test:
125    >>> import controlpi
126    >>> asyncio.run(controlpi.test(
127    ...     {"Test Init": {"plugin": "Init",
128    ...                    "messages": [{"id": 42,
129    ...                                  "content": "Test Message"},
130    ...                                 {"id": 42.42,
131    ...                                  "content": "Second Message"}]}},
132    ...     [{"target": "Test Init", "command": "execute"}]))
133    ... # doctest: +NORMALIZE_WHITESPACE
134    test(): {'sender': '', 'event': 'registered',
135             'client': 'Test Init', 'plugin': 'Init',
136             'sends': [{'id': {'const': 42},
137                        'content': {'const': 'Test Message'}},
138                       {'id': {'const': 42.42},
139                        'content': {'const': 'Second Message'}}],
140             'receives': [{'target': {'const': 'Test Init'},
141                           'command': {'const': 'execute'}}]}
142    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
143    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
144    test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'}
145    test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
146    test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
147
148    The "messages" key is required:
149    >>> asyncio.run(controlpi.test(
150    ...     {"Test Init": {"plugin": "Init"}}, []))
151    data must contain ['messages'] properties
152    Configuration for 'Test Init' is not valid.
153
154    The "messages" key has to contain a list of (partial) messages, i.e.,
155    JSON objects:
156    >>> asyncio.run(controlpi.test(
157    ...     {"Test Init": {"plugin": "Init",
158    ...                    "messages": [42]}}, []))
159    data.messages[0] must be object
160    Configuration for 'Test Init' is not valid.
161    """
162
163    CONF_SCHEMA = {
164        "properties": {"messages": {"type": "array", "items": {"type": "object"}}},
165        "required": ["messages"],
166    }
167    """Schema for Init plugin configuration.
168
169    Required configuration key:
170
171    - 'messages': list of messages to be sent.
172    """
173
174    def process_conf(self) -> None:
175        """Register plugin as bus client."""
176        self.bus.register(
177            self.name,
178            "Init",
179            [
180                MessageTemplate.from_message(message)
181                for message in self.conf["messages"]
182            ],
183            [
184                (
185                    [
186                        MessageTemplate(
187                            {
188                                "target": {"const": self.name},
189                                "command": {"const": "execute"},
190                            }
191                        )
192                    ],
193                    self._execute,
194                )
195            ],
196        )
197
198    async def _execute(self, message: Message) -> None:
199        for message in self.conf["messages"]:
200            await self.bus.send(Message(self.name, message))
201            # Give immediate reactions to messages opportunity to happen:
202            await asyncio.sleep(0)
203
204    async def run(self) -> None:
205        """Send configured messages on startup."""
206        for message in self.conf["messages"]:
207            await self.bus.send(Message(self.name, message))

Send list of messages on startup and on demand.

The "messages" configuration key gets a list of messages to be sent on startup. The same list is sent in reaction to a message with "target": NAME and "command": "execute".

In the example, the two configured messages are sent twice, once at startup and a second time in reaction to the "execute" command sent by the test:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Init": {"plugin": "Init",
...                    "messages": [{"id": 42,
...                                  "content": "Test Message"},
...                                 {"id": 42.42,
...                                  "content": "Second Message"}]}},
...     [{"target": "Test Init", "command": "execute"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Init', 'plugin': 'Init',
         'sends': [{'id': {'const': 42},
                    'content': {'const': 'Test Message'}},
                   {'id': {'const': 42.42},
                    'content': {'const': 'Second Message'}}],
         'receives': [{'target': {'const': 'Test Init'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}
test(): {'sender': 'test()', 'target': 'Test Init', 'command': 'execute'}
test(): {'sender': 'Test Init', 'id': 42, 'content': 'Test Message'}
test(): {'sender': 'Test Init', 'id': 42.42, 'content': 'Second Message'}

The "messages" key is required:

>>> asyncio.run(controlpi.test(
...     {"Test Init": {"plugin": "Init"}}, []))
data must contain ['messages'] properties
Configuration for 'Test Init' is not valid.

The "messages" key has to contain a list of (partial) messages, i.e., JSON objects:

>>> asyncio.run(controlpi.test(
...     {"Test Init": {"plugin": "Init",
...                    "messages": [42]}}, []))
data.messages[0] must be object
Configuration for 'Test Init' is not valid.
CONF_SCHEMA = {'properties': {'messages': {'type': 'array', 'items': {'type': 'object'}}}, 'required': ['messages']}

Schema for Init plugin configuration.

Required configuration key:

  • 'messages': list of messages to be sent.
def process_conf(self) -> None:
174    def process_conf(self) -> None:
175        """Register plugin as bus client."""
176        self.bus.register(
177            self.name,
178            "Init",
179            [
180                MessageTemplate.from_message(message)
181                for message in self.conf["messages"]
182            ],
183            [
184                (
185                    [
186                        MessageTemplate(
187                            {
188                                "target": {"const": self.name},
189                                "command": {"const": "execute"},
190                            }
191                        )
192                    ],
193                    self._execute,
194                )
195            ],
196        )

Register plugin as bus client.

async def run(self) -> None:
204    async def run(self) -> None:
205        """Send configured messages on startup."""
206        for message in self.conf["messages"]:
207            await self.bus.send(Message(self.name, message))

Send configured messages on startup.

class Execute(controlpi.baseplugin.BasePlugin):
210class Execute(BasePlugin):
211    """Send configurable list of messages on demand.
212
213    An Execute plugin instance receives two kinds of commands.
214    The "set messages" command has a "messages" key with a list of (partial)
215    messages, which are sent by the Execute instance in reaction to an
216    "execute" command.
217
218    In the example, the first command sent by the test sets two messages,
219    which are then sent in reaction to the second command sent by the test:
220    >>> import controlpi
221    >>> asyncio.run(controlpi.test(
222    ...     {"Test Execute": {"plugin": "Execute"}},
223    ...     [{"target": "Test Execute", "command": "set messages",
224    ...       "messages": [{"id": 42, "content": "Test Message"},
225    ...                    {"id": 42.42, "content": "Second Message"}]},
226    ...      {"target": "Test Execute", "command": "execute"}]))
227    ... # doctest: +NORMALIZE_WHITESPACE
228    test(): {'sender': '', 'event': 'registered',
229             'client': 'Test Execute', 'plugin': 'Execute',
230             'sends': [{}],
231             'receives': [{'target': {'const': 'Test Execute'},
232                           'command': {'const': 'set messages'},
233                           'messages': {'type': 'array',
234                                        'items': {'type': 'object'}}},
235                          {'target': {'const': 'Test Execute'},
236                           'command': {'const': 'execute'}}]}
237    test(): {'sender': 'test()', 'target': 'Test Execute',
238             'command': 'set messages',
239             'messages': [{'id': 42, 'content': 'Test Message'},
240                          {'id': 42.42, 'content': 'Second Message'}]}
241    test(): {'sender': 'test()', 'target': 'Test Execute',
242             'command': 'execute'}
243    test(): {'sender': 'Test Execute', 'id': 42,
244             'content': 'Test Message'}
245    test(): {'sender': 'Test Execute', 'id': 42.42,
246             'content': 'Second Message'}
247    """
248
249    CONF_SCHEMA = True
250    """Schema for Execute plugin configuration.
251
252    There are no required or optional configuration keys.
253    """
254
255    def process_conf(self) -> None:
256        """Register plugin as bus client."""
257        self.messages: List[Message] = []
258        self.bus.register(
259            self.name,
260            "Execute",
261            [MessageTemplate()],
262            [
263                (
264                    [
265                        MessageTemplate(
266                            {
267                                "target": {"const": self.name},
268                                "command": {"const": "set messages"},
269                                "messages": {
270                                    "type": "array",
271                                    "items": {"type": "object"},
272                                },
273                            }
274                        )
275                    ],
276                    self._set_messages,
277                ),
278                (
279                    [
280                        MessageTemplate(
281                            {
282                                "target": {"const": self.name},
283                                "command": {"const": "execute"},
284                            }
285                        )
286                    ],
287                    self._execute,
288                ),
289            ],
290        )
291
292    async def _set_messages(self, message: Message) -> None:
293        assert isinstance(message["messages"], list)
294        self.messages = list(message["messages"])
295
296    async def _execute(self, message: Message) -> None:
297        for message in self.messages:
298            await self.bus.send(Message(self.name, message))
299            # Give immediate reactions to messages opportunity to happen:
300            await asyncio.sleep(0)
301
302    async def run(self) -> None:
303        """Run no code proactively."""
304        pass

Send configurable list of messages on demand.

An Execute plugin instance receives two kinds of commands. The "set messages" command has a "messages" key with a list of (partial) messages, which are sent by the Execute instance in reaction to an "execute" command.

In the example, the first command sent by the test sets two messages, which are then sent in reaction to the second command sent by the test:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Execute": {"plugin": "Execute"}},
...     [{"target": "Test Execute", "command": "set messages",
...       "messages": [{"id": 42, "content": "Test Message"},
...                    {"id": 42.42, "content": "Second Message"}]},
...      {"target": "Test Execute", "command": "execute"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Execute', 'plugin': 'Execute',
         'sends': [{}],
         'receives': [{'target': {'const': 'Test Execute'},
                       'command': {'const': 'set messages'},
                       'messages': {'type': 'array',
                                    'items': {'type': 'object'}}},
                      {'target': {'const': 'Test Execute'},
                       'command': {'const': 'execute'}}]}
test(): {'sender': 'test()', 'target': 'Test Execute',
         'command': 'set messages',
         'messages': [{'id': 42, 'content': 'Test Message'},
                      {'id': 42.42, 'content': 'Second Message'}]}
test(): {'sender': 'test()', 'target': 'Test Execute',
         'command': 'execute'}
test(): {'sender': 'Test Execute', 'id': 42,
         'content': 'Test Message'}
test(): {'sender': 'Test Execute', 'id': 42.42,
         'content': 'Second Message'}
CONF_SCHEMA = True

Schema for Execute plugin configuration.

There are no required or optional configuration keys.

def process_conf(self) -> None:
255    def process_conf(self) -> None:
256        """Register plugin as bus client."""
257        self.messages: List[Message] = []
258        self.bus.register(
259            self.name,
260            "Execute",
261            [MessageTemplate()],
262            [
263                (
264                    [
265                        MessageTemplate(
266                            {
267                                "target": {"const": self.name},
268                                "command": {"const": "set messages"},
269                                "messages": {
270                                    "type": "array",
271                                    "items": {"type": "object"},
272                                },
273                            }
274                        )
275                    ],
276                    self._set_messages,
277                ),
278                (
279                    [
280                        MessageTemplate(
281                            {
282                                "target": {"const": self.name},
283                                "command": {"const": "execute"},
284                            }
285                        )
286                    ],
287                    self._execute,
288                ),
289            ],
290        )

Register plugin as bus client.

async def run(self) -> None:
302    async def run(self) -> None:
303        """Run no code proactively."""
304        pass

Run no code proactively.

class Alias(controlpi.baseplugin.BasePlugin):
307class Alias(BasePlugin):
308    """Translate messages to an alias.
309
310    The "from" configuration key gets a message template and the
311    configuration key "to" a (partial) message. The "translate"
312    configuration key contains pairs of message keys, where the "from"
313    message key is translated to the "to" message key if present in the
314    message.
315
316    All messages matching the "from" template are received by the Alias
317    instance and a message translated by adding the keys and values of the
318    "to" message and the translated key-value pairs according to
319    "translate" is sent. Keys that are not "sender" and not modified by
320    "to" or "translate" are retained.
321
322    In the example, the two messages sent by the test are translated by the
323    Alias instance and the translated messages are sent by it preserving
324    the "content" keys:
325    >>> import controlpi
326    >>> asyncio.run(controlpi.test(
327    ...     {"Test Alias": {"plugin": "Alias",
328    ...                     "from": {"id": {"const": 42}},
329    ...                     "to": {"id": "translated"},
330    ...                     "translate": [{'from': "old", "to": "new"}]}},
331    ...     [{"id": 42, "content": "Test Message", "old": "content"},
332    ...      {"id": 42, "content": "Second Message", "old": "content"}]))
333    ... # doctest: +NORMALIZE_WHITESPACE
334    test(): {'sender': '', 'event': 'registered',
335             'client': 'Test Alias', 'plugin': 'Alias',
336             'sends': [{'id': {'const': 'translated'}}],
337             'receives': [{'id': {'const': 42}}]}
338    test(): {'sender': 'test()', 'id': 42,
339             'content': 'Test Message', 'old': 'content'}
340    test(): {'sender': 'test()', 'id': 42,
341             'content': 'Second Message', 'old': 'content'}
342    test(): {'sender': 'Test Alias', 'id': 'translated',
343             'content': 'Test Message', 'old': 'content', 'new': 'content'}
344    test(): {'sender': 'Test Alias', 'id': 'translated',
345             'content': 'Second Message', 'old': 'content', 'new': 'content'}
346
347    An Alias instance can also translate to a list of messages instead of
348    a single message:
349    >>> asyncio.run(controlpi.test(
350    ...     {"Test Alias": {"plugin": "Alias",
351    ...                     "from": {"id": {"const": 42}},
352    ...                     "to": [{"id": "first"}, {"id": "second"}],
353    ...                     "translate": [{'from': "old", "to": "new"}]}},
354    ...     [{"id": 42, "content": "Test Message", "old": "content"}]))
355    ... # doctest: +NORMALIZE_WHITESPACE
356    test(): {'sender': '', 'event': 'registered',
357             'client': 'Test Alias', 'plugin': 'Alias',
358             'sends': [{'id': {'const': 'first'}},
359                       {'id': {'const': 'second'}}],
360             'receives': [{'id': {'const': 42}}]}
361    test(): {'sender': 'test()', 'id': 42,
362             'content': 'Test Message', 'old': 'content'}
363    test(): {'sender': 'Test Alias', 'id': 'first',
364             'content': 'Test Message', 'old': 'content', 'new': 'content'}
365    test(): {'sender': 'Test Alias', 'id': 'second',
366             'content': 'Test Message', 'old': 'content', 'new': 'content'}
367
368    The "from" key is required:
369    >>> asyncio.run(controlpi.test(
370    ...     {"Test Alias": {"plugin": "Alias"}}, []))
371    data must contain ['from'] properties
372    Configuration for 'Test Alias' is not valid.
373
374    The "from" key has to contain a message template and the "to" key a
375    (partial) message, i.e., both have to be JSON objects:
376    >>> asyncio.run(controlpi.test(
377    ...     {"Test Alias": {"plugin": "Alias",
378    ...                     "from": 42,
379    ...                     "to": 42}}, []))
380    data.from must be object
381    Configuration for 'Test Alias' is not valid.
382    >>> asyncio.run(controlpi.test(
383    ...     {"Test Alias": {"plugin": "Alias",
384    ...                     "from": {"id": {"const": 42}},
385    ...                     "to": 42}}, []))
386    data.to cannot be validated by any definition
387    Configuration for 'Test Alias' is not valid.
388    """
389
390    CONF_SCHEMA = {
391        "properties": {
392            "from": {"type": "object"},
393            "to": {
394                "anyOf": [
395                    {"type": "object"},
396                    {"type": "array", "items": {"type": "object"}},
397                ]
398            },
399            "translate": {
400                "type": "array",
401                "items": {
402                    "type": "object",
403                    "properties": {
404                        "from": {"type": "string"},
405                        "to": {"type": "string"},
406                    },
407                },
408            },
409        },
410        "required": ["from"],
411    }
412    """Schema for Alias plugin configuration.
413
414    Required configuration keys:
415
416    - 'from': template of messages to be translated.
417
418    Optional configuration keys:
419
420    - 'to': translated message(s) to be sent.
421    - 'translate': array of pairs of keys to be translated.
422    """
423
424    def process_conf(self) -> None:
425        """Register plugin as bus client."""
426        sends = []
427        self._to = []
428        if "to" in self.conf:
429            if isinstance(self.conf["to"], list):
430                self._to = self.conf["to"]
431                for to in self.conf["to"]:
432                    sends.append(MessageTemplate.from_message(to))
433            else:
434                self._to = [self.conf["to"]]
435                sends.append(MessageTemplate.from_message(self.conf["to"]))
436        self._translate = {}
437        if "translate" in self.conf:
438            for pair in self.conf["translate"]:
439                self._translate[pair["from"]] = pair["to"]
440        self.bus.register(
441            self.name, "Alias", sends, [([self.conf["from"]], self._alias)]
442        )
443
444    async def _alias(self, message: Message) -> None:
445        # Prevent endless loop:
446        if message["sender"] != self.name:
447            for to in self._to:
448                alias_message = Message(self.name, message)
449                alias_message.update(to)
450                for key in self._translate:
451                    if key in message:
452                        alias_message[self._translate[key]] = message[key]
453                await self.bus.send(alias_message)
454
455    async def run(self) -> None:
456        """Run no code proactively."""
457        pass

Translate messages to an alias.

The "from" configuration key gets a message template and the configuration key "to" a (partial) message. The "translate" configuration key contains pairs of message keys, where the "from" message key is translated to the "to" message key if present in the message.

All messages matching the "from" template are received by the Alias instance and a message translated by adding the keys and values of the "to" message and the translated key-value pairs according to "translate" is sent. Keys that are not "sender" and not modified by "to" or "translate" are retained.

In the example, the two messages sent by the test are translated by the Alias instance and the translated messages are sent by it preserving the "content" keys:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias",
...                     "from": {"id": {"const": 42}},
...                     "to": {"id": "translated"},
...                     "translate": [{'from': "old", "to": "new"}]}},
...     [{"id": 42, "content": "Test Message", "old": "content"},
...      {"id": 42, "content": "Second Message", "old": "content"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Alias', 'plugin': 'Alias',
         'sends': [{'id': {'const': 'translated'}}],
         'receives': [{'id': {'const': 42}}]}
test(): {'sender': 'test()', 'id': 42,
         'content': 'Test Message', 'old': 'content'}
test(): {'sender': 'test()', 'id': 42,
         'content': 'Second Message', 'old': 'content'}
test(): {'sender': 'Test Alias', 'id': 'translated',
         'content': 'Test Message', 'old': 'content', 'new': 'content'}
test(): {'sender': 'Test Alias', 'id': 'translated',
         'content': 'Second Message', 'old': 'content', 'new': 'content'}

An Alias instance can also translate to a list of messages instead of a single message:

>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias",
...                     "from": {"id": {"const": 42}},
...                     "to": [{"id": "first"}, {"id": "second"}],
...                     "translate": [{'from': "old", "to": "new"}]}},
...     [{"id": 42, "content": "Test Message", "old": "content"}]))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Alias', 'plugin': 'Alias',
         'sends': [{'id': {'const': 'first'}},
                   {'id': {'const': 'second'}}],
         'receives': [{'id': {'const': 42}}]}
test(): {'sender': 'test()', 'id': 42,
         'content': 'Test Message', 'old': 'content'}
test(): {'sender': 'Test Alias', 'id': 'first',
         'content': 'Test Message', 'old': 'content', 'new': 'content'}
test(): {'sender': 'Test Alias', 'id': 'second',
         'content': 'Test Message', 'old': 'content', 'new': 'content'}

The "from" key is required:

>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias"}}, []))
data must contain ['from'] properties
Configuration for 'Test Alias' is not valid.

The "from" key has to contain a message template and the "to" key a (partial) message, i.e., both have to be JSON objects:

>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias",
...                     "from": 42,
...                     "to": 42}}, []))
data.from must be object
Configuration for 'Test Alias' is not valid.
>>> asyncio.run(controlpi.test(
...     {"Test Alias": {"plugin": "Alias",
...                     "from": {"id": {"const": 42}},
...                     "to": 42}}, []))
data.to cannot be validated by any definition
Configuration for 'Test Alias' is not valid.
CONF_SCHEMA = {'properties': {'from': {'type': 'object'}, 'to': {'anyOf': [{'type': 'object'}, {'type': 'array', 'items': {'type': 'object'}}]}, 'translate': {'type': 'array', 'items': {'type': 'object', 'properties': {'from': {'type': 'string'}, 'to': {'type': 'string'}}}}}, 'required': ['from']}

Schema for Alias plugin configuration.

Required configuration keys:

  • 'from': template of messages to be translated.

Optional configuration keys:

  • 'to': translated message(s) to be sent.
  • 'translate': array of pairs of keys to be translated.
def process_conf(self) -> None:
424    def process_conf(self) -> None:
425        """Register plugin as bus client."""
426        sends = []
427        self._to = []
428        if "to" in self.conf:
429            if isinstance(self.conf["to"], list):
430                self._to = self.conf["to"]
431                for to in self.conf["to"]:
432                    sends.append(MessageTemplate.from_message(to))
433            else:
434                self._to = [self.conf["to"]]
435                sends.append(MessageTemplate.from_message(self.conf["to"]))
436        self._translate = {}
437        if "translate" in self.conf:
438            for pair in self.conf["translate"]:
439                self._translate[pair["from"]] = pair["to"]
440        self.bus.register(
441            self.name, "Alias", sends, [([self.conf["from"]], self._alias)]
442        )

Register plugin as bus client.

async def run(self) -> None:
455    async def run(self) -> None:
456        """Run no code proactively."""
457        pass

Run no code proactively.

class Counter(controlpi.baseplugin.BasePlugin):
460class Counter(BasePlugin):
461    """Count messages confirming to a given template.
462
463    The plugin counts messages confirming to the given template. The
464    counter can be queried and reset by commands. The 'reset' command also
465    queries the last count before the reset:
466    >>> import controlpi
467    >>> asyncio.run(controlpi.test(
468    ...     {"Test Counter": {"plugin": "Counter",
469    ...                       "count": {"id": {"const": 42}}}},
470    ...     [{"target": "Test Counter", "command": "get count"},
471    ...      {"id": 42}, {"id": 42}, {"id": 49},
472    ...      {"target": "Test Counter", "command": "get count"},
473    ...      {"id": 42}, {"id": 42}, {"id": 42},
474    ...      {"target": "Test Counter", "command": "reset"},
475    ...      {"target": "Test Counter", "command": "get count"},
476    ...      {"id": 42}, {"id": 42}, {"id": 42},
477    ...      {"target": "Test Counter", "command": "get count"}]))
478    ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
479    test(): {'sender': '', 'event': 'registered',
480             'client': 'Test Counter', 'plugin': 'Counter',
481             'sends': [{'count': {'type': 'integer'}}],
482             'receives': [{'id': {'const': 42}},
483                          {'target': {'const': 'Test Counter'},
484                           'command': {'const': 'get count'}},
485                          {'target': {'const': 'Test Counter'},
486                           'command': {'const': 'reset'}}]}
487    test(): {'sender': 'test()', 'target': 'Test Counter',
488             'command': 'get count'}
489    test(): {'sender': 'test()', 'id': 42}
490    test(): {'sender': 'Test Counter', 'count': 0,
491             'since': ..., 'until': ...}
492    test(): {'sender': 'test()', 'id': 42}
493    test(): {'sender': 'test()', 'id': 49}
494    test(): {'sender': 'test()', 'target': 'Test Counter',
495             'command': 'get count'}
496    test(): {'sender': 'test()', 'id': 42}
497    test(): {'sender': 'Test Counter', 'count': 2,
498             'since': ..., 'until': ...}
499    test(): {'sender': 'test()', 'id': 42}
500    test(): {'sender': 'test()', 'id': 42}
501    test(): {'sender': 'test()', 'target': 'Test Counter',
502             'command': 'reset'}
503    test(): {'sender': 'test()', 'target': 'Test Counter',
504             'command': 'get count'}
505    test(): {'sender': 'Test Counter', 'count': 5,
506             'since': ..., 'until': ...}
507    test(): {'sender': 'test()', 'id': 42}
508    test(): {'sender': 'Test Counter', 'count': 0,
509             'since': ..., 'until': ...}
510    test(): {'sender': 'test()', 'id': 42}
511    test(): {'sender': 'test()', 'id': 42}
512    test(): {'sender': 'test()', 'target': 'Test Counter',
513             'command': 'get count'}
514    test(): {'sender': 'Test Counter', 'count': 3,
515             'since': ..., 'until': ...}
516    """
517
518    CONF_SCHEMA = {
519        "properties": {
520            "count": {"type": "object"},
521            "date format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"},
522        },
523        "required": ["count"],
524    }
525    """Schema for Counter plugin configuration.
526
527    Required configuration key:
528
529    - 'count': template of messages to be counted.
530    """
531
532    def process_conf(self) -> None:
533        """Register plugin as bus client."""
534        self._since = datetime.now().strftime(self.conf["date format"])
535        self._counter = 0
536        self.bus.register(
537            self.name,
538            "Counter",
539            [MessageTemplate({"count": {"type": "integer"}})],
540            [
541                ([MessageTemplate(self.conf["count"])], self._count),
542                (
543                    [
544                        MessageTemplate(
545                            {
546                                "target": {"const": self.name},
547                                "command": {"const": "get count"},
548                            }
549                        )
550                    ],
551                    self._get_count,
552                ),
553                (
554                    [
555                        MessageTemplate(
556                            {
557                                "target": {"const": self.name},
558                                "command": {"const": "reset"},
559                            }
560                        )
561                    ],
562                    self._reset,
563                ),
564            ],
565        )
566
567    async def _count(self, message: Message) -> None:
568        self._counter += 1
569
570    async def _get_count(self, message: Message) -> None:
571        now = datetime.now().strftime(self.conf["date format"])
572        await self.bus.send(
573            Message(
574                self.name, {"count": self._counter, "since": self._since, "until": now}
575            )
576        )
577
578    async def _reset(self, message: Message) -> None:
579        now = datetime.now().strftime(self.conf["date format"])
580        counter = self._counter
581        self._counter = 0
582        await self.bus.send(
583            Message(self.name, {"count": counter, "since": self._since, "until": now})
584        )
585        self._since = now
586
587    async def run(self) -> None:
588        """Run no code proactively."""
589        pass

Count messages confirming to a given template.

The plugin counts messages confirming to the given template. The counter can be queried and reset by commands. The 'reset' command also queries the last count before the reset:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Counter": {"plugin": "Counter",
...                       "count": {"id": {"const": 42}}}},
...     [{"target": "Test Counter", "command": "get count"},
...      {"id": 42}, {"id": 42}, {"id": 49},
...      {"target": "Test Counter", "command": "get count"},
...      {"id": 42}, {"id": 42}, {"id": 42},
...      {"target": "Test Counter", "command": "reset"},
...      {"target": "Test Counter", "command": "get count"},
...      {"id": 42}, {"id": 42}, {"id": 42},
...      {"target": "Test Counter", "command": "get count"}]))
... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Counter', 'plugin': 'Counter',
         'sends': [{'count': {'type': 'integer'}}],
         'receives': [{'id': {'const': 42}},
                      {'target': {'const': 'Test Counter'},
                       'command': {'const': 'get count'}},
                      {'target': {'const': 'Test Counter'},
                       'command': {'const': 'reset'}}]}
test(): {'sender': 'test()', 'target': 'Test Counter',
         'command': 'get count'}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'Test Counter', 'count': 0,
         'since': ..., 'until': ...}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'test()', 'id': 49}
test(): {'sender': 'test()', 'target': 'Test Counter',
         'command': 'get count'}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'Test Counter', 'count': 2,
         'since': ..., 'until': ...}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'test()', 'target': 'Test Counter',
         'command': 'reset'}
test(): {'sender': 'test()', 'target': 'Test Counter',
         'command': 'get count'}
test(): {'sender': 'Test Counter', 'count': 5,
         'since': ..., 'until': ...}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'Test Counter', 'count': 0,
         'since': ..., 'until': ...}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'test()', 'id': 42}
test(): {'sender': 'test()', 'target': 'Test Counter',
         'command': 'get count'}
test(): {'sender': 'Test Counter', 'count': 3,
         'since': ..., 'until': ...}
CONF_SCHEMA = {'properties': {'count': {'type': 'object'}, 'date format': {'type': 'string', 'default': '%Y-%m-%d %H:%M:%S'}}, 'required': ['count']}

Schema for Counter plugin configuration.

Required configuration key:

  • 'count': template of messages to be counted.
def process_conf(self) -> None:
532    def process_conf(self) -> None:
533        """Register plugin as bus client."""
534        self._since = datetime.now().strftime(self.conf["date format"])
535        self._counter = 0
536        self.bus.register(
537            self.name,
538            "Counter",
539            [MessageTemplate({"count": {"type": "integer"}})],
540            [
541                ([MessageTemplate(self.conf["count"])], self._count),
542                (
543                    [
544                        MessageTemplate(
545                            {
546                                "target": {"const": self.name},
547                                "command": {"const": "get count"},
548                            }
549                        )
550                    ],
551                    self._get_count,
552                ),
553                (
554                    [
555                        MessageTemplate(
556                            {
557                                "target": {"const": self.name},
558                                "command": {"const": "reset"},
559                            }
560                        )
561                    ],
562                    self._reset,
563                ),
564            ],
565        )

Register plugin as bus client.

async def run(self) -> None:
587    async def run(self) -> None:
588        """Run no code proactively."""
589        pass

Run no code proactively.

class Date(controlpi.baseplugin.BasePlugin):
592class Date(BasePlugin):
593    """Send message with current date.
594
595    The plugin reacts to 'get date' commands by sending messages with
596    a 'date' key:
597    >>> import controlpi
598    >>> asyncio.run(controlpi.test(
599    ...     {"Test Date": {"plugin": "Date"}},
600    ...     [{"target": "Test Date", "command": "get date"}]))
601    ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
602    test(): {'sender': '', 'event': 'registered',
603             'client': 'Test Date', 'plugin': 'Date',
604             'sends': [{'date': {'type': 'string'}}],
605             'receives': [{'target': {'const': 'Test Date'},
606                           'command': {'const': 'get date'}}]}
607    test(): {'sender': 'test()', 'target': 'Test Date',
608             'command': 'get date'}
609    test(): {'sender': 'Test Date', 'date': ...}
610
611    The format of the date can be configured with the 'format'
612    configuration key:
613    >>> asyncio.run(controlpi.test(
614    ...     {"Test Date": {"plugin": "Date",
615    ...                    "format": "%Y%m%d%H%M%S%f"}},
616    ...     [{"target": "Test Date", "command": "get date"}]))
617    ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
618    test(): {'sender': '', 'event': 'registered',
619             'client': 'Test Date', 'plugin': 'Date',
620             'sends': [{'date': {'type': 'string'}}],
621             'receives': [{'target': {'const': 'Test Date'},
622                           'command': {'const': 'get date'}}]}
623    test(): {'sender': 'test()', 'target': 'Test Date',
624             'command': 'get date'}
625    test(): {'sender': 'Test Date', 'date': ...}
626    """
627
628    CONF_SCHEMA = {
629        "properties": {"format": {"type": "string", "default": "%Y-%m-%d %H:%M:%S"}}
630    }
631    """Schema for Date plugin configuration.
632
633    Optional configuration key:
634
635    - 'format': format for the sent datetime string.
636                Default: '%Y-%m-%d %H:%M:%S'
637    """
638
639    def process_conf(self) -> None:
640        """Register plugin as bus client."""
641        self.bus.register(
642            self.name,
643            "Date",
644            [MessageTemplate({"date": {"type": "string"}})],
645            [
646                (
647                    [
648                        MessageTemplate(
649                            {
650                                "target": {"const": self.name},
651                                "command": {"const": "get date"},
652                            }
653                        )
654                    ],
655                    self._date,
656                )
657            ],
658        )
659
660    async def _date(self, message: Message) -> None:
661        date = datetime.now().strftime(self.conf["format"])
662        await self.bus.send(Message(self.name, {"date": date}))
663
664    async def run(self) -> None:
665        """Run no code proactively."""
666        pass

Send message with current date.

The plugin reacts to 'get date' commands by sending messages with a 'date' key:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Date": {"plugin": "Date"}},
...     [{"target": "Test Date", "command": "get date"}]))
... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Date', 'plugin': 'Date',
         'sends': [{'date': {'type': 'string'}}],
         'receives': [{'target': {'const': 'Test Date'},
                       'command': {'const': 'get date'}}]}
test(): {'sender': 'test()', 'target': 'Test Date',
         'command': 'get date'}
test(): {'sender': 'Test Date', 'date': ...}

The format of the date can be configured with the 'format' configuration key:

>>> asyncio.run(controlpi.test(
...     {"Test Date": {"plugin": "Date",
...                    "format": "%Y%m%d%H%M%S%f"}},
...     [{"target": "Test Date", "command": "get date"}]))
... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Date', 'plugin': 'Date',
         'sends': [{'date': {'type': 'string'}}],
         'receives': [{'target': {'const': 'Test Date'},
                       'command': {'const': 'get date'}}]}
test(): {'sender': 'test()', 'target': 'Test Date',
         'command': 'get date'}
test(): {'sender': 'Test Date', 'date': ...}
CONF_SCHEMA = {'properties': {'format': {'type': 'string', 'default': '%Y-%m-%d %H:%M:%S'}}}

Schema for Date plugin configuration.

Optional configuration key:

  • 'format': format for the sent datetime string. Default: '%Y-%m-%d %H:%M:%S'
def process_conf(self) -> None:
639    def process_conf(self) -> None:
640        """Register plugin as bus client."""
641        self.bus.register(
642            self.name,
643            "Date",
644            [MessageTemplate({"date": {"type": "string"}})],
645            [
646                (
647                    [
648                        MessageTemplate(
649                            {
650                                "target": {"const": self.name},
651                                "command": {"const": "get date"},
652                            }
653                        )
654                    ],
655                    self._date,
656                )
657            ],
658        )

Register plugin as bus client.

async def run(self) -> None:
664    async def run(self) -> None:
665        """Run no code proactively."""
666        pass

Run no code proactively.