Docs cleanup and Google-style tracker docstrings (#6751)

Signed-off-by: Glenn Jocher <glenn.jocher@ultralytics.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Glenn Jocher 2023-12-03 04:12:33 +01:00 committed by GitHub
parent 60041014a8
commit 80802be1e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 740 additions and 529 deletions

View file

@ -1,4 +1,5 @@
# Ultralytics YOLO 🚀, AGPL-3.0 license
# This module defines the base classes and structures for object tracking in YOLO.
from collections import OrderedDict
@ -6,7 +7,15 @@ import numpy as np
class TrackState:
"""Enumeration of possible object tracking states."""
"""
Enumeration class representing the possible states of an object being tracked.
Attributes:
New (int): State when the object is newly detected.
Tracked (int): State when the object is successfully tracked in subsequent frames.
Lost (int): State when the object is no longer tracked.
Removed (int): State when the object is removed from tracking.
"""
New = 0
Tracked = 1
@ -15,24 +24,48 @@ class TrackState:
class BaseTrack:
"""Base class for object tracking, handling basic track attributes and operations."""
"""
Base class for object tracking, providing foundational attributes and methods.
Attributes:
_count (int): Class-level counter for unique track IDs.
track_id (int): Unique identifier for the track.
is_activated (bool): Flag indicating whether the track is currently active.
state (TrackState): Current state of the track.
history (OrderedDict): Ordered history of the track's states.
features (list): List of features extracted from the object for tracking.
curr_feature (any): The current feature of the object being tracked.
score (float): The confidence score of the tracking.
start_frame (int): The frame number where tracking started.
frame_id (int): The most recent frame ID processed by the track.
time_since_update (int): Frames passed since the last update.
location (tuple): The location of the object in the context of multi-camera tracking.
Methods:
end_frame: Returns the ID of the last frame where the object was tracked.
next_id: Increments and returns the next global track ID.
activate: Abstract method to activate the track.
predict: Abstract method to predict the next state of the track.
update: Abstract method to update the track with new data.
mark_lost: Marks the track as lost.
mark_removed: Marks the track as removed.
reset_id: Resets the global track ID counter.
"""
_count = 0
track_id = 0
is_activated = False
state = TrackState.New
history = OrderedDict()
features = []
curr_feature = None
score = 0
start_frame = 0
frame_id = 0
time_since_update = 0
# Multi-camera
location = (np.inf, np.inf)
def __init__(self):
self.track_id = 0
self.is_activated = False
self.state = TrackState.New
self.history = OrderedDict()
self.features = []
self.curr_feature = None
self.score = 0
self.start_frame = 0
self.frame_id = 0
self.time_since_update = 0
self.location = (np.inf, np.inf)
@property
def end_frame(self):
@ -46,15 +79,15 @@ class BaseTrack:
return BaseTrack._count
def activate(self, *args):
"""Activate the track with the provided arguments."""
"""Abstract method to activate the track with provided arguments."""
raise NotImplementedError
def predict(self):
"""Predict the next state of the track."""
"""Abstract method to predict the next state of the track."""
raise NotImplementedError
def update(self, *args, **kwargs):
"""Update the track with new observations."""
"""Abstract method to update the track with new observations."""
raise NotImplementedError
def mark_lost(self):

View file

@ -45,6 +45,7 @@ class STrack(BaseTrack):
def __init__(self, tlwh, score, cls):
"""Initialize new STrack instance."""
super().__init__()
self._tlwh = np.asarray(self.tlbr_to_tlwh(tlwh[:-1]), dtype=np.float32)
self.kalman_filter = None
self.mean, self.covariance = None, None
@ -370,7 +371,8 @@ class BYTETracker:
"""Returns the predicted tracks using the YOLOv8 network."""
STrack.multi_predict(tracks)
def reset_id(self):
@staticmethod
def reset_id():
"""Resets the ID counter of STrack."""
STrack.reset_id()

View file

@ -1,5 +1,3 @@
# Ultralytics YOLO 🚀, AGPL-3.0 license
from functools import partial
from pathlib import Path
@ -11,10 +9,11 @@ from ultralytics.utils.checks import check_yaml
from .bot_sort import BOTSORT
from .byte_tracker import BYTETracker
# A mapping of tracker types to corresponding tracker classes
TRACKER_MAP = {'bytetrack': BYTETracker, 'botsort': BOTSORT}
def on_predict_start(predictor, persist=False):
def on_predict_start(predictor: object, persist: bool = False) -> None:
"""
Initialize trackers for object tracking during prediction.
@ -27,10 +26,13 @@ def on_predict_start(predictor, persist=False):
"""
if hasattr(predictor, 'trackers') and persist:
return
tracker = check_yaml(predictor.args.tracker)
cfg = IterableSimpleNamespace(**yaml_load(tracker))
assert cfg.tracker_type in ['bytetrack', 'botsort'], \
f"Only support 'bytetrack' and 'botsort' for now, but got '{cfg.tracker_type}'"
if cfg.tracker_type not in ['bytetrack', 'botsort']:
raise AssertionError(f"Only 'bytetrack' and 'botsort' are supported for now, but got '{cfg.tracker_type}'")
trackers = []
for _ in range(predictor.dataset.bs):
tracker = TRACKER_MAP[cfg.tracker_type](args=cfg, frame_rate=30)
@ -38,8 +40,14 @@ def on_predict_start(predictor, persist=False):
predictor.trackers = trackers
def on_predict_postprocess_end(predictor, persist=False):
"""Postprocess detected boxes and update with object tracking."""
def on_predict_postprocess_end(predictor: object, persist: bool = False) -> None:
"""
Postprocess detected boxes and update with object tracking.
Args:
predictor (object): The predictor object containing the predictions.
persist (bool, optional): Whether to persist the trackers if they already exist. Defaults to False.
"""
bs = predictor.dataset.bs
path, im0s = predictor.batch[:2]
@ -58,7 +66,7 @@ def on_predict_postprocess_end(predictor, persist=False):
predictor.results[i].update(boxes=torch.as_tensor(tracks[:, :-1]))
def register_tracker(model, persist):
def register_tracker(model: object, persist: bool) -> None:
"""
Register tracking callbacks to the model for object tracking during prediction.

View file

@ -13,7 +13,7 @@ class GMC:
Generalized Motion Compensation (GMC) class for tracking and object detection in video frames.
This class provides methods for tracking and detecting objects based on several tracking algorithms including ORB,
SIFT, ECC, and Sparse Optical Flow. It also supports downscaling of frames for computational efficiency.
SIFT, ECC, and Sparse Optical Flow. It also supports downscaling of frames for computational efficiency.
Attributes:
method (str): The method used for tracking. Options include 'orb', 'sift', 'ecc', 'sparseOptFlow', 'none'.
@ -33,8 +33,14 @@ class GMC:
applySparseOptFlow(self, raw_frame, detections=None): Applies the Sparse Optical Flow method to a raw frame.
"""
def __init__(self, method='sparseOptFlow', downscale=2):
"""Initialize a video tracker with specified parameters."""
def __init__(self, method: str = 'sparseOptFlow', downscale: int = 2) -> None:
"""
Initialize a video tracker with specified parameters.
Args:
method (str): The method used for tracking. Options include 'orb', 'sift', 'ecc', 'sparseOptFlow', 'none'.
downscale (int): Downscale factor for processing frames.
"""
super().__init__()
self.method = method
@ -72,11 +78,25 @@ class GMC:
self.prevFrame = None
self.prevKeyPoints = None
self.prevDescriptors = None
self.initializedFirstFrame = False
def apply(self, raw_frame, detections=None):
"""Apply object detection on a raw frame using specified method."""
def apply(self, raw_frame: np.array, detections: list = None) -> np.array:
"""
Apply object detection on a raw frame using specified method.
Args:
raw_frame (np.array): The raw frame to be processed.
detections (list): List of detections to be used in the processing.
Returns:
np.array: Processed frame.
Examples:
>>> gmc = GMC()
>>> gmc.apply(np.array([[1, 2, 3], [4, 5, 6]]))
array([[1, 2, 3],
[4, 5, 6]])
"""
if self.method in ['orb', 'sift']:
return self.applyFeatures(raw_frame, detections)
elif self.method == 'ecc':
@ -86,13 +106,28 @@ class GMC:
else:
return np.eye(2, 3)
def applyEcc(self, raw_frame, detections=None):
"""Initialize."""
def applyEcc(self, raw_frame: np.array, detections: list = None) -> np.array:
"""
Apply ECC algorithm to a raw frame.
Args:
raw_frame (np.array): The raw frame to be processed.
detections (list): List of detections to be used in the processing.
Returns:
np.array: Processed frame.
Examples:
>>> gmc = GMC()
>>> gmc.applyEcc(np.array([[1, 2, 3], [4, 5, 6]]))
array([[1, 2, 3],
[4, 5, 6]])
"""
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3, dtype=np.float32)
# Downscale image (TODO: consider using pyramids)
# Downscale image
if self.downscale > 1.0:
frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
@ -118,22 +153,35 @@ class GMC:
return H
def applyFeatures(self, raw_frame, detections=None):
"""Initialize."""
def applyFeatures(self, raw_frame: np.array, detections: list = None) -> np.array:
"""
Apply feature-based methods like ORB or SIFT to a raw frame.
Args:
raw_frame (np.array): The raw frame to be processed.
detections (list): List of detections to be used in the processing.
Returns:
np.array: Processed frame.
Examples:
>>> gmc = GMC()
>>> gmc.applyFeatures(np.array([[1, 2, 3], [4, 5, 6]]))
array([[1, 2, 3],
[4, 5, 6]])
"""
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3)
# Downscale image (TODO: consider using pyramids)
# Downscale image
if self.downscale > 1.0:
# frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
width = width // self.downscale
height = height // self.downscale
# Find the keypoints
mask = np.zeros_like(frame)
# mask[int(0.05 * height): int(0.95 * height), int(0.05 * width): int(0.95 * width)] = 255
mask[int(0.02 * height):int(0.98 * height), int(0.02 * width):int(0.98 * width)] = 255
if detections is not None:
for det in detections:
@ -157,10 +205,10 @@ class GMC:
return H
# Match descriptors.
# Match descriptors
knnMatches = self.matcher.knnMatch(self.prevDescriptors, descriptors, 2)
# Filtered matches based on smallest spatial distance
# Filter matches based on smallest spatial distance
matches = []
spatialDistances = []
@ -244,15 +292,29 @@ class GMC:
return H
def applySparseOptFlow(self, raw_frame, detections=None):
"""Initialize."""
def applySparseOptFlow(self, raw_frame: np.array, detections: list = None) -> np.array:
"""
Apply Sparse Optical Flow method to a raw frame.
Args:
raw_frame (np.array): The raw frame to be processed.
detections (list): List of detections to be used in the processing.
Returns:
np.array: Processed frame.
Examples:
>>> gmc = GMC()
>>> gmc.applySparseOptFlow(np.array([[1, 2, 3], [4, 5, 6]]))
array([[1, 2, 3],
[4, 5, 6]])
"""
height, width, _ = raw_frame.shape
frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
H = np.eye(2, 3)
# Downscale image
if self.downscale > 1.0:
# frame = cv2.GaussianBlur(frame, (3, 3), 1.5)
frame = cv2.resize(frame, (width // self.downscale, height // self.downscale))
# Find the keypoints
@ -260,13 +322,9 @@ class GMC:
# Handle first frame
if not self.initializedFirstFrame:
# Initialize data
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
# Initialization done
self.initializedFirstFrame = True
return H
# Find correspondences
@ -285,23 +343,21 @@ class GMC:
currPoints = np.array(currPoints)
# Find rigid matrix
if (np.size(prevPoints, 0) > 4) and (np.size(prevPoints, 0) == np.size(prevPoints, 0)):
if np.size(prevPoints, 0) > 4 and np.size(prevPoints, 0) == np.size(prevPoints, 0):
H, inliers = cv2.estimateAffinePartial2D(prevPoints, currPoints, cv2.RANSAC)
# Handle downscale
if self.downscale > 1.0:
H[0, 2] *= self.downscale
H[1, 2] *= self.downscale
else:
LOGGER.warning('WARNING: not enough matching points')
# Store to next iteration
self.prevFrame = frame.copy()
self.prevKeyPoints = copy.copy(keypoints)
return H
def reset_params(self):
def reset_params(self) -> None:
"""Reset parameters."""
self.prevFrame = None
self.prevKeyPoints = None

View file

@ -19,33 +19,28 @@ class KalmanFilterXYAH:
"""Initialize Kalman filter model matrices with motion and observation uncertainty weights."""
ndim, dt = 4, 1.
# Create Kalman filter model matrices.
# Create Kalman filter model matrices
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt
self._update_mat = np.eye(ndim, 2 * ndim)
# Motion and observation uncertainty are chosen relative to the current state estimate. These weights control
# the amount of uncertainty in the model. This is a bit hacky.
# the amount of uncertainty in the model.
self._std_weight_position = 1. / 20
self._std_weight_velocity = 1. / 160
def initiate(self, measurement):
def initiate(self, measurement: np.ndarray) -> tuple:
"""
Create track from unassociated measurement.
Parameters
----------
measurement : ndarray
Bounding box coordinates (x, y, a, h) with center position (x, y),
aspect ratio a, and height h.
Args:
measurement (ndarray): Bounding box coordinates (x, y, a, h) with center position (x, y), aspect ratio a,
and height h.
Returns
-------
(ndarray, ndarray)
Returns the mean vector (8 dimensional) and covariance matrix (8x8
dimensional) of the new track. Unobserved velocities are initialized
to 0 mean.
Returns:
tuple[ndarray, ndarray]: Returns the mean vector (8 dimensional) and covariance matrix (8x8 dimensional) of
the new track. Unobserved velocities are initialized to 0 mean.
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
@ -58,22 +53,17 @@ class KalmanFilterXYAH:
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean, covariance):
def predict(self, mean: np.ndarray, covariance: np.ndarray) -> tuple:
"""
Run Kalman filter prediction step.
Parameters
----------
mean : ndarray
The 8 dimensional mean vector of the object state at the previous time step.
covariance : ndarray
The 8x8 dimensional covariance matrix of the object state at the previous time step.
Args:
mean (ndarray): The 8 dimensional mean vector of the object state at the previous time step.
covariance (ndarray): The 8x8 dimensional covariance matrix of the object state at the previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted state. Unobserved velocities are
initialized to 0 mean.
Returns:
tuple[ndarray, ndarray]: Returns the mean vector and covariance matrix of the predicted state. Unobserved
velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[3], self._std_weight_position * mean[3], 1e-2,
@ -83,27 +73,21 @@ class KalmanFilterXYAH:
self._std_weight_velocity * mean[3]]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
# mean = np.dot(self._motion_mat, mean)
mean = np.dot(mean, self._motion_mat.T)
covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
def project(self, mean, covariance):
def project(self, mean: np.ndarray, covariance: np.ndarray) -> tuple:
"""
Project state distribution to measurement space.
Parameters
----------
mean : ndarray
The state's mean vector (8 dimensional array).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
Args:
mean (ndarray): The state's mean vector (8 dimensional array).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
Returns
-------
(ndarray, ndarray)
Returns the projected mean and covariance matrix of the given state estimate.
Returns:
tuple[ndarray, ndarray]: Returns the projected mean and covariance matrix of the given state estimate.
"""
std = [
self._std_weight_position * mean[3], self._std_weight_position * mean[3], 1e-1,
@ -114,22 +98,17 @@ class KalmanFilterXYAH:
covariance = np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def multi_predict(self, mean, covariance):
def multi_predict(self, mean: np.ndarray, covariance: np.ndarray) -> tuple:
"""
Run Kalman filter prediction step (Vectorized version).
Parameters
----------
mean : ndarray
The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance : ndarray
The Nx8x8 dimensional covariance matrix of the object states at the previous time step.
Args:
mean (ndarray): The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance (ndarray): The Nx8x8 covariance matrix of the object states at the previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted state. Unobserved velocities are
initialized to 0 mean.
Returns:
tuple[ndarray, ndarray]: Returns the mean vector and covariance matrix of the predicted state. Unobserved
velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[:, 3], self._std_weight_position * mean[:, 3],
@ -148,24 +127,18 @@ class KalmanFilterXYAH:
return mean, covariance
def update(self, mean, covariance, measurement):
def update(self, mean: np.ndarray, covariance: np.ndarray, measurement: np.ndarray) -> tuple:
"""
Run Kalman filter correction step.
Parameters
----------
mean : ndarray
The predicted state's mean vector (8 dimensional).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
measurement : ndarray
The 4 dimensional measurement vector (x, y, a, h), where (x, y) is the center position, a the aspect
ratio, and h the height of the bounding box.
Args:
mean (ndarray): The predicted state's mean vector (8 dimensional).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
measurement (ndarray): The 4 dimensional measurement vector (x, y, a, h), where (x, y) is the center
position, a the aspect ratio, and h the height of the bounding box.
Returns
-------
(ndarray, ndarray)
Returns the measurement-corrected state distribution.
Returns:
tuple[ndarray, ndarray]: Returns the measurement-corrected state distribution.
"""
projected_mean, projected_cov = self.project(mean, covariance)
@ -179,29 +152,30 @@ class KalmanFilterXYAH:
new_covariance = covariance - np.linalg.multi_dot((kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
def gating_distance(self, mean, covariance, measurements, only_position=False, metric='maha'):
def gating_distance(self,
mean: np.ndarray,
covariance: np.ndarray,
measurements: np.ndarray,
only_position: bool = False,
metric: str = 'maha') -> np.ndarray:
"""
Compute gating distance between state distribution and measurements. A suitable distance threshold can be
obtained from `chi2inv95`. If `only_position` is False, the chi-square distribution has 4 degrees of freedom,
otherwise 2.
Parameters
----------
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in format (x, y, a, h) where (x, y) is the bounding box
center position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding box center position only.
Args:
mean (ndarray): Mean vector over the state distribution (8 dimensional).
covariance (ndarray): Covariance of the state distribution (8x8 dimensional).
measurements (ndarray): An Nx4 matrix of N measurements, each in format (x, y, a, h) where (x, y)
is the bounding box center position, a the aspect ratio, and h the height.
only_position (bool, optional): If True, distance computation is done with respect to the bounding box
center position only. Defaults to False.
metric (str, optional): The metric to use for calculating the distance. Options are 'gaussian' for the
squared Euclidean distance and 'maha' for the squared Mahalanobis distance. Defaults to 'maha'.
Returns
-------
ndarray
Returns an array of length N, where the i-th element contains the squared Mahalanobis distance between
(mean, covariance) and `measurements[i]`.
Returns:
ndarray: Returns an array of length N, where the i-th element contains the squared distance between
(mean, covariance) and `measurements[i]`.
"""
mean, covariance = self.project(mean, covariance)
if only_position:
@ -216,7 +190,7 @@ class KalmanFilterXYAH:
z = scipy.linalg.solve_triangular(cholesky_factor, d.T, lower=True, check_finite=False, overwrite_b=True)
return np.sum(z * z, axis=0) # square maha
else:
raise ValueError('invalid distance metric')
raise ValueError('Invalid distance metric')
class KalmanFilterXYWH(KalmanFilterXYAH):
@ -230,20 +204,16 @@ class KalmanFilterXYWH(KalmanFilterXYAH):
observation of the state space (linear observation model).
"""
def initiate(self, measurement):
def initiate(self, measurement: np.ndarray) -> tuple:
"""
Create track from unassociated measurement.
Parameters
----------
measurement : ndarray
Bounding box coordinates (x, y, w, h) with center position (x, y), width w, and height h.
Args:
measurement (ndarray): Bounding box coordinates (x, y, w, h) with center position (x, y), width, and height.
Returns
-------
(ndarray, ndarray)
Returns the mean vector (8 dimensional) and covariance matrix (8x8 dimensional) of the new track.
Unobserved velocities are initialized to 0 mean.
Returns:
tuple[ndarray, ndarray]: Returns the mean vector (8 dimensional) and covariance matrix (8x8 dimensional) of
the new track. Unobserved velocities are initialized to 0 mean.
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
@ -257,22 +227,17 @@ class KalmanFilterXYWH(KalmanFilterXYAH):
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean, covariance):
def predict(self, mean, covariance) -> tuple:
"""
Run Kalman filter prediction step.
Parameters
----------
mean : ndarray
The 8 dimensional mean vector of the object state at the previous time step.
covariance : ndarray
The 8x8 dimensional covariance matrix of the object state at the previous time step.
Args:
mean (ndarray): The 8 dimensional mean vector of the object state at the previous time step.
covariance (ndarray): The 8x8 dimensional covariance matrix of the object state at the previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted state. Unobserved velocities are
initialized to 0 mean.
Returns:
tuple[ndarray, ndarray]: Returns the mean vector and covariance matrix of the predicted state. Unobserved
velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[2], self._std_weight_position * mean[3],
@ -287,21 +252,16 @@ class KalmanFilterXYWH(KalmanFilterXYAH):
return mean, covariance
def project(self, mean, covariance):
def project(self, mean, covariance) -> tuple:
"""
Project state distribution to measurement space.
Parameters
----------
mean : ndarray
The state's mean vector (8 dimensional array).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
Args:
mean (ndarray): The state's mean vector (8 dimensional array).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
Returns
-------
(ndarray, ndarray)
Returns the projected mean and covariance matrix of the given state estimate.
Returns:
tuple[ndarray, ndarray]: Returns the projected mean and covariance matrix of the given state estimate.
"""
std = [
self._std_weight_position * mean[2], self._std_weight_position * mean[3],
@ -312,22 +272,17 @@ class KalmanFilterXYWH(KalmanFilterXYAH):
covariance = np.linalg.multi_dot((self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def multi_predict(self, mean, covariance):
def multi_predict(self, mean, covariance) -> tuple:
"""
Run Kalman filter prediction step (Vectorized version).
Parameters
----------
mean : ndarray
The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance : ndarray
The Nx8x8 dimensional covariance matrix of the object states at the previous time step.
Args:
mean (ndarray): The Nx8 dimensional mean matrix of the object states at the previous time step.
covariance (ndarray): The Nx8x8 covariance matrix of the object states at the previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted state. Unobserved velocities are
initialized to 0 mean.
Returns:
tuple[ndarray, ndarray]: Returns the mean vector and covariance matrix of the predicted state. Unobserved
velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[:, 2], self._std_weight_position * mean[:, 3],
@ -346,23 +301,17 @@ class KalmanFilterXYWH(KalmanFilterXYAH):
return mean, covariance
def update(self, mean, covariance, measurement):
def update(self, mean, covariance, measurement) -> tuple:
"""
Run Kalman filter correction step.
Parameters
----------
mean : ndarray
The predicted state's mean vector (8 dimensional).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
measurement : ndarray
The 4 dimensional measurement vector (x, y, w, h), where (x, y) is the center position, w the width,
and h the height of the bounding box.
Args:
mean (ndarray): The predicted state's mean vector (8 dimensional).
covariance (ndarray): The state's covariance matrix (8x8 dimensional).
measurement (ndarray): The 4 dimensional measurement vector (x, y, w, h), where (x, y) is the center
position, w the width, and h the height of the bounding box.
Returns
-------
(ndarray, ndarray)
Returns the measurement-corrected state distribution.
Returns:
tuple[ndarray, ndarray]: Returns the measurement-corrected state distribution.
"""
return super().update(mean, covariance, measurement)

View file

@ -17,7 +17,7 @@ except (ImportError, AssertionError, AttributeError):
import lap
def linear_assignment(cost_matrix, thresh, use_lap=True):
def linear_assignment(cost_matrix: np.ndarray, thresh: float, use_lap: bool = True) -> tuple:
"""
Perform linear assignment using scipy or lap.lapjv.
@ -27,19 +27,24 @@ def linear_assignment(cost_matrix, thresh, use_lap=True):
use_lap (bool, optional): Whether to use lap.lapjv. Defaults to True.
Returns:
(tuple): Tuple containing matched indices, unmatched indices from 'a', and unmatched indices from 'b'.
Tuple with:
- matched indices
- unmatched indices from 'a'
- unmatched indices from 'b'
"""
if cost_matrix.size == 0:
return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))
if use_lap:
# Use lap.lapjv
# https://github.com/gatagat/lap
_, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
matches = [[ix, mx] for ix, mx in enumerate(x) if mx >= 0]
unmatched_a = np.where(x < 0)[0]
unmatched_b = np.where(y < 0)[0]
else:
# Use scipy.optimize.linear_sum_assignment
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.linear_sum_assignment.html
x, y = scipy.optimize.linear_sum_assignment(cost_matrix) # row x, col y
matches = np.asarray([[x[i], y[i]] for i in range(len(x)) if cost_matrix[x[i], y[i]] <= thresh])
@ -53,7 +58,7 @@ def linear_assignment(cost_matrix, thresh, use_lap=True):
return matches, unmatched_a, unmatched_b
def iou_distance(atracks, btracks):
def iou_distance(atracks: list, btracks: list) -> np.ndarray:
"""
Compute cost based on Intersection over Union (IoU) between tracks.
@ -62,7 +67,7 @@ def iou_distance(atracks, btracks):
btracks (list[STrack] | list[np.ndarray]): List of tracks 'b' or bounding boxes.
Returns:
(np.ndarray): Cost matrix computed based on IoU.
np.ndarray: Cost matrix computed based on IoU.
"""
if (len(atracks) > 0 and isinstance(atracks[0], np.ndarray)) \
@ -81,7 +86,7 @@ def iou_distance(atracks, btracks):
return 1 - ious # cost matrix
def embedding_distance(tracks, detections, metric='cosine'):
def embedding_distance(tracks: list, detections: list, metric: str = 'cosine') -> np.ndarray:
"""
Compute distance between tracks and detections based on embeddings.
@ -91,7 +96,7 @@ def embedding_distance(tracks, detections, metric='cosine'):
metric (str, optional): Metric for distance computation. Defaults to 'cosine'.
Returns:
(np.ndarray): Cost matrix computed based on embeddings.
np.ndarray: Cost matrix computed based on embeddings.
"""
cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float32)
@ -105,7 +110,7 @@ def embedding_distance(tracks, detections, metric='cosine'):
return cost_matrix
def fuse_score(cost_matrix, detections):
def fuse_score(cost_matrix: np.ndarray, detections: list) -> np.ndarray:
"""
Fuses cost matrix with detection scores to produce a single similarity matrix.
@ -114,7 +119,7 @@ def fuse_score(cost_matrix, detections):
detections (list[BaseTrack]): List of detections with scores.
Returns:
(np.ndarray): Fused similarity matrix.
np.ndarray: Fused similarity matrix.
"""
if cost_matrix.size == 0: