import re import socket import threads import xml.dom.minidom import thread import chakana.error import chakana.event import chakana.monitor import chakana.threads import chakana.utils from chakana.debug import * class Shepherd: """The shepherd is a singleton class that manages the connection to COOJA and keeps track of user monitor threads. The user initialises a debugging scenario by creating a Shepherd object, and then calls the newMonitor method to create Monitor objects, one for each causal path through the system that he wishes to monitor. The monitor threads automatically register with the shepherd. The user can use the debugger instances to probe variables or create eventpoints. When the monitor thread routine decides that is ready for simulation to proceed, it calls the waitFor method to block until one of the eventpoints occur. When a Monitor object enters waitFor, it informs shepherd about the eventpoints that the user is waiting for, and waits for the eventpoints to happen. When all threads enter waiting state, the shepherd asks COOJA to resume simulation. When an eventpoint is hit, the shepherd wakes the Monitor that is waiting for that eventpoint. """ def __init__(self, threadManager, port, coojaHost = "localhost"): debug(MajorEvent, "Creating shepherd") self._pollConnLock = threading.Lock() self._threadManager = threadManager self._port = port self._monitors = [] self._waitingMonitors = [] # Only basic eventpoints in these lists self._eventpoints = [] self._waitingEventpoints = [] self._coojaContinueEvent = threading.Event() self._waitMap = {} self._coojaHost = coojaHost self._coojaConnection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) debug(MajorEvent, "Connecting to COOJA at " + coojaHost + ":" + str(self._port)) self._connectionLock = threading.Lock() self._coojaConnection.connect((self._coojaHost, self._port)) self._coojaStream = self._coojaConnection.makefile() self._coojaConnection.close() del self._coojaConnection self._connectionLock.acquire() hello = self.readFromCooja() debug(Debug, hello.toprettyxml()) self._connectionLock.release() self._shepherdThread = threads.ManagedThread( self._threadManager, target = self._shepherdMainLoop) self._shepherdThread.start() debug(Debug, "Shepherd created:\n" + repr(self)) def _shepherdMainLoop(self): debug(Event, "Starting shepherd main loop") while 1: debug(Debug, "Waiting for eventpoints to enter wait state") debug(Debug2, repr(self)) self._coojaContinueEvent.wait() self._coojaContinueEvent.clear() if self._coojaStream.closed: debug(Event, "Terminating shepherd main loop") return debug(Debug, "Resuming COOJA") assert(len(self._eventpoints) == len(self._waitingEventpoints)) response = self.runCommand("CONTROL_SIM", { "RESUME" : None }) id = int(response.documentElement.childNodes[0].nodeValue) debug(MinorEvent, "Eventpoint " + str(id) + " was triggered") # Wake up triggered eventpoint self._waitMap[id][1].set() # Allow new main loop startups self._pollConnLock.release() def quit(self): """Send a termination request to the debugger broker, and discard all debuggers.""" debug(Information, "Terminating the shepherd session") self.runCommand("EXIT_COOJA") self._connectionLock.acquire() self._coojaStream.close() self._connectionLock.release() self._coojaContinueEvent.set() debug(Information, "Number of PID errors reported: " + str(self.threadManager().nrPIDerrors())) for (eventpoint, event) in self._waitMap.values(): debug(Debug2, "Awakening eventpoint at exit: " + repr(eventpoint)) event.set() def loadConfiguration(self, fileName): return self.runCommand( "CREATE_SIM", xmlContent = chakana.utils.readFile(fileName)) def loadConfigurationXML(self, xmlContent): return self.runCommand( "CREATE_SIM", xmlContent = xmlContent) def setConfigurationXML(self, xmlContent): return self.runCommand( "CONF_SIM", xmlContent = xmlContent) def setPluginsXML(self, xmlContent): return self.runCommand( "CONF_PLUGINS", xmlContent = xmlContent) def newMonitor(self, routine, * args, ** kwArgs): return self.newCustomMonitor( chakana.monitor.Monitor, routine, * args, ** kwArgs) def newCustomMonitor(self, Type, routine = None, * args, ** kwArgs): """Start a new monitor thread running routine, with * args and ** kwArgs as arguments.""" debug(Debug, "Creating new monitor of type " + Type.__name__) debug(Debug3, "Routine: " + repr(routine) + ", arguments: " + repr(args) + ", keyword arguments: " + repr(kwArgs)) thread = Type(self, routine, * args, ** kwArgs) debug(MinorEvent, "New monitor thread created: " + repr(thread)) return thread def newTimeoutMonitor(self, eventpoint, timeout, routine = None): return self.newCustomMonitor( chakana.monitor.TimeoutWrapperMonitor, routine, eventpoint = eventpoint, timeout = timeout) def threadManager(self): return self._threadManager def registerMonitor(self, monitor): debug(Debug, "Registering monitor: " + repr(monitor)) assert(isinstance(monitor, chakana.monitor.Monitor)) assert(not (monitor in self._monitors)) assert(not (monitor in self._waitingMonitors)) self._monitors.append(monitor) self._monitors.sort() debug(Debug2, repr(self)) def unregisterMonitor(self, monitor): debug(Debug, "Unregistering monitor: " + repr(monitor)) assert(isinstance(monitor, chakana.monitor.Monitor)) assert(monitor in self._monitors) assert(not (monitor in self._waitingMonitors)) self._monitors.remove(monitor) def registerWaitingMonitor(self, monitor): debug(Debug, "Registering waiting monitor: " + repr(monitor)) assert(isinstance(monitor, chakana.monitor.Monitor)) assert(monitor in self._monitors) assert(not (monitor in self._waitingMonitors)) self._waitingMonitors.append(monitor) self._waitingMonitors.sort() debug(Debug2, repr(self)) def unregisterWaitingMonitor(self, monitor): debug(Debug, "Unregistering waiting monitor: " + repr(monitor)) assert(isinstance(monitor, chakana.monitor.Monitor)) assert(monitor in self._monitors) assert(monitor in self._waitingMonitors) self._waitingMonitors.remove(monitor) debug(Debug2, repr(self)) def registerBasicEventpoint(self, eventpoint): debug(Debug2, "Registering basic eventpoint: " + repr(eventpoint)) assert(isinstance(eventpoint, chakana.event.BasicEventpoint)) assert(not (eventpoint in self._eventpoints)) self._eventpoints.append(eventpoint) self._eventpoints.sort() debug(Debug2, repr(self)) def unregisterBasicEventpoint(self, eventpoint): debug(Debug2, "Unregistering basic eventpoint: " + repr(eventpoint)) assert(not (eventpoint in self._waitingEventpoints)) assert(eventpoint in self._eventpoints) self._eventpoints.remove(eventpoint) debug(Debug2, repr(self)) def _registerWaitingEventpoint(self, eventpoint): debug(Debug2, "Registering waiting eventpoint: " + repr(eventpoint)) assert(not (eventpoint in self._waitingEventpoints)) assert(eventpoint in self._eventpoints) self._waitingEventpoints.append(eventpoint) self._waitingEventpoints.sort() def _unregisterWaitingEventpoint(self, eventpoint): debug(Debug2, "Unregistering waiting eventpoint: " + repr(eventpoint)) assert(eventpoint in self._waitingEventpoints) assert(eventpoint in self._eventpoints) self._waitingEventpoints.remove(eventpoint) debug(Debug2, repr(self)) def pollContinuation(self): self._pollConnLock.acquire() if len(self._waitingMonitors) != len(self._monitors): self._pollConnLock.release() return for monitor in self._monitors: if not monitor._ready: self._pollConnLock.release() return for monitor in self._monitors: assert(not monitor._discarded) for eventpoint in self._eventpoints: assert(not eventpoint._discarded) assert(self._waitingMonitors == self._monitors) assert(self._waitingEventpoints == self._eventpoints) debug(MinorEvent, "All monitors are waiting, activating COOJA") assert(not self._coojaContinueEvent.isSet()) self._coojaContinueEvent.set() def waitForCoojaEventpoint(self, eventpoint): response = self.runCommand("ADD_EVENTPOINT", eventpoint.coojaArguments()) id = int(response.documentElement.childNodes[0].nodeValue) debug(Debug, "Waiting for COOJA eventpoint " + str(id)) resourceAllocated = 0 while resourceAllocated == 0: try: self._waitMap[id] = (eventpoint, threading.Event()) resourceAllocated = 1 except thread.error: resourceAllocated = 0 self.threadManager().registerPIDerror() self._registerWaitingEventpoint(eventpoint) eventpoint._isWaitingEvent.set() resourceAllocated = 0 while resourceAllocated == 0: try: self._waitMap[id][1].wait() resourceAllocated = 1 except thread.error: resourceAllocated = 0 self.threadManager().registerPIDerror() del self._waitMap[id] self._unregisterWaitingEventpoint(eventpoint) try: self.runCommand("DELETE_EVENTPOINT", { "id" : id }) debug(Debug, "COOJA event " + str(id) + " has occurred") except chakana.error.CoojaExit: pass except socket.error: pass def runCommand(self, name, args = None, xmlContent = ""): debug(Debug3, "Running cooja command " + name + ", args: " + repr(args) + " xml content: " + repr(xmlContent)) if args is None: args = {} self._connectionLock.acquire() try: if self._coojaStream.closed: raise chakana.error.CoojaExit() command = [''] for (key, value) in args.items(): if value is None: command.append('<' + key + '/>') else: command.append('<' + key + '>' + str(value) + '') command.append(xmlContent + '') commandStr = "\n".join(command + ['\n']) debug(MinorEvent, '--> ' + commandStr) self._coojaStream.write(commandStr) self._coojaStream.flush() response = self.readFromCooja() debug(Debug, response.toprettyxml()) if response.documentElement.tagName == 'error': raise chakana.error.CoojaError(response) return response except socket.error: debug(MajorEvent, 'Socket error catched') finally: self._connectionLock.release() def readFromCooja(self): # XXX: Assume message ends with a newline debug(Debug, "Reading message from COOJA") responseLines = [self._coojaStream.readline()] debug(Debug2, "First line: " + repr(responseLines[0])) (rootElement, slash) = re.match(r"^\s*<(\w+)(/?)>", responseLines[0]).groups() debug(Debug3, "Root element: " + rootElement) if slash != "/": while 1: if re.search("$", responseLines[-1], re.M): break responseLines.append(self._coojaStream.readline()) debug(Debug3, "Read line: " + repr(responseLines[-1])) result = "".join(responseLines) debug(MinorEvent, '<-- ' + result) return xml.dom.minidom.parseString(result) def readVariable(self, variable, ** kwArgs): response = self.readMemory( type = "variable", variable = variable, ** kwArgs) arrayString = response.documentElement.childNodes[0].nodeValue.split() bytes = map(eval, arrayString) debug(Debug, "Read variable " + variable + " as byte array: " + repr(bytes)) return bytes def readInt(self, ** kwArgs): response = self.readMemory(type = "int", ** kwArgs) return eval(response.documentElement.childNodes[0].nodeValue) def readMemory(self, ** kwArgs): return self.runCommand("READ_MEMORY", kwArgs) def killNodesInInterval(self, ** kwArgs): return self.runCommand("KILL_NODES", kwArgs) def sendCustomCommand(self, ** kwArgs): return self.runCommand("CUSTOM_COMMAND", kwArgs) def getSimulationInfo(self, ** kwArgs): response = self.runCommand("GET_INFO", kwArgs) return eval(response.documentElement.childNodes[0].nodeValue) def __repr__(self): return "\n ".join([ "Shepherd:", "Port: " + repr(self._port), "Monitors: " + repr(self._monitors), "Waiting monitors: " + repr(self._waitingMonitors), "Basic eventpoints: [" + "\n".join(map(repr, self._eventpoints)) + "]", "Waiting eventpoints: [" + "\n".join( map(repr, self._waitingEventpoints)) + "]", "Wait map: " + repr(self._waitMap), ])