Temp eye tracking
This commit is contained in:
parent
0085ad63b7
commit
e0e7d006bd
2 changed files with 295 additions and 50 deletions
1
Pipfile
1
Pipfile
|
@ -9,6 +9,7 @@ verify_ssl = true
|
|||
face-alignment = "~=1.3.3"
|
||||
opencv-python = "~=4.5.1"
|
||||
numpy = "~=1.20.2"
|
||||
numba = "*"
|
||||
|
||||
[requires]
|
||||
python_version = "3.7"
|
||||
|
|
344
main.py
344
main.py
|
@ -1,40 +1,243 @@
|
|||
import socket
|
||||
from threading import Thread
|
||||
from queue import Queue, Full as FullException, Empty as EmptyException
|
||||
from enum import Enum, auto
|
||||
import math
|
||||
|
||||
import face_alignment
|
||||
import face_alignment.detection.blazeface
|
||||
import cv2
|
||||
import numpy as np
|
||||
from numba import jit
|
||||
|
||||
from contrib.head_pose_estimation.pose_estimator import PoseEstimator
|
||||
from contrib.GazeTracking.gaze_tracking.eye import Eye
|
||||
from contrib.GazeTracking.gaze_tracking.calibration import Calibration
|
||||
# from contrib.GazeTracking.gaze_tracking.eye import Eye
|
||||
# from contrib.GazeTracking.gaze_tracking.calibration import Calibration
|
||||
|
||||
|
||||
# class GazeTracker:
|
||||
# def __init__(self, margin):
|
||||
# self.calibration = Calibration()
|
||||
# self.margin = margin
|
||||
#
|
||||
# def get_gaze(self, frame, landmarks):
|
||||
# frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
# eye_left = Eye(frame, landmarks, self.margin, 0, self.calibration)
|
||||
# eye_right = Eye(frame, landmarks, self.margin, 1, self.calibration)
|
||||
# try:
|
||||
# left_horz_ratio = eye_left.pupil.x / (eye_left.center[0] * 2 - (self.margin//2))
|
||||
# left_vert_ratio = eye_left.pupil.y / (eye_left.center[1] * 2 - (self.margin//2))
|
||||
# right_horz_ratio = eye_right.pupil.x / (eye_right.center[0] * 2 - (self.margin//2))
|
||||
# right_vert_ratio = eye_right.pupil.y / (eye_right.center[1] * 2 - (self.margin//2))
|
||||
# except TypeError:
|
||||
# return None
|
||||
#
|
||||
# horz_average = (left_horz_ratio + right_horz_ratio) / 2
|
||||
# vert_average = (left_vert_ratio + right_vert_ratio) / 2
|
||||
#
|
||||
# return (eye_left.origin[0] + eye_left.pupil.x, eye_left.origin[1] + eye_left.pupil.y, eye_left.blinking), \
|
||||
# (eye_right.origin[0] + eye_right.pupil.x, eye_right.origin[1] + eye_right.pupil.y, eye_right.blinking), \
|
||||
# horz_average, vert_average
|
||||
class Eye:
|
||||
class SIDE(Enum):
|
||||
LEFT = auto()
|
||||
RIGHT = auto()
|
||||
|
||||
_SIDE_SLICES = {
|
||||
SIDE.LEFT: slice(36, 42),
|
||||
SIDE.RIGHT: slice(42, 48),
|
||||
}
|
||||
|
||||
def __init__(self, frame, landmarks, margin, side):
|
||||
self._margin = margin
|
||||
self._max_iterations = 100
|
||||
self._ascent_distance = 8
|
||||
|
||||
self._pupil_x = None
|
||||
self._pupil_y = None
|
||||
|
||||
landmarks_for_eye = landmarks[Eye._SIDE_SLICES[side]].astype(np.int32)
|
||||
|
||||
self._frame, (self.min_x, self.min_y, self.width, self.height) = self._crop(frame, landmarks_for_eye)
|
||||
|
||||
self.blinking = self._get_blink_ratio(landmarks_for_eye)
|
||||
|
||||
if (self.blinking is not None and self.blinking < 1):
|
||||
pass
|
||||
# pupil = self._find_pupils(self._frame)
|
||||
# if pupil is not None:
|
||||
# pupil_x, pupil_y = pupil
|
||||
# self._pupil_x = round(pupil_x)
|
||||
# self._pupil_y = round(pupil_y)
|
||||
|
||||
@property
|
||||
def frame(self):
|
||||
return self._frame
|
||||
|
||||
@property
|
||||
def pupil(self):
|
||||
return self._pupil_x, self._pupil_y
|
||||
|
||||
@property
|
||||
def origin(self):
|
||||
return self.min_x, self.min_y
|
||||
|
||||
@property
|
||||
def center(self):
|
||||
return self.min_x + (self.width // 2), self.min_y + (self.height // 2)
|
||||
|
||||
def _crop(self, frame, landmarks):
|
||||
# FIXME: My spidy sense says this is a poor way to do this, but rn brain no worky and code do
|
||||
min_x = np.min(landmarks[:, 0]) - self._margin
|
||||
max_x = np.max(landmarks[:, 0]) + self._margin
|
||||
min_y = np.min(landmarks[:, 1]) - self._margin
|
||||
max_y = np.max(landmarks[:, 1]) + self._margin
|
||||
|
||||
cropped = frame[min_y:max_y, min_x:max_x]
|
||||
|
||||
height, width = cropped.shape[:2]
|
||||
|
||||
return cropped, (min_x, min_y, width, height)
|
||||
|
||||
def _get_blink_ratio(self, landmarks):
|
||||
left = landmarks[0]
|
||||
right = landmarks[3]
|
||||
top = np.mean(landmarks[1:3], axis=0)
|
||||
bottom = np.mean(landmarks[4:6], axis=0)
|
||||
|
||||
eye_width = math.hypot((left[0] - right[0]), (left[1] - right[1]))
|
||||
eye_height = math.hypot((top[0] - bottom[0]), (top[1] - bottom[1]))
|
||||
|
||||
try:
|
||||
ratio = eye_width / eye_height
|
||||
except ZeroDivisionError:
|
||||
ratio = None
|
||||
|
||||
return ratio
|
||||
|
||||
# MAGIC: Get pupils via Fabian Timm gradient localization
|
||||
# See https://www.inb.uni-luebeck.de/fileadmin/files/PUBPDFS/TiBa11b.pdf
|
||||
# and https://thume.ca/projects/2012/11/04/simple-accurate-eye-center-tracking-in-opencv/
|
||||
#
|
||||
# Mostly copied from from https://github.com/Kryolyz/Pupil_Tracker/blob/master/Pupil_Tracker.py
|
||||
@staticmethod
|
||||
@jit(nopython=True, cache=True, fastmath=True)
|
||||
def _gradx(pic, grad, mean, std):
|
||||
if len(grad[:, :]) > 0:
|
||||
for y in range(len(grad[:, 0])):
|
||||
for x in range(len(grad[0, :]) - 2):
|
||||
grad[y, x+1] = (float(pic[y, x+2]) - float(pic[y, x])) / 2.0
|
||||
|
||||
mean = np.mean(grad)
|
||||
std = np.std(grad)
|
||||
|
||||
for y in range(len(grad[:, 0])):
|
||||
for x in range(len(grad[0, :])):
|
||||
if grad[y, x] < 0.3 * mean + std and grad[y, x] > - 0.3 * mean - std:
|
||||
grad[y, x] = 0
|
||||
if grad[y, x] > 0:
|
||||
grad[y, x] = 1
|
||||
elif grad[y, x] < 0:
|
||||
grad[y, x] = -1
|
||||
return grad
|
||||
|
||||
@staticmethod
|
||||
@jit(nopython=True, cache=True, fastmath=True)
|
||||
def _evaluate(x, y, gradix, gradiy, func):
|
||||
if len(gradix[:, :]) > 0:
|
||||
for cy in range(len(gradix[:, 0])):
|
||||
for cx in range(len(gradix[0, :])):
|
||||
if y != cy and x != cx:
|
||||
dy = float(cy - y)
|
||||
dx = float(cx - x)
|
||||
norm = np.linalg.norm(np.array([dx, dy]))
|
||||
dy = dy/norm
|
||||
dx = dx/norm
|
||||
func[cy, cx] = gradiy[cy, cx] * dy + gradix[cy, cx] * dx
|
||||
if func[cy, cx] < 0:
|
||||
func[cy, cx] = 0
|
||||
return np.mean(func)
|
||||
|
||||
def _find_pupils(self, frame):
|
||||
mean = 0
|
||||
std = 0
|
||||
|
||||
gradix = np.zeros_like(frame, dtype=float)
|
||||
self._gradx(frame, gradix, mean, std)
|
||||
|
||||
gradiy = np.zeros_like(np.transpose(frame), dtype=float)
|
||||
self._gradx(np.transpose(frame), gradiy, mean, std)
|
||||
gradiy = np.transpose(gradiy)
|
||||
|
||||
func = np.zeros_like(frame, dtype=float)
|
||||
means = np.zeros_like(frame, dtype=float)
|
||||
y = int(self.height / 2)
|
||||
x = int(self.width / 2)
|
||||
iterations = 0
|
||||
|
||||
while iterations < self._max_iterations:
|
||||
ymin = max(y-self._ascent_distance, 0)
|
||||
ymax = min(y+self._ascent_distance, self.height)
|
||||
xmin = max(x-self._ascent_distance, 0)
|
||||
xmax = min(x+self._ascent_distance, self.width)
|
||||
|
||||
should_continue = 0
|
||||
for i in np.arange(ymin, ymax):
|
||||
for j in np.arange(xmin, xmax):
|
||||
if means[i, j] < 10:
|
||||
means[i, j] = (255 - frame[i, j]) * self._evaluate(j, i, gradix, gradiy, func)
|
||||
if means[i, j] > means[y, x]:
|
||||
should_continue = 1
|
||||
y = i
|
||||
x = j
|
||||
iterations += 1
|
||||
if should_continue == 0:
|
||||
break
|
||||
|
||||
return x, y
|
||||
|
||||
|
||||
class GazeTracker:
|
||||
def __init__(self, margin):
|
||||
self.calibration = Calibration()
|
||||
self.margin = margin
|
||||
|
||||
def get_gaze(self, frame, landmarks):
|
||||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
eye_left = Eye(frame, landmarks, self.margin, 0, self.calibration)
|
||||
eye_right = Eye(frame, landmarks, self.margin, 1, self.calibration)
|
||||
try:
|
||||
left_horz_ratio = eye_left.pupil.x / (eye_left.center[0] * 2 - (self.margin//2))
|
||||
left_vert_ratio = eye_left.pupil.y / (eye_left.center[1] * 2 - (self.margin//2))
|
||||
right_horz_ratio = eye_right.pupil.x / (eye_right.center[0] * 2 - (self.margin//2))
|
||||
right_vert_ratio = eye_right.pupil.y / (eye_right.center[1] * 2 - (self.margin//2))
|
||||
except TypeError:
|
||||
return None
|
||||
eye_left = Eye(frame, landmarks, self.margin, Eye.SIDE.LEFT)
|
||||
eye_right = Eye(frame, landmarks, self.margin, Eye.SIDE.RIGHT)
|
||||
|
||||
horz_average = (left_horz_ratio + right_horz_ratio) / 2
|
||||
vert_average = (left_vert_ratio + right_vert_ratio) / 2
|
||||
# left_debug = left_eye.frame.copy()
|
||||
# left_x, left_y = left_eye.pupil_pos
|
||||
# if left_x and left_y:
|
||||
# color = (255, 0, 0)
|
||||
# cv2.line(left_debug, (left_x - 5, left_y), (left_x + 5, left_y), color)
|
||||
# cv2.line(left_debug, (left_x, left_y - 5), (left_x, left_y + 5), color)
|
||||
# cv2.imshow('eye test l', left_debug)
|
||||
#
|
||||
# right_debug = right_eye.frame.copy()
|
||||
# right_x, right_y = right_eye.pupil_pos
|
||||
# if right_x and right_y:
|
||||
# color = (255, 0, 0)
|
||||
# cv2.line(right_debug, (right_x - 5, right_y), (right_x + 5, right_y), color)
|
||||
# cv2.line(right_debug, (right_x, right_y - 5), (right_x, right_y + 5), color)
|
||||
# cv2.imshow('eye test r', right_debug)
|
||||
#
|
||||
# try:
|
||||
# left_horz_ratio = eye_left.pupil[0] / (eye_left.center[0] * 2 - (self.margin//2))
|
||||
# left_vert_ratio = eye_left.pupil[1] / (eye_left.center[1] * 2 - (self.margin//2))
|
||||
# right_horz_ratio = eye_right.pupil[0] / (eye_right.center[0] * 2 - (self.margin//2))
|
||||
# right_vert_ratio = eye_right.pupil[1] / (eye_right.center[1] * 2 - (self.margin//2))
|
||||
# except TypeError:
|
||||
# return None
|
||||
#
|
||||
# horz_average = (left_horz_ratio + right_horz_ratio) / 2
|
||||
# vert_average = (left_vert_ratio + right_vert_ratio) / 2
|
||||
|
||||
return (eye_left.origin[0] + eye_left.pupil.x, eye_left.origin[1] + eye_left.pupil.y, eye_left.blinking), \
|
||||
(eye_right.origin[0] + eye_right.pupil.x, eye_right.origin[1] + eye_right.pupil.y, eye_right.blinking), \
|
||||
horz_average, vert_average
|
||||
# return (eye_left.origin[0] + eye_left.pupil[0], eye_left.origin[1] + eye_left.pupil[1], eye_left.blinking), \
|
||||
# (eye_right.origin[0] + eye_right.pupil[0], eye_right.origin[1] + eye_right.pupil[1], eye_right.blinking), \
|
||||
# horz_average, vert_average
|
||||
|
||||
return eye_left.blinking, eye_right.blinking
|
||||
|
||||
|
||||
class FaceDetector(Thread):
|
||||
|
@ -68,8 +271,19 @@ class FaceDetector(Thread):
|
|||
face_box = self.find_face(newest_frame)
|
||||
if face_box is None:
|
||||
continue
|
||||
|
||||
# cropped = newest_frame.copy()[face_box[1]:face_box[3], face_box[0]:face_box[2]]
|
||||
# print(face_box)
|
||||
# print(newest_frame.shape)
|
||||
# print(cropped.shape)
|
||||
# cv2.imshow('crop', cropped)
|
||||
# cv2.imshow('old', newest_frame)
|
||||
# ch = cv2.waitKey(1)
|
||||
# if ch == 27:
|
||||
# break
|
||||
|
||||
landmarks = self.get_landmarks(newest_frame, face_box)
|
||||
if face_box is None:
|
||||
if landmarks is None:
|
||||
continue
|
||||
self._track_queue.put((landmarks, newest_frame))
|
||||
self._track_queue.join()
|
||||
|
@ -160,7 +374,11 @@ class Interpolator(Thread):
|
|||
|
||||
|
||||
class FaceTracker3000:
|
||||
def __init__(self, fps, width, height, bufferlen, remote_addr):
|
||||
class CODEC(Enum):
|
||||
# what the fuck are these constants
|
||||
MJPEG = 1196444237.0
|
||||
|
||||
def __init__(self, cam_index, codec, fps, width, height, bufferlen, remote_addr):
|
||||
self.frame_queue = Queue(maxsize=bufferlen)
|
||||
self.track_queue = Queue()
|
||||
self.output_queue = Queue(maxsize=bufferlen)
|
||||
|
@ -168,16 +386,20 @@ class FaceTracker3000:
|
|||
self.remote_addr = remote_addr
|
||||
|
||||
# Setup
|
||||
self._cap = cv2.VideoCapture(0, cv2.CAP_V4L)
|
||||
self._cap = cv2.VideoCapture(cam_index, cv2.CAP_V4L)
|
||||
self._cap.set(cv2.CAP_PROP_FOURCC, codec.value)
|
||||
self._cap.set(cv2.CAP_PROP_FPS, fps)
|
||||
self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
||||
self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
||||
|
||||
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._cap.set(cv2.CAP_PROP_SHARPNESS, 0)
|
||||
self._cap.set(cv2.CAP_PROP_GAMMA, 106)
|
||||
|
||||
if remote_addr:
|
||||
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
self._pose_estimator = PoseEstimator(img_size=(height, width))
|
||||
# TODO: Margin should be configurable
|
||||
self._gaze_estimator = GazeTracker(10)
|
||||
self._gaze_estimator = GazeTracker(8)
|
||||
|
||||
# ml thread
|
||||
self._face_detector_thread = FaceDetector(self.frame_queue, self.track_queue)
|
||||
|
@ -187,10 +409,19 @@ class FaceTracker3000:
|
|||
self._interpolator_thread = Interpolator(self.frame_queue, self.track_queue, self.output_queue)
|
||||
|
||||
def run(self):
|
||||
# Initalize some variables
|
||||
mar = 0
|
||||
mdst = 0
|
||||
volume = 0
|
||||
look_horiz, look_vert = (0.5,) * 2
|
||||
left_blink, right_blink = (1,) * 2
|
||||
left_x, left_y, right_x, right_y = (None,) * 4
|
||||
|
||||
# Start our other threads (will read from our queues)
|
||||
self._face_detector_thread.start()
|
||||
self._interpolator_thread.start()
|
||||
# self._socket.connect(self.remote_addr)
|
||||
if self.remote_addr:
|
||||
self._socket.connect(self.remote_addr)
|
||||
try:
|
||||
while True:
|
||||
# get frame
|
||||
|
@ -215,34 +446,34 @@ class FaceTracker3000:
|
|||
|
||||
track_results = self._gaze_estimator.get_gaze(frame, landmarks)
|
||||
if track_results is not None:
|
||||
(left_x, left_y, left_blink), (right_x, right_y, right_blink), look_horiz, look_vert = track_results
|
||||
# (left_x, left_y, left_blink), (right_x, right_y, right_blink), look_horiz, look_vert = track_results
|
||||
left_blink, right_blink = track_results
|
||||
|
||||
mar = 0
|
||||
mdst = 0
|
||||
volume = 0
|
||||
mar = self.mouth_aspect_ratio(landmarks)
|
||||
# mdst = self.mouth_distance(landmarks, facebox)
|
||||
|
||||
msg = '%.4f %.4f %.4f %.4f %.4f %.4f %.4f'% \
|
||||
(roll, pitch, yaw, look_horiz, look_vert, left_blink, right_blink)
|
||||
print(msg)
|
||||
# self._socket.send(bytes(msg, "utf-8"))
|
||||
msg = '%.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f' % \
|
||||
(roll, pitch, yaw, look_horiz, look_vert, left_blink, right_blink, mar, mdst)
|
||||
if self.remote_addr:
|
||||
self._socket.send(bytes(msg, "utf-8"))
|
||||
|
||||
mark_color = (255, 255, 255) if interpolated else (0, 255, 0)
|
||||
for mark in landmarks:
|
||||
cv2.circle(frame, (int(mark[0]), int(mark[1])), 1, mark_color, -1, cv2.LINE_AA)
|
||||
|
||||
if left_x and left_y and right_x and right_y:
|
||||
color = (255, 0, 0)
|
||||
cv2.line(frame, (left_x - 5, left_y), (left_x + 5, left_y), color)
|
||||
cv2.line(frame, (left_x, left_y - 5), (left_x, left_y + 5), color)
|
||||
cv2.line(frame, (right_x - 5, right_y), (right_x + 5, right_y), color)
|
||||
cv2.line(frame, (right_x, right_y - 5), (right_x, right_y + 5), color)
|
||||
|
||||
self._pose_estimator.draw_annotation_box(frame, rotation_vector, translation_vector)
|
||||
|
||||
cv2.imshow('tracking debug', frame)
|
||||
ch = cv2.waitKey(1)
|
||||
if ch == 27:
|
||||
break
|
||||
# mark_color = (255, 255, 255) if interpolated else (0, 255, 0)
|
||||
# for mark in landmarks:
|
||||
# cv2.circle(frame, (int(mark[0]), int(mark[1])), 1, mark_color, -1, cv2.LINE_AA)
|
||||
#
|
||||
# if left_x and left_y and right_x and right_y:
|
||||
# color = (255, 0, 0)
|
||||
# cv2.line(frame, (left_x - 5, left_y), (left_x + 5, left_y), color)
|
||||
# cv2.line(frame, (left_x, left_y - 5), (left_x, left_y + 5), color)
|
||||
# cv2.line(frame, (right_x - 5, right_y), (right_x + 5, right_y), color)
|
||||
# cv2.line(frame, (right_x, right_y - 5), (right_x, right_y + 5), color)
|
||||
#
|
||||
# self._pose_estimator.draw_annotation_box(frame, rotation_vector, translation_vector)
|
||||
#
|
||||
# cv2.imshow('tracking debug', frame)
|
||||
# ch = cv2.waitKey(1)
|
||||
# if ch == 27:
|
||||
# break
|
||||
|
||||
# TODO: Doesnt exit cleanly, getting an error message / python is having to kill the daemon thread
|
||||
except Exception as e:
|
||||
|
@ -257,8 +488,21 @@ class FaceTracker3000:
|
|||
finally:
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
@staticmethod
|
||||
def mouth_distance(landmarks, facebox):
|
||||
mouth = landmarks[60:68]
|
||||
return np.linalg.norm(mouth[0]-mouth[4]) / (facebox[2]-facebox[0])
|
||||
|
||||
@staticmethod
|
||||
def mouth_aspect_ratio(landmarks):
|
||||
mouth = landmarks[60:68]
|
||||
mar = np.linalg.norm(mouth[1]-mouth[7]) + np.linalg.norm(mouth[2]-mouth[6]) + np.linalg.norm(mouth[3]-mouth[5])
|
||||
mar /= (2 * np.linalg.norm(mouth[0]-mouth[4]) + 1e-6)
|
||||
return mar
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# TODO: should come from a configuration file
|
||||
ft = FaceTracker3000(60, 640, 480, 30, ('127.0.0.1', 5066))
|
||||
# ft = FaceTracker3000(0, 60, 640, 480, 30, ('127.0.0.1', 5066))
|
||||
ft = FaceTracker3000(0, FaceTracker3000.CODEC.MJPEG, 30, 1920, 1080, 30, ('127.0.0.1', 5066))
|
||||
ft.run()
|
||||
|
|
Loading…
Add table
Reference in a new issue