path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/gps/rockwell.py
diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/gps/rockwell.py')
1 files changed, 0 insertions, 268 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/gps/rockwell.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/gps/rockwell.py
deleted file mode 100755
index 7c1d2adc..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/gps/rockwell.py
+++ /dev/null
@@ -1,268 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-"""Rockwell Semiconductor Zodiac Serial Protocol
-Coded from official protocol specs (Order No. GPS-25, 09/24/1996, Revision 11)
-Maintainer: Bob Ippolito
-The following Rockwell Zodiac messages are currently understood::
- EARTHA\\r\\n (a hack to "turn on" a DeLorme Earthmate)
- 1000 (Geodesic Position Status Output)
- 1002 (Channel Summary)
- 1003 (Visible Satellites)
- 1011 (Receiver ID)
-The following Rockwell Zodiac messages require implementation::
- None really, the others aren't quite so useful and require bidirectional communication w/ the device
-Other desired features::
- - Compatability with the DeLorme Tripmate and other devices with this chipset (?)
-import struct, operator, math
-from twisted.internet import protocol
-from twisted.python import log
-DEBUG = 1
-class ZodiacParseError(ValueError):
- pass
-class Zodiac(protocol.Protocol):
- dispatch = {
- # Output Messages (* means they get sent by the receiver by default periodically)
- 1000: 'fix', # *Geodesic Position Status Output
- 1001: 'ecef', # ECEF Position Status Output
- 1002: 'channels', # *Channel Summary
- 1003: 'satellites', # *Visible Satellites
- 1005: 'dgps', # Differential GPS Status
- 1007: 'channelmeas', # Channel Measurement
- 1011: 'id', # *Receiver ID
- 1012: 'usersettings', # User-Settings Output
- 1100: 'testresults', # Built-In Test Results
- 1102: 'meastimemark', # Measurement Time Mark
- 1108: 'utctimemark', # UTC Time Mark Pulse Output
- 1130: 'serial', # Serial Port Communication Parameters In Use
- 1135: 'eepromupdate', # EEPROM Update
- 1136: 'eepromstatus', # EEPROM Status
- }
- # these aren't used for anything yet, just sitting here for reference
- messages = {
- # Input Messages
- 'fix': 1200, # Geodesic Position and Velocity Initialization
- 'udatum': 1210, # User-Defined Datum Definition
- 'mdatum': 1211, # Map Datum Select
- 'smask': 1212, # Satellite Elevation Mask Control
- 'sselect': 1213, # Satellite Candidate Select
- 'dgpsc': 1214, # Differential GPS Control
- 'startc': 1216, # Cold Start Control
- 'svalid': 1217, # Solution Validity Control
- 'antenna': 1218, # Antenna Type Select
- 'altinput': 1219, # User-Entered Altitude Input
- 'appctl': 1220, # Application Platform Control
- 'navcfg': 1221, # Nav Configuration
- 'test': 1300, # Perform Built-In Test Command
- 'restart': 1303, # Restart Command
- 'serial': 1330, # Serial Port Communications Parameters
- 'msgctl': 1331, # Message Protocol Control
- 'dgpsd': 1351, # Raw DGPS RTCM SC-104 Data
- }
- MAX_LENGTH = 296
- allow_earthmate_hack = 1
- recvd = ""
- def dataReceived(self, recd):
- self.recvd = self.recvd + recd
- while len(self.recvd) >= 10:
- # hack for DeLorme EarthMate
- if self.recvd[:8] == 'EARTHA\r\n':
- if self.allow_earthmate_hack:
- self.allow_earthmate_hack = 0
- self.transport.write('EARTHA\r\n')
- self.recvd = self.recvd[8:]
- continue
- if self.recvd[0:2] != '\xFF\x81':
- if DEBUG:
- raise ZodiacParseError('Invalid Sync %r' % self.recvd)
- else:
- raise ZodiacParseError
- sync, msg_id, length, acknak, checksum = struct.unpack('<HHHHh', self.recvd[:10])
- # verify checksum
- cksum = -(reduce(operator.add, (sync, msg_id, length, acknak)) & 0xFFFF)
- cksum, = struct.unpack('<h', struct.pack('<h', cksum))
- if cksum != checksum:
- if DEBUG:
- raise ZodiacParseError('Invalid Header Checksum %r != %r %r' % (checksum, cksum, self.recvd[:8]))
- else:
- raise ZodiacParseError
- # length was in words, now it's bytes
- length = length * 2
- # do we need more data ?
- neededBytes = 10
- if length:
- neededBytes += length + 2
- if len(self.recvd) < neededBytes:
- break
- if neededBytes > self.MAX_LENGTH:
- raise ZodiacParseError("Invalid Header??")
- # empty messages pass empty strings
- message = ''
- # does this message have data ?
- if length:
- message, checksum = self.recvd[10:10+length], struct.unpack('<h', self.recvd[10+length:neededBytes])[0]
- cksum = 0x10000 - (reduce(operator.add, struct.unpack('<%dH' % (length/2), message)) & 0xFFFF)
- cksum, = struct.unpack('<h', struct.pack('<h', cksum))
- if cksum != checksum:
- if DEBUG:
- log.dmsg('msg_id = %r length = %r' % (msg_id, length), debug=True)
- raise ZodiacParseError('Invalid Data Checksum %r != %r %r' % (checksum, cksum, message))
- else:
- raise ZodiacParseError
- # discard used buffer, dispatch message
- self.recvd = self.recvd[neededBytes:]
- self.receivedMessage(msg_id, message, acknak)
- def receivedMessage(self, msg_id, message, acknak):
- dispatch = self.dispatch.get(msg_id, None)
- if not dispatch:
- raise ZodiacParseError('Unknown msg_id = %r' % msg_id)
- handler = getattr(self, 'handle_%s' % dispatch, None)
- decoder = getattr(self, 'decode_%s' % dispatch, None)
- if not (handler and decoder):
- # missing handler or decoder
- #if DEBUG:
- # log.msg('MISSING HANDLER/DECODER PAIR FOR: %r' % (dispatch,), debug=True)
- return
- decoded = decoder(message)
- return handler(*decoded)
- def decode_fix(self, message):
- assert len(message) == 98, "Geodesic Position Status Output should be 55 words total (98 byte message)"
- (ticks, msgseq, satseq, navstatus, navtype, nmeasure, polar, gpswk, gpses, gpsns, utcdy, utcmo, utcyr, utchr, utcmn, utcsc, utcns, latitude, longitude, height, geoidalsep, speed, course, magvar, climb, mapdatum, exhposerr, exvposerr, extimeerr, exphvelerr, clkbias, clkbiasdev, clkdrift, clkdriftdev) = struct.unpack('<LhhHHHHHLLHHHHHHLlllhLHhhHLLLHllll', message)
- # there's a lot of shit in here..
- # I'll just snag the important stuff and spit it out like my NMEA decoder
- utc = (utchr * 3600.0) + (utcmn * 60.0) + utcsc + (float(utcns) * 0.000000001)
- log.msg('utchr, utcmn, utcsc, utcns = ' + repr((utchr, utcmn, utcsc, utcns)), debug=True)
- latitude = float(latitude) * 0.00000180 / math.pi
- longitude = float(longitude) * 0.00000180 / math.pi
- posfix = not (navstatus & 0x001c)
- satellites = nmeasure
- hdop = float(exhposerr) * 0.01
- altitude = float(height) * 0.01, 'M'
- geoid = float(geoidalsep) * 0.01, 'M'
- dgps = None
- return (
- # seconds since 00:00 UTC
- utc,
- # latitude (degrees)
- latitude,
- # longitude (degrees)
- longitude,
- # position fix status (invalid = False, valid = True)
- posfix,
- # number of satellites [measurements] used for fix 0 <= satellites <= 12
- satellites,
- # horizontal dilution of precision
- hdop,
- # (altitude according to WGS-84 ellipsoid, units (always 'M' for meters))
- altitude,
- # (geoid separation according to WGS-84 ellipsoid, units (always 'M' for meters))
- geoid,
- # None, for compatability w/ NMEA code
- dgps,
- )
- def decode_id(self, message):
- assert len(message) == 106, "Receiver ID Message should be 59 words total (106 byte message)"
- ticks, msgseq, channels, software_version, software_date, options_list, reserved = struct.unpack('<Lh20s20s20s20s20s', message)
- channels, software_version, software_date, options_list = map(lambda s: s.split('\0')[0], (channels, software_version, software_date, options_list))
- software_version = float(software_version)
- channels = int(channels) # 0-12 .. but ALWAYS 12, so we ignore.
- options_list = int(options_list[:4], 16) # only two bitflags, others are reserved
- minimize_rom = (options_list & 0x01) > 0
- minimize_ram = (options_list & 0x02) > 0
- # (version info), (options info)
- return ((software_version, software_date), (minimize_rom, minimize_ram))
- def decode_channels(self, message):
- assert len(message) == 90, "Channel Summary Message should be 51 words total (90 byte message)"
- ticks, msgseq, satseq, gpswk, gpsws, gpsns = struct.unpack('<LhhHLL', message[:18])
- channels = []
- message = message[18:]
- for i in range(12):
- flags, prn, cno = struct.unpack('<HHH', message[6 * i:6 * (i + 1)])
- # measurement used, ephemeris available, measurement valid, dgps corrections available
- flags = (flags & 0x01, flags & 0x02, flags & 0x04, flags & 0x08)
- channels.append((flags, prn, cno))
- # ((flags, satellite PRN, C/No in dbHz)) for 12 channels
- # satellite message sequence number
- # gps week number, gps seconds in week (??), gps nanoseconds from Epoch
- return (tuple(channels),) #, satseq, (gpswk, gpsws, gpsns))
- def decode_satellites(self, message):
- assert len(message) == 90, "Visible Satellites Message should be 51 words total (90 byte message)"
- ticks, msgseq, gdop, pdop, hdop, vdop, tdop, numsatellites = struct.unpack('<LhhhhhhH', message[:18])
- gdop, pdop, hdop, vdop, tdop = map(lambda n: float(n) * 0.01, (gdop, pdop, hdop, vdop, tdop))
- satellites = []
- message = message[18:]
- for i in range(numsatellites):
- prn, azi, elev = struct.unpack('<Hhh', message[6 * i:6 * (i + 1)])
- azi, elev = map(lambda n: (float(n) * 0.0180 / math.pi), (azi, elev))
- satellites.push((prn, azi, elev))
- # ((PRN [0, 32], azimuth +=[0.0, 180.0] deg, elevation +-[0.0, 90.0] deg)) satellite info (0-12)
- # (geometric, position, horizontal, vertical, time) dilution of precision
- return (tuple(satellites), (gdop, pdop, hdop, vdop, tdop))
- def decode_dgps(self, message):
- assert len(message) == 38, "Differential GPS Status Message should be 25 words total (38 byte message)"
- raise NotImplementedError
- def decode_ecef(self, message):
- assert len(message) == 96, "ECEF Position Status Output Message should be 54 words total (96 byte message)"
- raise NotImplementedError
- def decode_channelmeas(self, message):
- assert len(message) == 296, "Channel Measurement Message should be 154 words total (296 byte message)"
- raise NotImplementedError
- def decode_usersettings(self, message):
- assert len(message) == 32, "User-Settings Output Message should be 22 words total (32 byte message)"
- raise NotImplementedError
- def decode_testresults(self, message):
- assert len(message) == 28, "Built-In Test Results Message should be 20 words total (28 byte message)"
- raise NotImplementedError
- def decode_meastimemark(self, message):
- assert len(message) == 494, "Measurement Time Mark Message should be 253 words total (494 byte message)"
- raise NotImplementedError
- def decode_utctimemark(self, message):
- assert len(message) == 28, "UTC Time Mark Pulse Output Message should be 20 words total (28 byte message)"
- raise NotImplementedError
- def decode_serial(self, message):
- assert len(message) == 30, "Serial Port Communication Paramaters In Use Message should be 21 words total (30 byte message)"
- raise NotImplementedError
- def decode_eepromupdate(self, message):
- assert len(message) == 8, "EEPROM Update Message should be 10 words total (8 byte message)"
- raise NotImplementedError
- def decode_eepromstatus(self, message):
- assert len(message) == 24, "EEPROM Status Message should be 18 words total (24 byte message)"
- raise NotImplementedError