




对于一个概率密度函数为$p(\mathbb x)$的分布,我们知道其均值和方差分别可以由以下公式计算: $ E(f(\mathbb x)) = \int f(\mathbb x) p(\mathbb x) d \mathbb x \ Var{f(\mathbb x)} = E(f(\mathbb x) – E(f(\mathbb x))^2 = \int (f(\mathbb x) – E(f(\mathbb x))^2p(\mathbb x) d\mathbb x $ 根据基本统计知识有,对一批数据${\mathbb y_i}$计算其均值和方差的方法为: $ \bar {\mathbb y} = \frac{1}{n} \Sigma_{i=1}^n \mathbb y_i $

$ Var(\mathbb y) = \frac{1}{n} \Sigma_{i=1}^n(\mathbb y_i – \bar {\mathbb y})^2 $



  1. t=0时刻,粒子初始化,根据初始的均值和方差生成一批例子。
  2. for t > 0:
  3. ​ a. 预测。根据系统模型,对输入数据加上过程噪声,预测粒子的下一状态,统计更新后的粒子均值和方差得到预测结果。
  4. ​ b. 更新。根据观测值更新噪声。测量观测值和粒子的观测值的残差,计算残差关于采样噪声的概率值,作为权重。
  5. ​ c. 重采样。对粒子群重新采样,剔除一些权重值特别小的粒子,对权重值比较高的点多采样。
  6. ​ d. 估计值。根据粒子的状态和权重值,重新计算均值和方差。





resample(n, particles, weights): Neff = 0 for i from 1 to n: sum_weight = weights[i] * weights[i] if 1. / Neff < N_TH: wcum = [0] * n new_particles = {} new_weights = [0] * n for i from 1 to n: wcum[i] = wcum[i-1] + weight[i] for i from 1 to n: p = random.uniform(0, 1) find j, wcum[j] < p <= wcum[j+1] copy particle j to new_particles new_weights[i] = weights[j] particles = new_particles weights = new_weights normalize(weights) return particles, weights 




class ParticleFilter(object): def __init__(self, initial_X, initial_std, number_particles): self.np = number_particles self.particles = self._initial_prarticles(initial_X, initial_std, number_particles) self.weights = np.matrix(np.zeros((number_particles, 1))) + 1.0 / number_particles self.NTH = number_particles * 1 / 4 print(type(self.weights)) def _initial_prarticles(self, initial_X, initial_std, number_particles): particles = np.matrix(np.zeros((initial_X.shape[0], number_particles))) particles[0, :] = initial_X[0,0] + (np.random.randn(number_particles)) * initial_std[0, 0] particles[1, :] = initial_X[1,0] + (np.random.randn(number_particles)) * initial_std[1, 0] particles[2, :] = initial_X[2, 0] + (np.random.randn(number_particles)) * initial_std[2, 0] particles[2, :] %= (2 * np.pi) particles[3, :] = initial_X[3, 0] + (np.random.randn(number_particles)) * initial_std[3, 0] return particles def _move_motion(self, X, u, dt): F = np.matrix([[1., 0., 0., dt * math.cos(X[2])], [0., 1., 0., dt * math.sin(X[2])], [0., 0., 1., 0.], [0., 0., 0., 1.]]) B = np.matrix([[0, 0.], [0, 0.], [0., dt], [dt, 0.]]) X_pred = F * X + B * u return X_pred def calc_convariance(self, X_mean): covariance = np.zeros((4, 4)) for i in range(self.np): dx = self.particles[:, i:i+1] - X_mean covariance += self.weights[i, 0] * dx * dx.T return covariance def predict(self, u, Q, dt=0.1): for i in range(self.np): u_p = np.zeros((2, 1)) u_p[0, 0] = u[0,0] + np.random.randn() * Q[0, 0] u_p[1, 0] = u[1,0] + np.random.randn() * Q[1, 1] self.particles[:, i:i+1] = self._move_motion(self.particles[:, i:i+1], u_p, dt) X = self.particles * self.weights P = self.calc_convariance(X) return X, P def gauss_likelihood(self, x, sigma): p = 1.0 / math.sqrt(2.0 * math.pi * sigma ** 2) * math.exp(-x **2/(2 * sigma ** 2)) return p def update(self, Z_observed, R): for i in range(self.np): X = self.particles[:, i] # w = self.weights[i, 0] w = 1.0 for z_j, lx_j, ly_j in Z_observed: dx = lx_j - X[0] dy = ly_j - X[1] z_pre = math.sqrt(dx * dx + dy * dy) dz = z_j - z_pre w = w * self.gauss_likelihood(dz, math.sqrt(R[0, 0])) self.weights[i, 0] = w self.weights += 1.e-300 self.weights = self.weights / self.weights.sum() # try to resample self.resample() X = self.particles * self.weights P = self.calc_convariance(X) return X, P def random(self): resampleid = np.zeros((1, self.np)) for i in range(self.np): resampleid[0, i] = random.uniform(0, 1.0) # base = np.cumsum(self.weights * 0.0 + 1 / self.np) - 1 / self.np # resampleid = base + np.random.randn(base.shape[1]) / self.np return resampleid def resample(self): self.weights += 1.e-300 Neff = 1.0 / (self.weights.T * self.weights)[0, 0] if Neff < self.NTH: wcum = np.cumsum(self.weights, axis=0) wcum[-1, 0] = 1. indexes = [] resampleid = self.random() for i in range(self.np): ind = 0 while wcum[ind, 0] < resampleid[0, i]: ind += 1 indexes.append(ind) self.particles[:, :] = self.particles[:, indexes] self.weights[:, 0] = self.weights[indexes, 0] self.weights /= np.sum(self.weights) def get_particles(self): return self.particles 


免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/71290.html




您的电子邮箱地址不会被公开。 必填项已用 * 标注
