22
33"""
44
5- __authors__ = "Ashwin Kanhere"
6- __date__ = "25 Januray 2020"
5+ __authors__ = "Ashwin Kanhere, Derek Knowles"
6+ __date__ = "25 Jan 2020"
7+
8+ import warnings
79
810import numpy as np
911
12+ from gnss_lib_py .parsers .navdata import NavData
13+ from gnss_lib_py .algorithms .snapshot import solve_wls
14+ from gnss_lib_py .utils .coordinates import ecef_to_geodetic
1015from gnss_lib_py .utils .filters import BaseExtendedKalmanFilter
1116
17+ def solve_gnss_ekf (measurements , init_dict = None ,
18+ params_dict = None ):
19+ """Runs a GNSS Extended Kalman Filter across each timestep.
20+
21+ Runs an Extended Kalman Filter across each timestep and adds a new
22+ row for the receiver's position and clock bias.
23+
24+ Parameters
25+ ----------
26+ measurements : gnss_lib_py.parsers.navdata.NavData
27+ Instance of the NavData class
28+ init_dict : dict
29+ Initialization dict with initial states and covariances.
30+ params_dict : dict
31+ Dictionary of parameters for GNSS EKF.
32+
33+ Returns
34+ -------
35+ state_estimate : gnss_lib_py.parsers.navdata.NavData
36+ Estimated receiver position in ECEF frame in meters and the
37+ estimated receiver clock bias also in meters as an instance of
38+ the NavData class with shape (4 x # unique timesteps) and
39+ the following rows: x_rx_m, y_rx_m, z_rx_m, b_rx_m.
40+
41+ """
42+
43+ # check that all necessary rows exist
44+ measurements .in_rows (["gps_millis" ,"corr_pr_m" ,
45+ "x_sv_m" ,"y_sv_m" ,"z_sv_m" ,
46+ ])
47+
48+ if init_dict is None :
49+ init_dict = {}
50+
51+ if "state_0" not in init_dict :
52+ pos_0 = None
53+ for _ , _ , measurement_subset in measurements .loop_time ("gps_millis" ):
54+ pos_0 = solve_wls (measurement_subset )
55+ if pos_0 is not None :
56+ break
57+
58+ state_0 = np .zeros ((7 ,1 ))
59+ if pos_0 is not None :
60+ state_0 [:3 ,0 ] = pos_0 [["x_rx_m" ,"y_rx_m" ,"z_rx_m" ]]
61+ state_0 [6 ,0 ] = pos_0 [["b_rx_m" ]]
62+
63+ init_dict ["state_0" ] = state_0
64+
65+ if "sigma_0" not in init_dict :
66+ sigma_0 = np .eye (init_dict ["state_0" ].size )
67+ init_dict ["sigma_0" ] = sigma_0
68+
69+ if "Q" not in init_dict :
70+ process_noise = np .eye (init_dict ["state_0" ].size )
71+ init_dict ["Q" ] = process_noise
72+
73+ if "R" not in init_dict :
74+ measurement_noise = np .eye (1 ) # gets overwritten
75+ init_dict ["R" ] = measurement_noise
76+
77+ # initialize parameter dictionary
78+ if params_dict is None :
79+ params_dict = {}
80+
81+ if "motion_type" not in params_dict :
82+ params_dict ["motion_type" ] = "constant_velocity"
83+
84+ if "measure_type" not in params_dict :
85+ params_dict ["measure_type" ] = "pseudorange"
86+
87+ # create initialization parameters.
88+ gnss_ekf = GNSSEKF (init_dict , params_dict )
89+
90+ states = []
91+
92+ for timestamp , delta_t , measurement_subset in measurements .loop_time ("gps_millis" ):
93+ pos_sv_m = measurement_subset [["x_sv_m" ,"y_sv_m" ,"z_sv_m" ]].T
94+ pos_sv_m = np .atleast_2d (pos_sv_m )
95+
96+ corr_pr_m = measurement_subset ["corr_pr_m" ].reshape (- 1 ,1 )
97+
98+ # remove NaN indexes
99+ not_nan_indexes = ~ np .isnan (pos_sv_m ).any (axis = 1 )
100+ pos_sv_m = pos_sv_m [not_nan_indexes ]
101+ corr_pr_m = corr_pr_m [not_nan_indexes ]
102+
103+ # prediction step
104+ predict_dict = {"delta_t" : delta_t }
105+ gnss_ekf .predict (predict_dict = predict_dict )
106+
107+ # update step
108+ update_dict = {"pos_sv_m" : pos_sv_m .T }
109+ update_dict ["measurement_noise" ] = np .eye (pos_sv_m .shape [0 ])
110+ gnss_ekf .update (corr_pr_m , update_dict = update_dict )
111+
112+ states .append ([timestamp ] + np .squeeze (gnss_ekf .state ).tolist ())
113+
114+ states = np .array (states )
115+
116+ if states .size == 0 :
117+ warnings .warn ("No valid state estimate computed in solve_gnss_ekf, " \
118+ + "returning None." , RuntimeWarning )
119+ return None
120+
121+ state_estimate = NavData ()
122+ state_estimate ["gps_millis" ] = states [:,0 ]
123+ state_estimate ["x_rx_m" ] = states [:,1 ]
124+ state_estimate ["y_rx_m" ] = states [:,2 ]
125+ state_estimate ["z_rx_m" ] = states [:,3 ]
126+ state_estimate ["vx_rx_mps" ] = states [:,4 ]
127+ state_estimate ["vy_rx_mps" ] = states [:,5 ]
128+ state_estimate ["vz_rx_mps" ] = states [:,6 ]
129+ state_estimate ["b_rx_m" ] = states [:,7 ]
130+
131+ lat ,lon ,alt = ecef_to_geodetic (state_estimate [["x_rx_m" ,"y_rx_m" ,
132+ "z_rx_m" ]].reshape (3 ,- 1 ))
133+ state_estimate ["lat_rx_deg" ] = lat
134+ state_estimate ["lon_rx_deg" ] = lon
135+ state_estimate ["alt_rx_deg" ] = alt
136+
137+ return state_estimate
138+
12139class GNSSEKF (BaseExtendedKalmanFilter ):
13140 """GNSS-only EKF implementation.
14141
@@ -17,42 +144,43 @@ class GNSSEKF(BaseExtendedKalmanFilter):
17144
18145 Attributes
19146 ----------
20- dt : float
147+ params_dict : dict
148+ Dictionary of parameters that may include the following.
149+ delta_t : float
21150 Time between prediction instances
22151 motion_type : string
23- Type of motion (stationary or constant velocity )
152+ Type of motion (`` stationary`` or ``constant_velocity`` )
24153 measure_type : string
25- NavData types (pseudoranges )
154+ NavData types (pseudorange )
26155 """
27156 def __init__ (self , init_dict , params_dict ):
28157 super ().__init__ (init_dict , params_dict )
29- self .dt = params_dict ['dt' ]
30- try :
31- self .motion_type = params_dict ['motion_type' ]
32- except KeyError :
33- self .motion_type = 'stationary'
34- try :
35- self .measure_type = params_dict ['measure_type' ]
36- except KeyError :
37- self .measure_type = 'pseudoranges'
158+
159+ self .delta_t = params_dict .get ('dt' ,1.0 )
160+ self .motion_type = params_dict .get ('motion_type' ,'stationary' )
161+ self .measure_type = params_dict .get ('measure_type' ,'pseudorange' )
38162
39163 def dyn_model (self , u , predict_dict = None ):
40- """Non linear dynamics
164+ """Nonlinear dynamics
41165
42166 Parameters
43167 ----------
44168 u : np.ndarray
45169 Control signal, not used for propagation
46- predict_dict : Dict
47- Additional prediction parameters, not used currently
170+ predict_dict : dict
171+ Additional prediction parameters, including ``delta_t``
172+ updates.
48173
49174 Returns
50175 -------
51176 new_x : np.ndarray
52177 Propagated state
53178 """
54- A = self .linearize_dynamics ()
55- new_x = A @ self .x
179+ if predict_dict is None : #pragma: no cover
180+ predict_dict = {}
181+
182+ A = self .linearize_dynamics (predict_dict )
183+ new_x = A @ self .state
56184 return new_x
57185
58186 def measure_model (self , update_dict ):
@@ -62,10 +190,15 @@ def measure_model(self, update_dict):
62190 :math:`\\ rho = \\ sqrt{(x-x_{sv})^2 + (y-y_{sv})^2 + (z-z_{sv})^2} + b`.
63191 See [1]_ for more details and models.
64192
193+ ``pos_sv_m`` must be a key in update_dict and must be an array
194+ of shape [3 x N] with rows of x_sv_m, y_sv_m, and z_sv_m in that
195+ order.
196+
65197 Parameters
66198 ----------
67- update_dict : Dict
68- Update dictionary containing satellite positions with key 'sv_pos'
199+ update_dict : dict
200+ Update dictionary containing satellite positions with key
201+ ``pos_sv_m``.
69202
70203 Returns
71204 -------
@@ -80,13 +213,13 @@ def measure_model(self, update_dict):
80213 applications. John Wiley & Sons, 2021.
81214 """
82215 if self .measure_type == 'pseudorange' :
83- sv_pos = update_dict ['sv_pos ' ]
84- pseudo = np .sqrt ((self .x [0 ] - sv_pos [0 , :])** 2
85- + (self .x [1 ] - sv_pos [1 , :])** 2
86- + (self .x [2 ] - sv_pos [2 , :])** 2 ) \
87- + self .x [6 ]
216+ pos_sv_m = update_dict ['pos_sv_m ' ]
217+ pseudo = np .sqrt ((self .state [0 ] - pos_sv_m [0 , :])** 2
218+ + (self .state [1 ] - pos_sv_m [1 , :])** 2
219+ + (self .state [2 ] - pos_sv_m [2 , :])** 2 ) \
220+ + self .state [6 ]
88221 z = np .reshape (pseudo , [- 1 , 1 ])
89- else :
222+ else : #pragma: no cover
90223 raise NotImplementedError
91224 return z
92225
@@ -95,20 +228,30 @@ def linearize_dynamics(self, predict_dict=None):
95228
96229 Parameters
97230 ----------
98- predict_dict : Dict
231+ predict_dict : dict
99232 Additional predict parameters, not used in current implementation
100233
101234 Returns
102235 -------
103236 A : np.ndarray
104237 Linear dynamics model depending on motion_type
238+ predict_dict : dict
239+ Dictionary of prediction parameters.
105240 """
241+
242+ if predict_dict is None : # pragma: no cover
243+ predict_dict = {}
244+
245+ # uses delta_t from predict_dict if exists, otherwise delta_t
246+ # from the class initialization.
247+ delta_t = predict_dict .get ('delta_t' , self .delta_t )
248+
106249 if self .motion_type == 'stationary' :
107250 A = np .eye (7 )
108251 elif self .motion_type == 'constant_velocity' :
109252 A = np .eye (7 )
110- A [:3 , - 4 :- 1 ] = self . dt * np .eye (3 )
111- else :
253+ A [:3 , - 4 :- 1 ] = delta_t * np .eye (3 )
254+ else : # pragma: no cover
112255 raise NotImplementedError
113256 return A
114257
@@ -117,22 +260,24 @@ def linearize_measurements(self, update_dict):
117260
118261 Parameters
119262 ----------
120- update_dict : Dict
121- Update dictionary containing satellite positions with key 'sv_pos'
263+ update_dict : dict
264+ Update dictionary containing satellite positions with key
265+ ``pos_sv_m``.
122266
123267 Returns
124268 -------
125269 H : np.ndarray
126- Jacobian of measurement model, dimension M x N
270+ Jacobian of measurement model, of dimension
271+ #measurements x #states
127272 """
128273 if self .measure_type == 'pseudorange' :
129- sv_pos = update_dict ['sv_pos ' ]
130- m = np .shape (sv_pos )[1 ]
131- H = np .zeros ([m , self .x_dim ])
274+ pos_sv_m = update_dict ['pos_sv_m ' ]
275+ m = np .shape (pos_sv_m )[1 ]
276+ H = np .zeros ([m , self .state_dim ])
132277 pseudo_expect = self .measure_model (update_dict )
133- rx_pos = np .reshape (self .x [:3 ], [- 1 , 1 ])
134- H [:, :3 ] = (rx_pos - sv_pos ).T / pseudo_expect
278+ rx_pos = np .reshape (self .state [:3 ], [- 1 , 1 ])
279+ H [:, :3 ] = (rx_pos - pos_sv_m ).T / pseudo_expect
135280 H [:, 6 ] = 1
136- else :
281+ else : # pragma: no cover
137282 raise NotImplementedError
138283 return H
0 commit comments