Compare commits
5 Commits
master
...
efficientd
Author | SHA1 | Date |
---|---|---|
yinguobing | 48de8cfd8e | |
yinguobing | 0a6bac609e | |
yinguobing | 1a3eaaf464 | |
yinguobing | ecb46c7e14 | |
yinguobing | ec62053728 |
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -1,16 +1,23 @@
|
|||
"""Demo code shows how to estimate human head pose.
|
||||
Currently, human face is detected by a detector from an OpenCV DNN module.
|
||||
Then the face box is modified a little to suits the need of landmark
|
||||
detection. The facial landmark detection is done by a custom Convolutional
|
||||
Neural Network trained with TensorFlow. After that, head pose is estimated
|
||||
by solving a PnP problem.
|
||||
|
||||
Three major steps for this code.
|
||||
|
||||
Step 1: face detection. The human faces are detected by a deep learning face
|
||||
face_detector .Then the face boxes are modified a little to suits the need of
|
||||
landmark detection.
|
||||
Step 2: facial landmark detection. This is done by a custom Convolutional
|
||||
Neural Network trained with TensorFlow.
|
||||
Step 3: head pose estimation. The pose is estimated by solving a PnP problem.
|
||||
|
||||
All models and training code are available at: https://github.com/yinguobing/head-pose-estimation
|
||||
"""
|
||||
from argparse import ArgumentParser
|
||||
from multiprocessing import Process, Queue
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
|
||||
from face_detector import FaceDetector
|
||||
from mark_detector import MarkDetector
|
||||
from os_detector import detect_os
|
||||
from pose_estimator import PoseEstimator
|
||||
|
@ -18,12 +25,11 @@ from stabilizer import Stabilizer
|
|||
|
||||
print("OpenCV version: {}".format(cv2.__version__))
|
||||
|
||||
# multiprocessing may not work on Windows and macOS, check OS for safety.
|
||||
detect_os()
|
||||
devices = tf.config.list_physical_devices('GPU')
|
||||
for device in devices:
|
||||
tf.config.experimental.set_memory_growth(device, True)
|
||||
|
||||
CNN_INPUT_SIZE = 128
|
||||
|
||||
# Take arguments from user input.
|
||||
# Parse arguments from user inputs.
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("--video", type=str, default=None,
|
||||
help="Video file to be processed.")
|
||||
|
@ -32,41 +38,36 @@ parser.add_argument("--cam", type=int, default=None,
|
|||
args = parser.parse_args()
|
||||
|
||||
|
||||
def get_face(detector, img_queue, box_queue):
|
||||
"""Get face from image queue. This function is used for multiprocessing"""
|
||||
while True:
|
||||
image = img_queue.get()
|
||||
box = detector.extract_cnn_facebox(image)
|
||||
box_queue.put(box)
|
||||
|
||||
|
||||
def main():
|
||||
"""MAIN"""
|
||||
# Video source from webcam or video file.
|
||||
"""Run human head pose estimation from video files."""
|
||||
|
||||
# What is the threshold value for face detection.
|
||||
threshold = 0.5
|
||||
|
||||
# Setup the video source. If no video file provided, the default webcam will
|
||||
# be used.
|
||||
video_src = args.cam if args.cam is not None else args.video
|
||||
if video_src is None:
|
||||
print("Warning: video source not assigned, default webcam will be used.")
|
||||
video_src = 0
|
||||
|
||||
cap = cv2.VideoCapture(video_src)
|
||||
|
||||
# If reading frames from a webcam, try setting the camera resolution.
|
||||
if video_src == 0:
|
||||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
|
||||
_, sample_frame = cap.read()
|
||||
|
||||
# Introduce mark_detector to detect landmarks.
|
||||
mark_detector = MarkDetector()
|
||||
# Get the real frame resolution.
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
|
||||
# Setup process and queues for multiprocessing.
|
||||
img_queue = Queue()
|
||||
box_queue = Queue()
|
||||
img_queue.put(sample_frame)
|
||||
box_process = Process(target=get_face, args=(
|
||||
mark_detector, img_queue, box_queue,))
|
||||
box_process.start()
|
||||
# Introduce a mark_face_detector to detect face marks.
|
||||
detector_mark = MarkDetector("assets/mark_model")
|
||||
|
||||
# Introduce pose estimator to solve pose. Get one frame to setup the
|
||||
# estimator according to the image size.
|
||||
height, width = sample_frame.shape[:2]
|
||||
# Introduce a face face_detector to detect human faces.
|
||||
detector_face = FaceDetector("assets/face_model")
|
||||
|
||||
# Introduce pose estimator to solve pose.
|
||||
pose_estimator = PoseEstimator(img_size=(height, width))
|
||||
|
||||
# Introduce scalar stabilizers for pose.
|
||||
|
@ -76,9 +77,13 @@ def main():
|
|||
cov_process=0.1,
|
||||
cov_measure=0.1) for _ in range(6)]
|
||||
|
||||
# Introduce a metter to measure the FPS.
|
||||
tm = cv2.TickMeter()
|
||||
|
||||
# Loop through the video frames.
|
||||
while True:
|
||||
tm.start()
|
||||
|
||||
# Read frame, crop it, flip it, suits your needs.
|
||||
frame_got, frame = cap.read()
|
||||
if frame_got is False:
|
||||
|
@ -91,38 +96,40 @@ def main():
|
|||
if video_src == 0:
|
||||
frame = cv2.flip(frame, 2)
|
||||
|
||||
# Pose estimation by 3 steps:
|
||||
# 1. detect face;
|
||||
# 2. detect landmarks;
|
||||
# 3. estimate pose
|
||||
# Preprocess the input image.
|
||||
_image = detector_face.preprocess(frame)
|
||||
|
||||
# Feed frame to image queue.
|
||||
img_queue.put(frame)
|
||||
# Run the model
|
||||
boxes, scores, classes = detector_face.predict(_image, threshold)
|
||||
|
||||
# Get face from box queue.
|
||||
facebox = box_queue.get()
|
||||
# Transform the boxes into squares.
|
||||
boxes = detector_face.transform_to_square(
|
||||
boxes, scale=1.22, offset=(0, 0.13))
|
||||
|
||||
if facebox is not None:
|
||||
# Detect landmarks from image of 128x128.
|
||||
face_img = frame[facebox[1]: facebox[3],
|
||||
facebox[0]: facebox[2]]
|
||||
face_img = cv2.resize(face_img, (CNN_INPUT_SIZE, CNN_INPUT_SIZE))
|
||||
face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
|
||||
# Clip the boxes if they cross the image boundaries.
|
||||
boxes, _ = detector_face.clip_boxes(boxes, (0, 0, height, width))
|
||||
|
||||
tm.start()
|
||||
marks = mark_detector.detect_marks(face_img)
|
||||
tm.stop()
|
||||
# Detect facial marks.
|
||||
if boxes.size > 0:
|
||||
# Get one face image.
|
||||
facebox = boxes[0]
|
||||
top, left, bottom, right = [int(x) for x in facebox]
|
||||
face_image = frame[top:bottom, left:right]
|
||||
|
||||
# Run detection.
|
||||
face_image = detector_mark.preprocess(face_image)
|
||||
marks = detector_mark.predict(face_image)
|
||||
|
||||
# Convert the marks locations from local CNN to global image.
|
||||
marks *= (facebox[2] - facebox[0])
|
||||
marks[:, 0] += facebox[0]
|
||||
marks[:, 1] += facebox[1]
|
||||
|
||||
# Uncomment following line to show raw marks.
|
||||
# mark_detector.draw_marks(frame, marks, color=(0, 255, 0))
|
||||
marks *= (right - left)
|
||||
marks[:, 0] += left
|
||||
marks[:, 1] += top
|
||||
|
||||
# Uncomment following line to show facebox.
|
||||
# mark_detector.draw_box(frame, [facebox])
|
||||
# detector_face.draw_box(frame, facebox, scores[0])
|
||||
|
||||
# Uncomment following line to show raw marks.
|
||||
detector_mark.draw_marks(frame, marks, color=(0, 255, 0))
|
||||
|
||||
# Try pose estimation with 68 points.
|
||||
pose = pose_estimator.solve_pose_by_68_points(marks)
|
||||
|
@ -141,20 +148,22 @@ def main():
|
|||
|
||||
# Uncomment following line to draw stabile pose annotation on frame.
|
||||
pose_estimator.draw_annotation_box(
|
||||
frame, steady_pose[0], steady_pose[1], color=(128, 255, 128))
|
||||
frame, steady_pose[0], steady_pose[1], color=(128, 255, 128))
|
||||
|
||||
# Uncomment following line to draw head axes on frame.
|
||||
# pose_estimator.draw_axes(frame, steady_pose[0], steady_pose[1])
|
||||
pose_estimator.draw_axes(frame, steady_pose[0], steady_pose[1])
|
||||
|
||||
tm.stop()
|
||||
|
||||
# Draw FPS on the screen's top left corner.
|
||||
cv2.putText(frame, "FPS: {:.0f}".format(tm.getFPS()), (24, 24),
|
||||
cv2.FONT_HERSHEY_DUPLEX, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
|
||||
|
||||
# Show preview.
|
||||
cv2.imshow("Preview", frame)
|
||||
if cv2.waitKey(10) == 27:
|
||||
if cv2.waitKey(1) == 27:
|
||||
break
|
||||
|
||||
# Clean up the multiprocessing process.
|
||||
box_process.terminate()
|
||||
box_process.join()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
"""Object detector based on EfficientDet model.
|
||||
|
||||
This module supports inferenceing with the official EfficientDet model.
|
||||
For more details: https://github.com/yinguobing/efficientdet-runner
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
class FaceDetector(object):
|
||||
"""Mini module to run the EfficientDet model."""
|
||||
|
||||
def __init__(self, saved_model):
|
||||
"""Build an EfficientDet model runner.
|
||||
|
||||
Args:
|
||||
saved_model: the string path to the SavedModel.
|
||||
"""
|
||||
self.scale_width = 0
|
||||
self.scale_height = 0
|
||||
self.input_size = 512
|
||||
|
||||
# Load the SavedModel object.
|
||||
imported = tf.saved_model.load(saved_model)
|
||||
self._predict_fn = imported.signatures["serving_default"]
|
||||
|
||||
# To avoid garbage collected by Python, see TensorFlow issue:37615
|
||||
self._predict_fn._backref_to_saved_model = imported
|
||||
|
||||
def preprocess(self, image):
|
||||
"""Preprocess the input image."""
|
||||
|
||||
# Scale the image first.
|
||||
height, width, _ = image.shape
|
||||
self.ratio = self.input_size / max(height, width)
|
||||
image = cv2.resize(
|
||||
image, (int(self.ratio * width), int(self.ratio * height)))
|
||||
|
||||
# Convert to RGB.
|
||||
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# Then pad the image to input size.
|
||||
self.padding_h = self.input_size - int(self.ratio * width)
|
||||
self.padding_v = self.input_size - int(self.ratio * height)
|
||||
image = cv2.copyMakeBorder(
|
||||
image, 0, self.padding_v, 0, self.padding_h, cv2.BORDER_CONSTANT, (0, 0, 0))
|
||||
|
||||
return image
|
||||
|
||||
def __filter(self, detections, threshold):
|
||||
"""Filter the detection results by score threshold."""
|
||||
# Get the detection results.
|
||||
boxes = detections['output_0'].numpy()[0]
|
||||
scores = detections['output_1'].numpy()[0]
|
||||
classes = detections['output_2'].numpy()[0]
|
||||
|
||||
# Filter out the results by score threshold.
|
||||
mask = scores > threshold
|
||||
boxes = boxes[mask]
|
||||
scores = scores[mask]
|
||||
classes = classes[mask]
|
||||
|
||||
return boxes, scores, classes
|
||||
|
||||
@tf.function
|
||||
def _predict(self, images):
|
||||
return self._predict_fn(images)
|
||||
|
||||
def predict(self, image, threshold):
|
||||
"""Run inference with image inputs.
|
||||
|
||||
Args:
|
||||
image: a numpy array as an input image.
|
||||
|
||||
Returns:
|
||||
predictions: result.
|
||||
"""
|
||||
frame_tensor = tf.constant(image, dtype=tf.uint8)
|
||||
frame_tensor = tf.expand_dims(frame_tensor, axis=0)
|
||||
detections = self._predict(frame_tensor)
|
||||
boxes, scores, classes = self.__filter(detections, threshold)
|
||||
|
||||
# Scale the box back to the original image size.
|
||||
boxes /= self.ratio
|
||||
|
||||
# Crop out the padding area.
|
||||
boxes[:, 2] = np.minimum(
|
||||
boxes[:, 2], (self.input_size - self.padding_v)/self.ratio)
|
||||
boxes[:, 1] = np.minimum(
|
||||
boxes[:, 1], (self.input_size - self.padding_h)/self.ratio)
|
||||
|
||||
return boxes, scores, classes
|
||||
|
||||
def transform_to_square(self, boxes, scale=1.0, offset=(0, 0)):
|
||||
"""Get the square bounding boxes.
|
||||
|
||||
Args:
|
||||
boxes: input boxes [[ymin, xmin, ymax, xmax], ...]
|
||||
scale: ratio to scale the boxes
|
||||
offset: a tuple of offset to move the boxes (x, y)
|
||||
|
||||
Returns:
|
||||
square boxes.
|
||||
"""
|
||||
ymins, xmins, ymaxs, xmaxs = np.split(boxes, 4, 1)
|
||||
width = xmaxs - xmins
|
||||
height = ymaxs - ymins
|
||||
|
||||
# How much to move.
|
||||
offset_x = offset[0] * width
|
||||
offset_y = offset[1] * height
|
||||
|
||||
# Where is the center location.
|
||||
center_x = np.floor_divide(xmins + xmaxs, 2) + offset_x
|
||||
center_y = np.floor_divide(ymins + ymaxs, 2) + offset_y
|
||||
|
||||
# Make them squares.
|
||||
margin = np.floor_divide(np.maximum(height, width) * scale, 2)
|
||||
boxes = np.concatenate((center_y-margin, center_x-margin,
|
||||
center_y+margin, center_x+margin), axis=1)
|
||||
|
||||
return boxes
|
||||
|
||||
def clip_boxes(self, boxes, margins):
|
||||
"""Clip the boxes to the safe margins.
|
||||
|
||||
Args:
|
||||
boxes: input boxes [[ymin, xmin, ymax, xmax], ...].
|
||||
margins: a tuple of 4 int (top, left, bottom, right) as safe margins.
|
||||
|
||||
Returns:
|
||||
boxes: clipped boxes.
|
||||
clip_mark: the mark of clipped sides.
|
||||
"""
|
||||
top, left, bottom, right = margins
|
||||
|
||||
clip_mark = (boxes[:, 0] < top, boxes[:, 1] < left,
|
||||
boxes[:, 2] > bottom, boxes[:, 3] > right)
|
||||
|
||||
boxes[:, 0] = np.maximum(boxes[:, 0], top)
|
||||
boxes[:, 1] = np.maximum(boxes[:, 1], left)
|
||||
boxes[:, 2] = np.minimum(boxes[:, 2], bottom)
|
||||
boxes[:, 3] = np.minimum(boxes[:, 3], right)
|
||||
|
||||
return boxes, clip_mark
|
||||
|
||||
def draw_box(self, image, box, score, color=(0, 255, 0)):
|
||||
"""Draw the bounding box.
|
||||
|
||||
Args:
|
||||
boxes: the face box.
|
||||
color: the color of the box.
|
||||
scores: detection score.
|
||||
|
||||
"""
|
||||
y0, x0, y1, x1 = [int(b) for b in box]
|
||||
cv2.rectangle(image, (x0, y0), (x1, y1), (0, 255, 0), 2)
|
||||
cv2.putText(image, "Face:{:.2f}".format(score),
|
||||
(x0, y0-7), cv2.FONT_HERSHEY_DUPLEX, 0.5, color,
|
||||
1, cv2.LINE_AA)
|
159
mark_detector.py
159
mark_detector.py
|
@ -5,163 +5,42 @@ import tensorflow as tf
|
|||
from tensorflow import keras
|
||||
|
||||
|
||||
class FaceDetector:
|
||||
"""Detect human face from image"""
|
||||
|
||||
def __init__(self,
|
||||
dnn_proto_text='assets/deploy.prototxt',
|
||||
dnn_model='assets/res10_300x300_ssd_iter_140000.caffemodel'):
|
||||
"""Initialization"""
|
||||
self.face_net = cv2.dnn.readNetFromCaffe(dnn_proto_text, dnn_model)
|
||||
self.detection_result = None
|
||||
|
||||
def get_faceboxes(self, image, threshold=0.5):
|
||||
"""
|
||||
Get the bounding box of faces in image using dnn.
|
||||
"""
|
||||
rows, cols, _ = image.shape
|
||||
|
||||
confidences = []
|
||||
faceboxes = []
|
||||
|
||||
self.face_net.setInput(cv2.dnn.blobFromImage(
|
||||
image, 1.0, (300, 300), (104.0, 177.0, 123.0), False, False))
|
||||
detections = self.face_net.forward()
|
||||
|
||||
for result in detections[0, 0, :, :]:
|
||||
confidence = result[2]
|
||||
if confidence > threshold:
|
||||
x_left_bottom = int(result[3] * cols)
|
||||
y_left_bottom = int(result[4] * rows)
|
||||
x_right_top = int(result[5] * cols)
|
||||
y_right_top = int(result[6] * rows)
|
||||
confidences.append(confidence)
|
||||
faceboxes.append(
|
||||
[x_left_bottom, y_left_bottom, x_right_top, y_right_top])
|
||||
|
||||
self.detection_result = [faceboxes, confidences]
|
||||
|
||||
return confidences, faceboxes
|
||||
|
||||
def draw_all_result(self, image):
|
||||
"""Draw the detection result on image"""
|
||||
for facebox, conf in self.detection_result:
|
||||
cv2.rectangle(image, (facebox[0], facebox[1]),
|
||||
(facebox[2], facebox[3]), (0, 255, 0))
|
||||
label = "face: %.4f" % conf
|
||||
label_size, base_line = cv2.getTextSize(
|
||||
label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
|
||||
|
||||
cv2.rectangle(image, (facebox[0], facebox[1] - label_size[1]),
|
||||
(facebox[0] + label_size[0],
|
||||
facebox[1] + base_line),
|
||||
(0, 255, 0), cv2.FILLED)
|
||||
cv2.putText(image, label, (facebox[0], facebox[1]),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0))
|
||||
|
||||
|
||||
class MarkDetector:
|
||||
"""Facial landmark detector by Convolutional Neural Network"""
|
||||
|
||||
def __init__(self, saved_model='assets/pose_model'):
|
||||
def __init__(self, saved_model):
|
||||
"""Initialization"""
|
||||
# A face detector is required for mark detection.
|
||||
self.face_detector = FaceDetector()
|
||||
self.input_size = 128
|
||||
|
||||
self.cnn_input_size = 128
|
||||
self.marks = None
|
||||
# Load the SavedModel object.
|
||||
imported = tf.saved_model.load(saved_model)
|
||||
self._predict_fn = imported.signatures["serving_default"]
|
||||
self._predict_fn._backref_to_saved_model = imported
|
||||
|
||||
# Restore model from the saved_model file.
|
||||
self.model = keras.models.load_model(saved_model)
|
||||
def preprocess(self, image):
|
||||
"""Preprocess the input images."""
|
||||
image = cv2.resize(image, (self.input_size, self.input_size))
|
||||
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||
|
||||
@staticmethod
|
||||
def draw_box(image, boxes, box_color=(255, 255, 255)):
|
||||
"""Draw square boxes on image"""
|
||||
for box in boxes:
|
||||
cv2.rectangle(image,
|
||||
(box[0], box[1]),
|
||||
(box[2], box[3]), box_color, 3)
|
||||
return image
|
||||
|
||||
@staticmethod
|
||||
def move_box(box, offset):
|
||||
"""Move the box to direction specified by vector offset"""
|
||||
left_x = box[0] + offset[0]
|
||||
top_y = box[1] + offset[1]
|
||||
right_x = box[2] + offset[0]
|
||||
bottom_y = box[3] + offset[1]
|
||||
return [left_x, top_y, right_x, bottom_y]
|
||||
@tf.function
|
||||
def _predict(self, images):
|
||||
return self._predict_fn(image_input=images)
|
||||
|
||||
@staticmethod
|
||||
def get_square_box(box):
|
||||
"""Get a square box out of the given box, by expanding it."""
|
||||
left_x = box[0]
|
||||
top_y = box[1]
|
||||
right_x = box[2]
|
||||
bottom_y = box[3]
|
||||
|
||||
box_width = right_x - left_x
|
||||
box_height = bottom_y - top_y
|
||||
|
||||
# Check if box is already a square. If not, make it a square.
|
||||
diff = box_height - box_width
|
||||
delta = int(abs(diff) / 2)
|
||||
|
||||
if diff == 0: # Already a square.
|
||||
return box
|
||||
elif diff > 0: # Height > width, a slim box.
|
||||
left_x -= delta
|
||||
right_x += delta
|
||||
if diff % 2 == 1:
|
||||
right_x += 1
|
||||
else: # Width > height, a short box.
|
||||
top_y -= delta
|
||||
bottom_y += delta
|
||||
if diff % 2 == 1:
|
||||
bottom_y += 1
|
||||
|
||||
# Make sure box is always square.
|
||||
assert ((right_x - left_x) == (bottom_y - top_y)), 'Box is not square.'
|
||||
|
||||
return [left_x, top_y, right_x, bottom_y]
|
||||
|
||||
@staticmethod
|
||||
def box_in_image(box, image):
|
||||
"""Check if the box is in image"""
|
||||
rows = image.shape[0]
|
||||
cols = image.shape[1]
|
||||
return box[0] >= 0 and box[1] >= 0 and box[2] <= cols and box[3] <= rows
|
||||
|
||||
def extract_cnn_facebox(self, image):
|
||||
"""Extract face area from image."""
|
||||
_, raw_boxes = self.face_detector.get_faceboxes(
|
||||
image=image, threshold=0.9)
|
||||
|
||||
for box in raw_boxes:
|
||||
# Move box down.
|
||||
offset_y = int(abs((box[3] - box[1]) * 0.12))
|
||||
box_moved = self.move_box(box, [0, offset_y])
|
||||
|
||||
# Make box square.
|
||||
facebox = self.get_square_box(box_moved)
|
||||
|
||||
if self.box_in_image(facebox, image):
|
||||
return facebox
|
||||
|
||||
return None
|
||||
|
||||
def detect_marks(self, images):
|
||||
def predict(self, image):
|
||||
"""Detect marks from images"""
|
||||
|
||||
# # Actual detection.
|
||||
marks = self.model.predict(tf.expand_dims(images, axis=0))
|
||||
# Actual detection.
|
||||
marks = self._predict(tf.convert_to_tensor([image], dtype=tf.float32))
|
||||
|
||||
# Convert predictions to landmarks.
|
||||
marks = np.reshape(marks, (-1, 2))
|
||||
marks = np.reshape(marks['dense_1'].numpy(), (-1, 2))
|
||||
|
||||
return marks
|
||||
|
||||
@staticmethod
|
||||
def draw_marks(image, marks, color=(255, 255, 255)):
|
||||
def draw_marks(self, image, marks, color=(255, 255, 255)):
|
||||
"""Draw mark points on image"""
|
||||
for mark in marks:
|
||||
cv2.circle(image, (int(mark[0]), int(
|
||||
|
|
Loading…
Reference in New Issue