#from mpmath import * from matrices import matrix, eye, zeros import math import rand import random # Matrices are labelled in the notation from # http://en.wikipedia.org/wiki/Kalman_filter#Underlying_dynamic_system_model # If using a implementing a non-linear motion or observation model, then for those # functions which return a matrix, you'll need to find a linear approximation to the # true model. This linear approximation is usually found by using the tangent line to # the non-linear function. The slope of a tangent line to a function is the derivative of # the function at that point. # See http://en.wikipedia.org/wiki/Kalman_filter#Extended_Kalman_filter or, for example, # http://www.acsu.buffalo.edu/~terejanu/files/tutorialEKF.pdf # For a system with more than one dimension, this means: # def getStateDimensions(): return 3 def getObservationDimensions(): return 3 def rot2(theta): "Return a 2D rotation matrix." return matrix([[math.cos(theta), -math.sin(theta)],[math.sin(theta), math.cos(theta)]]) def rot3(theta): "Return a rotation matrix for the first 2 dimensions of 3." return matrix([[math.cos(theta), -math.sin(theta), 0.0],[math.sin(theta), math.cos(theta), 0.0], [0.0, 0.0, 1.0]]) def motionMatrixF(state, action): "Return the F matrix (using wikipedia notation)" theta = state[2] return matrix([[1, 0, -math.sin(theta)*action[0] - math.cos(theta)*action[1]], [0, 1, math.cos(theta)*action[0] - math.sin(theta)*action[1]], [0, 0, 1]]) def motionMatrixB(state, action): "Return the B matrix (using wikipedia notation)" return rot3(state[2]) def motionNoiseCoVar(state, action): "Return the Q matrix (using wikipedia notation)" r = rot3(state[2]) # convert standard deviations into variances va = 0.01 + 0.1*abs(action[0]) va = va*va vb = 0.01 + 0.15*abs(action[1]) vb = vb*vb vc = 5/180.0*math.pi + 0.1*abs(action[2]) vc = vc*vc c = matrix([[va, 0, 0], [0, vb, 0], [0, 0, vc]]) return r*c*r.T def expectedMotionResult(state, action): # for a linear filter # return F.x + B.u return state+rot3(state[2])*action def motionSample(state, action): "Return a sample from the motion model if the agent started in the supplied state and performed the supplied action" delta = action.copy() delta[0] = random.gauss(action[0], 0.01 + 0.1*abs(action[0])) delta[1] = random.gauss(action[1], 0.01 + 0.15*abs(action[1])) delta[2] = random.gauss(action[2], 5/180.0*math.pi + 0.1*abs(action[2])) result = state+rot3(state[2])*delta return normaliseStateAngles(result) def getBeaconLoc(obs): if abs(obs[0] - 1) < 0.25: return matrix([-2,2]) elif abs(obs[0] - 2) < 0.25: return matrix([2,2]) elif abs(obs[0] - 3) < 0.25: return matrix([-2,-2]) elif abs(obs[0] - 4) < 0.25: return matrix([2,-2]) return None def observationMatrix(state, obs): "Return the H matrix (using wikipedia notation)" beacon = getBeaconLoc(obs) dx = beacon[0]-state[0] dy = beacon[1]-state[1] dsqr = dx*dx + dy*dy dist = math.sqrt(dsqr) angle = math.atan2(dy,dx) - state[2] return matrix([[0, 0, 0],[-dx/dist, -dy/dist, 0], [dy/dsqr, -dx/dsqr, -1]]) def expectedObservation(state, obs): # for a linear filter # return Ho beacon = getBeaconLoc(obs) dx = beacon[0]-state[0] dy = beacon[1]-state[1] dsqr = dx*dx + dy*dy dist = math.sqrt(dsqr) angle = math.atan2(dy,dx) - state[2] return matrix([obs[0], dist, angle]) def observationNoiseCoVar(state, obs): "Return the R matrix (using wikipedia notation)" beacon = getBeaconLoc(obs) dx = beacon[0]-state[0] dy = beacon[1]-state[1] dist = math.sqrt(dx*dx + dy*dy) angle = math.atan2(dy,dx) - state[2] # convert standard deviations into variances va = 0.025+0.01*dist va = va*va vb = 5/180.0*math.pi + 0.1*abs(angle) vb = vb*vb return matrix([[1, 0, 0], [0, va, 0], [0, 0, vb]]) def observationProbability(state, obs): "Return the probability, or probability density as appropriate, of the supplied observation assuming we're in the supplied state" beacon = getBeaconLoc(obs) dx = beacon[0]-state[0] dy = beacon[1]-state[1] dist = math.sqrt(dx*dx + dy*dy) angle = math.atan2(dy,dx) - state[2] # convert standard deviations into variances va = 0.025+0.01*dist va = va*va vb = 5/180.0*math.pi + 0.1*abs(angle) vb = vb*vb return rand.mv_normal_val(matrix([dist, angle]), matrix[[va, 0],[0, vb]], matrix([obs[1], obs[2]])) def observationSample(state): "Return a sampled observation" beaconList = [matrix([-2,2]), matrix([2,2]), matrix([-2,-2]), matrix([2,-2])] bestDist = 0 bestAngle = 10*math.pi bestBeaconID = 0 bestBeacon = None for b in range(4): beacon = beaconList[b] dx = beacon[0]-state[0] dy = beacon[1]-state[1] dist = math.sqrt(dx*dx + dy*dy) angle = math.atan2(dy,dx) - state[2] if abs(angle) < abs(bestAngle): bestAngle = angle bestDist = dist bestBeaconID = b+1 bestBeacon = beacon if bestBeaconID < 0.5: print "No best beacon:", state obsMean = matrix([bestBeaconID, bestDist, bestAngle]) # print "Observed beacon at:", bestBeacon, bestBeaconID # print "ObsMean:", obsMean # print "observationNoiseCoVar:", observationNoiseCoVar(state, obsMean) obs = rand.mv_normal_sample(obsMean, observationNoiseCoVar(state, obsMean)) obs[0] = bestBeaconID # make sure this didn't get weirdly sampled return obs def normaliseStateAngles(state): "Make sure any angles in the state are between -pi and +pi" for i in [2]: # list angles in the state here while state[i] > math.pi: state[i] = state[i] - 2*math.pi while state[i] < -math.pi: state[i] = state[i] + 2*math.pi return state def normaliseObsAngles(obs): "Make sure any angles in the observation are between -pi and +pi" for i in []: # list angles in the observation here while obs[i] > math.pi: obs[i] = obs[i] - 2*math.pi while obs[i] < -math.pi: obs[i] = obs[i] + 2*math.pi return obs