controlpi_plugins.wait

Provide waiting/sleeping plugins for all kinds of systems.

  • Wait waits for time defined in configuration and sends "finished" event.
  • GenericWait waits for time defined in "wait" command and sends "finished" event with "id" string defined in "wait" command.
>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test Wait": {"plugin": "Wait", "seconds": 0.01},
...      "Test GenericWait": {"plugin": "GenericWait"}},
...     [{"target": "Test GenericWait", "command": "wait",
...       "seconds": 0.02, "id": "Long Wait"},
...      {"target": "Test Wait", "command": "wait"}], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test Wait', 'plugin': 'Wait',
         'sends': [{'event': {'const': 'finished'}}],
         'receives': [{'target': {'const': 'Test Wait'},
                       'command': {'const': 'wait'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Test GenericWait', 'plugin': 'GenericWait',
         'sends': [{'event': {'const': 'finished'},
                    'id': {'type': 'string'}}],
         'receives': [{'target': {'const': 'Test GenericWait'},
                       'command': {'const': 'wait'},
                       'seconds': {'type': 'number'},
                       'id': {'type': 'string'}}]}
test(): {'sender': 'test()', 'target': 'Test GenericWait',
         'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
test(): {'sender': 'test()', 'target': 'Test Wait', 'command': 'wait'}
test(): {'sender': 'Test Wait', 'event': 'finished'}
test(): {'sender': 'Test GenericWait', 'event': 'finished',
         'id': 'Long Wait'}
  1"""Provide waiting/sleeping plugins for all kinds of systems.
  2
  3- Wait waits for time defined in configuration and sends "finished" event.
  4- GenericWait waits for time defined in "wait" command and sends "finished"
  5  event with "id" string defined in "wait" command.
  6
  7>>> import controlpi
  8>>> asyncio.run(controlpi.test(
  9...     {"Test Wait": {"plugin": "Wait", "seconds": 0.01},
 10...      "Test GenericWait": {"plugin": "GenericWait"}},
 11...     [{"target": "Test GenericWait", "command": "wait",
 12...       "seconds": 0.02, "id": "Long Wait"},
 13...      {"target": "Test Wait", "command": "wait"}], 0.025))
 14... # doctest: +NORMALIZE_WHITESPACE
 15test(): {'sender': '', 'event': 'registered',
 16         'client': 'Test Wait', 'plugin': 'Wait',
 17         'sends': [{'event': {'const': 'finished'}}],
 18         'receives': [{'target': {'const': 'Test Wait'},
 19                       'command': {'const': 'wait'}}]}
 20test(): {'sender': '', 'event': 'registered',
 21         'client': 'Test GenericWait', 'plugin': 'GenericWait',
 22         'sends': [{'event': {'const': 'finished'},
 23                    'id': {'type': 'string'}}],
 24         'receives': [{'target': {'const': 'Test GenericWait'},
 25                       'command': {'const': 'wait'},
 26                       'seconds': {'type': 'number'},
 27                       'id': {'type': 'string'}}]}
 28test(): {'sender': 'test()', 'target': 'Test GenericWait',
 29         'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
 30test(): {'sender': 'test()', 'target': 'Test Wait', 'command': 'wait'}
 31test(): {'sender': 'Test Wait', 'event': 'finished'}
 32test(): {'sender': 'Test GenericWait', 'event': 'finished',
 33         'id': 'Long Wait'}
 34"""
 35
 36import asyncio
 37
 38from controlpi import BasePlugin, Message, MessageTemplate
 39
 40
 41class Wait(BasePlugin):
 42    """Wait for time defined in configuration.
 43
 44    The "seconds" configuration key gets the number of seconds to wait after
 45    receiving a "wait" command before sending the "finished" event:
 46    >>> import controlpi
 47    >>> asyncio.run(controlpi.test(
 48    ...     {"Long Wait": {"plugin": "Wait", "seconds": 0.02},
 49    ...      "Short Wait": {"plugin": "Wait", "seconds": 0.01}},
 50    ...     [{"target": "Long Wait", "command": "wait"},
 51    ...      {"target": "Short Wait", "command": "wait"}], 0.025))
 52    ... # doctest: +NORMALIZE_WHITESPACE
 53    test(): {'sender': '', 'event': 'registered',
 54             'client': 'Long Wait', 'plugin': 'Wait',
 55             'sends': [{'event': {'const': 'finished'}}],
 56             'receives': [{'target': {'const': 'Long Wait'},
 57                           'command': {'const': 'wait'}}]}
 58    test(): {'sender': '', 'event': 'registered',
 59             'client': 'Short Wait', 'plugin': 'Wait',
 60             'sends': [{'event': {'const': 'finished'}}],
 61             'receives': [{'target': {'const': 'Short Wait'},
 62                           'command': {'const': 'wait'}}]}
 63    test(): {'sender': 'test()', 'target': 'Long Wait', 'command': 'wait'}
 64    test(): {'sender': 'test()', 'target': 'Short Wait', 'command': 'wait'}
 65    test(): {'sender': 'Short Wait', 'event': 'finished'}
 66    test(): {'sender': 'Long Wait', 'event': 'finished'}
 67    """
 68
 69    CONF_SCHEMA = {
 70        "properties": {"seconds": {"type": "number"}},
 71        "required": ["seconds"],
 72    }
 73    """Schema for Wait plugin configuration.
 74
 75    Required configuration key:
 76
 77    - 'seconds': number of seconds to wait.
 78    """
 79
 80    def process_conf(self) -> None:
 81        """Register plugin as bus client."""
 82        self._tasks = set()
 83        self.bus.register(
 84            self.name,
 85            "Wait",
 86            [MessageTemplate({"event": {"const": "finished"}})],
 87            [
 88                (
 89                    [
 90                        MessageTemplate(
 91                            {
 92                                "target": {"const": self.name},
 93                                "command": {"const": "wait"},
 94                            }
 95                        )
 96                    ],
 97                    self._wait,
 98                )
 99            ],
100        )
101
102    async def _wait(self, message: Message) -> None:
103        async def wait_coroutine():
104            await asyncio.sleep(self.conf["seconds"])
105            await self.bus.send(Message(self.name, {"event": "finished"}))
106
107        # Done in separate task to not block queue awaiting this callback:
108        task = asyncio.create_task(wait_coroutine())
109        self._tasks.add(task)
110        task.add_done_callback(self._tasks.discard)
111
112    async def run(self) -> None:
113        """Run no code proactively."""
114        pass
115
116
117class GenericWait(BasePlugin):
118    """Wait for time defined in "wait" command.
119
120    The "wait" command has message keys "seconds" defining the seconds to
121    wait and "id" defining a string to be sent back in the "finished" event
122    after the wait:
123    >>> import controlpi
124    >>> asyncio.run(controlpi.test(
125    ...     {"Test GenericWait": {"plugin": "GenericWait"}},
126    ...     [{"target": "Test GenericWait", "command": "wait",
127    ...       "seconds": 0.02, "id": "Long Wait"},
128    ...      {"target": "Test GenericWait", "command": "wait",
129    ...       "seconds": 0.01, "id": "Short Wait"}], 0.025))
130    ... # doctest: +NORMALIZE_WHITESPACE
131    test(): {'sender': '', 'event': 'registered',
132             'client': 'Test GenericWait', 'plugin': 'GenericWait',
133             'sends': [{'event': {'const': 'finished'},
134                        'id': {'type': 'string'}}],
135             'receives': [{'target': {'const': 'Test GenericWait'},
136                           'command': {'const': 'wait'},
137                           'seconds': {'type': 'number'},
138                           'id': {'type': 'string'}}]}
139    test(): {'sender': 'test()', 'target': 'Test GenericWait',
140             'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
141    test(): {'sender': 'test()', 'target': 'Test GenericWait',
142             'command': 'wait', 'seconds': 0.01, 'id': 'Short Wait'}
143    test(): {'sender': 'Test GenericWait', 'event': 'finished',
144             'id': 'Short Wait'}
145    test(): {'sender': 'Test GenericWait', 'event': 'finished',
146             'id': 'Long Wait'}
147    """
148
149    CONF_SCHEMA = True
150    """Schema for GenericWait plugin configuration.
151
152    There are no required or optional configuration keys.
153    """
154
155    def process_conf(self) -> None:
156        """Register plugin as bus client."""
157        self._tasks = set()
158        self.bus.register(
159            self.name,
160            "GenericWait",
161            [
162                MessageTemplate(
163                    {"event": {"const": "finished"}, "id": {"type": "string"}}
164                )
165            ],
166            [
167                (
168                    [
169                        MessageTemplate(
170                            {
171                                "target": {"const": self.name},
172                                "command": {"const": "wait"},
173                                "seconds": {"type": "number"},
174                                "id": {"type": "string"},
175                            }
176                        )
177                    ],
178                    self._wait,
179                )
180            ],
181        )
182
183    async def _wait(self, message: Message) -> None:
184        async def wait_coroutine():
185            assert isinstance(message["seconds"], float) or isinstance(
186                message["seconds"], int
187            )
188            await asyncio.sleep(message["seconds"])
189            await self.bus.send(
190                Message(self.name, {"event": "finished", "id": message["id"]})
191            )
192
193        # Done in separate task to not block queue awaiting this callback:
194        task = asyncio.create_task(wait_coroutine())
195        self._tasks.add(task)
196        task.add_done_callback(self._tasks.discard)
197
198    async def run(self) -> None:
199        """Run no code proactively."""
200        pass
201
202
203class Timer(BasePlugin):
204    """Timer that can be started and cancelled.
205
206    The "seconds" configuration key gets the number of seconds to wait after
207    receiving a "start" command before sending the "finished" event.
208    The "cancel" command cancels all outstanding "finished" events and sends
209    a corresponding "cancelled" event:
210    >>> import controlpi
211    >>> asyncio.run(controlpi.test(
212    ...     {"Timer": {"plugin": "Timer", "seconds": 0.01}},
213    ...     [{"target": "Timer", "command": "start"},
214    ...      {"target": "Timer", "command": "start"},
215    ...      {"target": "Timer", "command": "cancel"},
216    ...      {"target": "Timer", "command": "start"},
217    ...      {"target": "Timer", "command": "start"}], 0.015))
218    ... # doctest: +NORMALIZE_WHITESPACE
219    test(): {'sender': '', 'event': 'registered',
220             'client': 'Timer', 'plugin': 'Timer',
221             'sends': [{'event': {'const': 'finished'}},
222                       {'event': {'const': 'cancelled'}}],
223             'receives': [{'target': {'const': 'Timer'},
224                           'command': {'const': 'start'}},
225                          {'target': {'const': 'Timer'},
226                           'command': {'const': 'cancel'}}]}
227    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
228    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
229    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'}
230    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
231    test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2}
232    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
233    test(): {'sender': 'Timer', 'event': 'finished'}
234    test(): {'sender': 'Timer', 'event': 'finished'}
235    """
236
237    CONF_SCHEMA = {
238        "properties": {"seconds": {"type": "number"}},
239        "required": ["seconds"],
240    }
241    """Schema for Timer plugin configuration.
242
243    Required configuration key:
244
245    - 'seconds': number of seconds to wait.
246    """
247
248    def process_conf(self) -> None:
249        """Register plugin as bus client."""
250        self._tasks = set()
251        self.started = 0
252        self.cancelled = 0
253        self.bus.register(
254            self.name,
255            "Timer",
256            [
257                MessageTemplate({"event": {"const": "finished"}}),
258                MessageTemplate({"event": {"const": "cancelled"}}),
259            ],
260            [
261                (
262                    [
263                        MessageTemplate(
264                            {
265                                "target": {"const": self.name},
266                                "command": {"const": "start"},
267                            }
268                        )
269                    ],
270                    self._start,
271                ),
272                (
273                    [
274                        MessageTemplate(
275                            {
276                                "target": {"const": self.name},
277                                "command": {"const": "cancel"},
278                            }
279                        )
280                    ],
281                    self._cancel,
282                ),
283            ],
284        )
285
286    async def _start(self, message: Message) -> None:
287        self.started += 1
288
289        async def wait_coroutine():
290            await asyncio.sleep(self.conf["seconds"])
291            if self.cancelled > 0:
292                self.cancelled -= 1
293                self.started -= 1
294            elif self.started > 0:
295                self.started -= 1
296                await self.bus.send(Message(self.name, {"event": "finished"}))
297
298        # Done in separate task to not block queue awaiting this callback:
299        task = asyncio.create_task(wait_coroutine())
300        self._tasks.add(task)
301        task.add_done_callback(self._tasks.discard)
302
303    async def _cancel(self, message: Message) -> None:
304        if self.cancelled < self.started:
305            cancel = self.started - self.cancelled
306            self.cancelled = self.started
307            await self.bus.send(
308                Message(self.name, {"event": "cancelled", "count": cancel})
309            )
310
311    async def run(self) -> None:
312        """Run no code proactively."""
313        pass
314
315
316class Periodic(BasePlugin):
317    """Send message periodically.
318
319    The "seconds" configuration key is the period of the repetition:
320    receiving a "wait" command before sending the "finished" event:
321    >>> import controlpi
322    >>> asyncio.run(controlpi.test(
323    ...     {"Loop": {"plugin": "Periodic", "seconds": 0.01,
324    ...               "message": {"key": "value"}}},
325    ...     [], 0.025))
326    ... # doctest: +NORMALIZE_WHITESPACE
327    test(): {'sender': '', 'event': 'registered',
328             'client': 'Loop', 'plugin': 'Periodic',
329             'sends': [{'key': {'const': 'value'}}], 'receives': []}
330    test(): {'sender': 'Loop', 'key': 'value'}
331    test(): {'sender': 'Loop', 'key': 'value'}
332    """
333
334    CONF_SCHEMA = {
335        "properties": {"seconds": {"type": "number"}, "message": {"type": "object"}},
336        "required": ["seconds", "message"],
337    }
338    """Schema for Wait plugin configuration.
339
340    Required configuration key:
341
342    - 'seconds': period of repetition in seconds.
343    - 'message': message to send periodically.
344    """
345
346    def process_conf(self) -> None:
347        """Register plugin as bus client."""
348        self.bus.register(
349            self.name,
350            "Periodic",
351            [MessageTemplate.from_message(self.conf["message"])],
352            [],
353        )
354
355    async def run(self) -> None:
356        """Run periodic loop."""
357        while True:
358            await asyncio.sleep(self.conf["seconds"])
359            await self.bus.send(Message(self.name, self.conf["message"]))
class Wait(controlpi.baseplugin.BasePlugin):
 42class Wait(BasePlugin):
 43    """Wait for time defined in configuration.
 44
 45    The "seconds" configuration key gets the number of seconds to wait after
 46    receiving a "wait" command before sending the "finished" event:
 47    >>> import controlpi
 48    >>> asyncio.run(controlpi.test(
 49    ...     {"Long Wait": {"plugin": "Wait", "seconds": 0.02},
 50    ...      "Short Wait": {"plugin": "Wait", "seconds": 0.01}},
 51    ...     [{"target": "Long Wait", "command": "wait"},
 52    ...      {"target": "Short Wait", "command": "wait"}], 0.025))
 53    ... # doctest: +NORMALIZE_WHITESPACE
 54    test(): {'sender': '', 'event': 'registered',
 55             'client': 'Long Wait', 'plugin': 'Wait',
 56             'sends': [{'event': {'const': 'finished'}}],
 57             'receives': [{'target': {'const': 'Long Wait'},
 58                           'command': {'const': 'wait'}}]}
 59    test(): {'sender': '', 'event': 'registered',
 60             'client': 'Short Wait', 'plugin': 'Wait',
 61             'sends': [{'event': {'const': 'finished'}}],
 62             'receives': [{'target': {'const': 'Short Wait'},
 63                           'command': {'const': 'wait'}}]}
 64    test(): {'sender': 'test()', 'target': 'Long Wait', 'command': 'wait'}
 65    test(): {'sender': 'test()', 'target': 'Short Wait', 'command': 'wait'}
 66    test(): {'sender': 'Short Wait', 'event': 'finished'}
 67    test(): {'sender': 'Long Wait', 'event': 'finished'}
 68    """
 69
 70    CONF_SCHEMA = {
 71        "properties": {"seconds": {"type": "number"}},
 72        "required": ["seconds"],
 73    }
 74    """Schema for Wait plugin configuration.
 75
 76    Required configuration key:
 77
 78    - 'seconds': number of seconds to wait.
 79    """
 80
 81    def process_conf(self) -> None:
 82        """Register plugin as bus client."""
 83        self._tasks = set()
 84        self.bus.register(
 85            self.name,
 86            "Wait",
 87            [MessageTemplate({"event": {"const": "finished"}})],
 88            [
 89                (
 90                    [
 91                        MessageTemplate(
 92                            {
 93                                "target": {"const": self.name},
 94                                "command": {"const": "wait"},
 95                            }
 96                        )
 97                    ],
 98                    self._wait,
 99                )
100            ],
101        )
102
103    async def _wait(self, message: Message) -> None:
104        async def wait_coroutine():
105            await asyncio.sleep(self.conf["seconds"])
106            await self.bus.send(Message(self.name, {"event": "finished"}))
107
108        # Done in separate task to not block queue awaiting this callback:
109        task = asyncio.create_task(wait_coroutine())
110        self._tasks.add(task)
111        task.add_done_callback(self._tasks.discard)
112
113    async def run(self) -> None:
114        """Run no code proactively."""
115        pass

Wait for time defined in configuration.

The "seconds" configuration key gets the number of seconds to wait after receiving a "wait" command before sending the "finished" event:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Long Wait": {"plugin": "Wait", "seconds": 0.02},
...      "Short Wait": {"plugin": "Wait", "seconds": 0.01}},
...     [{"target": "Long Wait", "command": "wait"},
...      {"target": "Short Wait", "command": "wait"}], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Long Wait', 'plugin': 'Wait',
         'sends': [{'event': {'const': 'finished'}}],
         'receives': [{'target': {'const': 'Long Wait'},
                       'command': {'const': 'wait'}}]}
test(): {'sender': '', 'event': 'registered',
         'client': 'Short Wait', 'plugin': 'Wait',
         'sends': [{'event': {'const': 'finished'}}],
         'receives': [{'target': {'const': 'Short Wait'},
                       'command': {'const': 'wait'}}]}
test(): {'sender': 'test()', 'target': 'Long Wait', 'command': 'wait'}
test(): {'sender': 'test()', 'target': 'Short Wait', 'command': 'wait'}
test(): {'sender': 'Short Wait', 'event': 'finished'}
test(): {'sender': 'Long Wait', 'event': 'finished'}
CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}}, 'required': ['seconds']}

Schema for Wait plugin configuration.

Required configuration key:

  • 'seconds': number of seconds to wait.
def process_conf(self) -> None:
 81    def process_conf(self) -> None:
 82        """Register plugin as bus client."""
 83        self._tasks = set()
 84        self.bus.register(
 85            self.name,
 86            "Wait",
 87            [MessageTemplate({"event": {"const": "finished"}})],
 88            [
 89                (
 90                    [
 91                        MessageTemplate(
 92                            {
 93                                "target": {"const": self.name},
 94                                "command": {"const": "wait"},
 95                            }
 96                        )
 97                    ],
 98                    self._wait,
 99                )
100            ],
101        )

Register plugin as bus client.

async def run(self) -> None:
113    async def run(self) -> None:
114        """Run no code proactively."""
115        pass

Run no code proactively.

class GenericWait(controlpi.baseplugin.BasePlugin):
118class GenericWait(BasePlugin):
119    """Wait for time defined in "wait" command.
120
121    The "wait" command has message keys "seconds" defining the seconds to
122    wait and "id" defining a string to be sent back in the "finished" event
123    after the wait:
124    >>> import controlpi
125    >>> asyncio.run(controlpi.test(
126    ...     {"Test GenericWait": {"plugin": "GenericWait"}},
127    ...     [{"target": "Test GenericWait", "command": "wait",
128    ...       "seconds": 0.02, "id": "Long Wait"},
129    ...      {"target": "Test GenericWait", "command": "wait",
130    ...       "seconds": 0.01, "id": "Short Wait"}], 0.025))
131    ... # doctest: +NORMALIZE_WHITESPACE
132    test(): {'sender': '', 'event': 'registered',
133             'client': 'Test GenericWait', 'plugin': 'GenericWait',
134             'sends': [{'event': {'const': 'finished'},
135                        'id': {'type': 'string'}}],
136             'receives': [{'target': {'const': 'Test GenericWait'},
137                           'command': {'const': 'wait'},
138                           'seconds': {'type': 'number'},
139                           'id': {'type': 'string'}}]}
140    test(): {'sender': 'test()', 'target': 'Test GenericWait',
141             'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
142    test(): {'sender': 'test()', 'target': 'Test GenericWait',
143             'command': 'wait', 'seconds': 0.01, 'id': 'Short Wait'}
144    test(): {'sender': 'Test GenericWait', 'event': 'finished',
145             'id': 'Short Wait'}
146    test(): {'sender': 'Test GenericWait', 'event': 'finished',
147             'id': 'Long Wait'}
148    """
149
150    CONF_SCHEMA = True
151    """Schema for GenericWait plugin configuration.
152
153    There are no required or optional configuration keys.
154    """
155
156    def process_conf(self) -> None:
157        """Register plugin as bus client."""
158        self._tasks = set()
159        self.bus.register(
160            self.name,
161            "GenericWait",
162            [
163                MessageTemplate(
164                    {"event": {"const": "finished"}, "id": {"type": "string"}}
165                )
166            ],
167            [
168                (
169                    [
170                        MessageTemplate(
171                            {
172                                "target": {"const": self.name},
173                                "command": {"const": "wait"},
174                                "seconds": {"type": "number"},
175                                "id": {"type": "string"},
176                            }
177                        )
178                    ],
179                    self._wait,
180                )
181            ],
182        )
183
184    async def _wait(self, message: Message) -> None:
185        async def wait_coroutine():
186            assert isinstance(message["seconds"], float) or isinstance(
187                message["seconds"], int
188            )
189            await asyncio.sleep(message["seconds"])
190            await self.bus.send(
191                Message(self.name, {"event": "finished", "id": message["id"]})
192            )
193
194        # Done in separate task to not block queue awaiting this callback:
195        task = asyncio.create_task(wait_coroutine())
196        self._tasks.add(task)
197        task.add_done_callback(self._tasks.discard)
198
199    async def run(self) -> None:
200        """Run no code proactively."""
201        pass

Wait for time defined in "wait" command.

The "wait" command has message keys "seconds" defining the seconds to wait and "id" defining a string to be sent back in the "finished" event after the wait:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Test GenericWait": {"plugin": "GenericWait"}},
...     [{"target": "Test GenericWait", "command": "wait",
...       "seconds": 0.02, "id": "Long Wait"},
...      {"target": "Test GenericWait", "command": "wait",
...       "seconds": 0.01, "id": "Short Wait"}], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Test GenericWait', 'plugin': 'GenericWait',
         'sends': [{'event': {'const': 'finished'},
                    'id': {'type': 'string'}}],
         'receives': [{'target': {'const': 'Test GenericWait'},
                       'command': {'const': 'wait'},
                       'seconds': {'type': 'number'},
                       'id': {'type': 'string'}}]}
test(): {'sender': 'test()', 'target': 'Test GenericWait',
         'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
test(): {'sender': 'test()', 'target': 'Test GenericWait',
         'command': 'wait', 'seconds': 0.01, 'id': 'Short Wait'}
test(): {'sender': 'Test GenericWait', 'event': 'finished',
         'id': 'Short Wait'}
test(): {'sender': 'Test GenericWait', 'event': 'finished',
         'id': 'Long Wait'}
CONF_SCHEMA = True

Schema for GenericWait plugin configuration.

There are no required or optional configuration keys.

def process_conf(self) -> None:
156    def process_conf(self) -> None:
157        """Register plugin as bus client."""
158        self._tasks = set()
159        self.bus.register(
160            self.name,
161            "GenericWait",
162            [
163                MessageTemplate(
164                    {"event": {"const": "finished"}, "id": {"type": "string"}}
165                )
166            ],
167            [
168                (
169                    [
170                        MessageTemplate(
171                            {
172                                "target": {"const": self.name},
173                                "command": {"const": "wait"},
174                                "seconds": {"type": "number"},
175                                "id": {"type": "string"},
176                            }
177                        )
178                    ],
179                    self._wait,
180                )
181            ],
182        )

Register plugin as bus client.

async def run(self) -> None:
199    async def run(self) -> None:
200        """Run no code proactively."""
201        pass

Run no code proactively.

class Timer(controlpi.baseplugin.BasePlugin):
204class Timer(BasePlugin):
205    """Timer that can be started and cancelled.
206
207    The "seconds" configuration key gets the number of seconds to wait after
208    receiving a "start" command before sending the "finished" event.
209    The "cancel" command cancels all outstanding "finished" events and sends
210    a corresponding "cancelled" event:
211    >>> import controlpi
212    >>> asyncio.run(controlpi.test(
213    ...     {"Timer": {"plugin": "Timer", "seconds": 0.01}},
214    ...     [{"target": "Timer", "command": "start"},
215    ...      {"target": "Timer", "command": "start"},
216    ...      {"target": "Timer", "command": "cancel"},
217    ...      {"target": "Timer", "command": "start"},
218    ...      {"target": "Timer", "command": "start"}], 0.015))
219    ... # doctest: +NORMALIZE_WHITESPACE
220    test(): {'sender': '', 'event': 'registered',
221             'client': 'Timer', 'plugin': 'Timer',
222             'sends': [{'event': {'const': 'finished'}},
223                       {'event': {'const': 'cancelled'}}],
224             'receives': [{'target': {'const': 'Timer'},
225                           'command': {'const': 'start'}},
226                          {'target': {'const': 'Timer'},
227                           'command': {'const': 'cancel'}}]}
228    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
229    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
230    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'}
231    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
232    test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2}
233    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
234    test(): {'sender': 'Timer', 'event': 'finished'}
235    test(): {'sender': 'Timer', 'event': 'finished'}
236    """
237
238    CONF_SCHEMA = {
239        "properties": {"seconds": {"type": "number"}},
240        "required": ["seconds"],
241    }
242    """Schema for Timer plugin configuration.
243
244    Required configuration key:
245
246    - 'seconds': number of seconds to wait.
247    """
248
249    def process_conf(self) -> None:
250        """Register plugin as bus client."""
251        self._tasks = set()
252        self.started = 0
253        self.cancelled = 0
254        self.bus.register(
255            self.name,
256            "Timer",
257            [
258                MessageTemplate({"event": {"const": "finished"}}),
259                MessageTemplate({"event": {"const": "cancelled"}}),
260            ],
261            [
262                (
263                    [
264                        MessageTemplate(
265                            {
266                                "target": {"const": self.name},
267                                "command": {"const": "start"},
268                            }
269                        )
270                    ],
271                    self._start,
272                ),
273                (
274                    [
275                        MessageTemplate(
276                            {
277                                "target": {"const": self.name},
278                                "command": {"const": "cancel"},
279                            }
280                        )
281                    ],
282                    self._cancel,
283                ),
284            ],
285        )
286
287    async def _start(self, message: Message) -> None:
288        self.started += 1
289
290        async def wait_coroutine():
291            await asyncio.sleep(self.conf["seconds"])
292            if self.cancelled > 0:
293                self.cancelled -= 1
294                self.started -= 1
295            elif self.started > 0:
296                self.started -= 1
297                await self.bus.send(Message(self.name, {"event": "finished"}))
298
299        # Done in separate task to not block queue awaiting this callback:
300        task = asyncio.create_task(wait_coroutine())
301        self._tasks.add(task)
302        task.add_done_callback(self._tasks.discard)
303
304    async def _cancel(self, message: Message) -> None:
305        if self.cancelled < self.started:
306            cancel = self.started - self.cancelled
307            self.cancelled = self.started
308            await self.bus.send(
309                Message(self.name, {"event": "cancelled", "count": cancel})
310            )
311
312    async def run(self) -> None:
313        """Run no code proactively."""
314        pass

Timer that can be started and cancelled.

The "seconds" configuration key gets the number of seconds to wait after receiving a "start" command before sending the "finished" event. The "cancel" command cancels all outstanding "finished" events and sends a corresponding "cancelled" event:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Timer": {"plugin": "Timer", "seconds": 0.01}},
...     [{"target": "Timer", "command": "start"},
...      {"target": "Timer", "command": "start"},
...      {"target": "Timer", "command": "cancel"},
...      {"target": "Timer", "command": "start"},
...      {"target": "Timer", "command": "start"}], 0.015))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Timer', 'plugin': 'Timer',
         'sends': [{'event': {'const': 'finished'}},
                   {'event': {'const': 'cancelled'}}],
         'receives': [{'target': {'const': 'Timer'},
                       'command': {'const': 'start'}},
                      {'target': {'const': 'Timer'},
                       'command': {'const': 'cancel'}}]}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'Timer', 'event': 'finished'}
test(): {'sender': 'Timer', 'event': 'finished'}
CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}}, 'required': ['seconds']}

Schema for Timer plugin configuration.

Required configuration key:

  • 'seconds': number of seconds to wait.
def process_conf(self) -> None:
249    def process_conf(self) -> None:
250        """Register plugin as bus client."""
251        self._tasks = set()
252        self.started = 0
253        self.cancelled = 0
254        self.bus.register(
255            self.name,
256            "Timer",
257            [
258                MessageTemplate({"event": {"const": "finished"}}),
259                MessageTemplate({"event": {"const": "cancelled"}}),
260            ],
261            [
262                (
263                    [
264                        MessageTemplate(
265                            {
266                                "target": {"const": self.name},
267                                "command": {"const": "start"},
268                            }
269                        )
270                    ],
271                    self._start,
272                ),
273                (
274                    [
275                        MessageTemplate(
276                            {
277                                "target": {"const": self.name},
278                                "command": {"const": "cancel"},
279                            }
280                        )
281                    ],
282                    self._cancel,
283                ),
284            ],
285        )

Register plugin as bus client.

async def run(self) -> None:
312    async def run(self) -> None:
313        """Run no code proactively."""
314        pass

Run no code proactively.

class Periodic(controlpi.baseplugin.BasePlugin):
317class Periodic(BasePlugin):
318    """Send message periodically.
319
320    The "seconds" configuration key is the period of the repetition:
321    receiving a "wait" command before sending the "finished" event:
322    >>> import controlpi
323    >>> asyncio.run(controlpi.test(
324    ...     {"Loop": {"plugin": "Periodic", "seconds": 0.01,
325    ...               "message": {"key": "value"}}},
326    ...     [], 0.025))
327    ... # doctest: +NORMALIZE_WHITESPACE
328    test(): {'sender': '', 'event': 'registered',
329             'client': 'Loop', 'plugin': 'Periodic',
330             'sends': [{'key': {'const': 'value'}}], 'receives': []}
331    test(): {'sender': 'Loop', 'key': 'value'}
332    test(): {'sender': 'Loop', 'key': 'value'}
333    """
334
335    CONF_SCHEMA = {
336        "properties": {"seconds": {"type": "number"}, "message": {"type": "object"}},
337        "required": ["seconds", "message"],
338    }
339    """Schema for Wait plugin configuration.
340
341    Required configuration key:
342
343    - 'seconds': period of repetition in seconds.
344    - 'message': message to send periodically.
345    """
346
347    def process_conf(self) -> None:
348        """Register plugin as bus client."""
349        self.bus.register(
350            self.name,
351            "Periodic",
352            [MessageTemplate.from_message(self.conf["message"])],
353            [],
354        )
355
356    async def run(self) -> None:
357        """Run periodic loop."""
358        while True:
359            await asyncio.sleep(self.conf["seconds"])
360            await self.bus.send(Message(self.name, self.conf["message"]))

Send message periodically.

The "seconds" configuration key is the period of the repetition: receiving a "wait" command before sending the "finished" event:

>>> import controlpi
>>> asyncio.run(controlpi.test(
...     {"Loop": {"plugin": "Periodic", "seconds": 0.01,
...               "message": {"key": "value"}}},
...     [], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
         'client': 'Loop', 'plugin': 'Periodic',
         'sends': [{'key': {'const': 'value'}}], 'receives': []}
test(): {'sender': 'Loop', 'key': 'value'}
test(): {'sender': 'Loop', 'key': 'value'}
CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}, 'message': {'type': 'object'}}, 'required': ['seconds', 'message']}

Schema for Wait plugin configuration.

Required configuration key:

  • 'seconds': period of repetition in seconds.
  • 'message': message to send periodically.
def process_conf(self) -> None:
347    def process_conf(self) -> None:
348        """Register plugin as bus client."""
349        self.bus.register(
350            self.name,
351            "Periodic",
352            [MessageTemplate.from_message(self.conf["message"])],
353            [],
354        )

Register plugin as bus client.

async def run(self) -> None:
356    async def run(self) -> None:
357        """Run periodic loop."""
358        while True:
359            await asyncio.sleep(self.conf["seconds"])
360            await self.bus.send(Message(self.name, self.conf["message"]))

Run periodic loop.