Graph-IT

supcon.switch module

# -*- coding: utf-8 -*-

import pigpio
import supcon.util
import supcon.intf

""" the interface com.screwerk.Sensor """
SensorIntf = supcon.intf.DInterface.load({
  'name':
  'com.screwerk.Sensor',
  'events': [{
    'name': 'changed',
    'args': [
      {
        'name': 'value', 'description': 'The new sensor value'
      }
    ],
    'description': 'Fires when the sensor value changes'
  }],
  'methods': [{
    'name': 'value',
    'outArgs': [
      {
        'name': 'value', 'description': 'The sensor value'
      }
    ],
    'description': 'Returns the sensor value'
  }]
})

""" the interface com.screwerk.Switch """
SwitchIntf = supcon.intf.DInterface.load({
  'name': 'com.screwerk.Switch',
  'methods': [
    {
      'name': 'on', 'description': 'Toggles the switch on'
    }, {
      'name': 'off', 'description': 'Toggles the switch off'
    }, {
      'name': 'toggle', 'description': 'Toggles the switch'
    }
  ]
})

def returnTrue(*args): return True
def returnNone(*args): return None
def returnFirst(*args): return args[0]

class SoftSensor(object):
  def __init__(self, value=None, guard=None, effect=None, convert=None):

    self.__guard = guard or returnTrue
    self.__effect = effect or returnNone
    self.__convert = convert or returnFirst

    self.__value = self.__convert(value)

    self.__sensor = supcon.intf.Implementation(SensorIntf)
    self.__sensor.setCallCb('value', lambda: {'value': self.__value})

  @property
  def value(self):
    return self.__value

  @property
  def sensor(self):
    return self.__sensor

  def update(self, value):
    value = self.__convert(value)
    if self.__value == value:
      return False
    if not self.__guard(value, self):
      return False

    self.__value = value
    self.__effect(value, self)
    self.__sensor.fire('changed', {'value': value})
    return True

class SoftSwitch(SoftSensor):
  def __init__(self, value=False, guard=None, effect=None, convert=None):
    boolConvert = (lambda value: bool(convert(value))) if convert else bool
    SoftSensor.__init__(self, value, guard, effect, boolConvert)

    update = lambda value: {} if self.update(value) else {}
    self.__switch = supcon.intf.Implementation(SwitchIntf)
    self.__switch.setCallCb('on', lambda: update(True))
    self.__switch.setCallCb('off', lambda: update(False))
    self.__switch.setCallCb('toggle', lambda: update(not self.value))

  @property
  def switch(self):
    return self.__switch

class PiGPIOSensor(object):
  def __init__(self, reactor, gpio):
    self.__reactor = reactor
    self.__gpio = gpio

    self.__pi = pigpio.pi()
    self.__pi.set_mode(self.__gpio, pigpio.INPUT)
    self.__pi.set_glitch_filter(self.__gpio, 50000)
    self.__pi.set_pull_up_down(self.__gpio, pigpio.PUD_UP)
    self.__pi.callback(self.__gpio, pigpio.EITHER_EDGE, self.__callback)

    self.__value = bool(self.__pi.read(self.__gpio))

    self.__sensor = supcon.intf.Implementation(SensorIntf)
    self.__sensor.setCallCb('value', lambda: {'value': self.__value})

  @property
  def sensor(self):
    return self.__sensor

  def __callback(self, gpio, level, tick):
    if level >= 2:
      return

    self.__reactor.callFromThread(self.__update, bool(level))

  def __update(self, value):
    if self.__value == bool(value):
      return

    self.__value = bool(value)
    self.__sensor.fire('changed', {'value': self.__value})

class PiGPIOSwitch(SoftSwitch):
  def __init__(self, gpio, guard=None, effect=None, convert=None):
    self.__gpio = gpio

    self.__pi = pigpio.pi()
    self.__pi.set_mode(self.__gpio, pigpio.OUTPUT)

    value = self.__pi.read(self.__gpio)

    def writeEffect(value, switch):
      self.__pi.write(self.__gpio, int(value))
      if callable(effect):
        effect(value, switch)

    super().__init__(value, guard, writeEffect, convert)

class NodeSensor(supcon.util.EventEmitter):

  def __init__(self, local: supcon.intf.Node, node: str, path: str):
    supcon.util.EventEmitter.__init__(self)

    self.__local = local
    self.__node = node
    self.__path = path

    self.__value = None

    self.__local.on(self.__local.name, '/', 'supcon.Local', 'intf', self.__onIntf)
    self.__local.on(self.__local.name, '/', 'supcon.Local', 'intfLost', self.__onIntfLost)
    self.__local.on(self.node, self.path, 'com.screwerk.Sensor', 'changed', self.__onChanged)

    if self.__local.nodes().hasIntf(self.node, self.path, 'com.screwerk.com'):
      self.__fetchValue()

  def __del__(self):
    self.__local.off(self.__local.name, '/', 'supcon.Local', 'intf', self.__onIntf)
    self.__local.off(self.__local.name, '/', 'supcon.Local', 'intfLost', self.__onIntfLost)
    self.__local.off(self.node, self.path, 'com.screwerk.Sensor', 'changed', self.__onChanged)

  @property
  def node(self):
    return self.__node

  @property
  def path(self):
    return self.__path

  @property
  def value(self):
    return self.__value

  def __fetchValue(self):
    self.__local.call(self.node, self.path, 'com.screwerk.Sensor', 'value', {}).addCallbacks(
      lambda args: self.__update(args['value']),
      lambda reason: self.__update(None)
    )

  def __update(self, value):
    if self.__value == value:
      return

    self.__value = value
    self.emit('changed', self.__value)

  def __onIntf(self, args, event):
    if (args.node == self.node and args.path == self.path and args.intf.name == 'com.screwerk.Sensor'):
      self.__fetchValue()

  def __onIntfLost(self, args, event):
    if (args.node == self.node and args.path == self.path and args.intf.name == 'com.screwerk.Sensor'):
      self.__update(None)

  def __onChanged(self, args, event):
       self.__update(args['value'])

Module variables

var SensorIntf

the interface com.screwerk.Switch

var SwitchIntf

Functions

def returnFirst(

*args)

def returnFirst(*args): return args[0]

def returnNone(

*args)

def returnNone(*args): return None

def returnTrue(

*args)

def returnTrue(*args): return True

Classes

class NodeSensor

class NodeSensor(supcon.util.EventEmitter):

  def __init__(self, local: supcon.intf.Node, node: str, path: str):
    supcon.util.EventEmitter.__init__(self)

    self.__local = local
    self.__node = node
    self.__path = path

    self.__value = None

    self.__local.on(self.__local.name, '/', 'supcon.Local', 'intf', self.__onIntf)
    self.__local.on(self.__local.name, '/', 'supcon.Local', 'intfLost', self.__onIntfLost)
    self.__local.on(self.node, self.path, 'com.screwerk.Sensor', 'changed', self.__onChanged)

    if self.__local.nodes().hasIntf(self.node, self.path, 'com.screwerk.com'):
      self.__fetchValue()

  def __del__(self):
    self.__local.off(self.__local.name, '/', 'supcon.Local', 'intf', self.__onIntf)
    self.__local.off(self.__local.name, '/', 'supcon.Local', 'intfLost', self.__onIntfLost)
    self.__local.off(self.node, self.path, 'com.screwerk.Sensor', 'changed', self.__onChanged)

  @property
  def node(self):
    return self.__node

  @property
  def path(self):
    return self.__path

  @property
  def value(self):
    return self.__value

  def __fetchValue(self):
    self.__local.call(self.node, self.path, 'com.screwerk.Sensor', 'value', {}).addCallbacks(
      lambda args: self.__update(args['value']),
      lambda reason: self.__update(None)
    )

  def __update(self, value):
    if self.__value == value:
      return

    self.__value = value
    self.emit('changed', self.__value)

  def __onIntf(self, args, event):
    if (args.node == self.node and args.path == self.path and args.intf.name == 'com.screwerk.Sensor'):
      self.__fetchValue()

  def __onIntfLost(self, args, event):
    if (args.node == self.node and args.path == self.path and args.intf.name == 'com.screwerk.Sensor'):
      self.__update(None)

  def __onChanged(self, args, event):
       self.__update(args['value'])

Ancestors (in MRO)

  • NodeSensor
  • supcon.util.EventEmitter
  • builtins.object

Instance variables

var node

var path

var value

Methods

def __init__(

self, local, node, path)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, local: supcon.intf.Node, node: str, path: str):
  supcon.util.EventEmitter.__init__(self)
  self.__local = local
  self.__node = node
  self.__path = path
  self.__value = None
  self.__local.on(self.__local.name, '/', 'supcon.Local', 'intf', self.__onIntf)
  self.__local.on(self.__local.name, '/', 'supcon.Local', 'intfLost', self.__onIntfLost)
  self.__local.on(self.node, self.path, 'com.screwerk.Sensor', 'changed', self.__onChanged)
  if self.__local.nodes().hasIntf(self.node, self.path, 'com.screwerk.com'):
    self.__fetchValue()

def emit(

self, event, *args)

Calls all callbacks for the given event with the given arguments

def emit(self, event, *args):
  """
  Calls all callbacks for the given event with the given arguments
  """
  if event not in self.__callbacks:
    return
  for callback in self.__callbacks[event].values():
    schedule(callback(*args))

def off(

self, event, callback)

Unregisters the given callback for the given event

def off(self, event, callback):
  """
  Unregisters the given callback for the given event
  """
  if event not in self.__callbacks:
    return
  if id(callback) not in self.__callbacks[event]:
    return
  del self.__callbacks[event][id(callback)]

def on(

self, event, callback)

Registers the given callback for the given event

def on(self, event, callback):
  """
  Registers the given callback for the given event
  """
  if type not in self.__callbacks:
    self.__callbacks[event] = {}
  self.__callbacks[event][id(callback)] = callback
  return lambda: self.off(event, callback)

class PiGPIOSensor

class PiGPIOSensor(object):
  def __init__(self, reactor, gpio):
    self.__reactor = reactor
    self.__gpio = gpio

    self.__pi = pigpio.pi()
    self.__pi.set_mode(self.__gpio, pigpio.INPUT)
    self.__pi.set_glitch_filter(self.__gpio, 50000)
    self.__pi.set_pull_up_down(self.__gpio, pigpio.PUD_UP)
    self.__pi.callback(self.__gpio, pigpio.EITHER_EDGE, self.__callback)

    self.__value = bool(self.__pi.read(self.__gpio))

    self.__sensor = supcon.intf.Implementation(SensorIntf)
    self.__sensor.setCallCb('value', lambda: {'value': self.__value})

  @property
  def sensor(self):
    return self.__sensor

  def __callback(self, gpio, level, tick):
    if level >= 2:
      return

    self.__reactor.callFromThread(self.__update, bool(level))

  def __update(self, value):
    if self.__value == bool(value):
      return

    self.__value = bool(value)
    self.__sensor.fire('changed', {'value': self.__value})

Ancestors (in MRO)

Instance variables

var sensor

Methods

def __init__(

self, reactor, gpio)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, reactor, gpio):
  self.__reactor = reactor
  self.__gpio = gpio
  self.__pi = pigpio.pi()
  self.__pi.set_mode(self.__gpio, pigpio.INPUT)
  self.__pi.set_glitch_filter(self.__gpio, 50000)
  self.__pi.set_pull_up_down(self.__gpio, pigpio.PUD_UP)
  self.__pi.callback(self.__gpio, pigpio.EITHER_EDGE, self.__callback)
  self.__value = bool(self.__pi.read(self.__gpio))
  self.__sensor = supcon.intf.Implementation(SensorIntf)
  self.__sensor.setCallCb('value', lambda: {'value': self.__value})

class PiGPIOSwitch

class PiGPIOSwitch(SoftSwitch):
  def __init__(self, gpio, guard=None, effect=None, convert=None):
    self.__gpio = gpio

    self.__pi = pigpio.pi()
    self.__pi.set_mode(self.__gpio, pigpio.OUTPUT)

    value = self.__pi.read(self.__gpio)

    def writeEffect(value, switch):
      self.__pi.write(self.__gpio, int(value))
      if callable(effect):
        effect(value, switch)

    super().__init__(value, guard, writeEffect, convert)

Ancestors (in MRO)

Instance variables

var sensor

Inheritance: SoftSwitch.sensor

var switch

var value

Methods

def __init__(

self, gpio, guard=None, effect=None, convert=None)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, gpio, guard=None, effect=None, convert=None):
  self.__gpio = gpio
  self.__pi = pigpio.pi()
  self.__pi.set_mode(self.__gpio, pigpio.OUTPUT)
  value = self.__pi.read(self.__gpio)
  def writeEffect(value, switch):
    self.__pi.write(self.__gpio, int(value))
    if callable(effect):
      effect(value, switch)
  super().__init__(value, guard, writeEffect, convert)

def update(

self, value)

def update(self, value):
  value = self.__convert(value)
  if self.__value == value:
    return False
  if not self.__guard(value, self):
    return False
  self.__value = value
  self.__effect(value, self)
  self.__sensor.fire('changed', {'value': value})
  return True

class SoftSensor

class SoftSensor(object):
  def __init__(self, value=None, guard=None, effect=None, convert=None):

    self.__guard = guard or returnTrue
    self.__effect = effect or returnNone
    self.__convert = convert or returnFirst

    self.__value = self.__convert(value)

    self.__sensor = supcon.intf.Implementation(SensorIntf)
    self.__sensor.setCallCb('value', lambda: {'value': self.__value})

  @property
  def value(self):
    return self.__value

  @property
  def sensor(self):
    return self.__sensor

  def update(self, value):
    value = self.__convert(value)
    if self.__value == value:
      return False
    if not self.__guard(value, self):
      return False

    self.__value = value
    self.__effect(value, self)
    self.__sensor.fire('changed', {'value': value})
    return True

Ancestors (in MRO)

Instance variables

var sensor

var value

Methods

def __init__(

self, value=None, guard=None, effect=None, convert=None)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, value=None, guard=None, effect=None, convert=None):
  self.__guard = guard or returnTrue
  self.__effect = effect or returnNone
  self.__convert = convert or returnFirst
  self.__value = self.__convert(value)
  self.__sensor = supcon.intf.Implementation(SensorIntf)
  self.__sensor.setCallCb('value', lambda: {'value': self.__value})

def update(

self, value)

def update(self, value):
  value = self.__convert(value)
  if self.__value == value:
    return False
  if not self.__guard(value, self):
    return False
  self.__value = value
  self.__effect(value, self)
  self.__sensor.fire('changed', {'value': value})
  return True

class SoftSwitch

class SoftSwitch(SoftSensor):
  def __init__(self, value=False, guard=None, effect=None, convert=None):
    boolConvert = (lambda value: bool(convert(value))) if convert else bool
    SoftSensor.__init__(self, value, guard, effect, boolConvert)

    update = lambda value: {} if self.update(value) else {}
    self.__switch = supcon.intf.Implementation(SwitchIntf)
    self.__switch.setCallCb('on', lambda: update(True))
    self.__switch.setCallCb('off', lambda: update(False))
    self.__switch.setCallCb('toggle', lambda: update(not self.value))

  @property
  def switch(self):
    return self.__switch

Ancestors (in MRO)

Instance variables

var sensor

Inheritance: SoftSensor.sensor

var switch

var value

Methods

def __init__(

self, value=False, guard=None, effect=None, convert=None)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, value=False, guard=None, effect=None, convert=None):
  boolConvert = (lambda value: bool(convert(value))) if convert else bool
  SoftSensor.__init__(self, value, guard, effect, boolConvert)
  update = lambda value: {} if self.update(value) else {}
  self.__switch = supcon.intf.Implementation(SwitchIntf)
  self.__switch.setCallCb('on', lambda: update(True))
  self.__switch.setCallCb('off', lambda: update(False))
  self.__switch.setCallCb('toggle', lambda: update(not self.value))

def update(

self, value)

def update(self, value):
  value = self.__convert(value)
  if self.__value == value:
    return False
  if not self.__guard(value, self):
    return False
  self.__value = value
  self.__effect(value, self)
  self.__sensor.fire('changed', {'value': value})
  return True