Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F124136315
boat
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Jul 31, 15:19
Size
4 KB
Mime Type
text/x-python
Expires
Sat, Aug 2, 15:19 (2 d)
Engine
blob
Format
Raw Data
Handle
27807964
Attached To
R890 mux0183
boat
View Options
#!/usr/bin/env python3
import
sys
,
datetime
sys
.
path
.
append
(
'../lib'
)
from
nmeaTransceiver
import
*
from
geoCalc
import
*
from
nmea0183
import
RMB
,
RMC
,
HDT
class
TestBoat
:
"Test boat that receives RMBs and sends RMCs"
__NT
=
None
__RMB
=
None
__RMC
=
None
__HDT
=
None
__GC
=
None
__nbOfLegs
=
None
# Starting position
__lat
=
'4301.000'
__latNS
=
'N'
__lon
=
'00617.000'
__lonEW
=
'E'
# Current destination information
bearing
=
0
__RMB_lat
=
None
__RMB_latNS
=
None
__RMB_lon
=
None
__RMB_lonEW
=
None
__RMB_text
=
None
__RMB_iteration
=
None
__RMB_waypoints
=
None
def
__RMB_reader
(
self
,
nmeaSentence
):
print
(
"RMB received from EC talker: %s"
%
nmeaSentence
)
rmbData
=
self
.
__RMB
.
decode
(
nmeaSentence
)
# move boat to the next Waypoint
rmb_text
=
rmbData
[
'DestinationWaypointLatitude'
]
+
rmbData
[
'DestinationWaypointLatitudeNS'
]
+
rmbData
[
'DestinationWaypointLongitude'
]
+
rmbData
[
'DestinationWaypointLongitudeEW'
]
if
rmb_text
!=
self
.
__RMB_text
:
print
(
"New destination detected!"
)
# New destination
self
.
__RMB_text
=
rmb_text
self
.
__RMB_lat
=
rmbData
[
'DestinationWaypointLatitude'
]
self
.
__RMB_latNS
=
rmbData
[
'DestinationWaypointLatitudeNS'
]
self
.
__RMB_lon
=
rmbData
[
'DestinationWaypointLongitude'
]
self
.
__RMB_lonEW
=
rmbData
[
'DestinationWaypointLongitudeEW'
]
self
.
__RMB_iteration
=
0
# Compute intermediate waypoints
currentPosition
=
NmeaGeoPoint
(
self
.
__lat
,
self
.
__latNS
,
self
.
__lon
,
self
.
__lonEW
)
destinationPosition
=
NmeaGeoPoint
(
rmbData
[
'DestinationWaypointLatitude'
],
rmbData
[
'DestinationWaypointLatitudeNS'
],
rmbData
[
'DestinationWaypointLongitude'
],
rmbData
[
'DestinationWaypointLongitudeEW'
])
self
.
__RMB_waypoints
=
self
.
__GC
.
getIntermediateWaypoints
(
currentPosition
,
destinationPosition
,
self
.
__nbOfLegs
)[
'waypoints'
]
def
__RMC_sender
(
self
):
# Move to the next intermediate waypoint
self
.
bearing
=
0
if
self
.
__RMB_iteration
!=
None
:
n
=
self
.
__RMB_iteration
if
n
<
self
.
__nbOfLegs
:
print
(
"Moving to waypoint: #"
,
n
+
1
,
"/"
,
len
(
self
.
__RMB_waypoints
))
targetPoint
=
GeoPoint
(
self
.
__RMB_waypoints
[
n
][
'latitude'
],
self
.
__RMB_waypoints
[
n
][
'longitude'
])
print
(
str
(
targetPoint
))
self
.
bearing
=
self
.
__RMB_waypoints
[
n
][
'bearing'
]
self
.
__lat
,
self
.
__latNS
,
self
.
__lon
,
self
.
__lonEW
=
targetPoint
.
toNMEA
()
self
.
__RMB_iteration
+=
1
else
:
self
.
__RMB_iteration
=
None
now
=
datetime
.
datetime
.
now
()
return
self
.
__RMC
.
encodeDict
(
{
'UtcOfPositionFix'
:
now
.
strftime
(
'%H%M%S'
),
'Status'
:
'A'
,
'Latitude'
:
self
.
__lat
,
'LatitudeNS'
:
self
.
__latNS
,
'Longitude'
:
self
.
__lon
,
'LongitudeEW'
:
self
.
__lonEW
,
'SpeedOverGround'
:
'4.6'
,
'CourseOverGround'
:
str
(
self
.
bearing
),
'Date'
:
now
.
strftime
(
'%d%m%y'
),
'MagneticVariation'
:
'0.0'
,
'ModeIndicator'
:
'E'
}
)
def
__HDT_sender
(
self
):
nmea
=
self
.
__HDT
.
encodeDict
(
{
'Heading'
:
str
(
self
.
bearing
)
}
)
print
(
nmea
)
return
nmea
def
__init__
(
self
,
nbOfLegs
=
1
):
# Set default parameters
if
nbOfLegs
<
1
:
self
.
__nbOfLegs
=
1
else
:
self
.
__nbOfLegs
=
nbOfLegs
# Create NMEA messages handlers
self
.
__RMB
=
RMB
(
RMB
.
nmeaAttributes
)
self
.
__RMC
=
RMC
(
RMC
.
nmeaAttributes
,
'GP'
)
self
.
__HDT
=
HDT
(
HDT
.
nmeaAttributes
,
'GP'
)
# Create transceiver
self
.
__NT
=
NmeaTransceiver
(
sys
.
argv
)
# Create GeoCal
self
.
__GC
=
GeoCalc
()
# Add a read callback, called when a RMB is received (only from EC talker)
self
.
__NT
.
addNmeaFilteringReader
(
self
.
__RMB_reader
,
'RMB'
,
'EC'
)
# Call RMC sender ASAP (when transceiver is started) and repeat every 3 seconds
self
.
__NT
.
callSoon
(
self
.
__RMC_sender
,
1
)
self
.
__NT
.
callSoon
(
self
.
__HDT_sender
,
1
)
def
start
(
self
):
self
.
__NT
.
startTransceiver
()
#
# Main
#
TB
=
TestBoat
(
60
)
TB
.
start
()
# EOF
Event Timeline
Log In to Comment