osd-contiki/tools/sky/serial/serialjava.py

197 lines
7 KiB
Python

#!jython
#module for serial IO for Jython and JavaComm
#see __init__.py
#
#(C) 2002 Chris Liechti <cliechti@gmx.net>
# this is distributed under a free software license, see license.txt
import sys, os, string, javax.comm
import serialutil
VERSION = string.split("$Revision: 1.1 $")[1] #extract CVS version
PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = (0,1,2,3,4)
STOPBITS_ONE, STOPBITS_TWO, STOPBITS_ONE_HALVE = (1, 2, 3)
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5,6,7,8)
portNotOpenError = ValueError('port not open')
def device(portnumber):
enum = javax.comm.CommPortIdentifier.getPortIdentifiers()
ports = []
while enum.hasMoreElements():
el = enum.nextElement()
if el.getPortType() == javax.comm.CommPortIdentifier.PORT_SERIAL:
ports.append(el)
return ports[portnumber]
class Serial(serialutil.FileLike):
def __init__(self,
port, #number of device, numbering starts at
#zero. if everything fails, the user
#can specify a device string, note
#that this isn't portable anymore
baudrate=9600, #baudrate
bytesize=EIGHTBITS, #number of databits
parity=PARITY_NONE, #enable parity checking
stopbits=STOPBITS_ONE, #number of stopbits
timeout=None, #set a timeout value, None for waiting forever
xonxoff=0, #enable software flow control
rtscts=0, #enable RTS/CTS flow control
):
if type(port) == type(''): #strings are taken directly
portId = javax.comm.CommPortIdentifier.getPortIdentifier(port)
else:
portId = device(port) #numbers are transformed to a comportid obj
self.portstr = portId.getName()
try:
self.sPort = portId.open("python serial module", 10)
except Exception, msg:
self.sPort = None
raise serialutil.SerialException, "could not open port: %s" % msg
self.instream = self.sPort.getInputStream()
self.outstream = self.sPort.getOutputStream()
self.sPort.enableReceiveTimeout(30)
if bytesize == FIVEBITS:
self.databits = javax.comm.SerialPort.DATABITS_5
elif bytesize == SIXBITS:
self.databits = javax.comm.SerialPort.DATABITS_6
elif bytesize == SEVENBITS:
self.databits = javax.comm.SerialPort.DATABITS_7
elif bytesize == EIGHTBITS:
self.databits = javax.comm.SerialPort.DATABITS_8
else:
raise ValueError, "unsupported bytesize"
if stopbits == STOPBITS_ONE:
self.jstopbits = javax.comm.SerialPort.STOPBITS_1
elif stopbits == STOPBITS_ONE_HALVE:
self.jstopbits = javax.comm.SerialPort.STOPBITS_1_5
elif stopbits == STOPBITS_TWO:
self.jstopbits = javax.comm.SerialPort.STOPBITS_2
else:
raise ValueError, "unsupported number of stopbits"
if parity == PARITY_NONE:
self.jparity = javax.comm.SerialPort.PARITY_NONE
elif parity == PARITY_EVEN:
self.jparity = javax.comm.SerialPort.PARITY_EVEN
elif parity == PARITY_ODD:
self.jparity = javax.comm.SerialPort.PARITY_ODD
elif parity == PARITY_MARK:
self.jparity = javax.comm.SerialPort.PARITY_MARK
elif parity == PARITY_SPACE:
self.jparity = javax.comm.SerialPort.PARITY_SPACE
else:
raise ValueError, "unsupported parity type"
jflowin = jflowout = 0
if rtscts:
jflowin = jflowin | javax.comm.SerialPort.FLOWCONTROL_RTSCTS_IN
jflowout = jflowout | javax.comm.SerialPort.FLOWCONTROL_RTSCTS_OUT
if xonxoff:
jflowin = jflowin | javax.comm.SerialPort.FLOWCONTROL_XONXOFF_IN
jflowout = jflowout | javax.comm.SerialPort.FLOWCONTROL_XONXOFF_OUT
self.sPort.setSerialPortParams(baudrate, self.databits, self.jstopbits, self.jparity)
self.sPort.setFlowControlMode(jflowin | jflowout)
self.timeout = timeout
if timeout >= 0:
self.sPort.enableReceiveTimeout(timeout*1000)
else:
self.sPort.disableReceiveTimeout()
def close(self):
if self.sPort:
self.instream.close()
self.outstream.close()
self.sPort.close()
self.sPort = None
def setBaudrate(self, baudrate):
"""change baudrate after port is open"""
if not self.sPort: raise portNotOpenError
self.sPort.setSerialPortParams(baudrate, self.databits, self.jstopbits, self.jparity)
def inWaiting(self):
if not self.sPort: raise portNotOpenError
return self.instream.available()
def write(self, data):
if not self.sPort: raise portNotOpenError
self.outstream.write(data)
def read(self, size=1):
if not self.sPort: raise portNotOpenError
read = ''
if size > 0:
while len(read) < size:
x = self.instream.read()
if x == -1:
if self.timeout >= 0:
break
else:
read = read + chr(x)
return read
def flushInput(self):
if not self.sPort: raise portNotOpenError
self.instream.skip(self.instream.available())
def flushOutput(self):
if not self.sPort: raise portNotOpenError
self.outstream.flush()
def sendBreak(self):
if not self.sPort: raise portNotOpenError
self.sPort.sendBreak()
def getDSR(self):
if not self.sPort: raise portNotOpenError
self.sPort.isDSR()
def getCD(self):
if not self.sPort: raise portNotOpenError
self.sPort.isCD()
def getRI(self):
if not self.sPort: raise portNotOpenError
self.sPort.isRI()
def getCTS(self):
if not self.sPort: raise portNotOpenError
self.sPort.isCTS()
def setDTR(self,on=1):
if not self.sPort: raise portNotOpenError
self.sPort.setDTR(on)
def setRTS(self,on=1):
if not self.sPort: raise portNotOpenError
self.sPort.setRTS(on)
if __name__ == '__main__':
s = Serial(0,
baudrate=19200, #baudrate
bytesize=EIGHTBITS, #number of databits
parity=PARITY_EVEN, #enable parity checking
stopbits=STOPBITS_ONE, #number of stopbits
timeout=3, #set a timeout value, None for waiting forever
xonxoff=0, #enable software flow control
rtscts=0, #enable RTS/CTS flow control
)
s.setRTS(1)
s.setDTR(1)
s.flushInput()
s.flushOutput()
s.write('hello')
print repr(s.read(5))
print s.inWaiting()
del s