controlpi_plugins.wait
Provide waiting/sleeping plugins for all kinds of systems.
- Wait waits for time defined in configuration and sends "finished" event.
- GenericWait waits for time defined in "wait" command and sends "finished" event with "id" string defined in "wait" command.
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test Wait": {"plugin": "Wait", "seconds": 0.01},
... "Test GenericWait": {"plugin": "GenericWait"}},
... [{"target": "Test GenericWait", "command": "wait",
... "seconds": 0.02, "id": "Long Wait"},
... {"target": "Test Wait", "command": "wait"}], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test Wait', 'plugin': 'Wait',
'sends': [{'event': {'const': 'finished'}}],
'receives': [{'target': {'const': 'Test Wait'},
'command': {'const': 'wait'}}]}
test(): {'sender': '', 'event': 'registered',
'client': 'Test GenericWait', 'plugin': 'GenericWait',
'sends': [{'event': {'const': 'finished'},
'id': {'type': 'string'}}],
'receives': [{'target': {'const': 'Test GenericWait'},
'command': {'const': 'wait'},
'seconds': {'type': 'number'},
'id': {'type': 'string'}}]}
test(): {'sender': 'test()', 'target': 'Test GenericWait',
'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
test(): {'sender': 'test()', 'target': 'Test Wait', 'command': 'wait'}
test(): {'sender': 'Test Wait', 'event': 'finished'}
test(): {'sender': 'Test GenericWait', 'event': 'finished',
'id': 'Long Wait'}
1"""Provide waiting/sleeping plugins for all kinds of systems. 2 3- Wait waits for time defined in configuration and sends "finished" event. 4- GenericWait waits for time defined in "wait" command and sends "finished" 5 event with "id" string defined in "wait" command. 6 7>>> import controlpi 8>>> asyncio.run(controlpi.test( 9... {"Test Wait": {"plugin": "Wait", "seconds": 0.01}, 10... "Test GenericWait": {"plugin": "GenericWait"}}, 11... [{"target": "Test GenericWait", "command": "wait", 12... "seconds": 0.02, "id": "Long Wait"}, 13... {"target": "Test Wait", "command": "wait"}], 0.025)) 14... # doctest: +NORMALIZE_WHITESPACE 15test(): {'sender': '', 'event': 'registered', 16 'client': 'Test Wait', 'plugin': 'Wait', 17 'sends': [{'event': {'const': 'finished'}}], 18 'receives': [{'target': {'const': 'Test Wait'}, 19 'command': {'const': 'wait'}}]} 20test(): {'sender': '', 'event': 'registered', 21 'client': 'Test GenericWait', 'plugin': 'GenericWait', 22 'sends': [{'event': {'const': 'finished'}, 23 'id': {'type': 'string'}}], 24 'receives': [{'target': {'const': 'Test GenericWait'}, 25 'command': {'const': 'wait'}, 26 'seconds': {'type': 'number'}, 27 'id': {'type': 'string'}}]} 28test(): {'sender': 'test()', 'target': 'Test GenericWait', 29 'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'} 30test(): {'sender': 'test()', 'target': 'Test Wait', 'command': 'wait'} 31test(): {'sender': 'Test Wait', 'event': 'finished'} 32test(): {'sender': 'Test GenericWait', 'event': 'finished', 33 'id': 'Long Wait'} 34""" 35 36import asyncio 37 38from controlpi import BasePlugin, Message, MessageTemplate 39 40 41class Wait(BasePlugin): 42 """Wait for time defined in configuration. 43 44 The "seconds" configuration key gets the number of seconds to wait after 45 receiving a "wait" command before sending the "finished" event: 46 >>> import controlpi 47 >>> asyncio.run(controlpi.test( 48 ... {"Long Wait": {"plugin": "Wait", "seconds": 0.02}, 49 ... "Short Wait": {"plugin": "Wait", "seconds": 0.01}}, 50 ... [{"target": "Long Wait", "command": "wait"}, 51 ... {"target": "Short Wait", "command": "wait"}], 0.025)) 52 ... # doctest: +NORMALIZE_WHITESPACE 53 test(): {'sender': '', 'event': 'registered', 54 'client': 'Long Wait', 'plugin': 'Wait', 55 'sends': [{'event': {'const': 'finished'}}], 56 'receives': [{'target': {'const': 'Long Wait'}, 57 'command': {'const': 'wait'}}]} 58 test(): {'sender': '', 'event': 'registered', 59 'client': 'Short Wait', 'plugin': 'Wait', 60 'sends': [{'event': {'const': 'finished'}}], 61 'receives': [{'target': {'const': 'Short Wait'}, 62 'command': {'const': 'wait'}}]} 63 test(): {'sender': 'test()', 'target': 'Long Wait', 'command': 'wait'} 64 test(): {'sender': 'test()', 'target': 'Short Wait', 'command': 'wait'} 65 test(): {'sender': 'Short Wait', 'event': 'finished'} 66 test(): {'sender': 'Long Wait', 'event': 'finished'} 67 """ 68 69 CONF_SCHEMA = { 70 "properties": {"seconds": {"type": "number"}}, 71 "required": ["seconds"], 72 } 73 """Schema for Wait plugin configuration. 74 75 Required configuration key: 76 77 - 'seconds': number of seconds to wait. 78 """ 79 80 def process_conf(self) -> None: 81 """Register plugin as bus client.""" 82 self._tasks = set() 83 self.bus.register( 84 self.name, 85 "Wait", 86 [MessageTemplate({"event": {"const": "finished"}})], 87 [ 88 ( 89 [ 90 MessageTemplate( 91 { 92 "target": {"const": self.name}, 93 "command": {"const": "wait"}, 94 } 95 ) 96 ], 97 self._wait, 98 ) 99 ], 100 ) 101 102 async def _wait(self, message: Message) -> None: 103 async def wait_coroutine(): 104 await asyncio.sleep(self.conf["seconds"]) 105 await self.bus.send(Message(self.name, {"event": "finished"})) 106 107 # Done in separate task to not block queue awaiting this callback: 108 task = asyncio.create_task(wait_coroutine()) 109 self._tasks.add(task) 110 task.add_done_callback(self._tasks.discard) 111 112 async def run(self) -> None: 113 """Run no code proactively.""" 114 pass 115 116 117class GenericWait(BasePlugin): 118 """Wait for time defined in "wait" command. 119 120 The "wait" command has message keys "seconds" defining the seconds to 121 wait and "id" defining a string to be sent back in the "finished" event 122 after the wait: 123 >>> import controlpi 124 >>> asyncio.run(controlpi.test( 125 ... {"Test GenericWait": {"plugin": "GenericWait"}}, 126 ... [{"target": "Test GenericWait", "command": "wait", 127 ... "seconds": 0.02, "id": "Long Wait"}, 128 ... {"target": "Test GenericWait", "command": "wait", 129 ... "seconds": 0.01, "id": "Short Wait"}], 0.025)) 130 ... # doctest: +NORMALIZE_WHITESPACE 131 test(): {'sender': '', 'event': 'registered', 132 'client': 'Test GenericWait', 'plugin': 'GenericWait', 133 'sends': [{'event': {'const': 'finished'}, 134 'id': {'type': 'string'}}], 135 'receives': [{'target': {'const': 'Test GenericWait'}, 136 'command': {'const': 'wait'}, 137 'seconds': {'type': 'number'}, 138 'id': {'type': 'string'}}]} 139 test(): {'sender': 'test()', 'target': 'Test GenericWait', 140 'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'} 141 test(): {'sender': 'test()', 'target': 'Test GenericWait', 142 'command': 'wait', 'seconds': 0.01, 'id': 'Short Wait'} 143 test(): {'sender': 'Test GenericWait', 'event': 'finished', 144 'id': 'Short Wait'} 145 test(): {'sender': 'Test GenericWait', 'event': 'finished', 146 'id': 'Long Wait'} 147 """ 148 149 CONF_SCHEMA = True 150 """Schema for GenericWait plugin configuration. 151 152 There are no required or optional configuration keys. 153 """ 154 155 def process_conf(self) -> None: 156 """Register plugin as bus client.""" 157 self._tasks = set() 158 self.bus.register( 159 self.name, 160 "GenericWait", 161 [ 162 MessageTemplate( 163 {"event": {"const": "finished"}, "id": {"type": "string"}} 164 ) 165 ], 166 [ 167 ( 168 [ 169 MessageTemplate( 170 { 171 "target": {"const": self.name}, 172 "command": {"const": "wait"}, 173 "seconds": {"type": "number"}, 174 "id": {"type": "string"}, 175 } 176 ) 177 ], 178 self._wait, 179 ) 180 ], 181 ) 182 183 async def _wait(self, message: Message) -> None: 184 async def wait_coroutine(): 185 assert isinstance(message["seconds"], float) or isinstance( 186 message["seconds"], int 187 ) 188 await asyncio.sleep(message["seconds"]) 189 await self.bus.send( 190 Message(self.name, {"event": "finished", "id": message["id"]}) 191 ) 192 193 # Done in separate task to not block queue awaiting this callback: 194 task = asyncio.create_task(wait_coroutine()) 195 self._tasks.add(task) 196 task.add_done_callback(self._tasks.discard) 197 198 async def run(self) -> None: 199 """Run no code proactively.""" 200 pass 201 202 203class Timer(BasePlugin): 204 """Timer that can be started and cancelled. 205 206 The "seconds" configuration key gets the number of seconds to wait after 207 receiving a "start" command before sending the "finished" event. 208 The "cancel" command cancels all outstanding "finished" events and sends 209 a corresponding "cancelled" event: 210 >>> import controlpi 211 >>> asyncio.run(controlpi.test( 212 ... {"Timer": {"plugin": "Timer", "seconds": 0.01}}, 213 ... [{"target": "Timer", "command": "start"}, 214 ... {"target": "Timer", "command": "start"}, 215 ... {"target": "Timer", "command": "cancel"}, 216 ... {"target": "Timer", "command": "start"}, 217 ... {"target": "Timer", "command": "start"}], 0.015)) 218 ... # doctest: +NORMALIZE_WHITESPACE 219 test(): {'sender': '', 'event': 'registered', 220 'client': 'Timer', 'plugin': 'Timer', 221 'sends': [{'event': {'const': 'finished'}}, 222 {'event': {'const': 'cancelled'}}], 223 'receives': [{'target': {'const': 'Timer'}, 224 'command': {'const': 'start'}}, 225 {'target': {'const': 'Timer'}, 226 'command': {'const': 'cancel'}}]} 227 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 228 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 229 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'} 230 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 231 test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2} 232 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 233 test(): {'sender': 'Timer', 'event': 'finished'} 234 test(): {'sender': 'Timer', 'event': 'finished'} 235 """ 236 237 CONF_SCHEMA = { 238 "properties": {"seconds": {"type": "number"}}, 239 "required": ["seconds"], 240 } 241 """Schema for Timer plugin configuration. 242 243 Required configuration key: 244 245 - 'seconds': number of seconds to wait. 246 """ 247 248 def process_conf(self) -> None: 249 """Register plugin as bus client.""" 250 self._tasks = set() 251 self.started = 0 252 self.cancelled = 0 253 self.bus.register( 254 self.name, 255 "Timer", 256 [ 257 MessageTemplate({"event": {"const": "finished"}}), 258 MessageTemplate({"event": {"const": "cancelled"}}), 259 ], 260 [ 261 ( 262 [ 263 MessageTemplate( 264 { 265 "target": {"const": self.name}, 266 "command": {"const": "start"}, 267 } 268 ) 269 ], 270 self._start, 271 ), 272 ( 273 [ 274 MessageTemplate( 275 { 276 "target": {"const": self.name}, 277 "command": {"const": "cancel"}, 278 } 279 ) 280 ], 281 self._cancel, 282 ), 283 ], 284 ) 285 286 async def _start(self, message: Message) -> None: 287 self.started += 1 288 289 async def wait_coroutine(): 290 await asyncio.sleep(self.conf["seconds"]) 291 if self.cancelled > 0: 292 self.cancelled -= 1 293 self.started -= 1 294 elif self.started > 0: 295 self.started -= 1 296 await self.bus.send(Message(self.name, {"event": "finished"})) 297 298 # Done in separate task to not block queue awaiting this callback: 299 task = asyncio.create_task(wait_coroutine()) 300 self._tasks.add(task) 301 task.add_done_callback(self._tasks.discard) 302 303 async def _cancel(self, message: Message) -> None: 304 if self.cancelled < self.started: 305 cancel = self.started - self.cancelled 306 self.cancelled = self.started 307 await self.bus.send( 308 Message(self.name, {"event": "cancelled", "count": cancel}) 309 ) 310 311 async def run(self) -> None: 312 """Run no code proactively.""" 313 pass 314 315 316class Periodic(BasePlugin): 317 """Send message periodically. 318 319 The "seconds" configuration key is the period of the repetition: 320 receiving a "wait" command before sending the "finished" event: 321 >>> import controlpi 322 >>> asyncio.run(controlpi.test( 323 ... {"Loop": {"plugin": "Periodic", "seconds": 0.01, 324 ... "message": {"key": "value"}}}, 325 ... [], 0.025)) 326 ... # doctest: +NORMALIZE_WHITESPACE 327 test(): {'sender': '', 'event': 'registered', 328 'client': 'Loop', 'plugin': 'Periodic', 329 'sends': [{'key': {'const': 'value'}}], 'receives': []} 330 test(): {'sender': 'Loop', 'key': 'value'} 331 test(): {'sender': 'Loop', 'key': 'value'} 332 """ 333 334 CONF_SCHEMA = { 335 "properties": {"seconds": {"type": "number"}, "message": {"type": "object"}}, 336 "required": ["seconds", "message"], 337 } 338 """Schema for Wait plugin configuration. 339 340 Required configuration key: 341 342 - 'seconds': period of repetition in seconds. 343 - 'message': message to send periodically. 344 """ 345 346 def process_conf(self) -> None: 347 """Register plugin as bus client.""" 348 self.bus.register( 349 self.name, 350 "Periodic", 351 [MessageTemplate.from_message(self.conf["message"])], 352 [], 353 ) 354 355 async def run(self) -> None: 356 """Run periodic loop.""" 357 while True: 358 await asyncio.sleep(self.conf["seconds"]) 359 await self.bus.send(Message(self.name, self.conf["message"]))
42class Wait(BasePlugin): 43 """Wait for time defined in configuration. 44 45 The "seconds" configuration key gets the number of seconds to wait after 46 receiving a "wait" command before sending the "finished" event: 47 >>> import controlpi 48 >>> asyncio.run(controlpi.test( 49 ... {"Long Wait": {"plugin": "Wait", "seconds": 0.02}, 50 ... "Short Wait": {"plugin": "Wait", "seconds": 0.01}}, 51 ... [{"target": "Long Wait", "command": "wait"}, 52 ... {"target": "Short Wait", "command": "wait"}], 0.025)) 53 ... # doctest: +NORMALIZE_WHITESPACE 54 test(): {'sender': '', 'event': 'registered', 55 'client': 'Long Wait', 'plugin': 'Wait', 56 'sends': [{'event': {'const': 'finished'}}], 57 'receives': [{'target': {'const': 'Long Wait'}, 58 'command': {'const': 'wait'}}]} 59 test(): {'sender': '', 'event': 'registered', 60 'client': 'Short Wait', 'plugin': 'Wait', 61 'sends': [{'event': {'const': 'finished'}}], 62 'receives': [{'target': {'const': 'Short Wait'}, 63 'command': {'const': 'wait'}}]} 64 test(): {'sender': 'test()', 'target': 'Long Wait', 'command': 'wait'} 65 test(): {'sender': 'test()', 'target': 'Short Wait', 'command': 'wait'} 66 test(): {'sender': 'Short Wait', 'event': 'finished'} 67 test(): {'sender': 'Long Wait', 'event': 'finished'} 68 """ 69 70 CONF_SCHEMA = { 71 "properties": {"seconds": {"type": "number"}}, 72 "required": ["seconds"], 73 } 74 """Schema for Wait plugin configuration. 75 76 Required configuration key: 77 78 - 'seconds': number of seconds to wait. 79 """ 80 81 def process_conf(self) -> None: 82 """Register plugin as bus client.""" 83 self._tasks = set() 84 self.bus.register( 85 self.name, 86 "Wait", 87 [MessageTemplate({"event": {"const": "finished"}})], 88 [ 89 ( 90 [ 91 MessageTemplate( 92 { 93 "target": {"const": self.name}, 94 "command": {"const": "wait"}, 95 } 96 ) 97 ], 98 self._wait, 99 ) 100 ], 101 ) 102 103 async def _wait(self, message: Message) -> None: 104 async def wait_coroutine(): 105 await asyncio.sleep(self.conf["seconds"]) 106 await self.bus.send(Message(self.name, {"event": "finished"})) 107 108 # Done in separate task to not block queue awaiting this callback: 109 task = asyncio.create_task(wait_coroutine()) 110 self._tasks.add(task) 111 task.add_done_callback(self._tasks.discard) 112 113 async def run(self) -> None: 114 """Run no code proactively.""" 115 pass
Wait for time defined in configuration.
The "seconds" configuration key gets the number of seconds to wait after receiving a "wait" command before sending the "finished" event:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Long Wait": {"plugin": "Wait", "seconds": 0.02},
... "Short Wait": {"plugin": "Wait", "seconds": 0.01}},
... [{"target": "Long Wait", "command": "wait"},
... {"target": "Short Wait", "command": "wait"}], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Long Wait', 'plugin': 'Wait',
'sends': [{'event': {'const': 'finished'}}],
'receives': [{'target': {'const': 'Long Wait'},
'command': {'const': 'wait'}}]}
test(): {'sender': '', 'event': 'registered',
'client': 'Short Wait', 'plugin': 'Wait',
'sends': [{'event': {'const': 'finished'}}],
'receives': [{'target': {'const': 'Short Wait'},
'command': {'const': 'wait'}}]}
test(): {'sender': 'test()', 'target': 'Long Wait', 'command': 'wait'}
test(): {'sender': 'test()', 'target': 'Short Wait', 'command': 'wait'}
test(): {'sender': 'Short Wait', 'event': 'finished'}
test(): {'sender': 'Long Wait', 'event': 'finished'}
Schema for Wait plugin configuration.
Required configuration key:
- 'seconds': number of seconds to wait.
81 def process_conf(self) -> None: 82 """Register plugin as bus client.""" 83 self._tasks = set() 84 self.bus.register( 85 self.name, 86 "Wait", 87 [MessageTemplate({"event": {"const": "finished"}})], 88 [ 89 ( 90 [ 91 MessageTemplate( 92 { 93 "target": {"const": self.name}, 94 "command": {"const": "wait"}, 95 } 96 ) 97 ], 98 self._wait, 99 ) 100 ], 101 )
Register plugin as bus client.
Inherited Members
118class GenericWait(BasePlugin): 119 """Wait for time defined in "wait" command. 120 121 The "wait" command has message keys "seconds" defining the seconds to 122 wait and "id" defining a string to be sent back in the "finished" event 123 after the wait: 124 >>> import controlpi 125 >>> asyncio.run(controlpi.test( 126 ... {"Test GenericWait": {"plugin": "GenericWait"}}, 127 ... [{"target": "Test GenericWait", "command": "wait", 128 ... "seconds": 0.02, "id": "Long Wait"}, 129 ... {"target": "Test GenericWait", "command": "wait", 130 ... "seconds": 0.01, "id": "Short Wait"}], 0.025)) 131 ... # doctest: +NORMALIZE_WHITESPACE 132 test(): {'sender': '', 'event': 'registered', 133 'client': 'Test GenericWait', 'plugin': 'GenericWait', 134 'sends': [{'event': {'const': 'finished'}, 135 'id': {'type': 'string'}}], 136 'receives': [{'target': {'const': 'Test GenericWait'}, 137 'command': {'const': 'wait'}, 138 'seconds': {'type': 'number'}, 139 'id': {'type': 'string'}}]} 140 test(): {'sender': 'test()', 'target': 'Test GenericWait', 141 'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'} 142 test(): {'sender': 'test()', 'target': 'Test GenericWait', 143 'command': 'wait', 'seconds': 0.01, 'id': 'Short Wait'} 144 test(): {'sender': 'Test GenericWait', 'event': 'finished', 145 'id': 'Short Wait'} 146 test(): {'sender': 'Test GenericWait', 'event': 'finished', 147 'id': 'Long Wait'} 148 """ 149 150 CONF_SCHEMA = True 151 """Schema for GenericWait plugin configuration. 152 153 There are no required or optional configuration keys. 154 """ 155 156 def process_conf(self) -> None: 157 """Register plugin as bus client.""" 158 self._tasks = set() 159 self.bus.register( 160 self.name, 161 "GenericWait", 162 [ 163 MessageTemplate( 164 {"event": {"const": "finished"}, "id": {"type": "string"}} 165 ) 166 ], 167 [ 168 ( 169 [ 170 MessageTemplate( 171 { 172 "target": {"const": self.name}, 173 "command": {"const": "wait"}, 174 "seconds": {"type": "number"}, 175 "id": {"type": "string"}, 176 } 177 ) 178 ], 179 self._wait, 180 ) 181 ], 182 ) 183 184 async def _wait(self, message: Message) -> None: 185 async def wait_coroutine(): 186 assert isinstance(message["seconds"], float) or isinstance( 187 message["seconds"], int 188 ) 189 await asyncio.sleep(message["seconds"]) 190 await self.bus.send( 191 Message(self.name, {"event": "finished", "id": message["id"]}) 192 ) 193 194 # Done in separate task to not block queue awaiting this callback: 195 task = asyncio.create_task(wait_coroutine()) 196 self._tasks.add(task) 197 task.add_done_callback(self._tasks.discard) 198 199 async def run(self) -> None: 200 """Run no code proactively.""" 201 pass
Wait for time defined in "wait" command.
The "wait" command has message keys "seconds" defining the seconds to wait and "id" defining a string to be sent back in the "finished" event after the wait:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Test GenericWait": {"plugin": "GenericWait"}},
... [{"target": "Test GenericWait", "command": "wait",
... "seconds": 0.02, "id": "Long Wait"},
... {"target": "Test GenericWait", "command": "wait",
... "seconds": 0.01, "id": "Short Wait"}], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Test GenericWait', 'plugin': 'GenericWait',
'sends': [{'event': {'const': 'finished'},
'id': {'type': 'string'}}],
'receives': [{'target': {'const': 'Test GenericWait'},
'command': {'const': 'wait'},
'seconds': {'type': 'number'},
'id': {'type': 'string'}}]}
test(): {'sender': 'test()', 'target': 'Test GenericWait',
'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
test(): {'sender': 'test()', 'target': 'Test GenericWait',
'command': 'wait', 'seconds': 0.01, 'id': 'Short Wait'}
test(): {'sender': 'Test GenericWait', 'event': 'finished',
'id': 'Short Wait'}
test(): {'sender': 'Test GenericWait', 'event': 'finished',
'id': 'Long Wait'}
Schema for GenericWait plugin configuration.
There are no required or optional configuration keys.
156 def process_conf(self) -> None: 157 """Register plugin as bus client.""" 158 self._tasks = set() 159 self.bus.register( 160 self.name, 161 "GenericWait", 162 [ 163 MessageTemplate( 164 {"event": {"const": "finished"}, "id": {"type": "string"}} 165 ) 166 ], 167 [ 168 ( 169 [ 170 MessageTemplate( 171 { 172 "target": {"const": self.name}, 173 "command": {"const": "wait"}, 174 "seconds": {"type": "number"}, 175 "id": {"type": "string"}, 176 } 177 ) 178 ], 179 self._wait, 180 ) 181 ], 182 )
Register plugin as bus client.
Inherited Members
204class Timer(BasePlugin): 205 """Timer that can be started and cancelled. 206 207 The "seconds" configuration key gets the number of seconds to wait after 208 receiving a "start" command before sending the "finished" event. 209 The "cancel" command cancels all outstanding "finished" events and sends 210 a corresponding "cancelled" event: 211 >>> import controlpi 212 >>> asyncio.run(controlpi.test( 213 ... {"Timer": {"plugin": "Timer", "seconds": 0.01}}, 214 ... [{"target": "Timer", "command": "start"}, 215 ... {"target": "Timer", "command": "start"}, 216 ... {"target": "Timer", "command": "cancel"}, 217 ... {"target": "Timer", "command": "start"}, 218 ... {"target": "Timer", "command": "start"}], 0.015)) 219 ... # doctest: +NORMALIZE_WHITESPACE 220 test(): {'sender': '', 'event': 'registered', 221 'client': 'Timer', 'plugin': 'Timer', 222 'sends': [{'event': {'const': 'finished'}}, 223 {'event': {'const': 'cancelled'}}], 224 'receives': [{'target': {'const': 'Timer'}, 225 'command': {'const': 'start'}}, 226 {'target': {'const': 'Timer'}, 227 'command': {'const': 'cancel'}}]} 228 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 229 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 230 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'} 231 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 232 test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2} 233 test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} 234 test(): {'sender': 'Timer', 'event': 'finished'} 235 test(): {'sender': 'Timer', 'event': 'finished'} 236 """ 237 238 CONF_SCHEMA = { 239 "properties": {"seconds": {"type": "number"}}, 240 "required": ["seconds"], 241 } 242 """Schema for Timer plugin configuration. 243 244 Required configuration key: 245 246 - 'seconds': number of seconds to wait. 247 """ 248 249 def process_conf(self) -> None: 250 """Register plugin as bus client.""" 251 self._tasks = set() 252 self.started = 0 253 self.cancelled = 0 254 self.bus.register( 255 self.name, 256 "Timer", 257 [ 258 MessageTemplate({"event": {"const": "finished"}}), 259 MessageTemplate({"event": {"const": "cancelled"}}), 260 ], 261 [ 262 ( 263 [ 264 MessageTemplate( 265 { 266 "target": {"const": self.name}, 267 "command": {"const": "start"}, 268 } 269 ) 270 ], 271 self._start, 272 ), 273 ( 274 [ 275 MessageTemplate( 276 { 277 "target": {"const": self.name}, 278 "command": {"const": "cancel"}, 279 } 280 ) 281 ], 282 self._cancel, 283 ), 284 ], 285 ) 286 287 async def _start(self, message: Message) -> None: 288 self.started += 1 289 290 async def wait_coroutine(): 291 await asyncio.sleep(self.conf["seconds"]) 292 if self.cancelled > 0: 293 self.cancelled -= 1 294 self.started -= 1 295 elif self.started > 0: 296 self.started -= 1 297 await self.bus.send(Message(self.name, {"event": "finished"})) 298 299 # Done in separate task to not block queue awaiting this callback: 300 task = asyncio.create_task(wait_coroutine()) 301 self._tasks.add(task) 302 task.add_done_callback(self._tasks.discard) 303 304 async def _cancel(self, message: Message) -> None: 305 if self.cancelled < self.started: 306 cancel = self.started - self.cancelled 307 self.cancelled = self.started 308 await self.bus.send( 309 Message(self.name, {"event": "cancelled", "count": cancel}) 310 ) 311 312 async def run(self) -> None: 313 """Run no code proactively.""" 314 pass
Timer that can be started and cancelled.
The "seconds" configuration key gets the number of seconds to wait after receiving a "start" command before sending the "finished" event. The "cancel" command cancels all outstanding "finished" events and sends a corresponding "cancelled" event:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Timer": {"plugin": "Timer", "seconds": 0.01}},
... [{"target": "Timer", "command": "start"},
... {"target": "Timer", "command": "start"},
... {"target": "Timer", "command": "cancel"},
... {"target": "Timer", "command": "start"},
... {"target": "Timer", "command": "start"}], 0.015))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Timer', 'plugin': 'Timer',
'sends': [{'event': {'const': 'finished'}},
{'event': {'const': 'cancelled'}}],
'receives': [{'target': {'const': 'Timer'},
'command': {'const': 'start'}},
{'target': {'const': 'Timer'},
'command': {'const': 'cancel'}}]}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2}
test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
test(): {'sender': 'Timer', 'event': 'finished'}
test(): {'sender': 'Timer', 'event': 'finished'}
Schema for Timer plugin configuration.
Required configuration key:
- 'seconds': number of seconds to wait.
249 def process_conf(self) -> None: 250 """Register plugin as bus client.""" 251 self._tasks = set() 252 self.started = 0 253 self.cancelled = 0 254 self.bus.register( 255 self.name, 256 "Timer", 257 [ 258 MessageTemplate({"event": {"const": "finished"}}), 259 MessageTemplate({"event": {"const": "cancelled"}}), 260 ], 261 [ 262 ( 263 [ 264 MessageTemplate( 265 { 266 "target": {"const": self.name}, 267 "command": {"const": "start"}, 268 } 269 ) 270 ], 271 self._start, 272 ), 273 ( 274 [ 275 MessageTemplate( 276 { 277 "target": {"const": self.name}, 278 "command": {"const": "cancel"}, 279 } 280 ) 281 ], 282 self._cancel, 283 ), 284 ], 285 )
Register plugin as bus client.
Inherited Members
317class Periodic(BasePlugin): 318 """Send message periodically. 319 320 The "seconds" configuration key is the period of the repetition: 321 receiving a "wait" command before sending the "finished" event: 322 >>> import controlpi 323 >>> asyncio.run(controlpi.test( 324 ... {"Loop": {"plugin": "Periodic", "seconds": 0.01, 325 ... "message": {"key": "value"}}}, 326 ... [], 0.025)) 327 ... # doctest: +NORMALIZE_WHITESPACE 328 test(): {'sender': '', 'event': 'registered', 329 'client': 'Loop', 'plugin': 'Periodic', 330 'sends': [{'key': {'const': 'value'}}], 'receives': []} 331 test(): {'sender': 'Loop', 'key': 'value'} 332 test(): {'sender': 'Loop', 'key': 'value'} 333 """ 334 335 CONF_SCHEMA = { 336 "properties": {"seconds": {"type": "number"}, "message": {"type": "object"}}, 337 "required": ["seconds", "message"], 338 } 339 """Schema for Wait plugin configuration. 340 341 Required configuration key: 342 343 - 'seconds': period of repetition in seconds. 344 - 'message': message to send periodically. 345 """ 346 347 def process_conf(self) -> None: 348 """Register plugin as bus client.""" 349 self.bus.register( 350 self.name, 351 "Periodic", 352 [MessageTemplate.from_message(self.conf["message"])], 353 [], 354 ) 355 356 async def run(self) -> None: 357 """Run periodic loop.""" 358 while True: 359 await asyncio.sleep(self.conf["seconds"]) 360 await self.bus.send(Message(self.name, self.conf["message"]))
Send message periodically.
The "seconds" configuration key is the period of the repetition: receiving a "wait" command before sending the "finished" event:
>>> import controlpi
>>> asyncio.run(controlpi.test(
... {"Loop": {"plugin": "Periodic", "seconds": 0.01,
... "message": {"key": "value"}}},
... [], 0.025))
... # doctest: +NORMALIZE_WHITESPACE
test(): {'sender': '', 'event': 'registered',
'client': 'Loop', 'plugin': 'Periodic',
'sends': [{'key': {'const': 'value'}}], 'receives': []}
test(): {'sender': 'Loop', 'key': 'value'}
test(): {'sender': 'Loop', 'key': 'value'}
Schema for Wait plugin configuration.
Required configuration key:
- 'seconds': period of repetition in seconds.
- 'message': message to send periodically.
347 def process_conf(self) -> None: 348 """Register plugin as bus client.""" 349 self.bus.register( 350 self.name, 351 "Periodic", 352 [MessageTemplate.from_message(self.conf["message"])], 353 [], 354 )
Register plugin as bus client.
356 async def run(self) -> None: 357 """Run periodic loop.""" 358 while True: 359 await asyncio.sleep(self.conf["seconds"]) 360 await self.bus.send(Message(self.name, self.conf["message"]))
Run periodic loop.