Update workouts_monitoring solution (#16706)
Co-authored-by: UltralyticsAssistant <web@ultralytics.com> Co-authored-by: Glenn Jocher <glenn.jocher@ultralytics.com>
This commit is contained in:
parent
c17ddcdf70
commit
73e6861d95
7 changed files with 162 additions and 245 deletions
|
|
@ -1,127 +1,79 @@
|
|||
# Ultralytics YOLO 🚀, AGPL-3.0 license
|
||||
|
||||
import cv2
|
||||
|
||||
from ultralytics.utils.checks import check_imshow
|
||||
from ultralytics.solutions.solutions import BaseSolution # Import a parent class
|
||||
from ultralytics.utils.plotting import Annotator
|
||||
|
||||
|
||||
class AIGym:
|
||||
class AIGym(BaseSolution):
|
||||
"""A class to manage the gym steps of people in a real-time video stream based on their poses."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
kpts_to_check,
|
||||
line_thickness=2,
|
||||
view_img=False,
|
||||
pose_up_angle=145.0,
|
||||
pose_down_angle=90.0,
|
||||
pose_type="pullup",
|
||||
):
|
||||
def __init__(self, **kwargs):
|
||||
"""Initialization function for AiGYM class, a child class of BaseSolution class, can be used for workouts
|
||||
monitoring.
|
||||
"""
|
||||
Initializes the AIGym class with the specified parameters.
|
||||
# Check if the model name ends with '-pose'
|
||||
if "model" in kwargs and "-pose" not in kwargs["model"]:
|
||||
kwargs["model"] = "yolo11n-pose.pt"
|
||||
elif "model" not in kwargs:
|
||||
kwargs["model"] = "yolo11n-pose.pt"
|
||||
|
||||
super().__init__(**kwargs)
|
||||
self.count = [] # List for counts, necessary where there are multiple objects in frame
|
||||
self.angle = [] # List for angle, necessary where there are multiple objects in frame
|
||||
self.stage = [] # List for stage, necessary where there are multiple objects in frame
|
||||
|
||||
# Extract details from CFG single time for usage later
|
||||
self.initial_stage = None
|
||||
self.up_angle = float(self.CFG["up_angle"]) # Pose up predefined angle to consider up pose
|
||||
self.down_angle = float(self.CFG["down_angle"]) # Pose down predefined angle to consider down pose
|
||||
self.kpts = self.CFG["kpts"] # User selected kpts of workouts storage for further usage
|
||||
self.lw = self.CFG["line_width"] # Store line_width for usage
|
||||
|
||||
def monitor(self, im0):
|
||||
"""
|
||||
Monitor the workouts using Ultralytics YOLOv8 Pose Model: https://docs.ultralytics.com/tasks/pose/.
|
||||
|
||||
Args:
|
||||
kpts_to_check (list): Indices of keypoints to check.
|
||||
line_thickness (int, optional): Thickness of the lines drawn. Defaults to 2.
|
||||
view_img (bool, optional): Flag to display the image. Defaults to False.
|
||||
pose_up_angle (float, optional): Angle threshold for the 'up' pose. Defaults to 145.0.
|
||||
pose_down_angle (float, optional): Angle threshold for the 'down' pose. Defaults to 90.0.
|
||||
pose_type (str, optional): Type of pose to detect ('pullup', 'pushup', 'abworkout'). Defaults to "pullup".
|
||||
im0 (ndarray): The input image that will be used for processing
|
||||
Returns
|
||||
im0 (ndarray): The processed image for more usage
|
||||
"""
|
||||
# Image and line thickness
|
||||
self.im0 = None
|
||||
self.tf = line_thickness
|
||||
# Extract tracks
|
||||
tracks = self.model.track(source=im0, persist=True, classes=self.CFG["classes"])[0]
|
||||
|
||||
# Keypoints and count information
|
||||
self.keypoints = None
|
||||
self.poseup_angle = pose_up_angle
|
||||
self.posedown_angle = pose_down_angle
|
||||
self.threshold = 0.001
|
||||
if tracks.boxes.id is not None:
|
||||
# Extract and check keypoints
|
||||
if len(tracks) > len(self.count):
|
||||
new_human = len(tracks) - len(self.count)
|
||||
self.angle += [0] * new_human
|
||||
self.count += [0] * new_human
|
||||
self.stage += ["-"] * new_human
|
||||
|
||||
# Store stage, count and angle information
|
||||
self.angle = None
|
||||
self.count = None
|
||||
self.stage = None
|
||||
self.pose_type = pose_type
|
||||
self.kpts_to_check = kpts_to_check
|
||||
# Initialize annotator
|
||||
self.annotator = Annotator(im0, line_width=self.lw)
|
||||
|
||||
# Visual Information
|
||||
self.view_img = view_img
|
||||
self.annotator = None
|
||||
# Enumerate over keypoints
|
||||
for ind, k in enumerate(reversed(tracks.keypoints.data)):
|
||||
# Get keypoints and estimate the angle
|
||||
kpts = [k[int(self.kpts[i])].cpu() for i in range(3)]
|
||||
self.angle[ind] = self.annotator.estimate_pose_angle(*kpts)
|
||||
im0 = self.annotator.draw_specific_points(k, self.kpts, radius=self.lw * 3)
|
||||
|
||||
# Check if environment supports imshow
|
||||
self.env_check = check_imshow(warn=True)
|
||||
self.count = []
|
||||
self.angle = []
|
||||
self.stage = []
|
||||
|
||||
def start_counting(self, im0, results):
|
||||
"""
|
||||
Function used to count the gym steps.
|
||||
|
||||
Args:
|
||||
im0 (ndarray): Current frame from the video stream.
|
||||
results (list): Pose estimation data.
|
||||
"""
|
||||
self.im0 = im0
|
||||
|
||||
if not len(results[0]):
|
||||
return self.im0
|
||||
|
||||
if len(results[0]) > len(self.count):
|
||||
new_human = len(results[0]) - len(self.count)
|
||||
self.count += [0] * new_human
|
||||
self.angle += [0] * new_human
|
||||
self.stage += ["-"] * new_human
|
||||
|
||||
self.keypoints = results[0].keypoints.data
|
||||
self.annotator = Annotator(im0, line_width=self.tf)
|
||||
|
||||
for ind, k in enumerate(reversed(self.keypoints)):
|
||||
# Estimate angle and draw specific points based on pose type
|
||||
if self.pose_type in {"pushup", "pullup", "abworkout", "squat"}:
|
||||
self.angle[ind] = self.annotator.estimate_pose_angle(
|
||||
k[int(self.kpts_to_check[0])].cpu(),
|
||||
k[int(self.kpts_to_check[1])].cpu(),
|
||||
k[int(self.kpts_to_check[2])].cpu(),
|
||||
)
|
||||
self.im0 = self.annotator.draw_specific_points(k, self.kpts_to_check, shape=(640, 640), radius=10)
|
||||
|
||||
# Check and update pose stages and counts based on angle
|
||||
if self.pose_type in {"abworkout", "pullup"}:
|
||||
if self.angle[ind] > self.poseup_angle:
|
||||
self.stage[ind] = "down"
|
||||
if self.angle[ind] < self.posedown_angle and self.stage[ind] == "down":
|
||||
self.stage[ind] = "up"
|
||||
self.count[ind] += 1
|
||||
|
||||
elif self.pose_type in {"pushup", "squat"}:
|
||||
if self.angle[ind] > self.poseup_angle:
|
||||
self.stage[ind] = "up"
|
||||
if self.angle[ind] < self.posedown_angle and self.stage[ind] == "up":
|
||||
self.stage[ind] = "down"
|
||||
# Determine stage and count logic based on angle thresholds
|
||||
if self.angle[ind] < self.down_angle:
|
||||
if self.stage[ind] == "up":
|
||||
self.count[ind] += 1
|
||||
self.stage[ind] = "down"
|
||||
elif self.angle[ind] > self.up_angle:
|
||||
self.stage[ind] = "up"
|
||||
|
||||
# Display angle, count, and stage text
|
||||
self.annotator.plot_angle_and_count_and_stage(
|
||||
angle_text=self.angle[ind],
|
||||
count_text=self.count[ind],
|
||||
stage_text=self.stage[ind],
|
||||
center_kpt=k[int(self.kpts_to_check[1])],
|
||||
angle_text=self.angle[ind], # angle text for display
|
||||
count_text=self.count[ind], # count text for workouts
|
||||
stage_text=self.stage[ind], # stage position text
|
||||
center_kpt=k[int(self.kpts[1])], # center keypoint for display
|
||||
)
|
||||
|
||||
# Draw keypoints
|
||||
self.annotator.kpts(k, shape=(640, 640), radius=1, kpt_line=True)
|
||||
|
||||
# Display the image if environment supports it and view_img is True
|
||||
if self.env_check and self.view_img:
|
||||
cv2.imshow("Ultralytics YOLOv8 AI GYM", self.im0)
|
||||
if cv2.waitKey(1) & 0xFF == ord("q"):
|
||||
return
|
||||
|
||||
return self.im0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
kpts_to_check = [0, 1, 2] # example keypoints
|
||||
aigym = AIGym(kpts_to_check)
|
||||
self.display_output(im0) # Display output image, if environment support display
|
||||
return im0 # return an image for writing or further usage
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue