344 lines
13 KiB
Python
344 lines
13 KiB
Python
import re
|
|
import socket
|
|
import threads
|
|
import xml.dom.minidom
|
|
import thread
|
|
|
|
import chakana.error
|
|
import chakana.event
|
|
import chakana.monitor
|
|
import chakana.threads
|
|
import chakana.utils
|
|
from chakana.debug import *
|
|
|
|
class Shepherd:
|
|
"""The shepherd is a singleton class that manages the connection to COOJA
|
|
and keeps track of user monitor threads. The user initialises a
|
|
debugging scenario by creating a Shepherd object, and then calls the
|
|
newMonitor method to create Monitor objects, one for each causal
|
|
path through the system that he wishes to monitor.
|
|
|
|
The monitor threads automatically register with the shepherd. The user
|
|
can use the debugger instances to probe variables or create eventpoints.
|
|
When the monitor thread routine decides that is ready for simulation to
|
|
proceed, it calls the waitFor method to block until one of the
|
|
eventpoints occur.
|
|
|
|
When a Monitor object enters waitFor, it informs shepherd about the
|
|
eventpoints that the user is waiting for, and waits for the eventpoints
|
|
to happen. When all threads enter waiting state, the shepherd asks COOJA
|
|
to resume simulation. When an eventpoint is hit, the shepherd wakes the
|
|
Monitor that is waiting for that eventpoint.
|
|
|
|
"""
|
|
|
|
def __init__(self, threadManager, port, coojaHost = "localhost"):
|
|
debug(MajorEvent, "Creating shepherd")
|
|
self._pollConnLock = threading.Lock()
|
|
self._threadManager = threadManager
|
|
self._port = port
|
|
self._monitors = []
|
|
self._waitingMonitors = []
|
|
# Only basic eventpoints in these lists
|
|
self._eventpoints = []
|
|
self._waitingEventpoints = []
|
|
self._coojaContinueEvent = threading.Event()
|
|
self._waitMap = {}
|
|
self._coojaHost = coojaHost
|
|
self._coojaConnection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
debug(MajorEvent, "Connecting to COOJA at " + coojaHost + ":" +
|
|
str(self._port))
|
|
self._connectionLock = threading.Lock()
|
|
self._coojaConnection.connect((self._coojaHost, self._port))
|
|
self._coojaStream = self._coojaConnection.makefile()
|
|
self._coojaConnection.close()
|
|
del self._coojaConnection
|
|
self._connectionLock.acquire()
|
|
hello = self.readFromCooja()
|
|
debug(Debug, hello.toprettyxml())
|
|
self._connectionLock.release()
|
|
self._shepherdThread = threads.ManagedThread(
|
|
self._threadManager, target = self._shepherdMainLoop)
|
|
self._shepherdThread.start()
|
|
debug(Debug, "Shepherd created:\n" + repr(self))
|
|
|
|
def _shepherdMainLoop(self):
|
|
debug(Event, "Starting shepherd main loop")
|
|
while 1:
|
|
debug(Debug, "Waiting for eventpoints to enter wait state")
|
|
debug(Debug2, repr(self))
|
|
self._coojaContinueEvent.wait()
|
|
self._coojaContinueEvent.clear()
|
|
if self._coojaStream.closed:
|
|
debug(Event, "Terminating shepherd main loop")
|
|
return
|
|
debug(Debug, "Resuming COOJA")
|
|
assert(len(self._eventpoints) == len(self._waitingEventpoints))
|
|
response = self.runCommand("CONTROL_SIM", { "RESUME" : None })
|
|
id = int(response.documentElement.childNodes[0].nodeValue)
|
|
debug(MinorEvent, "Eventpoint " + str(id) + " was triggered")
|
|
|
|
# Wake up triggered eventpoint
|
|
self._waitMap[id][1].set()
|
|
|
|
# Allow new main loop startups
|
|
self._pollConnLock.release()
|
|
|
|
|
|
def quit(self):
|
|
"""Send a termination request to the debugger broker, and discard all
|
|
debuggers."""
|
|
|
|
debug(Information, "Terminating the shepherd session")
|
|
self.runCommand("EXIT_COOJA")
|
|
self._connectionLock.acquire()
|
|
self._coojaStream.close()
|
|
self._connectionLock.release()
|
|
self._coojaContinueEvent.set()
|
|
debug(Information, "Number of PID errors reported: " + str(self.threadManager().nrPIDerrors()))
|
|
|
|
for (eventpoint, event) in self._waitMap.values():
|
|
debug(Debug2, "Awakening eventpoint at exit: " + repr(eventpoint))
|
|
event.set()
|
|
|
|
def loadConfiguration(self, fileName):
|
|
return self.runCommand(
|
|
"CREATE_SIM", xmlContent = chakana.utils.readFile(fileName))
|
|
|
|
def loadConfigurationXML(self, xmlContent):
|
|
return self.runCommand(
|
|
"CREATE_SIM", xmlContent = xmlContent)
|
|
|
|
def setConfigurationXML(self, xmlContent):
|
|
return self.runCommand(
|
|
"CONF_SIM", xmlContent = xmlContent)
|
|
|
|
def setPluginsXML(self, xmlContent):
|
|
return self.runCommand(
|
|
"CONF_PLUGINS", xmlContent = xmlContent)
|
|
|
|
def newMonitor(self, routine, * args, ** kwArgs):
|
|
return self.newCustomMonitor(
|
|
chakana.monitor.Monitor, routine, * args, ** kwArgs)
|
|
|
|
def newCustomMonitor(self, Type, routine = None, * args, ** kwArgs):
|
|
"""Start a new monitor thread running routine, with * args and ** kwArgs
|
|
as arguments."""
|
|
debug(Debug, "Creating new monitor of type " + Type.__name__)
|
|
debug(Debug3, "Routine: " + repr(routine) + ", arguments: " + repr(args) +
|
|
", keyword arguments: " + repr(kwArgs))
|
|
thread = Type(self, routine, * args, ** kwArgs)
|
|
debug(MinorEvent, "New monitor thread created: " + repr(thread))
|
|
return thread
|
|
|
|
def newTimeoutMonitor(self, eventpoint, timeout, routine = None):
|
|
return self.newCustomMonitor(
|
|
chakana.monitor.TimeoutWrapperMonitor, routine, eventpoint = eventpoint, timeout = timeout)
|
|
|
|
def threadManager(self):
|
|
return self._threadManager
|
|
|
|
def registerMonitor(self, monitor):
|
|
debug(Debug, "Registering monitor: " + repr(monitor))
|
|
assert(isinstance(monitor, chakana.monitor.Monitor))
|
|
assert(not (monitor in self._monitors))
|
|
assert(not (monitor in self._waitingMonitors))
|
|
self._monitors.append(monitor)
|
|
self._monitors.sort()
|
|
debug(Debug2, repr(self))
|
|
|
|
def unregisterMonitor(self, monitor):
|
|
debug(Debug, "Unregistering monitor: " + repr(monitor))
|
|
assert(isinstance(monitor, chakana.monitor.Monitor))
|
|
assert(monitor in self._monitors)
|
|
assert(not (monitor in self._waitingMonitors))
|
|
self._monitors.remove(monitor)
|
|
|
|
def registerWaitingMonitor(self, monitor):
|
|
debug(Debug, "Registering waiting monitor: " + repr(monitor))
|
|
assert(isinstance(monitor, chakana.monitor.Monitor))
|
|
assert(monitor in self._monitors)
|
|
assert(not (monitor in self._waitingMonitors))
|
|
self._waitingMonitors.append(monitor)
|
|
self._waitingMonitors.sort()
|
|
debug(Debug2, repr(self))
|
|
|
|
def unregisterWaitingMonitor(self, monitor):
|
|
debug(Debug, "Unregistering waiting monitor: " + repr(monitor))
|
|
assert(isinstance(monitor, chakana.monitor.Monitor))
|
|
assert(monitor in self._monitors)
|
|
assert(monitor in self._waitingMonitors)
|
|
self._waitingMonitors.remove(monitor)
|
|
debug(Debug2, repr(self))
|
|
|
|
def registerBasicEventpoint(self, eventpoint):
|
|
debug(Debug2, "Registering basic eventpoint: " + repr(eventpoint))
|
|
assert(isinstance(eventpoint, chakana.event.BasicEventpoint))
|
|
assert(not (eventpoint in self._eventpoints))
|
|
self._eventpoints.append(eventpoint)
|
|
self._eventpoints.sort()
|
|
debug(Debug2, repr(self))
|
|
|
|
def unregisterBasicEventpoint(self, eventpoint):
|
|
debug(Debug2, "Unregistering basic eventpoint: " + repr(eventpoint))
|
|
assert(not (eventpoint in self._waitingEventpoints))
|
|
assert(eventpoint in self._eventpoints)
|
|
self._eventpoints.remove(eventpoint)
|
|
debug(Debug2, repr(self))
|
|
|
|
def _registerWaitingEventpoint(self, eventpoint):
|
|
debug(Debug2, "Registering waiting eventpoint: " + repr(eventpoint))
|
|
assert(not (eventpoint in self._waitingEventpoints))
|
|
assert(eventpoint in self._eventpoints)
|
|
self._waitingEventpoints.append(eventpoint)
|
|
self._waitingEventpoints.sort()
|
|
|
|
def _unregisterWaitingEventpoint(self, eventpoint):
|
|
debug(Debug2, "Unregistering waiting eventpoint: " + repr(eventpoint))
|
|
assert(eventpoint in self._waitingEventpoints)
|
|
assert(eventpoint in self._eventpoints)
|
|
self._waitingEventpoints.remove(eventpoint)
|
|
debug(Debug2, repr(self))
|
|
|
|
def pollContinuation(self):
|
|
self._pollConnLock.acquire()
|
|
if len(self._waitingMonitors) != len(self._monitors):
|
|
self._pollConnLock.release()
|
|
return
|
|
for monitor in self._monitors:
|
|
if not monitor._ready:
|
|
self._pollConnLock.release()
|
|
return
|
|
|
|
for monitor in self._monitors:
|
|
assert(not monitor._discarded)
|
|
|
|
for eventpoint in self._eventpoints:
|
|
assert(not eventpoint._discarded)
|
|
|
|
assert(self._waitingMonitors == self._monitors)
|
|
assert(self._waitingEventpoints == self._eventpoints)
|
|
debug(MinorEvent, "All monitors are waiting, activating COOJA")
|
|
assert(not self._coojaContinueEvent.isSet())
|
|
self._coojaContinueEvent.set()
|
|
|
|
def waitForCoojaEventpoint(self, eventpoint):
|
|
response = self.runCommand("ADD_EVENTPOINT", eventpoint.coojaArguments())
|
|
id = int(response.documentElement.childNodes[0].nodeValue)
|
|
debug(Debug, "Waiting for COOJA eventpoint " + str(id))
|
|
|
|
resourceAllocated = 0
|
|
while resourceAllocated == 0:
|
|
try:
|
|
self._waitMap[id] = (eventpoint, threading.Event())
|
|
resourceAllocated = 1
|
|
except thread.error:
|
|
resourceAllocated = 0
|
|
self.threadManager().registerPIDerror()
|
|
|
|
self._registerWaitingEventpoint(eventpoint)
|
|
eventpoint._isWaitingEvent.set()
|
|
|
|
resourceAllocated = 0
|
|
while resourceAllocated == 0:
|
|
try:
|
|
self._waitMap[id][1].wait()
|
|
resourceAllocated = 1
|
|
except thread.error:
|
|
resourceAllocated = 0
|
|
self.threadManager().registerPIDerror()
|
|
|
|
del self._waitMap[id]
|
|
self._unregisterWaitingEventpoint(eventpoint)
|
|
try:
|
|
self.runCommand("DELETE_EVENTPOINT", { "id" : id })
|
|
debug(Debug, "COOJA event " + str(id) + " has occurred")
|
|
except chakana.error.CoojaExit:
|
|
pass
|
|
except socket.error:
|
|
pass
|
|
|
|
def runCommand(self, name, args = None, xmlContent = ""):
|
|
debug(Debug3, "Running cooja command " + name + ", args: " + repr(args) +
|
|
" xml content: " + repr(xmlContent))
|
|
if args is None:
|
|
args = {}
|
|
self._connectionLock.acquire()
|
|
try:
|
|
if self._coojaStream.closed:
|
|
raise chakana.error.CoojaExit()
|
|
command = ['<command value="' + name + '">']
|
|
for (key, value) in args.items():
|
|
if value is None:
|
|
command.append('<' + key + '/>')
|
|
else:
|
|
command.append('<' + key + '>' + str(value) + '</' + key + '>')
|
|
command.append(xmlContent + '</command>')
|
|
commandStr = "\n".join(command + ['\n'])
|
|
debug(MinorEvent, '--> ' + commandStr)
|
|
self._coojaStream.write(commandStr)
|
|
self._coojaStream.flush()
|
|
response = self.readFromCooja()
|
|
debug(Debug, response.toprettyxml())
|
|
if response.documentElement.tagName == 'error':
|
|
raise chakana.error.CoojaError(response)
|
|
return response
|
|
finally:
|
|
self._connectionLock.release()
|
|
|
|
def readFromCooja(self):
|
|
# XXX: Assume message ends with a newline
|
|
debug(Debug, "Reading message from COOJA")
|
|
responseLines = [self._coojaStream.readline()]
|
|
debug(Debug2, "First line: " + repr(responseLines[0]))
|
|
(rootElement, slash) = re.match(r"^\s*<(\w+)(/?)>",
|
|
responseLines[0]).groups()
|
|
debug(Debug3, "Root element: " + rootElement)
|
|
if slash != "/":
|
|
while 1:
|
|
if re.search("</" + re.escape(rootElement) + ">$", responseLines[-1],
|
|
re.M):
|
|
break
|
|
responseLines.append(self._coojaStream.readline())
|
|
debug(Debug3, "Read line: " + repr(responseLines[-1]))
|
|
result = "".join(responseLines)
|
|
debug(MinorEvent, '<-- ' + result)
|
|
return xml.dom.minidom.parseString(result)
|
|
|
|
def readVariable(self, variable, ** kwArgs):
|
|
response = self.readMemory(
|
|
type = "variable", variable = variable, ** kwArgs)
|
|
arrayString = response.documentElement.childNodes[0].nodeValue.split()
|
|
bytes = map(eval, arrayString)
|
|
debug(Debug, "Read variable " + variable + " as byte array: " +
|
|
repr(bytes))
|
|
return bytes
|
|
|
|
def readInt(self, ** kwArgs):
|
|
response = self.readMemory(type = "int", ** kwArgs)
|
|
return eval(response.documentElement.childNodes[0].nodeValue)
|
|
|
|
def readMemory(self, ** kwArgs):
|
|
return self.runCommand("READ_MEMORY", kwArgs)
|
|
|
|
def killNodesInInterval(self, ** kwArgs):
|
|
return self.runCommand("KILL_NODES", kwArgs)
|
|
|
|
def sendCustomCommand(self, ** kwArgs):
|
|
return self.runCommand("CUSTOM_COMMAND", kwArgs)
|
|
|
|
def getSimulationInfo(self, ** kwArgs):
|
|
response = self.runCommand("GET_INFO", kwArgs)
|
|
return eval(response.documentElement.childNodes[0].nodeValue)
|
|
|
|
def __repr__(self):
|
|
return "\n ".join([
|
|
"Shepherd:",
|
|
"Port: " + repr(self._port),
|
|
"Monitors: " + repr(self._monitors),
|
|
"Waiting monitors: " + repr(self._waitingMonitors),
|
|
"Basic eventpoints: [" + "\n".join(map(repr, self._eventpoints)) + "]",
|
|
"Waiting eventpoints: [" + "\n".join(
|
|
map(repr, self._waitingEventpoints)) + "]",
|
|
"Wait map: " + repr(self._waitMap),
|
|
])
|