supcon.core module
This module provides the implementation of abstract class supcon.intf.Node
.
# -*- coding: utf-8 -*- """ This module provides the implementation of abstract class `supcon.intf.Node`. """ import sys import traceback import uuid import struct import msgpack import twisted.internet.defer as defer import twisted.internet.protocol import twisted.internet.endpoints import twisted.application.service import twisted.application.internet import supcon.intf _printArgs = lambda *args: print(', '.join(['{}'.format(i) for i in args])) LocalIntf = supcon.intf.DInterface.load({ 'name': 'supcon.Local', 'events': [ { 'name': 'node', 'args': [ {'name': 'node', 'description': 'The node'}, ], }, { 'name': 'nodeLost', 'args': [ {'name': 'node', 'description': 'The lost node'}, ], }, { 'name': 'path', 'args': [ {'name': 'node', 'description': 'The node'}, {'name': 'path', 'description': 'The path'}, ], }, { 'name': 'pathLost', 'args': [ {'name': 'node', 'description': 'The node'}, {'name': 'path', 'description': 'The path'}, ], }, { 'name': 'intf', 'args': [ {'name': 'node', 'description': 'The node'}, {'name': 'path', 'description': 'The path'}, {'name': 'intf', 'description': 'The interface'}, ], }, { 'name': 'intfLost', 'args': [ {'name': 'node', 'description': 'The node'}, {'name': 'path', 'description': 'The path'}, {'name': 'intf', 'description': 'The interface'}, ], }, ], 'methods': [ { 'name': 'nodes', 'outArgs': [ {'name': 'nodes', 'description': 'A datastructure about all known nodes, paths and interfaces'}, ], }, ], 'description': 'All methods and events are only locally available' }) RemoteIntf = supcon.intf.DInterface.load({ 'name': 'supcon.Remote', 'events': [ { 'name': 'intf', 'args': [ {'name': 'path', 'description': 'The path'}, {'name': 'intf', 'description': 'The interface'}, ], }, { 'name': 'intfLost', 'args': [ {'name': 'path', 'description': 'The path'}, {'name': 'intf', 'description': 'The interface'}, ], }, ], 'methods': [ { 'name': 'node', 'outArgs': [ {'name': 'node', 'description': 'A datastructure about all paths and interfaces of the node'}, ], }, { 'name': 'on', 'inArgs': [ {'name': 'path', 'description': 'The path'}, {'name': 'intf', 'description': 'The interface'}, {'name': 'event', 'description': 'The event'}, ], }, { 'name': 'off', 'inArgs': [ {'name': 'path', 'description': 'The path'}, {'name': 'intf', 'description': 'The interface'}, {'name': 'event', 'description': 'The event'}, ], }, ], 'description': 'All methods and events are only used internally' }) class _Protocol(twisted.internet.protocol.Protocol): """This class handels handshake and message encoding between nodes on the bus. This class is just an implementation detail of the class Node. """ def __init__(self): super().__init__() self.factory = None self.__data = bytearray() self.__name = None def connectionMade(self): self.sendMesg({ 'type': 'handshake', 'name': self.factory.name }) def connectionLost(self, reason): if not self.__name: return self.factory.delNode(self.__name) def dataReceived(self, data): self.__data.extend(data) while len(self.__data) > 4: size = struct.unpack_from('<L', self.__data)[0] if len(self.__data) < size + 4: break mesg = self.__data[4:size + 4] self.__data = self.__data[size + 4:] try: mesg = msgpack.unpackb(mesg, encoding='utf-8') except BaseException as _: self.transport.loseConnection() else: self.recvMesg(mesg) def recvMesg(self, mesg): if not self.__name: if mesg['type'] != 'handshake': self.transport.loseConnection() return self.__name = mesg['name'] self.factory.addNode(self.__name, self) else: self.factory.recvMesg(self.__name, mesg) def sendMesg(self, mesg): data = msgpack.packb(mesg, use_bin_type=True) size = struct.pack('<L', len(data)) self.transport.write(size) self.transport.write(data) class _Factory(twisted.internet.protocol.Factory): """This class creates _Protocol instances for connections to remote nodes. This class is just an implementation detail of the class Node and encapulates methods to create the _Protocol instances resulting from calls to Node.connect() and Node.listen(). """ protocol = _Protocol def __init__(self, name, addNode, delNode, recvMesg): self.name = name self.addNode = addNode self.delNode = delNode self.recvMesg = recvMesg class _KnownMgr(object): def __init__(self, fire): self.__fire = fire self.__nodes = supcon.intf.DNodes() def addNode(self, node: str) -> bool: if node in self.__nodes: return True self.__nodes = self.__nodes.addNode(node) self.__fire('node', {'node': node}) return True def delNode(self, node: str) -> bool: if node not in self.__nodes: return True for path in self.__nodes[node]: for intf in self.__nodes[node][path]: self.delIntf(node, path, self.__nodes[node][path][intf]) self.__nodes = self.__nodes.delNode(node) self.__fire('nodeLost', {'node': node}) return True def addIntf(self, node: str, path: str, interface: supcon.intf.DInterface) -> bool: #_printArgs('_KnownMgr.addIntf', node, path, interface) if node not in self.__nodes: return False if path not in self.__nodes[node]: self.__nodes = self.__nodes.addPath(node, path) self.__fire('path', {'node': node, 'path': path}) if interface.name in self.__nodes[node][path]: return True self.__nodes = self.__nodes.addInterface(node, path, interface) self.__fire('intf', {'node': node, 'path': path, 'intf': interface.dump()}) return True def delIntf(self, node: str, path: str, interface: supcon.intf.DInterface) -> bool: #_printArgs('_KnownMgr.delIntf', node, path, interface) if not self.__nodes.hasIntf(node, path, interface.name): return True self.__nodes = self.__nodes.delInterface(node, path, interface) self.__fire('intfLost', {'node': node, 'path': path, 'intf': interface.dump()}) if not self.__nodes[node][path]: self.__nodes = self.__nodes.delPath(node, path) self.__fire('pathLost', {'node': node, 'path': path}) return True def getNode(self, node: str) -> supcon.intf.DNode: if node in self.__nodes: return self.__nodes[node] return None def setNode(self, newNode: supcon.intf.DNode): #_printArgs('_KnownMgr.setNode', newNode.dump()) oldNode = self.getNode(newNode.name) for oldPath in oldNode.values(): for oldIntf in oldPath.values(): if not newNode.hasIntf(oldPath.name, oldIntf.name): self.delIntf(oldNode.name, oldPath.name, oldIntf) for newPath in newNode.values(): for newIntf in newPath.values(): if not oldNode.hasIntf(newPath.name, newIntf.name): self.addIntf(newNode.name, newPath.name, newIntf) def getNodes(self) -> supcon.intf.DNodes: return self.__nodes class _InformMgr(object): def __init__(self): self.__keys = {} self.__nodes = {} def on(self, path, intf, event, node): key = (path, intf, event) if key not in self.__nodes: self.__nodes[key] = {} self.__nodes[key][node] = True if node not in self.__keys: self.__keys[node] = {} self.__keys[node][key] = True def off(self, path, intf, event, node): key = (path, intf, event) if key not in self.__nodes or node not in self.__nodes[key]: return del self.__nodes[key][node] if not self.__nodes[key]: del self.__nodes[key] del self.__keys[node][key] if not self.__keys[node]: del self.__keys[node] def nodes(self, path, intf, event): key = (path, intf, event) if key not in self.__nodes: return [] return self.__nodes[key].keys() def delNode(self, node): if node not in self.__keys: return for key in self.__keys[node]: del self.__nodes[key][node] if not self.__nodes[key]: del self.__nodes[key] del self.__keys[node] class _DeferredMgr(object): """ This class manages Deferred """ def __init__(self): self.__infos = {} def create(self, data=None) -> (str, defer.Deferred): """Creates a Deferred with an info. Args: data: Some additional data Returns: (str, defer.Deferred): A Tuple of an unique id and a Deferred """ pid = str(uuid.uuid4()) def canceller(_d): del self.__infos[pid] d = defer.Deferred(canceller) self.__infos[pid] = (d, data) return (pid, d) def succeed(self, value, pid): """Succeeds the Deferred identified by the given unique id with the given response. Args: pid (str): A unique id of a Deferred created with _DeferredMgr.create() value (mixed): The value to succeed the Deferred with """ if pid not in self.__infos: return d = self.__infos[pid][0] del self.__infos[pid] d.callback(value) def fail(self, reason, pid): """Fail the Deferred identified by the given unique id with the given reason. Args: reason (Exception): The reason to fail the Deferred with pid (str): A unique id of a Deferred created with _DeferredMgr.create() """ if pid not in self.__infos: return d = self.__infos[pid][1] del self.__infos[pid] d.errback(reason) def failAll(self, reason, predicate): """Fail all Deferred for which predicate(data) returns true. Args: reason (Exception): The reason to fail the Deferred with predicate (function): A predicate """ for pid, info in self.__infos.copy().items(): if predicate(pid, info[1]): del self.__infos[pid] info[0].errback(reason) class _CallbackMgr(object): def __init__(self): self.__cbs = {} def on(self, key, cb): first = False if key not in self.__cbs: self.__cbs[key] = {} first = True self.__cbs[key][id(cb)] = cb return first def off(self, key, cb): if key not in self.__cbs or id(cb) not in self.__cbs[key]: return False del self.__cbs[key][id(cb)] if not self.__cbs[key]: del self.__cbs[key] return True return False def fire(self, key, args): #_printArgs('_CallbackMgr.fire', key, args) if key not in self.__cbs: return for cb in self.__cbs[key].values(): try: cb(args, key) except BaseException as _: traceback.print_exc() def keys(self, predicate): return [key for key in self.__cbs if predicate(key)] class Node(supcon.intf.Node): def __init__(self, name): super().__init__(name) self.__impls = {} self.__protocols = {} self.__factory = _Factory(self.name, self.__addNode, self.__delNode, self.__recvMesg) self.__service = twisted.application.service.MultiService() self.__service.startService() fire = lambda event, args: \ self.__fireEventLocal(self.name, '/', LocalIntf.name, event, args) self.__knownMgr = _KnownMgr(fire) self.__informMgr = _InformMgr() self.__callbackMgr = _CallbackMgr() self.__deferredMgr = _DeferredMgr() self.__knownMgr.addNode(self.name) def connect(self, endpoint): service = twisted.application.internet.ClientService(endpoint, self.__factory) self.__service.addService(service) def listen(self, endpoint): endpoint.listen(self.__factory) def __addNode(self, node: str, protocol: _Protocol): if node in self.__protocols: return False self.__protocols[node] = protocol self.__knownMgr.addNode(node) # reestablish existing callbacks by calls to supcon.Remote.on for key in self.__callbackMgr.keys(lambda key: key[0] == node): self.__callRemote(node, '/', RemoteIntf.name, 'on', { 'path': key[1], 'intf': key[2], 'event': key[3] }) self.__callRemote(node, '/', RemoteIntf.name, 'node', {}).addCallback( lambda args: self.__knownMgr.setNode(supcon.intf.DNode.load(args['node'])) ).addErrback(_printArgs) return True def __delNode(self, node): if node not in self.__protocols: return False del self.__protocols[node] self.__knownMgr.delNode(node) self.__informMgr.delNode(node) reason = RuntimeError('node {} lost'.format(node)) predicate = lambda pid, data: data[0] == node self.__deferredMgr.failAll(reason, predicate) return True def nodes(self): return self.__knownMgr.getNodes() def register(self, path: str, impl: supcon.intf.Implementation): path = supcon.intf.DPath.toName(path) if path not in self.__impls: self.__impls[path] = {} if impl.intf in self.__impls[path]: raise ValueError('interface {} already registered at path {}'.format(impl.intf, path)) self.__impls[path][impl.intf] = { 'impl': impl, 'fire': lambda event, args: self.__fireImplEvent(path, impl.intf, event, args) } impl.addFireCb(self.__impls[path][impl.intf]['fire']) self.__knownMgr.addIntf(self.name, path, impl.interface) self.__broadcastEvent('intf', {'path': path, 'intf': impl.interface.dump()}) def unregister(self, path: str, impl: supcon.intf.Implementation): path = supcon.intf.DPath.toName(path) if path not in self.__impls: raise ValueError('unknown path {}'.format(path)) if impl.intf not in self.__impls[path]: raise ValueError('unknown interface {} at path {}'.format(impl.intf, path)) if impl != self.__impls[path][impl.intf]['impl']: raise ValueError('unknown implementation for interface {} at path {}'.format(impl.intf, path)) impl.delFireCb(self.__impls[path][impl.intf]['fire']) del self.__impls[path][impl.intf] self.__knownMgr.delIntf(self.name, path, impl.interface) self.__broadcastEvent('intfLost', {'path': path, 'intf': impl.interface.dump()}) def on(self, node, path, intf, event, cb): if self.__callbackMgr.on((node, path, intf, event), cb): if node != self.name and node in self.__protocols: self.__callRemote(node, '/', RemoteIntf.name, 'on', { 'path': path, 'intf': intf, 'event': event }) def off(self, node, path, intf, event, cb): if self.__callbackMgr.off((node, path, intf, event), cb): if node != self.name and node in self.__protocols: self.__callRemote(node, '/', RemoteIntf.name, 'off', { 'path': path, 'intf': intf, 'event': event }) def call(self, node, path, intf, method, args) -> defer.Deferred: # TODO: don't raise Exceptions assert path != '/' or intf != RemoteIntf.name, \ 'unable to call: method {} of interface {} at path /'.format(method, RemoteIntf.name) if node == self.name: if path == '/' and intf == LocalIntf.name: return self.__callLocal(method, args) return self.__callImpl(path, intf, method, args) return self.__callRemote(node, path, intf, method, args) def __callImpl(self, path, intf, method, args) -> defer.Deferred: # TODO: don't raise Exception assert path != '/' or intf != LocalIntf.name, \ 'unable to call impl: method {} of interface {} at path /'.format(method, LocalIntf.name) assert path != '/' or intf != RemoteIntf.name, \ 'unable to call impl: method {} of interface {} at path /'.format(method, RemoteIntf.name) try: if path not in self.__impls: raise ValueError('unknown path') # TODO: nicer error message if intf not in self.__impls[path]: raise ValueError('unknown intf') # TODO: nicer error message if method not in self.__impls[path][intf]['impl'].interface.methods: raise ValueError('unknown method') # TODO: nicer error message d = self.__impls[path][intf]['impl'].call(method, args) if not isinstance(d, defer.Deferred): d = defer.succeed(d) except BaseException as e: d = defer.fail(e) return d def __callLocal(self, method, args) -> defer.Deferred: # TODO: don't raise Exception assert method in LocalIntf.methods, \ '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name) LocalIntf.methods[method].validateInArgs(args) if method == 'nodes': return defer.succeed({'nodes': self.__knownMgr.getNodes().dump()}) return defer.fail(RuntimeError('method {} not yet implemented'.format(method))) def __callRemote(self, node, path, intf, method, args) -> defer.Deferred: # TODO: don't raise Exception assert path != '/' or intf != LocalIntf.name, \ 'unable to call remote: method {} of interface {} at path /'.format(method, LocalIntf.name) # TODO: validate args (cid, p) = self.__deferredMgr.create((node, path, intf, method)) self.__sendCall(node, path, intf, method, args, cid) return p def __fireImplEvent(self, path, intf, event, args): assert path != '/' or intf != LocalIntf.name, \ 'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name) assert path != '/' or intf != RemoteIntf.name, \ 'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name) self.__fireEventLocal(self.name, path, intf, event, args) self.__fireEventRemote(path, intf, event, args) def __fireEventLocal(self, node: str, path: str, intf: str, event: str, args: dict): assert path != '/' or intf != RemoteIntf.name, \ 'unable to fire local: event {} of interface {} at path /'.format(event, RemoteIntf.name) #_printArgs('Node.__fireEventLocal', node, path, intf, event, args) # TODO: validate args self.__callbackMgr.fire((node, path, intf, event), args) def __fireEventRemote(self, path: str, intf: str, event: str, args: dict): assert path != '/' or intf != LocalIntf.name, \ 'unable to fire remote: event {} of interface {} at path /'.format(event, LocalIntf.name) for node in self.__informMgr.nodes(path, intf, event): self.__sendEvent(node, path, intf, event, args) def __broadcastEvent(self, event: str, args: dict): try: assert event in RemoteIntf.events, \ '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name) RemoteIntf.events[event].validateArgs(args) except BaseException as _: traceback.print_exc() return for node in self.__protocols: self.__sendEvent(node, '/', RemoteIntf.name, event, args) def __recvMesg(self, node: str, mesg: dict): """Gets called by _Protocol on new Message""" if node not in self.__protocols: return False if mesg['type'] == 'call': self.__recvCall(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args'], mesg['id']) elif mesg['type'] == 'error': self.__recvError(node, mesg['args'], mesg['id']) elif mesg['type'] == 'response': self.__recvResult(node, mesg['args'], mesg['id']) elif mesg['type'] == 'event': self.__recvEvent(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args']) else: return False return True def __recvCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str): try: assert path != '/' or intf != LocalIntf.name, \ 'unable to recv call: method {} of interface {} at path / from node {}'.format(method, intf, node) if path == '/' and intf == RemoteIntf.name: assert method in RemoteIntf.methods, \ '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name) RemoteIntf.methods[method].validateInArgs(args) if method == 'node': d = defer.succeed({'node': self.__knownMgr.getNode(self.name).dump()}) elif method == 'on': self.__informMgr.on(args['path'], args['intf'], args['event'], node) d = defer.succeed({}) elif method == 'off': self.__informMgr.on(args['path'], args['intf'], args['event'], node) d = defer.succeed({}) else: raise ValueError('method {} is not yet implemented'.format(method)) else: d = self.__callImpl(path, intf, method, args) except BaseException as e: traceback.print_exc() d = defer.fail(e) d.addCallbacks( lambda result: self.__sendResult(node, result, cid), lambda reason: self.__sendError(node, repr(reason), cid) ).addErrback(_printArgs) def __recvError(self, _node: str, reason: str, cid: str): try: self.__deferredMgr.fail(RuntimeError(reason), cid) except BaseException as _: traceback.print_exc() def __recvResult(self, _node: str, result: dict, cid: str): try: self.__deferredMgr.succeed(result, cid) except BaseException as _: traceback.print_exc() def __recvEvent(self, node: str, path: str, intf: str, event: str, args: dict): try: assert path != '/' or intf != LocalIntf.name, \ 'unable to recv event: event {} of interface {} at path /'.format(event, LocalIntf.name) if path == '/' and intf == RemoteIntf.name: assert event in RemoteIntf.events, \ '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name) RemoteIntf.events[event].validateArgs(args) self.__recvEventRemote(node, event, args) else: self.__fireEventLocal(node, path, intf, event, args) except BaseException as _: traceback.print_exc() def __recvEventRemote(self, node, event, args): if event == 'intf': path = args['path'] interface = supcon.intf.DInterface.load(args['intf']) self.__knownMgr.addIntf(node, path, interface) elif event == 'intfLost': path = args['path'] interface = supcon.intf.DInterface.load(args['intf']) self.__knownMgr.delIntf(node, path, interface) reason = RuntimeError('interface {} at path {} on node {} lost'.format(node, path, interface.name)) predicate = lambda pid, data: data[0] == node and data[1] == path and data[2] == interface.name self.__deferredMgr.failAll(reason, predicate) else: raise ValueError('event {} not yet implemented'.format(event)) def __sendCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str) -> bool: try: assert path != '/' or intf != LocalIntf.name, \ 'unable to send call: method {} of interface {} at path /'.format(method, LocalIntf.name) if node not in self.__protocols: raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'call', 'path': path, 'intf': intf, 'name': method, 'args': args, 'id': cid }) return True except BaseException as _: traceback.print_exc() return False def __sendError(self, node: str, reason: str, cid: str) -> bool: try: if node not in self.__protocols: raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'error', 'args': reason, 'id': cid }) return True except BaseException as _: traceback.print_exc() return False def __sendResult(self, node: str, result: dict, cid: str) -> bool: try: if node not in self.__protocols: raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'response', 'args': result, 'id': cid }) return True except BaseException as _: traceback.print_exc() return False def __sendEvent(self, node: str, path: str, intf: str, event: str, args: dict) -> bool: try: assert path != '/' or intf != LocalIntf.name, \ 'unable to send event: event {} of interface {} at path /'.format(event, LocalIntf.name) if node not in self.__protocols: traceback.print_stack() raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'event', 'path': path, 'intf': intf, 'name': event, 'args': args }) return True except BaseException as _: exc_info = sys.exc_info() traceback.print_exception(*exc_info) return False
Module variables
var LocalIntf
var RemoteIntf
Classes
class Node
The Node Interface. This class defines the methods that participants can use to access the supcon bus.
class Node(supcon.intf.Node): def __init__(self, name): super().__init__(name) self.__impls = {} self.__protocols = {} self.__factory = _Factory(self.name, self.__addNode, self.__delNode, self.__recvMesg) self.__service = twisted.application.service.MultiService() self.__service.startService() fire = lambda event, args: \ self.__fireEventLocal(self.name, '/', LocalIntf.name, event, args) self.__knownMgr = _KnownMgr(fire) self.__informMgr = _InformMgr() self.__callbackMgr = _CallbackMgr() self.__deferredMgr = _DeferredMgr() self.__knownMgr.addNode(self.name) def connect(self, endpoint): service = twisted.application.internet.ClientService(endpoint, self.__factory) self.__service.addService(service) def listen(self, endpoint): endpoint.listen(self.__factory) def __addNode(self, node: str, protocol: _Protocol): if node in self.__protocols: return False self.__protocols[node] = protocol self.__knownMgr.addNode(node) # reestablish existing callbacks by calls to supcon.Remote.on for key in self.__callbackMgr.keys(lambda key: key[0] == node): self.__callRemote(node, '/', RemoteIntf.name, 'on', { 'path': key[1], 'intf': key[2], 'event': key[3] }) self.__callRemote(node, '/', RemoteIntf.name, 'node', {}).addCallback( lambda args: self.__knownMgr.setNode(supcon.intf.DNode.load(args['node'])) ).addErrback(_printArgs) return True def __delNode(self, node): if node not in self.__protocols: return False del self.__protocols[node] self.__knownMgr.delNode(node) self.__informMgr.delNode(node) reason = RuntimeError('node {} lost'.format(node)) predicate = lambda pid, data: data[0] == node self.__deferredMgr.failAll(reason, predicate) return True def nodes(self): return self.__knownMgr.getNodes() def register(self, path: str, impl: supcon.intf.Implementation): path = supcon.intf.DPath.toName(path) if path not in self.__impls: self.__impls[path] = {} if impl.intf in self.__impls[path]: raise ValueError('interface {} already registered at path {}'.format(impl.intf, path)) self.__impls[path][impl.intf] = { 'impl': impl, 'fire': lambda event, args: self.__fireImplEvent(path, impl.intf, event, args) } impl.addFireCb(self.__impls[path][impl.intf]['fire']) self.__knownMgr.addIntf(self.name, path, impl.interface) self.__broadcastEvent('intf', {'path': path, 'intf': impl.interface.dump()}) def unregister(self, path: str, impl: supcon.intf.Implementation): path = supcon.intf.DPath.toName(path) if path not in self.__impls: raise ValueError('unknown path {}'.format(path)) if impl.intf not in self.__impls[path]: raise ValueError('unknown interface {} at path {}'.format(impl.intf, path)) if impl != self.__impls[path][impl.intf]['impl']: raise ValueError('unknown implementation for interface {} at path {}'.format(impl.intf, path)) impl.delFireCb(self.__impls[path][impl.intf]['fire']) del self.__impls[path][impl.intf] self.__knownMgr.delIntf(self.name, path, impl.interface) self.__broadcastEvent('intfLost', {'path': path, 'intf': impl.interface.dump()}) def on(self, node, path, intf, event, cb): if self.__callbackMgr.on((node, path, intf, event), cb): if node != self.name and node in self.__protocols: self.__callRemote(node, '/', RemoteIntf.name, 'on', { 'path': path, 'intf': intf, 'event': event }) def off(self, node, path, intf, event, cb): if self.__callbackMgr.off((node, path, intf, event), cb): if node != self.name and node in self.__protocols: self.__callRemote(node, '/', RemoteIntf.name, 'off', { 'path': path, 'intf': intf, 'event': event }) def call(self, node, path, intf, method, args) -> defer.Deferred: # TODO: don't raise Exceptions assert path != '/' or intf != RemoteIntf.name, \ 'unable to call: method {} of interface {} at path /'.format(method, RemoteIntf.name) if node == self.name: if path == '/' and intf == LocalIntf.name: return self.__callLocal(method, args) return self.__callImpl(path, intf, method, args) return self.__callRemote(node, path, intf, method, args) def __callImpl(self, path, intf, method, args) -> defer.Deferred: # TODO: don't raise Exception assert path != '/' or intf != LocalIntf.name, \ 'unable to call impl: method {} of interface {} at path /'.format(method, LocalIntf.name) assert path != '/' or intf != RemoteIntf.name, \ 'unable to call impl: method {} of interface {} at path /'.format(method, RemoteIntf.name) try: if path not in self.__impls: raise ValueError('unknown path') # TODO: nicer error message if intf not in self.__impls[path]: raise ValueError('unknown intf') # TODO: nicer error message if method not in self.__impls[path][intf]['impl'].interface.methods: raise ValueError('unknown method') # TODO: nicer error message d = self.__impls[path][intf]['impl'].call(method, args) if not isinstance(d, defer.Deferred): d = defer.succeed(d) except BaseException as e: d = defer.fail(e) return d def __callLocal(self, method, args) -> defer.Deferred: # TODO: don't raise Exception assert method in LocalIntf.methods, \ '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name) LocalIntf.methods[method].validateInArgs(args) if method == 'nodes': return defer.succeed({'nodes': self.__knownMgr.getNodes().dump()}) return defer.fail(RuntimeError('method {} not yet implemented'.format(method))) def __callRemote(self, node, path, intf, method, args) -> defer.Deferred: # TODO: don't raise Exception assert path != '/' or intf != LocalIntf.name, \ 'unable to call remote: method {} of interface {} at path /'.format(method, LocalIntf.name) # TODO: validate args (cid, p) = self.__deferredMgr.create((node, path, intf, method)) self.__sendCall(node, path, intf, method, args, cid) return p def __fireImplEvent(self, path, intf, event, args): assert path != '/' or intf != LocalIntf.name, \ 'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name) assert path != '/' or intf != RemoteIntf.name, \ 'unable to fire impl: event {} of interface {} at path /'.format(event, RemoteIntf.name) self.__fireEventLocal(self.name, path, intf, event, args) self.__fireEventRemote(path, intf, event, args) def __fireEventLocal(self, node: str, path: str, intf: str, event: str, args: dict): assert path != '/' or intf != RemoteIntf.name, \ 'unable to fire local: event {} of interface {} at path /'.format(event, RemoteIntf.name) #_printArgs('Node.__fireEventLocal', node, path, intf, event, args) # TODO: validate args self.__callbackMgr.fire((node, path, intf, event), args) def __fireEventRemote(self, path: str, intf: str, event: str, args: dict): assert path != '/' or intf != LocalIntf.name, \ 'unable to fire remote: event {} of interface {} at path /'.format(event, LocalIntf.name) for node in self.__informMgr.nodes(path, intf, event): self.__sendEvent(node, path, intf, event, args) def __broadcastEvent(self, event: str, args: dict): try: assert event in RemoteIntf.events, \ '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name) RemoteIntf.events[event].validateArgs(args) except BaseException as _: traceback.print_exc() return for node in self.__protocols: self.__sendEvent(node, '/', RemoteIntf.name, event, args) def __recvMesg(self, node: str, mesg: dict): """Gets called by _Protocol on new Message""" if node not in self.__protocols: return False if mesg['type'] == 'call': self.__recvCall(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args'], mesg['id']) elif mesg['type'] == 'error': self.__recvError(node, mesg['args'], mesg['id']) elif mesg['type'] == 'response': self.__recvResult(node, mesg['args'], mesg['id']) elif mesg['type'] == 'event': self.__recvEvent(node, mesg['path'], mesg['intf'], mesg['name'], mesg['args']) else: return False return True def __recvCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str): try: assert path != '/' or intf != LocalIntf.name, \ 'unable to recv call: method {} of interface {} at path / from node {}'.format(method, intf, node) if path == '/' and intf == RemoteIntf.name: assert method in RemoteIntf.methods, \ '{} is not an method of interface {} at path /'.format(method, RemoteIntf.name) RemoteIntf.methods[method].validateInArgs(args) if method == 'node': d = defer.succeed({'node': self.__knownMgr.getNode(self.name).dump()}) elif method == 'on': self.__informMgr.on(args['path'], args['intf'], args['event'], node) d = defer.succeed({}) elif method == 'off': self.__informMgr.on(args['path'], args['intf'], args['event'], node) d = defer.succeed({}) else: raise ValueError('method {} is not yet implemented'.format(method)) else: d = self.__callImpl(path, intf, method, args) except BaseException as e: traceback.print_exc() d = defer.fail(e) d.addCallbacks( lambda result: self.__sendResult(node, result, cid), lambda reason: self.__sendError(node, repr(reason), cid) ).addErrback(_printArgs) def __recvError(self, _node: str, reason: str, cid: str): try: self.__deferredMgr.fail(RuntimeError(reason), cid) except BaseException as _: traceback.print_exc() def __recvResult(self, _node: str, result: dict, cid: str): try: self.__deferredMgr.succeed(result, cid) except BaseException as _: traceback.print_exc() def __recvEvent(self, node: str, path: str, intf: str, event: str, args: dict): try: assert path != '/' or intf != LocalIntf.name, \ 'unable to recv event: event {} of interface {} at path /'.format(event, LocalIntf.name) if path == '/' and intf == RemoteIntf.name: assert event in RemoteIntf.events, \ '{} is not an event of interface {} at path /'.format(event, RemoteIntf.name) RemoteIntf.events[event].validateArgs(args) self.__recvEventRemote(node, event, args) else: self.__fireEventLocal(node, path, intf, event, args) except BaseException as _: traceback.print_exc() def __recvEventRemote(self, node, event, args): if event == 'intf': path = args['path'] interface = supcon.intf.DInterface.load(args['intf']) self.__knownMgr.addIntf(node, path, interface) elif event == 'intfLost': path = args['path'] interface = supcon.intf.DInterface.load(args['intf']) self.__knownMgr.delIntf(node, path, interface) reason = RuntimeError('interface {} at path {} on node {} lost'.format(node, path, interface.name)) predicate = lambda pid, data: data[0] == node and data[1] == path and data[2] == interface.name self.__deferredMgr.failAll(reason, predicate) else: raise ValueError('event {} not yet implemented'.format(event)) def __sendCall(self, node: str, path: str, intf: str, method: str, args: dict, cid: str) -> bool: try: assert path != '/' or intf != LocalIntf.name, \ 'unable to send call: method {} of interface {} at path /'.format(method, LocalIntf.name) if node not in self.__protocols: raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'call', 'path': path, 'intf': intf, 'name': method, 'args': args, 'id': cid }) return True except BaseException as _: traceback.print_exc() return False def __sendError(self, node: str, reason: str, cid: str) -> bool: try: if node not in self.__protocols: raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'error', 'args': reason, 'id': cid }) return True except BaseException as _: traceback.print_exc() return False def __sendResult(self, node: str, result: dict, cid: str) -> bool: try: if node not in self.__protocols: raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'response', 'args': result, 'id': cid }) return True except BaseException as _: traceback.print_exc() return False def __sendEvent(self, node: str, path: str, intf: str, event: str, args: dict) -> bool: try: assert path != '/' or intf != LocalIntf.name, \ 'unable to send event: event {} of interface {} at path /'.format(event, LocalIntf.name) if node not in self.__protocols: traceback.print_stack() raise RuntimeError('unknown node {}'.format(node)) self.__protocols[node].sendMesg({ 'type': 'event', 'path': path, 'intf': intf, 'name': event, 'args': args }) return True except BaseException as _: exc_info = sys.exc_info() traceback.print_exception(*exc_info) return False
Ancestors (in MRO)
- Node
- supcon.intf.Node
- abc.ABC
- builtins.object
Class Methods
def toName(
cls, value)
@classmethod def toName(cls, value) -> str: return DNode.toName(value)
Instance variables
var name
str: The name of the node on the bus
Methods
def __init__(
self, name)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, name): super().__init__(name) self.__impls = {} self.__protocols = {} self.__factory = _Factory(self.name, self.__addNode, self.__delNode, self.__recvMesg) self.__service = twisted.application.service.MultiService() self.__service.startService() fire = lambda event, args: \ self.__fireEventLocal(self.name, '/', LocalIntf.name, event, args) self.__knownMgr = _KnownMgr(fire) self.__informMgr = _InformMgr() self.__callbackMgr = _CallbackMgr() self.__deferredMgr = _DeferredMgr() self.__knownMgr.addNode(self.name)
def call(
self, node, path, intf, method, args)
Calls a method on the bus
Args: node (str): a node on the bus path (str): a path on the given node intf (str): an interface at the given path method (str): a method of the given interface args (dict): a dict of method arguments
Returns: defer.Deferred:
def call(self, node, path, intf, method, args) -> defer.Deferred: # TODO: don't raise Exceptions assert path != '/' or intf != RemoteIntf.name, \ 'unable to call: method {} of interface {} at path /'.format(method, RemoteIntf.name) if node == self.name: if path == '/' and intf == LocalIntf.name: return self.__callLocal(method, args) return self.__callImpl(path, intf, method, args) return self.__callRemote(node, path, intf, method, args)
def connect(
self, endpoint)
Connects the node to the given endpoint.
If the connection failes or closes the connection gets reestablished with an exponential timeout up to two minutes.
Args: endpoint (twisted.internet.interfaces.IStreamClientEndpoint):
def connect(self, endpoint): service = twisted.application.internet.ClientService(endpoint, self.__factory) self.__service.addService(service)
def listen(
self, endpoint)
Listens at the given endpoint for incoming connections
Args: endpoint (twisted.internet.interfaces.IStreamServerEndpoint):
def listen(self, endpoint): endpoint.listen(self.__factory)
def nodes(
self)
list[str]: The currently connected nodes
def nodes(self): return self.__knownMgr.getNodes()
def off(
self, node, path, intf, event, cb)
Unregisters a callback for an event on the bus
Args: node (str): a node on the bus path (str): a path on the given node intf (str): an interface at the given path event (str): a method of the given interface cb (callable): a callable that gets called with a dict of event arguments
def off(self, node, path, intf, event, cb): if self.__callbackMgr.off((node, path, intf, event), cb): if node != self.name and node in self.__protocols: self.__callRemote(node, '/', RemoteIntf.name, 'off', { 'path': path, 'intf': intf, 'event': event })
def on(
self, node, path, intf, event, cb)
Registers a callback for an event on the bus
Args: node (str): a node on the bus path (str): a path on the given node intf (str): an interface at the given path event (str): a method of the given interface cb (callable): a callable that gets called with a dict of event arguments
def on(self, node, path, intf, event, cb): if self.__callbackMgr.on((node, path, intf, event), cb): if node != self.name and node in self.__protocols: self.__callRemote(node, '/', RemoteIntf.name, 'on', { 'path': path, 'intf': intf, 'event': event })
def register(
self, path, impl)
Registers an implementation with the node
Args: impl (Implementation):
def register(self, path: str, impl: supcon.intf.Implementation): path = supcon.intf.DPath.toName(path) if path not in self.__impls: self.__impls[path] = {} if impl.intf in self.__impls[path]: raise ValueError('interface {} already registered at path {}'.format(impl.intf, path)) self.__impls[path][impl.intf] = { 'impl': impl, 'fire': lambda event, args: self.__fireImplEvent(path, impl.intf, event, args) } impl.addFireCb(self.__impls[path][impl.intf]['fire']) self.__knownMgr.addIntf(self.name, path, impl.interface) self.__broadcastEvent('intf', {'path': path, 'intf': impl.interface.dump()})
def unregister(
self, path, impl)
Removes an implementation from the node
Args: impl (supcon.intf.Implementation):
def unregister(self, path: str, impl: supcon.intf.Implementation): path = supcon.intf.DPath.toName(path) if path not in self.__impls: raise ValueError('unknown path {}'.format(path)) if impl.intf not in self.__impls[path]: raise ValueError('unknown interface {} at path {}'.format(impl.intf, path)) if impl != self.__impls[path][impl.intf]['impl']: raise ValueError('unknown implementation for interface {} at path {}'.format(impl.intf, path)) impl.delFireCb(self.__impls[path][impl.intf]['fire']) del self.__impls[path][impl.intf] self.__knownMgr.delIntf(self.name, path, impl.interface) self.__broadcastEvent('intfLost', {'path': path, 'intf': impl.interface.dump()})