an early version of chakana, an automated test framework used with COOJA
This commit is contained in:
parent
9be473e4b9
commit
65533c0c00
29 changed files with 4983 additions and 0 deletions
736
tools/chakana/utils.py
Normal file
736
tools/chakana/utils.py
Normal file
|
@ -0,0 +1,736 @@
|
|||
|
||||
# Copyright (C) 2003-2007 Swedish Institute of Computer Science.
|
||||
#
|
||||
# Please refer to the file named LICENSE in the same directory as this
|
||||
# file for licensing information.
|
||||
#
|
||||
|
||||
|
||||
# $Id: utils.py,v 1.1 2007/08/21 14:41:01 fros4943 Exp $
|
||||
|
||||
import errno
|
||||
import math
|
||||
import operator
|
||||
import os
|
||||
import popen2
|
||||
import re
|
||||
import string
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import traceback
|
||||
import types
|
||||
|
||||
import chakana.command
|
||||
import chakana.error
|
||||
from chakana.debug import *
|
||||
|
||||
try:
|
||||
global False
|
||||
global True
|
||||
_ = False
|
||||
except NameError:
|
||||
False = 0
|
||||
True = 1
|
||||
|
||||
class DefaultDict(dict):
|
||||
def __init__(self, defaultCreator, initialiser = (), inject = 0):
|
||||
dict.__init__(self, initialiser)
|
||||
if callable(defaultCreator):
|
||||
self._defaultCreator = defaultCreator
|
||||
else:
|
||||
self._defaultCreator = lambda k: defaultCreator
|
||||
self._inject = inject
|
||||
|
||||
def __getitem__(self, key):
|
||||
try:
|
||||
return dict.__getitem__(self, key)
|
||||
except KeyError, err:
|
||||
value = self._defaultCreator(key)
|
||||
if self._inject:
|
||||
self[key] = value
|
||||
return self[key]
|
||||
else:
|
||||
return value
|
||||
|
||||
class UnbufferedFile:
|
||||
def __init__(self, fileObj):
|
||||
self._fileObj = fileObj
|
||||
|
||||
def __getattr__(self, attribute):
|
||||
return getattr(self._fileObj, attribute)
|
||||
|
||||
def write(self, what):
|
||||
self._fileObj.write(what)
|
||||
self._fileObj.flush()
|
||||
|
||||
def copyFile(src, dest):
|
||||
debug(Event, "cp " + src + " " + dest)
|
||||
srcFile = file(src)
|
||||
destFile = file(dest, "w")
|
||||
destFile.write(srcFile.read())
|
||||
srcFile.close()
|
||||
destFile.close()
|
||||
|
||||
def unique(list):
|
||||
ret = []
|
||||
inList = {}
|
||||
for elem in list:
|
||||
if not elem in inList:
|
||||
ret.append(elem)
|
||||
inList[elem] = 1
|
||||
return ret
|
||||
|
||||
def unorderedEqual(listOne, listTwo, predicate = lambda x, y: x == y):
|
||||
if len(listOne) != len(listTwo):
|
||||
return 0
|
||||
list1 = list(listOne)
|
||||
list2 = list(listTwo)
|
||||
while list1 != []:
|
||||
found = 0
|
||||
for index in range(len(list2)):
|
||||
if predicate(list1[0], list2[index]):
|
||||
del list1[0]
|
||||
del list2[index]
|
||||
found = 1
|
||||
break
|
||||
if not found:
|
||||
return 0
|
||||
return 1
|
||||
|
||||
def reduceSequences(tupleSeq, oper = operator.add):
|
||||
"""Return a list, where the item at position i is the reduced value
|
||||
of all items at position i."""
|
||||
|
||||
if len(tupleSeq) == 0:
|
||||
return []
|
||||
return [reduce(oper, [s[i] for s in tupleSeq])
|
||||
for i in range(len(tupleSeq[0]))]
|
||||
|
||||
def listRemove(theList, predicate):
|
||||
"""Remove all items that match predicate in theList."""
|
||||
if not callable(predicate):
|
||||
return listRemove(theList, lambda x: x == predicate)
|
||||
else:
|
||||
index = 0
|
||||
while index < len(theList):
|
||||
if predicate(theList[index]):
|
||||
del theList[index]
|
||||
else:
|
||||
index += 1
|
||||
return theList
|
||||
|
||||
def logn(n, x):
|
||||
return math.log(x) / math.log(n)
|
||||
|
||||
def longRange(* args):
|
||||
"""As range, but works with long integers."""
|
||||
if len(args) == 1:
|
||||
start = 0L
|
||||
stop = args[0]
|
||||
step = 1L
|
||||
elif len(args) == 2:
|
||||
(start, stop) = args
|
||||
step = 1L
|
||||
else:
|
||||
(start, stop, step) = args
|
||||
if step == 0:
|
||||
raise ValueError("longRange() arg 3 must not be zero")
|
||||
ret = []
|
||||
while cmp(start, stop) == cmp(0, step):
|
||||
ret.append(start)
|
||||
start += step
|
||||
return ret
|
||||
|
||||
integerSuffixes = ("", "K", "M", "G", "T")
|
||||
|
||||
def abbreviateInteger(i, factor = 1024, minimise = 0):
|
||||
suf = 0
|
||||
while suf < len(integerSuffixes) - 1 and \
|
||||
((i > 9999) or (minimise and (i != 0) and (i % factor == 0))):
|
||||
suf += 1
|
||||
i /= factor
|
||||
return str(i) + integerSuffixes[suf]
|
||||
|
||||
def readSuffixedInt(intString, factor = 1024):
|
||||
if intString[-1].isalpha():
|
||||
return int(intString[: -1]) * (
|
||||
factor ** list(integerSuffixes).index(intString[-1].upper()))
|
||||
else:
|
||||
return int(intString)
|
||||
|
||||
def normaliseIndex(index, seq):
|
||||
"""Return a non-negative index relating to a sequence, or negative if
|
||||
index < - len(seq)."""
|
||||
|
||||
if index < 0:
|
||||
return len(seq) + index
|
||||
else:
|
||||
return index
|
||||
|
||||
def truncateIndex(index, seq):
|
||||
"""Return a non-negative index relating to a sequence, less than or equal
|
||||
to the length of the sequence."""
|
||||
ret = normaliseIndex(index, seq)
|
||||
if ret < 0:
|
||||
return 0
|
||||
length = len(seq)
|
||||
if ret > length:
|
||||
return length
|
||||
return ret
|
||||
|
||||
class LazySlice:
|
||||
"""Representation of slice into other sequence."""
|
||||
def __init__(self, seq, start = None, stop = None, step = None):
|
||||
self._sequence = seq
|
||||
if step is None:
|
||||
self._step = 1
|
||||
else:
|
||||
self._step = step
|
||||
if self._step == 0:
|
||||
# Provoke ValueError
|
||||
[][0:1:0]
|
||||
|
||||
if self._step > 0:
|
||||
startDefault = 0
|
||||
stopDefault = len(seq)
|
||||
indexShift = 0
|
||||
else:
|
||||
startDefault = len(seq) - 1
|
||||
stopDefault = -1
|
||||
indexShift = 1
|
||||
|
||||
if start is None:
|
||||
self._start = startDefault
|
||||
else:
|
||||
self._start = truncateIndex(start + indexShift, seq) - indexShift
|
||||
if stop is None:
|
||||
self._stop = stopDefault
|
||||
else:
|
||||
self._stop = truncateIndex(stop + indexShift, seq) - indexShift
|
||||
|
||||
if (self._step > 0) != (self._start < self._stop):
|
||||
self._stop = self._start
|
||||
|
||||
def __getitem__(self, key):
|
||||
# debug(Debug2, "LS getitem " + str(key) + ": " + repr(self))
|
||||
if type(key) == types.SliceType:
|
||||
ret = LazySlice(self, key.start, key.stop, key.step)
|
||||
#debug(Debug2, "LS getitem " + str(key) + " = " +
|
||||
# repr(ret) + ", LS: " + repr(self))
|
||||
return ret
|
||||
index = normaliseIndex(key, self)
|
||||
if index < 0:
|
||||
# Provoke IndexError
|
||||
[][1]
|
||||
seqIndex = self._start + index * self._step
|
||||
if self._step > 0:
|
||||
if seqIndex >= self._stop:
|
||||
[][1]
|
||||
elif seqIndex <= self._stop:
|
||||
[][1]
|
||||
ret = self._sequence[seqIndex]
|
||||
#debug(Debug2, "LS getitem " + str(key) + " (" + str(seqIndex) + ") = " +
|
||||
# str(ret) + ", LS: " + repr(self))
|
||||
return ret
|
||||
|
||||
def __len__(self):
|
||||
return (abs(self._stop - self._start) + abs(self._step) - 1) / \
|
||||
abs(self._step)
|
||||
|
||||
def __str__(self):
|
||||
return str(list(self))
|
||||
|
||||
def __repr__(self):
|
||||
return "LazySlice(start = " + str(self._start) + ", stop = " + \
|
||||
str(self._stop) + ", step = " + str(self._step) + ",\n " + \
|
||||
repr(self._sequence) + ")"
|
||||
|
||||
def getItem(sequence, key):
|
||||
"""Obtain key in sequence. Key may be an integer or a slice object."""
|
||||
if type(key) == types.SliceType:
|
||||
if key.step is None:
|
||||
return sequence[key.start : key.stop]
|
||||
return sequence[key.start : key.stop : key.step]
|
||||
return sequence[key]
|
||||
|
||||
def binarySearch(list, value, start = 0, end = None):
|
||||
"Return the position where value should be inserted in a sorted list."
|
||||
if end is None:
|
||||
end = len(list)
|
||||
if start == end:
|
||||
return end
|
||||
middle = start + (end - start) / 2
|
||||
if list[middle] < value:
|
||||
return binarySearch(list, value, middle + 1, end)
|
||||
else:
|
||||
return binarySearch(list, value, start, middle)
|
||||
|
||||
def binaryFind(list, value, *args, ** kwArgs):
|
||||
"Return position of value in sorted list, or None if not found."
|
||||
position = binarySearch(list, value, *args, ** kwArgs)
|
||||
if (position == len(list)) or (list[position] != value):
|
||||
return None
|
||||
else:
|
||||
return position
|
||||
|
||||
def compactIntListRepr(input, sort = 0):
|
||||
if sort:
|
||||
list = input[:]
|
||||
list.sort()
|
||||
else:
|
||||
list = input
|
||||
if __debug__:
|
||||
tmp = list[:]
|
||||
tmp.sort()
|
||||
assert(tmp == list)
|
||||
index = 0
|
||||
ret = "["
|
||||
sep = ""
|
||||
compact = lambda l, i: (i + 1 < len(l)) and (l[i] + 1 == l[i + 1])
|
||||
while index < len(list):
|
||||
ret += sep + str(list[index])
|
||||
if compact(list, index):
|
||||
ret += "-"
|
||||
while compact(list, index):
|
||||
index += 1
|
||||
ret += str(list[index])
|
||||
index += 1
|
||||
sep = ","
|
||||
return ret + "]"
|
||||
|
||||
def predicatedIndex(sequence, predicate):
|
||||
for i in range(len(sequence)):
|
||||
if predicate(sequence[i]):
|
||||
return i
|
||||
raise ValueError()
|
||||
|
||||
class ArgumentBinder:
|
||||
def __init__(self, function, argument, position = 0):
|
||||
self.__function = function
|
||||
self.__argument = argument
|
||||
self.__position = position
|
||||
|
||||
def __call__(self, *args, **keywords):
|
||||
newArgs = list(args)
|
||||
newArgs[self.__position : self.__position] = [self.__argument]
|
||||
return self.__function(*newArgs, **keywords)
|
||||
|
||||
class ArgumentTupleBinder:
|
||||
def __init__(self, function, arguments):
|
||||
self._function = function
|
||||
self._arguments = arguments
|
||||
|
||||
def __call__(self, * args, ** kwArgs):
|
||||
return self._function(* (self._arguments + args), ** kwArgs)
|
||||
|
||||
class KeywordArgumentBinder:
|
||||
def __init__(self, function, ** keywords):
|
||||
self._function = function
|
||||
self._keywords = keywords
|
||||
|
||||
def __call__(self, * args, ** kwArgs):
|
||||
kw = self._keywords
|
||||
kw.update(kwArgs)
|
||||
return self._function(* args, ** kw)
|
||||
|
||||
cppKeywords = ["asm", "do", "inline", "short", "typeid", "auto",
|
||||
"double", "int", "signed", "typename", "bool",
|
||||
"dynamic_cast", "long", "sizeof", "union", "break",
|
||||
"else", "mutable", "static", "unsigned", "case",
|
||||
"enum", "namespace", "static_cast", "using", "catch",
|
||||
"explicit", "new", "struct", "virtual", "char",
|
||||
"extern", "operator", "switch", "void", "class",
|
||||
"false", "private", "template", "volatile", "const",
|
||||
"float", "protected", "this", "wchar_t", "const_cast",
|
||||
"for", "public", "throw", "while", "continue",
|
||||
"friend", "register", "true", "default", "goto",
|
||||
"reinterpret_cast", "try", "delete", "if", "return",
|
||||
"typedef"]
|
||||
|
||||
def isCWord(str):
|
||||
return not re.match("^[a-zA-Z_][a-zA-Z0-9_]*$", str) is None
|
||||
|
||||
def isCppIdentifier(str):
|
||||
return isCWord(str) and (str not in cppKeywords)
|
||||
|
||||
def find(seq, predicate = lambda x: x):
|
||||
for item in seq:
|
||||
if predicate(item):
|
||||
return item
|
||||
return None
|
||||
|
||||
def allBest(seq, comparator = cmp):
|
||||
if len(seq) < 2:
|
||||
return seq
|
||||
seq = seq[:]
|
||||
ret = [seq.pop()]
|
||||
for item in seq:
|
||||
c = comparator(item, ret[0])
|
||||
if c < 0:
|
||||
ret = [item]
|
||||
elif c == 0:
|
||||
ret.append(item)
|
||||
return ret
|
||||
|
||||
def readFile(fileName):
|
||||
f = open(fileName)
|
||||
try:
|
||||
ret = f.read()
|
||||
return ret
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
def writeFile(fileName, contents):
|
||||
f = open(fileName, "w")
|
||||
try:
|
||||
f.write(contents)
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
def writeFileAtomic(fileName, contents, suffix = ".tmp"):
|
||||
tmpFile = fileName + suffix
|
||||
writeFile(tmpFile, contents)
|
||||
try:
|
||||
os.rename(tmpFile, fileName)
|
||||
except:
|
||||
if os.path.exists(fileName) and os.path.exists(tmpFile):
|
||||
os.remove(tmpFile)
|
||||
raise
|
||||
|
||||
|
||||
class LineIndexed:
|
||||
"""Read-only file object indexed by line number. The file object must
|
||||
support seeking."""
|
||||
|
||||
def __init__(self, fileName, offset = 0):
|
||||
if type(fileName) == type(""):
|
||||
self._file = file(fileName)
|
||||
else:
|
||||
self._file = fileName
|
||||
self._offset = offset
|
||||
self._length = None
|
||||
self._lineOffsets = [0]
|
||||
|
||||
def close(self):
|
||||
self._file.close()
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def _seekForward(self, index):
|
||||
if self._length == 0:
|
||||
raise IndexError()
|
||||
for index in range(len(self._lineOffsets), index + 1):
|
||||
if self._lineOffsets[-1] == self._length:
|
||||
raise IndexError()
|
||||
self._file.seek(self._lineOffsets[-1] + self._offset)
|
||||
line = self._file.readline()
|
||||
if line == "":
|
||||
self._length = self._lineOffsets[-1]
|
||||
else:
|
||||
self._lineOffsets.append(self._lineOffsets[-1] + len(line))
|
||||
|
||||
def __getitem__(self, key):
|
||||
if type(key) == types.SliceType:
|
||||
ret = LazySlice(self, key.start, key.stop, key.step)
|
||||
return ret
|
||||
if key < 0:
|
||||
positiveLength = len(self) + key
|
||||
if positiveLength < 0:
|
||||
raise IndexError()
|
||||
return self[positiveLength]
|
||||
self._seekForward(key)
|
||||
offset = self._lineOffsets[key]
|
||||
if offset == self._length:
|
||||
raise IndexError()
|
||||
self._file.seek(offset + self._offset)
|
||||
return self._file.readline()
|
||||
|
||||
def __len__(self):
|
||||
if self._length == 0:
|
||||
return 0
|
||||
while self._lineOffsets[-1] != self._length:
|
||||
self._seekForward(len(self._lineOffsets))
|
||||
return len(self._lineOffsets) - 1
|
||||
|
||||
def lineOffset(self, index):
|
||||
self._seekForward(index)
|
||||
return self._lineOffsets[index]
|
||||
|
||||
def __str__(self):
|
||||
return str(list(self))
|
||||
|
||||
def __repr__(self):
|
||||
return "LineIndexed(offset = " + str(self._offset) + ", length = " + \
|
||||
str(self._length) + ", lineOffsets: " + \
|
||||
str(self._lineOffsets) + ")"
|
||||
|
||||
def getLine(fileName, count):
|
||||
fileObj = file(fileName)
|
||||
if count < 0:
|
||||
return fileObj.readlines()[count]
|
||||
for i in range(count + 1):
|
||||
ret = fileObj.readline()
|
||||
if ret == "":
|
||||
raise IndexError()
|
||||
return ret
|
||||
|
||||
def conditionalJoin(list, separator = " ", predicate = lambda e: e != ""):
|
||||
if len(list) == 0:
|
||||
return ""
|
||||
rest = conditionalJoin(list[1:], separator, predicate)
|
||||
if not predicate(rest):
|
||||
return list[0]
|
||||
if not predicate(list[0]):
|
||||
return rest
|
||||
else:
|
||||
return list[0] + separator + rest
|
||||
|
||||
def pathComponents(path):
|
||||
(head, tail) = os.path.split(path)
|
||||
if tail == "":
|
||||
if head == "":
|
||||
return []
|
||||
if head == "/":
|
||||
return ["/"]
|
||||
else:
|
||||
return pathComponents(head)
|
||||
else:
|
||||
return pathComponents(head) + [tail]
|
||||
|
||||
def commonFirstElements(listOfLists):
|
||||
if len(listOfLists) == 0:
|
||||
return []
|
||||
if len(listOfLists) == 1:
|
||||
return listOfLists[0]
|
||||
if (len(listOfLists[0]) == 0) or (len(listOfLists[1]) == 0) or \
|
||||
(listOfLists[0][0] != listOfLists[1][0]):
|
||||
return []
|
||||
else:
|
||||
return commonFirstElements([
|
||||
[listOfLists[0][0]] +
|
||||
commonFirstElements([listOfLists[0][1:], listOfLists[1][1:]])]
|
||||
+ listOfLists[2:])
|
||||
|
||||
def commonPrefix(pathList):
|
||||
"""Similar to os.path.commonprefix, but works on path components instead
|
||||
of individual characters."""
|
||||
|
||||
assert(not isinstance(pathList, str))
|
||||
components = commonFirstElements(map(pathComponents, pathList))
|
||||
if components == []:
|
||||
return ""
|
||||
else:
|
||||
return os.path.join(* components)
|
||||
|
||||
def pathIsBelow(path, directory):
|
||||
"""Check if path lies below directory."""
|
||||
return commonPrefix([path, directory]) == directory
|
||||
|
||||
def realPath(path):
|
||||
"""Similar to os.path.realpath, but handles amd gracefully."""
|
||||
ret = os.path.realpath(path)
|
||||
for binDir in os.environ["PATH"].split(":") + ["/usr/sbin", "/sbin"]:
|
||||
amq = os.path.join(binDir, "amq")
|
||||
if os.path.isfile(amq):
|
||||
try:
|
||||
output = chakana.command.output(amq)
|
||||
except chakana.error.CommandFailed, err:
|
||||
debug(Debug, str(err))
|
||||
# Assume amd is not running
|
||||
return ret
|
||||
for line in output.splitlines():
|
||||
amdDir = line.split()[0]
|
||||
mountDir = line.split()[-1]
|
||||
if mountDir[0] == "/":
|
||||
match = re.match('^(' + re.escape(line) + ')(/$)', ret)
|
||||
if match:
|
||||
return amdDir + ret[len(line) :]
|
||||
return ret
|
||||
return ret
|
||||
|
||||
def removeTree(path, ignoreErrors = False):
|
||||
debug(Debug, "rm -rf " + path)
|
||||
if os.path.isdir(path):
|
||||
shutil.rmtree(path, ignore_errors = ignoreErrors)
|
||||
|
||||
def makeSymlink(value, dest, force = True, debugLevel = MinorEvent,
|
||||
dryRun = False):
|
||||
if os.path.islink(dest):
|
||||
if os.readlink(dest) == value:
|
||||
debug(Debug, "Link " + dest + " already points to " + value)
|
||||
return
|
||||
elif force:
|
||||
debug(debugLevel, "Removing " + dest)
|
||||
if not dryRun:
|
||||
os.remove(dest)
|
||||
else:
|
||||
raise OSError((errno.EEXIST, "Link already exists", dest))
|
||||
absValue = os.path.join(os.path.dirname(dest), value)
|
||||
if (not dryRun) and (not force) and (not os.path.isfile(absValue)):
|
||||
raise OSError((errno.ENOENT, "Link destination does not exist", absDest))
|
||||
debug(debugLevel, "Linking " + dest + " to " + value)
|
||||
if not dryRun:
|
||||
os.symlink(value, dest)
|
||||
|
||||
def copyTree(src, dst, symlinks = False, predicate = lambda path: 1,
|
||||
preserveTimes = False):
|
||||
"""Similar to shutil.copytree, but allows existing destination and
|
||||
passes exceptions."""
|
||||
names = filter(predicate, os.listdir(src))
|
||||
if not os.path.isdir(dst):
|
||||
os.mkdir(dst)
|
||||
for name in names:
|
||||
srcname = os.path.join(src, name)
|
||||
dstname = os.path.join(dst, name)
|
||||
try:
|
||||
if symlinks and os.path.islink(srcname):
|
||||
linkto = os.readlink(srcname)
|
||||
os.symlink(linkto, dstname)
|
||||
elif os.path.isdir(srcname):
|
||||
copyTree(srcname, dstname, symlinks, predicate, preserveTimes)
|
||||
else:
|
||||
if preserveTimes:
|
||||
shutil.copy2(srcname, dstname)
|
||||
else:
|
||||
shutil.copy(srcname, dstname)
|
||||
# XXX What about devices, sockets etc.?
|
||||
except (IOError, os.error), why:
|
||||
debug(Error, "Can't copy %s to %s: %s" %
|
||||
(`srcname`, `dstname`, str(why)))
|
||||
raise
|
||||
|
||||
def makeDirsSafe(dir, mode = 0777):
|
||||
"""Similar to os.makedirs, but does not fail if another process is making
|
||||
the directory simultaneously."""
|
||||
while not os.path.isdir(dir):
|
||||
try:
|
||||
os.makedirs(dir, mode)
|
||||
except OSError, err:
|
||||
if err.errno != errno.EEXIST:
|
||||
raise
|
||||
|
||||
def typeAssert(variable, type):
|
||||
assert(isinstance(variable, type))
|
||||
|
||||
def intervalsOverlap(range1, range2):
|
||||
if range1 > range2:
|
||||
return intervalsOverlap(range2, range1)
|
||||
if (range1[0] == range1[1]) or (range2[0] == range2[1]):
|
||||
return 0
|
||||
return range2[0] < range1[1]
|
||||
|
||||
class LexicalDistance:
|
||||
def __init__(self, caseChange = 1, whiteSpaceChange = 2,
|
||||
whiteSpaceRemoval = 3, ampersand = 5, other = 10,
|
||||
whiteSpace = " _-'\""):
|
||||
self.__caseChange = caseChange
|
||||
self.__whiteSpaceChange = whiteSpaceChange
|
||||
self.__whiteSpaceRemoval = whiteSpaceRemoval
|
||||
self.__ampersand = ampersand
|
||||
self.__other = other
|
||||
self.__whiteSpace = whiteSpace
|
||||
|
||||
def removeWhiteSpace(self, str1, str2, limit):
|
||||
if str1[0] in self.__whiteSpace:
|
||||
if limit > self.__whiteSpaceRemoval:
|
||||
return self(str1[1:], str2, limit - self.__whiteSpaceRemoval) + \
|
||||
self.__whiteSpaceRemoval
|
||||
return limit
|
||||
|
||||
def changeWhiteSpace(self, str1, str2, limit):
|
||||
if (str1[0] in self.__whiteSpace) and (str2[0] in self.__whiteSpace):
|
||||
if limit > self.__whiteSpaceChange:
|
||||
return self(str1[1:], str2[1:], limit - self.__whiteSpaceChange) + \
|
||||
self.__whiteSpaceChange
|
||||
return limit
|
||||
|
||||
def changeCase(self, str1, str2, limit):
|
||||
if str1[0].upper() == str2[0].upper():
|
||||
if limit > self.__caseChange:
|
||||
return self(str1[1:], str2[1:], limit - self.__caseChange) + \
|
||||
self.__caseChange
|
||||
return limit
|
||||
|
||||
def changeAmpersand(self, str1, str2, limit):
|
||||
if (str1[0] == "&") and (str2[:3].lower() == "and"):
|
||||
if limit > self.__ampersand:
|
||||
return self(str1[1:], str2[3:], limit - self.__ampersand) + \
|
||||
self.__ampersand
|
||||
return limit
|
||||
|
||||
def removeOther(self, str1, str2, limit):
|
||||
if limit > self.__other:
|
||||
return self(str1[1:], str2, limit - self.__other) + self.__other
|
||||
return limit
|
||||
|
||||
def changeOther(self, str1, str2, limit):
|
||||
if limit > self.__other:
|
||||
return self(str1[1:], str2[1:], limit - self.__other) + self.__other
|
||||
return limit
|
||||
|
||||
def __call__(self, str1, str2, limit = 50):
|
||||
ret = limit
|
||||
if str1 == "":
|
||||
if str2 == "":
|
||||
return 0
|
||||
ret = self.removeWhiteSpace(str2, str1, ret)
|
||||
ret = self.removeOther(str2, str1, ret)
|
||||
elif str2 == "":
|
||||
ret = self(str2, str1, ret)
|
||||
else:
|
||||
if str1[0] == str2[0]:
|
||||
ret = self(str1[1:], str2[1:], ret)
|
||||
else:
|
||||
ret = self.removeWhiteSpace(str1, str2, ret)
|
||||
ret = self.removeWhiteSpace(str2, str1, ret)
|
||||
ret = self.changeWhiteSpace(str1, str2, ret)
|
||||
ret = self.changeCase(str1, str2, ret)
|
||||
ret = self.changeAmpersand(str1, str2, ret)
|
||||
ret = self.changeAmpersand(str2, str1, ret)
|
||||
ret = self.changeOther(str1, str2, ret)
|
||||
ret = self.removeOther(str1, str2, ret)
|
||||
ret = self.removeOther(str2, str1, ret)
|
||||
assert(ret <= limit)
|
||||
return ret
|
||||
|
||||
class TryOperation:
|
||||
def __init__(self, operation, description = None):
|
||||
self.__operation = operation
|
||||
if description is None:
|
||||
self.__description = str(operation)
|
||||
else:
|
||||
self.__description = description
|
||||
|
||||
def __call__(self, * args, ** kwargs):
|
||||
try:
|
||||
self.__operation(* args, ** kwargs)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except:
|
||||
import debug as d
|
||||
d.debug(d.Error, self.__description + " failed\n" +
|
||||
d.exceptionDump(sys.exc_info()))
|
||||
|
||||
def lineDirective():
|
||||
fileInfo = traceback.extract_stack(None, 2)[0]
|
||||
return "#line " + repr(fileInfo[1] + 1) + " \"" + fileInfo[0] + "\""
|
||||
|
||||
try:
|
||||
mkstemp = tempfile.mkstemp
|
||||
except AttributeError:
|
||||
def mkstemp(suffix = "", prefix = None, dir = None, text = 0):
|
||||
if prefix is None:
|
||||
prefix = tempfile.gettempprefix()
|
||||
fileName = tempfile.mktemp(suffix)
|
||||
if dir is None:
|
||||
dir = os.path.dirname(fileName)
|
||||
newFileName = os.path.join(dir, prefix + os.path.basename(fileName))
|
||||
if text:
|
||||
flags = "w"
|
||||
else:
|
||||
flags = "wb"
|
||||
return (file(newFileName, flags), newFileName)
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue