Improve track interpolation by using two poles and linear fallback

This commit is contained in:
Derek Schmidt 2021-04-14 23:57:14 -07:00
parent fe1d1add35
commit 18b710d189

136
main.py
View file

@ -77,10 +77,94 @@ class FaceDetector(Thread):
self._track_queue.join()
class Interpolator(Thread):
def __init__(self, frame_queue, track_queue, output_queue):
super().__init__()
self._frame_queue = frame_queue
self._track_queue = track_queue
self._output_queue = output_queue
# Lucas-Kanade sparse optical flow paramaters
# TODO: learn what these should do and make them easier to configure
self._lk_params = dict( winSize = (8,8),
maxLevel = 2,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
self._lk_error_thresh = 10
def run(self):
landmarks_from = None
landmarks_from_frame = None
frames_to_interpolate = None
while True:
# Get landmarked frame from the ML thread
landmarks_to, landmarks_to_frame = self._track_queue.get()
# Empty the frame queue
new_frames = list()
while not self._frame_queue.empty():
try:
new_frame_raw = self._frame_queue.get_nowait()
new_frames.append(new_frame_raw)
except EmptyException:
break
# Let the ML thread know its time to analyze a new frame
self._track_queue.task_done()
if landmarks_from is not None:
self._output_queue.put((landmarks_from_frame, landmarks_from, False))
# Interpolate frames from last loop
for frame, interpolated_landmarks in self.interpolate(landmarks_from, landmarks_from_frame, landmarks_to, frames_to_interpolate):
try:
self._output_queue.put_nowait((frame, interpolated_landmarks, True))
except FullException:
break
# Save the current landmarks for use in the next loop
landmarks_from = landmarks_to
landmarks_from_frame = landmarks_to_frame
frames_to_interpolate = new_frames
# Interpolate frames using a mixture of methods
def interpolate(self, landmarks_from, landmarks_from_frame, landmarks_to, frames):
buffer_length = len(frames)
lk_interp = self._interpolate_lk(landmarks_from, landmarks_from_frame, frames)
lin_interp = self._interpolate_lin(landmarks_from, landmarks_to, buffer_length)
for frame_index, (frame, lk_result, lin_landmarks) in enumerate(zip(frames, lk_interp, lin_interp)):
# Use lk landmarks as the base, fall back on linear interpolation when tracking failed
lk_landmarks, lk_statuses, lk_errors = lk_result
lk_criteria_mask = np.logical_or(lk_statuses == 0, lk_errors > self._lk_error_thresh)
lk_criteria_mask = np.repeat(lk_criteria_mask, 2, axis=1)
np.putmask(lk_landmarks, lk_criteria_mask, lin_landmarks)
# Mix the linear interpolation results in towards the end of the buffer, to prevent jitter
combined = np.stack((lk_landmarks, lin_landmarks), axis=-1)
mix_factor = (frame_index / buffer_length) ** 2
weights = np.full(combined.shape, (1-mix_factor, mix_factor))
mixed_landmarks = np.average(combined, axis=-1, weights=weights)
yield frame, lk_landmarks
# Interpolates linearly
def _interpolate_lin(self, landmarks_from, landmarks_to, points):
return np.linspace(landmarks_from, landmarks_to, points)[:-1]
# Interpolates using Lucas-Kanade sparse optical flow
def _interpolate_lk(self, landmarks, landmarks_frame, frames):
old_frame = cv2.cvtColor(landmarks_frame, cv2.COLOR_BGR2GRAY)
old_points = landmarks
for frame in frames:
new_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
new_points, statuses, errors = cv2.calcOpticalFlowPyrLK(old_frame, new_frame, old_points, None, **self._lk_params)
yield new_points, statuses, errors
old_points = new_points
old_frame = new_frame
class FaceTracker3000:
def __init__(self, fps, width, height, bufferlen, remote_addr):
self.frame_queue = Queue(maxsize=bufferlen)
self.track_queue = Queue(maxsize=bufferlen)
self.track_queue = Queue()
self.output_queue = Queue(maxsize=bufferlen)
self.remote_addr = remote_addr
@ -97,20 +181,18 @@ class FaceTracker3000:
# TODO: Margin should be configurable
self._gaze_estimator = GazeTracker(10)
# secondary ml thread
# ml thread
self._face_detector_thread = FaceDetector(self.frame_queue, self.track_queue)
self._face_detector_thread.daemon = True
# Lucas-Kanade sparse optical flow paramaters
# TODO: learn what these should do and make them easier to configure
self._lk_params = dict( winSize = (10,10),
maxLevel = 2,
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
self._lk_error_thresh = 20
# interpolator thread
self._interpolator_thread = Interpolator(self.frame_queue, self.track_queue, self.output_queue)
def run(self):
# Start thread for intensive ML jobs (will read from frame_queue)
# Start our other threads (will read from our queues)
self._face_detector_thread.start()
self._interpolator_thread.start()
self._socket.connect(self.remote_addr)
try:
while True:
@ -124,21 +206,6 @@ class FaceTracker3000:
except FullException:
pass
# If we have some landmarks from the ML thread
while not self.track_queue.empty():
# Get the landmarks and place them in the output queue
landmarks, landmark_frame = self.track_queue.get()
self.output_queue.put((landmark_frame, landmarks, False))
# Interpolate the rest of our buffer
for frame, interpolated_landmarks in self.interpolate_from(landmarks, landmark_frame, **self._lk_params):
try:
self.output_queue.put_nowait((frame, interpolated_landmarks, True))
except FullException:
break
# Let the ML thread know its ok to choose another frame (we've emptied the queue)
self.track_queue.task_done()
# Display the frames we have ready to go
if not self.output_queue.empty():
frame, landmarks, interpolated = self.output_queue.get_nowait()
@ -193,27 +260,6 @@ class FaceTracker3000:
finally:
cv2.destroyAllWindows()
# Uses Lucas-Kanade sparse optical flow to attempt to interpolate tracking data
def interpolate_from(self, landmarks, landmark_frame, **lk_params):
old_frame = cv2.cvtColor(landmark_frame, cv2.COLOR_BGR2GRAY)
old_points = landmarks
while not self.frame_queue.empty():
try:
new_frame_raw = self.frame_queue.get_nowait()
except EmptyException:
break
new_frame = cv2.cvtColor(new_frame_raw, cv2.COLOR_BGR2GRAY)
new_points, statuses, errors = cv2.calcOpticalFlowPyrLK(old_frame, new_frame, old_points, None, **lk_params)
if any(point_status == 0 or point_error > self._lk_error_thresh for point_status, point_error in zip(statuses, errors)):
yield new_frame_raw, None
else:
yield new_frame_raw, new_points
old_points = new_points
old_frame = new_frame
if __name__ == '__main__':
# TODO: should come from a configuration file