Graph-IT

supcon.core module

This module provides the implementation of abstract class supcon.intf.Node.

# -*- coding: utf-8 -*-
"""
This module provides the implementation of abstract class `supcon.intf.Node`.
"""

import sys
import traceback

import uuid
import struct
import msgpack

import twisted.internet.defer as defer
import twisted.internet.protocol
import twisted.internet.endpoints

import twisted.application.service
import twisted.application.internet

import supcon.intf

_printArgs = lambda *args: print(', '.join(['{}'.format(i) for i in args]))

LocalIntf = supcon.intf.DInterface.load({
  'name': 'supcon.Local',
  'events': [
    {
      'name': 'node',
      'args': [
        {'name': 'node', 'description': 'The node'},
      ],
    },
    {
      'name': 'nodeLost',
      'args': [
        {'name': 'node', 'description': 'The lost node'},
      ],
    },
    {
      'name': 'path',
      'args': [
        {'name': 'node', 'description': 'The node'},
        {'name': 'path', 'description': 'The path'},
      ],
    },
    {
      'name': 'pathLost',
      'args': [
        {'name': 'node', 'description': 'The node'},
        {'name': 'path', 'description': 'The path'},
      ],
    },
    {
      'name': 'intf',
      'args': [
        {'name': 'node', 'description': 'The node'},
        {'name': 'path', 'description': 'The path'},
        {'name': 'intf', 'description': 'The interface'},
      ],
    },
    {
      'name': 'intfLost',
      'args': [
        {'name': 'node', 'description': 'The node'},
        {'name': 'path', 'description': 'The path'},
        {'name': 'intf', 'description': 'The interface'},
      ],
    },
  ],
  'methods': [
    {
      'name': 'nodes',
      'outArgs': [
        {'name': 'nodes', 'description': 'A datastructure about all known nodes, paths and interfaces'},
      ],
    },
  ],
  'description': 'All methods and events are only locally available'
})

RemoteIntf = supcon.intf.DInterface.load({
  'name': 'supcon.Remote',
  'events': [
    {
      'name': 'intf',
      'args': [
        {'name': 'path', 'description': 'The path'},
        {'name': 'intf', 'description': 'The interface'},
      ],
    },
    {
      'name': 'intfLost',
      'args': [
        {'name': 'path', 'description': 'The path'},
        {'name': 'intf', 'description': 'The interface'},
      ],
    },
  ],
  'methods': [
    {
      'name': 'node',
      'outArgs': [
        {'name': 'node', 'description': 'A datastructure about all paths and interfaces of the node'},
      ],
    },
    {
      'name': 'on',
      'inArgs': [
        {'name': 'path', 'description': 'The path'},
        {'name': 'intf', 'description': 'The interface'},
        {'name': 'event', 'description': 'The event'},
      ],
    },
    {
      'name': 'off',
      'inArgs': [
        {'name': 'path', 'description': 'The path'},
        {'name': 'intf', 'description': 'The interface'},
        {'name': 'event', 'description': 'The event'},
      ],
    },
  ],
  'description': 'All methods and events are only used internally'
})

class _Protocol(twisted.internet.protocol.Protocol):
  """This class handels handshake and message encoding between nodes on the bus.

  This class is just an implementation detail of the class Node.
  """
  def __init__(self):
    super().__init__()
    self.factory = None

    self.__data = bytearray()
    self.__name = None

  def connectionMade(self):
    self.sendMesg({
      'type': 'handshake',
      'name': self.factory.name
    })

  def connectionLost(self, reason):
    if not self.__name:
      return
    self.factory.delNode(self.__name)

  def dataReceived(self, data):
    self.__data.extend(data)

    while len(self.__data) > 4:

      size = struct.unpack_from('<L', self.__data)[0]
      if len(self.__data) < size + 4:
        break

      mesg = self.__data[4:size + 4]
      self.__data = self.__data[size + 4:]

      try:
        mesg = msgpack.unpackb(mesg, encoding='utf-8')
      except BaseException as _:
        self.transport.loseConnection()
      else:
        self.recvMesg(mesg)

  def recvMesg(self, mesg):
    if not self.__name:
      if mesg['type'] != 'handshake':
        self.transport.loseConnection()
        return

      self.__name = mesg['name']
      self.factory.addNode(self.__name, self)
    else:
      self.factory.recvMesg(self.__name, mesg)

  def sendMesg(self, mesg):
    data = msgpack.packb(mesg, use_bin_type=True)
    size = struct.pack('<L', len(data))

    self.transport.write(size)
    self.transport.write(data)

class _Factory(twisted.internet.protocol.Factory):
  """This class creates _Protocol instances for connections to remote nodes.

  This class is just an implementation detail of the class Node and encapulates
  methods to create the _Protocol instances resulting from calls to
  Node.connect() and Node.listen().
  """
  protocol = _Protocol

  def __init__(self, name, addNode, delNode, recvMesg):
    self.name = name
    self.addNode = addNode
    self.delNode = delNode
    self.recvMesg = recvMesg

class _KnownMgr(object):

  def __init__(self, fire):
    self.__fire = fire

    self.__nodes = supcon.intf.DNodes()

  def addNode(self, node: str) -> bool:
    if node in self.__nodes:
      return True

    self.__nodes = self.__nodes.addNode(node)

    self.__fire('node', {'node': node})
    return True

  def delNode(self, node: str) -> bool:
    if node not in self.__nodes:
      return True

    for path in self.__nodes[node]:
      for intf in self.__nodes[node][path]:
        self.delIntf(node, path, self.__nodes[node][path][intf])

    self.__nodes = self.__nodes.delNode(node)
    self.__fire('nodeLost', {'node': node})
    return True

  def addIntf(self, node: str, path: str, interface: supcon.intf.DInterface) -> bool:
    #_printArgs('_KnownMgr.addIntf', node, path, interface)
    if node not in self.__nodes:
      return False

    if path not in self.__nodes[node]:
      self.__nodes = self.__nodes.addPath(node, path)
      self.__fire('path', {'node': node, 'path': path})

    if interface.name in self.__nodes[node][path]:
      return True

    self.__nodes = self.__nodes.addInterface(node, path, interface)
    self.__fire('intf', {'node': node, 'path': path, 'intf': interface.dump()})
    return True

  def delIntf(self, node: str, path: str, interface: supcon.intf.DInterface) -> bool:
    #_printArgs('_KnownMgr.delIntf', node, path, interface)
    if not self.__nodes.hasIntf(node, path, interface.name):
      return True

    self.__nodes = self.__nodes.delInterface(node, path, interface)
    self.__fire('intfLost', {'node': node, 'path': path, 'intf': interface.dump()})

    if not self.__nodes[node][path]:
      self.__nodes = self.__nodes.delPath(node, path)
      self.__fire('pathLost', {'node': node, 'path': path})

    return True

  def getNode(self, node: str) -> supcon.intf.DNode:
    if node in self.__nodes:
      return self.__nodes[node]
    return None

  def setNode(self, newNode: supcon.intf.DNode):
    #_printArgs('_KnownMgr.setNode', newNode.dump())
    oldNode = self.getNode(newNode.name)

    for oldPath in oldNode.values():
      for oldIntf in oldPath.values():
        if not newNode.hasIntf(oldPath.name, oldIntf.name):
          self.delIntf(oldNode.name, oldPath.name, oldIntf)

    for newPath in newNode.values():
      for newIntf in newPath.values():
        if not oldNode.hasIntf(newPath.name, newIntf.name):
          self.addIntf(newNode.name, newPath.name, newIntf)

  def getNodes(self) -> supcon.intf.DNodes:
    return self.__nodes

class _InformMgr(object):

  def __init__(self):

    self.__keys = {}
    self.__nodes = {}

  def on(self, path, intf, event, node):
    key = (path, intf, event)

    if key not in self.__nodes:
      self.__nodes[key] = {}
    self.__nodes[key][node] = True

    if node not in self.__keys:
      self.__keys[node] = {}
    self.__keys[node][key] = True

  def off(self, path, intf, event, node):
    key = (path, intf, event)
    if key not in self.__nodes or node not in self.__nodes[key]:
      return

    del self.__nodes[key][node]
    if not self.__nodes[key]:
      del self.__nodes[key]

    del self.__keys[node][key]
    if not self.__keys[node]:
      del self.__keys[node]

  def nodes(self, path, intf, event):
    key = (path, intf, event)
    if key not in self.__nodes:
      return []
    return self.__nodes[key].keys()

  def delNode(self, node):
    if node not in self.__keys:
      return
    for key in self.__keys[node]:
      del self.__nodes[key][node]
      if not self.__nodes[key]:
        del self.__nodes[key]
    del self.__keys[node]

class _DeferredMgr(object):
  """ This class manages Deferred """

  def __init__(self):
    self.__infos = {}

  def create(self, data=None) -> (str, defer.Deferred):
    """Creates a Deferred with an info.

    Args:
      data: Some additional data

    Returns:
      (str, defer.Deferred): A Tuple of an unique id and a Deferred
    """
    pid = str(uuid.uuid4())

    def canceller(_d):
      del self.__infos[pid]

    d = defer.Deferred(canceller)
    self.__infos[pid] = (d, data)

    return (pid, d)

  def succeed(self, value, pid):
    """Succeeds the Deferred identified by the given unique id with the given
    response.

    Args:
      pid (str): A unique id of a Deferred created with _DeferredMgr.create()
      value (mixed): The value to succeed the Deferred with
    """
    if pid not in self.__infos:
      return

    d = self.__infos[pid][0]
    del self.__infos[pid]
    d.callback(value)

  def fail(self, reason, pid):
    """Fail the Deferred identified by the given unique id with the given
    reason.

    Args:
      reason (Exception): The reason to fail the Deferred with
      pid (str): A unique id of a Deferred created with _DeferredMgr.create()
    """
    if pid not in self.__infos:
      return

    d = self.__infos[pid][1]
    del self.__infos[pid]
    d.errback(reason)

  def failAll(self, reason, predicate):
    """Fail all Deferred for which predicate(data) returns true.

    Args:
      reason (Exception): The reason to fail the Deferred with
      predicate (function): A predicate
    """
    for pid, info in self.__infos.copy().items():
      if predicate(pid, info[1]):
        del self.__infos[pid]
        info[0].errback(reason)

class _CallbackMgr(object):

  def __init__(self):
    self.__cbs = {}

  def on(self, key, cb):
    first = False
    if key not in self.__cbs:
      self.__cbs[key] = {}
      first = True
    self.__cbs[key][id(cb)] = cb
    return first

  def off(self, key, cb):
    if key not in self.__cbs or id(cb) not in self.__cbs[key]:
      return False
    del self.__cbs[key][id(cb)]
    if not self.__cbs[key]:
      del self.__cbs[key]
      return True
    return False

  def fire(self, key, args):
    #_printArgs('_CallbackMgr.fire', key, args)
    if key not in self.__cbs:
      return
    for cb in self.__cbs[key].values():
      try:
        cb(args, key)
      except BaseException as _:
        traceback.print_exc()

  def keys(self, predicate):
    return [key for key in self.__cbs if predicate(key)]

class Node(supcon.intf.Node):

  def __init__(self, name):
    super().__init__(name)

    self.__impls = {}
    self.__protocols = {}

    self.__factory = _Factory(self.name, self.__addNode, self.__delNode, self.__recvMesg)

    self.__service = twisted.application.service.MultiService()
    self.__service.startService()

    fire = lambda event, args: \
      self.__fireEventLocal(self.name, '/', LocalIntf.name, event, args)

    self.__knownMgr = _KnownMgr(fire)
    self.__informMgr = _InformMgr()
    self.__callbackMgr = _CallbackMgr()
    self.__deferredMgr = _DeferredMgr()

    self.__knownMgr.addNode(self.name)

  def connect(self, endpoint):
    service = twisted.application.internet.ClientService(endpoint, self.__factory)
    self.__service.addService(service)

  def listen(self, endpoint):
    endpoint.listen(self.__factory)

  def __addNode(self, node: str, protocol: _Protocol):
    if node in self.__protocols:
      return False
    self.__protocols[node] = protocol

    self.__knownMgr.addNode(node)

    # reestablish existing callbacks by calls to supcon.Remote.on
    for key in self.__callbackMgr.keys(lambda key: key[0] == node):
      self.__callRemote(node, '/', RemoteIntf.name, 'on', {
        'path': key[1],
        'intf': key[2],
        'event': key[3]
      })

    self.__callRemote(node, '/', RemoteIntf.name, 'node', {}).addCallback(
      lambda args: self.__knownMgr.setNode(supcon.intf.DNode.load(args['node']))
    ).addErrback(_printArgs)

    return True

  def __delNode(self, node):
    if node not in self.__protocols:
      return False
    del self.__protocols[node]

    self.__knownMgr.delNode(node)
    self.__informMgr.delNode(node)

    reason = RuntimeError('node {} lost'.format(node))
    predicate = lambda pid, data: data[0] == node
    self.__deferredMgr.failAll(reason, predicate)

    return True

  def nodes(self):
    return self.__knownMgr.getNodes()

  def register(self, path: str, impl: supcon.intf.Implementation):
    path = supcon.intf.DPath.toName(path)

    if path not in self.__impls:
      self.__impls[path] = {}

    if impl.intf in self.__impls[path]:
      raise ValueError('interface {} already registered at path {}'.format(impl.intf, path))

    self.__impls[path][impl.intf] = {
      'impl': impl,
      'fire': lambda event, args: self.__fireImplEvent(path, impl.intf, event, args)
    }
    impl.addFireCb(self.__impls[path][impl.intf]['fire'])

    self.__knownMgr.addIntf(self.name, path, impl.interface)
    self.__broadcastEvent('intf', {'path': path, 'intf': impl.interface.dump()})

  def unregister(self, path: str, impl: supcon.intf.Implementation):
    path = supcon.intf.DPath.toName(path)

    if path not in self.__impls:
      raise ValueError('unknown path {}'.format(path))

    if impl.intf not in self.__impls[path]:
      raise ValueError('unknown interface {} at path {}'.format(impl.intf, path))

    if impl != self.__impls[path][impl.intf]['impl']:
      raise ValueError('unknown implementation for interface {} at path {}'.format(impl.intf, path))

    impl.delFireCb(self.__impls[path][impl.intf]['fire'])
    del self.__impls[path][impl.intf]

    self.__knownMgr.delIntf(self.name, path, impl.interface)
    self.__broadcastEvent('intfLost', {'path': path, 'intf': impl.interface.dump()})

  def on(self, node, path, intf, event, cb):
    if self.__callbackMgr.on((node, path, intf, event), cb):
      if node != self.name and node in self.__protocols:
        self.__callRemote(node, '/', RemoteIntf.name, 'on', {
          'path': path, 'intf': intf, 'event': event
        })

  def off(self, node, path, intf, event, cb):
    if self.__callbackMgr.off((node, path, intf, event), cb):
      if node != self.name and node in self.__protocols:
        self.__callRemote(node, '/', RemoteIntf.name, 'off', {
          'path': path, 'intf': intf, 'event': event
        })

  def call(self, node, path, intf, method, args) -> defer.Deferred:
    # TODO: don't raise Exceptions
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to call: method {} of interface {} at path /'.format(method, RemoteIntf.name)

    if node == self.name:
      if path == '/' and intf == LocalIntf.name:
        return self.__callLocal(method, args)
      return self.__callImpl(path, intf, method, args)
    return self.__callRemote(node, path, intf, method, args)

  def __callImpl(self, path, intf, method, args) -> defer.Deferred:
    # TODO: don't raise Exception
    assert path != '/' or intf != LocalIntf.name, \
      'unable to call impl: method {} of interface {} at path /'.format(method, LocalIntf.name)
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to call impl: method {} of interface {} at path /'.format(method, RemoteIntf.name)

    try:
      if path not in self.__impls:
        raise ValueError('unknown path') # TODO: nicer error message
      if intf not in self.__impls[path]:
        raise ValueError('unknown intf') # TODO: nicer error message
      if method not in self.__impls[path][intf]['impl'].interface.methods:
        raise ValueError('unknown method') # TODO: nicer error message

      d = self.__impls[path][intf]['impl'].call(method, args)
      if not isinstance(d, defer.Deferred):
        d = defer.succeed(d)
    except BaseException as e:
      d = defer.fail(e)

    return d

  def __callLocal(self, method, args) -> defer.Deferred:
    # TODO: don't raise Exception
    assert method in LocalIntf.methods, \
      '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name)
    LocalIntf.methods[method].validateInArgs(args)

    if method == 'nodes':
      return defer.succeed({'nodes': self.__knownMgr.getNodes().dump()})

    return defer.fail(RuntimeError('method {} not yet implemented'.format(method)))

  def __callRemote(self, node, path, intf, method, args) -> defer.Deferred:
    # TODO: don't raise Exception
    assert path != '/' or intf != LocalIntf.name, \
      'unable to call remote: method {} of interface {} at path /'.format(method, LocalIntf.name)

    # TODO: validate args

    (cid, p) = self.__deferredMgr.create((node, path, intf, method))
    self.__sendCall(node, path, intf, method, args, cid)
    return p

  def __fireImplEvent(self, path, intf, event, args):
    assert path != '/' or intf != LocalIntf.name, \
      'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name)
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name)

    self.__fireEventLocal(self.name, path, intf, event, args)
    self.__fireEventRemote(path, intf, event, args)

  def __fireEventLocal(self, node: str, path: str, intf: str, event: str, args: dict):
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to fire local: event {} of interface {} at path /'.format(event, RemoteIntf.name)
    #_printArgs('Node.__fireEventLocal', node, path, intf, event, args)

    # TODO: validate args

    self.__callbackMgr.fire((node, path, intf, event), args)

  def __fireEventRemote(self, path: str, intf: str, event: str, args: dict):
    assert path != '/' or intf != LocalIntf.name, \
      'unable to fire remote: event {} of interface {} at path /'.format(event, LocalIntf.name)

    for node in self.__informMgr.nodes(path, intf, event):
      self.__sendEvent(node, path, intf, event, args)

  def __broadcastEvent(self, event: str, args: dict):
    try:
      assert event in RemoteIntf.events, \
        '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name)
      RemoteIntf.events[event].validateArgs(args)
    except BaseException as _:
      traceback.print_exc()
      return

    for node in self.__protocols:
      self.__sendEvent(node, '/', RemoteIntf.name, event, args)

  def __recvMesg(self, node: str, mesg: dict):
    """Gets called by _Protocol on new Message"""
    if node not in self.__protocols:
      return False

    if mesg['type'] == 'call':
      self.__recvCall(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args'], mesg['id'])
    elif mesg['type'] == 'error':
      self.__recvError(node, mesg['args'], mesg['id'])
    elif mesg['type'] == 'response':
      self.__recvResult(node, mesg['args'], mesg['id'])
    elif mesg['type'] == 'event':
      self.__recvEvent(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args'])
    else:
      return False

    return True

  def __recvCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str):
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to recv call: method {} of interface {} at path / from node {}'.format(method, intf, node)

      if path == '/' and intf == RemoteIntf.name:
        assert method in RemoteIntf.methods, \
          '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name)
        RemoteIntf.methods[method].validateInArgs(args)

        if method == 'node':
          d = defer.succeed({'node': self.__knownMgr.getNode(self.name).dump()})
        elif method == 'on':
          self.__informMgr.on(args['path'], args['intf'], args['event'], node)
          d = defer.succeed({})
        elif method == 'off':
          self.__informMgr.on(args['path'], args['intf'], args['event'], node)
          d = defer.succeed({})
        else:
          raise ValueError('method {} is not yet implemented'.format(method))
      else:
        d = self.__callImpl(path, intf, method, args)
    except BaseException as e:
      traceback.print_exc()
      d = defer.fail(e)

    d.addCallbacks(
      lambda result: self.__sendResult(node, result, cid),
      lambda reason: self.__sendError(node, repr(reason), cid)
    ).addErrback(_printArgs)

  def __recvError(self, _node: str, reason: str, cid: str):
    try:
      self.__deferredMgr.fail(RuntimeError(reason), cid)
    except BaseException as _:
      traceback.print_exc()

  def __recvResult(self, _node: str, result: dict, cid: str):
    try:
      self.__deferredMgr.succeed(result, cid)
    except BaseException as _:
      traceback.print_exc()

  def __recvEvent(self, node: str, path: str, intf: str, event: str, args: dict):
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to recv event: event {} of interface {} at path /'.format(event, LocalIntf.name)

      if path == '/' and intf == RemoteIntf.name:
        assert event in RemoteIntf.events, \
          '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name)
        RemoteIntf.events[event].validateArgs(args)

        self.__recvEventRemote(node, event, args)
      else:
        self.__fireEventLocal(node, path, intf, event, args)
    except BaseException as _:
      traceback.print_exc()

  def __recvEventRemote(self, node, event, args):
    if event == 'intf':
      path = args['path']
      interface = supcon.intf.DInterface.load(args['intf'])

      self.__knownMgr.addIntf(node, path, interface)

    elif event == 'intfLost':
      path = args['path']
      interface = supcon.intf.DInterface.load(args['intf'])

      self.__knownMgr.delIntf(node, path, interface)

      reason = RuntimeError('interface {} at path {} on node {} lost'.format(node, path, interface.name))
      predicate = lambda pid, data: data[0] == node and data[1] == path and data[2] == interface.name
      self.__deferredMgr.failAll(reason, predicate)

    else:
      raise ValueError('event {} not yet implemented'.format(event))

  def __sendCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str) -> bool:
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to send call: method {} of interface {} at path /'.format(method, LocalIntf.name)
      if node not in self.__protocols:
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'call',
        'path': path,
        'intf': intf,
        'name': method,
        'args': args,
        'id': cid
      })
      return True
    except BaseException as _:
      traceback.print_exc()
      return False

  def __sendError(self, node: str, reason: str, cid: str) -> bool:
    try:
      if node not in self.__protocols:
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'error',
        'args': reason,
        'id': cid
      })
      return True
    except BaseException as _:
      traceback.print_exc()
      return False

  def __sendResult(self, node: str, result: dict, cid: str) -> bool:
    try:
      if node not in self.__protocols:
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'response',
        'args': result,
        'id': cid
      })
      return True
    except BaseException as _:
      traceback.print_exc()
      return False

  def __sendEvent(self, node: str, path: str, intf: str, event: str, args: dict) -> bool:
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to send event: event {} of interface {} at path /'.format(event, LocalIntf.name)
      if node not in self.__protocols:
        traceback.print_stack()
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'event',
        'path': path,
        'intf': intf,
        'name': event,
        'args': args
      })
      return True
    except BaseException as _:
      exc_info = sys.exc_info()
      traceback.print_exception(*exc_info)
      return False

Module variables

var LocalIntf

var RemoteIntf

Classes

class Node

The Node Interface. This class defines the methods that participants can use to access the supcon bus.

class Node(supcon.intf.Node):

  def __init__(self, name):
    super().__init__(name)

    self.__impls = {}
    self.__protocols = {}

    self.__factory = _Factory(self.name, self.__addNode, self.__delNode, self.__recvMesg)

    self.__service = twisted.application.service.MultiService()
    self.__service.startService()

    fire = lambda event, args: \
      self.__fireEventLocal(self.name, '/', LocalIntf.name, event, args)

    self.__knownMgr = _KnownMgr(fire)
    self.__informMgr = _InformMgr()
    self.__callbackMgr = _CallbackMgr()
    self.__deferredMgr = _DeferredMgr()

    self.__knownMgr.addNode(self.name)

  def connect(self, endpoint):
    service = twisted.application.internet.ClientService(endpoint, self.__factory)
    self.__service.addService(service)

  def listen(self, endpoint):
    endpoint.listen(self.__factory)

  def __addNode(self, node: str, protocol: _Protocol):
    if node in self.__protocols:
      return False
    self.__protocols[node] = protocol

    self.__knownMgr.addNode(node)

    # reestablish existing callbacks by calls to supcon.Remote.on
    for key in self.__callbackMgr.keys(lambda key: key[0] == node):
      self.__callRemote(node, '/', RemoteIntf.name, 'on', {
        'path': key[1],
        'intf': key[2],
        'event': key[3]
      })

    self.__callRemote(node, '/', RemoteIntf.name, 'node', {}).addCallback(
      lambda args: self.__knownMgr.setNode(supcon.intf.DNode.load(args['node']))
    ).addErrback(_printArgs)

    return True

  def __delNode(self, node):
    if node not in self.__protocols:
      return False
    del self.__protocols[node]

    self.__knownMgr.delNode(node)
    self.__informMgr.delNode(node)

    reason = RuntimeError('node {} lost'.format(node))
    predicate = lambda pid, data: data[0] == node
    self.__deferredMgr.failAll(reason, predicate)

    return True

  def nodes(self):
    return self.__knownMgr.getNodes()

  def register(self, path: str, impl: supcon.intf.Implementation):
    path = supcon.intf.DPath.toName(path)

    if path not in self.__impls:
      self.__impls[path] = {}

    if impl.intf in self.__impls[path]:
      raise ValueError('interface {} already registered at path {}'.format(impl.intf, path))

    self.__impls[path][impl.intf] = {
      'impl': impl,
      'fire': lambda event, args: self.__fireImplEvent(path, impl.intf, event, args)
    }
    impl.addFireCb(self.__impls[path][impl.intf]['fire'])

    self.__knownMgr.addIntf(self.name, path, impl.interface)
    self.__broadcastEvent('intf', {'path': path, 'intf': impl.interface.dump()})

  def unregister(self, path: str, impl: supcon.intf.Implementation):
    path = supcon.intf.DPath.toName(path)

    if path not in self.__impls:
      raise ValueError('unknown path {}'.format(path))

    if impl.intf not in self.__impls[path]:
      raise ValueError('unknown interface {} at path {}'.format(impl.intf, path))

    if impl != self.__impls[path][impl.intf]['impl']:
      raise ValueError('unknown implementation for interface {} at path {}'.format(impl.intf, path))

    impl.delFireCb(self.__impls[path][impl.intf]['fire'])
    del self.__impls[path][impl.intf]

    self.__knownMgr.delIntf(self.name, path, impl.interface)
    self.__broadcastEvent('intfLost', {'path': path, 'intf': impl.interface.dump()})

  def on(self, node, path, intf, event, cb):
    if self.__callbackMgr.on((node, path, intf, event), cb):
      if node != self.name and node in self.__protocols:
        self.__callRemote(node, '/', RemoteIntf.name, 'on', {
          'path': path, 'intf': intf, 'event': event
        })

  def off(self, node, path, intf, event, cb):
    if self.__callbackMgr.off((node, path, intf, event), cb):
      if node != self.name and node in self.__protocols:
        self.__callRemote(node, '/', RemoteIntf.name, 'off', {
          'path': path, 'intf': intf, 'event': event
        })

  def call(self, node, path, intf, method, args) -> defer.Deferred:
    # TODO: don't raise Exceptions
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to call: method {} of interface {} at path /'.format(method, RemoteIntf.name)

    if node == self.name:
      if path == '/' and intf == LocalIntf.name:
        return self.__callLocal(method, args)
      return self.__callImpl(path, intf, method, args)
    return self.__callRemote(node, path, intf, method, args)

  def __callImpl(self, path, intf, method, args) -> defer.Deferred:
    # TODO: don't raise Exception
    assert path != '/' or intf != LocalIntf.name, \
      'unable to call impl: method {} of interface {} at path /'.format(method, LocalIntf.name)
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to call impl: method {} of interface {} at path /'.format(method, RemoteIntf.name)

    try:
      if path not in self.__impls:
        raise ValueError('unknown path') # TODO: nicer error message
      if intf not in self.__impls[path]:
        raise ValueError('unknown intf') # TODO: nicer error message
      if method not in self.__impls[path][intf]['impl'].interface.methods:
        raise ValueError('unknown method') # TODO: nicer error message

      d = self.__impls[path][intf]['impl'].call(method, args)
      if not isinstance(d, defer.Deferred):
        d = defer.succeed(d)
    except BaseException as e:
      d = defer.fail(e)

    return d

  def __callLocal(self, method, args) -> defer.Deferred:
    # TODO: don't raise Exception
    assert method in LocalIntf.methods, \
      '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name)
    LocalIntf.methods[method].validateInArgs(args)

    if method == 'nodes':
      return defer.succeed({'nodes': self.__knownMgr.getNodes().dump()})

    return defer.fail(RuntimeError('method {} not yet implemented'.format(method)))

  def __callRemote(self, node, path, intf, method, args) -> defer.Deferred:
    # TODO: don't raise Exception
    assert path != '/' or intf != LocalIntf.name, \
      'unable to call remote: method {} of interface {} at path /'.format(method, LocalIntf.name)

    # TODO: validate args

    (cid, p) = self.__deferredMgr.create((node, path, intf, method))
    self.__sendCall(node, path, intf, method, args, cid)
    return p

  def __fireImplEvent(self, path, intf, event, args):
    assert path != '/' or intf != LocalIntf.name, \
      'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name)
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name)

    self.__fireEventLocal(self.name, path, intf, event, args)
    self.__fireEventRemote(path, intf, event, args)

  def __fireEventLocal(self, node: str, path: str, intf: str, event: str, args: dict):
    assert path != '/' or intf != RemoteIntf.name, \
      'unable to fire local: event {} of interface {} at path /'.format(event, RemoteIntf.name)
    #_printArgs('Node.__fireEventLocal', node, path, intf, event, args)

    # TODO: validate args

    self.__callbackMgr.fire((node, path, intf, event), args)

  def __fireEventRemote(self, path: str, intf: str, event: str, args: dict):
    assert path != '/' or intf != LocalIntf.name, \
      'unable to fire remote: event {} of interface {} at path /'.format(event, LocalIntf.name)

    for node in self.__informMgr.nodes(path, intf, event):
      self.__sendEvent(node, path, intf, event, args)

  def __broadcastEvent(self, event: str, args: dict):
    try:
      assert event in RemoteIntf.events, \
        '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name)
      RemoteIntf.events[event].validateArgs(args)
    except BaseException as _:
      traceback.print_exc()
      return

    for node in self.__protocols:
      self.__sendEvent(node, '/', RemoteIntf.name, event, args)

  def __recvMesg(self, node: str, mesg: dict):
    """Gets called by _Protocol on new Message"""
    if node not in self.__protocols:
      return False

    if mesg['type'] == 'call':
      self.__recvCall(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args'], mesg['id'])
    elif mesg['type'] == 'error':
      self.__recvError(node, mesg['args'], mesg['id'])
    elif mesg['type'] == 'response':
      self.__recvResult(node, mesg['args'], mesg['id'])
    elif mesg['type'] == 'event':
      self.__recvEvent(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args'])
    else:
      return False

    return True

  def __recvCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str):
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to recv call: method {} of interface {} at path / from node {}'.format(method, intf, node)

      if path == '/' and intf == RemoteIntf.name:
        assert method in RemoteIntf.methods, \
          '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name)
        RemoteIntf.methods[method].validateInArgs(args)

        if method == 'node':
          d = defer.succeed({'node': self.__knownMgr.getNode(self.name).dump()})
        elif method == 'on':
          self.__informMgr.on(args['path'], args['intf'], args['event'], node)
          d = defer.succeed({})
        elif method == 'off':
          self.__informMgr.on(args['path'], args['intf'], args['event'], node)
          d = defer.succeed({})
        else:
          raise ValueError('method {} is not yet implemented'.format(method))
      else:
        d = self.__callImpl(path, intf, method, args)
    except BaseException as e:
      traceback.print_exc()
      d = defer.fail(e)

    d.addCallbacks(
      lambda result: self.__sendResult(node, result, cid),
      lambda reason: self.__sendError(node, repr(reason), cid)
    ).addErrback(_printArgs)

  def __recvError(self, _node: str, reason: str, cid: str):
    try:
      self.__deferredMgr.fail(RuntimeError(reason), cid)
    except BaseException as _:
      traceback.print_exc()

  def __recvResult(self, _node: str, result: dict, cid: str):
    try:
      self.__deferredMgr.succeed(result, cid)
    except BaseException as _:
      traceback.print_exc()

  def __recvEvent(self, node: str, path: str, intf: str, event: str, args: dict):
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to recv event: event {} of interface {} at path /'.format(event, LocalIntf.name)

      if path == '/' and intf == RemoteIntf.name:
        assert event in RemoteIntf.events, \
          '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name)
        RemoteIntf.events[event].validateArgs(args)

        self.__recvEventRemote(node, event, args)
      else:
        self.__fireEventLocal(node, path, intf, event, args)
    except BaseException as _:
      traceback.print_exc()

  def __recvEventRemote(self, node, event, args):
    if event == 'intf':
      path = args['path']
      interface = supcon.intf.DInterface.load(args['intf'])

      self.__knownMgr.addIntf(node, path, interface)

    elif event == 'intfLost':
      path = args['path']
      interface = supcon.intf.DInterface.load(args['intf'])

      self.__knownMgr.delIntf(node, path, interface)

      reason = RuntimeError('interface {} at path {} on node {} lost'.format(node, path, interface.name))
      predicate = lambda pid, data: data[0] == node and data[1] == path and data[2] == interface.name
      self.__deferredMgr.failAll(reason, predicate)

    else:
      raise ValueError('event {} not yet implemented'.format(event))

  def __sendCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str) -> bool:
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to send call: method {} of interface {} at path /'.format(method, LocalIntf.name)
      if node not in self.__protocols:
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'call',
        'path': path,
        'intf': intf,
        'name': method,
        'args': args,
        'id': cid
      })
      return True
    except BaseException as _:
      traceback.print_exc()
      return False

  def __sendError(self, node: str, reason: str, cid: str) -> bool:
    try:
      if node not in self.__protocols:
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'error',
        'args': reason,
        'id': cid
      })
      return True
    except BaseException as _:
      traceback.print_exc()
      return False

  def __sendResult(self, node: str, result: dict, cid: str) -> bool:
    try:
      if node not in self.__protocols:
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'response',
        'args': result,
        'id': cid
      })
      return True
    except BaseException as _:
      traceback.print_exc()
      return False

  def __sendEvent(self, node: str, path: str, intf: str, event: str, args: dict) -> bool:
    try:
      assert path != '/' or intf != LocalIntf.name, \
        'unable to send event: event {} of interface {} at path /'.format(event, LocalIntf.name)
      if node not in self.__protocols:
        traceback.print_stack()
        raise RuntimeError('unknown node {}'.format(node))

      self.__protocols[node].sendMesg({
        'type': 'event',
        'path': path,
        'intf': intf,
        'name': event,
        'args': args
      })
      return True
    except BaseException as _:
      exc_info = sys.exc_info()
      traceback.print_exception(*exc_info)
      return False

Ancestors (in MRO)

  • Node
  • supcon.intf.Node
  • abc.ABC
  • builtins.object

Class Methods

def toName(

cls, value)

@classmethod
def toName(cls, value) -> str:
  return DNode.toName(value)

Instance variables

var name

str: The name of the node on the bus

Methods

def __init__(

self, name)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, name):
  super().__init__(name)
  self.__impls = {}
  self.__protocols = {}
  self.__factory = _Factory(self.name, self.__addNode, self.__delNode, self.__recvMesg)
  self.__service = twisted.application.service.MultiService()
  self.__service.startService()
  fire = lambda event, args: \
    self.__fireEventLocal(self.name, '/', LocalIntf.name, event, args)
  self.__knownMgr = _KnownMgr(fire)
  self.__informMgr = _InformMgr()
  self.__callbackMgr = _CallbackMgr()
  self.__deferredMgr = _DeferredMgr()
  self.__knownMgr.addNode(self.name)

def call(

self, node, path, intf, method, args)

Calls a method on the bus

Args: node (str): a node on the bus path (str): a path on the given node intf (str): an interface at the given path method (str): a method of the given interface args (dict): a dict of method arguments

Returns: defer.Deferred:

def call(self, node, path, intf, method, args) -> defer.Deferred:
  # TODO: don't raise Exceptions
  assert path != '/' or intf != RemoteIntf.name, \
    'unable to call: method {} of interface {} at path /'.format(method, RemoteIntf.name)
  if node == self.name:
    if path == '/' and intf == LocalIntf.name:
      return self.__callLocal(method, args)
    return self.__callImpl(path, intf, method, args)
  return self.__callRemote(node, path, intf, method, args)

def connect(

self, endpoint)

Connects the node to the given endpoint.

If the connection failes or closes the connection gets reestablished with an exponential timeout up to two minutes.

Args: endpoint (twisted.internet.interfaces.IStreamClientEndpoint):

def connect(self, endpoint):
  service = twisted.application.internet.ClientService(endpoint, self.__factory)
  self.__service.addService(service)

def listen(

self, endpoint)

Listens at the given endpoint for incoming connections

Args: endpoint (twisted.internet.interfaces.IStreamServerEndpoint):

def listen(self, endpoint):
  endpoint.listen(self.__factory)

def nodes(

self)

list[str]: The currently connected nodes

def nodes(self):
  return self.__knownMgr.getNodes()

def off(

self, node, path, intf, event, cb)

Unregisters a callback for an event on the bus

Args: node (str): a node on the bus path (str): a path on the given node intf (str): an interface at the given path event (str): a method of the given interface cb (callable): a callable that gets called with a dict of event arguments

def off(self, node, path, intf, event, cb):
  if self.__callbackMgr.off((node, path, intf, event), cb):
    if node != self.name and node in self.__protocols:
      self.__callRemote(node, '/', RemoteIntf.name, 'off', {
        'path': path, 'intf': intf, 'event': event
      })

def on(

self, node, path, intf, event, cb)

Registers a callback for an event on the bus

Args: node (str): a node on the bus path (str): a path on the given node intf (str): an interface at the given path event (str): a method of the given interface cb (callable): a callable that gets called with a dict of event arguments

def on(self, node, path, intf, event, cb):
  if self.__callbackMgr.on((node, path, intf, event), cb):
    if node != self.name and node in self.__protocols:
      self.__callRemote(node, '/', RemoteIntf.name, 'on', {
        'path': path, 'intf': intf, 'event': event
      })

def register(

self, path, impl)

Registers an implementation with the node

Args: impl (Implementation):

def register(self, path: str, impl: supcon.intf.Implementation):
  path = supcon.intf.DPath.toName(path)
  if path not in self.__impls:
    self.__impls[path] = {}
  if impl.intf in self.__impls[path]:
    raise ValueError('interface {} already registered at path {}'.format(impl.intf, path))
  self.__impls[path][impl.intf] = {
    'impl': impl,
    'fire': lambda event, args: self.__fireImplEvent(path, impl.intf, event, args)
  }
  impl.addFireCb(self.__impls[path][impl.intf]['fire'])
  self.__knownMgr.addIntf(self.name, path, impl.interface)
  self.__broadcastEvent('intf', {'path': path, 'intf': impl.interface.dump()})

def unregister(

self, path, impl)

Removes an implementation from the node

Args: impl (supcon.intf.Implementation):

def unregister(self, path: str, impl: supcon.intf.Implementation):
  path = supcon.intf.DPath.toName(path)
  if path not in self.__impls:
    raise ValueError('unknown path {}'.format(path))
  if impl.intf not in self.__impls[path]:
    raise ValueError('unknown interface {} at path {}'.format(impl.intf, path))
  if impl != self.__impls[path][impl.intf]['impl']:
    raise ValueError('unknown implementation for interface {} at path {}'.format(impl.intf, path))
  impl.delFireCb(self.__impls[path][impl.intf]['fire'])
  del self.__impls[path][impl.intf]
  self.__knownMgr.delIntf(self.name, path, impl.interface)
  self.__broadcastEvent('intfLost', {'path': path, 'intf': impl.interface.dump()})