Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress Trackers Manually: Option E #980

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions stonesoup/tracker/base.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import datetime
from abc import abstractmethod
from typing import Iterator, Set, Tuple

from ..base import Base
from ..types.detection import Detection
from ..types.track import Track


class Tracker(Base):
"""Tracker base class"""

@property
@abstractmethod
def tracks(self):
def tracks(self) -> Set[Track]:
raise NotImplementedError

def __iter__(self):
def __iter__(self) -> Iterator[Tuple[datetime.datetime, Set[Track]]]:
return self

@abstractmethod
def __next__(self):
def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
"""
Returns
-------
Expand All @@ -25,3 +29,48 @@ def __next__(self):
Tracks existing in the time step
"""
raise NotImplementedError


class _TrackerMixInBase(Base):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.detector_iter = None

def __iter__(self) -> Iterator[Tuple[datetime.datetime, Set[Track]]]:
if self.detector is None:
raise AttributeError("Detector has not been set. A detector attribute is required to "
"iterate over a tracker.")
if self.detector_iter is None:
self.detector_iter = iter(self.detector)

return super().__iter__()


class _TrackerMixInNext(_TrackerMixInBase):
""" The tracking logic is contained within the __next__ method."""

@abstractmethod
def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
"""Pull detections from the detector (`detector_iter`). Act on them to create tracks."""

def update_tracker(self, time: datetime.datetime, detections: Set[Detection]) \
-> Tuple[datetime.datetime, Set[Track]]:

placeholder_detector_iter = self.detector_iter
self.detector_iter = iter([(time, detections)])
tracker_output = next(self)
self.detector_iter = placeholder_detector_iter
return tracker_output


class _TrackerMixInUpdate(_TrackerMixInBase):
""" The tracking logic is contained within the update_tracker function."""

def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
time, detections = next(self.detector_iter)
return self.update_tracker(time, detections)

@abstractmethod
def update_tracker(self, time: datetime.datetime, detections: Set[Detection]) \
-> Tuple[datetime.datetime, Set[Track]]:
"""Use `time` and `detections` to create tracks."""
21 changes: 10 additions & 11 deletions stonesoup/tracker/pointprocess.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from .base import Tracker
import datetime
from typing import Tuple, Set

from .base import Tracker, _TrackerMixInNext
from ..base import Property
from ..hypothesiser.gaussianmixture import GaussianMixtureHypothesiser
from ..mixturereducer.gaussianmixture import GaussianMixtureReducer
from ..reader import DetectionReader
from ..types.state import TaggedWeightedGaussianState
from ..types.mixture import GaussianMixture
from ..types.numeric import Probability
from ..types.state import TaggedWeightedGaussianState
from ..types.track import Track
from ..updater import Updater
from ..hypothesiser.gaussianmixture import GaussianMixtureHypothesiser
from ..mixturereducer.gaussianmixture import GaussianMixtureReducer


class PointProcessMultiTargetTracker(Tracker):
class PointProcessMultiTargetTracker(_TrackerMixInNext, Tracker):
"""
Base class for Gaussian Mixture (GM) style implementations of
point process derived filters
Expand Down Expand Up @@ -40,16 +43,12 @@ def __init__(self, *args, **kwargs):
self.gaussian_mixture = GaussianMixture()

@property
def tracks(self):
def tracks(self) -> Set[Track]:
tracks = set()
for track in self.target_tracks.values():
tracks.add(track)
return tracks

def __iter__(self):
self.detector_iter = iter(self.detector)
return super().__iter__()

def update_tracks(self):
"""
Updates the tracks (:class:`Track`) associated with the filter.
Expand Down Expand Up @@ -77,7 +76,7 @@ def update_tracks(self):
self.extraction_threshold:
self.target_tracks[tag] = Track([component], id=tag)

def __next__(self):
def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
time, detections = next(self.detector_iter)
# Add birth component
self.birth_component.timestamp = time
Expand Down
52 changes: 20 additions & 32 deletions stonesoup/tracker/simple.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import datetime
from typing import Set, Tuple

import numpy as np

from .base import Tracker
from .base import Tracker, _TrackerMixInNext
from ..base import Property
from ..dataassociator import DataAssociator
from ..deleter import Deleter
from ..reader import DetectionReader
from ..functions import gm_reduce_single
from ..initiator import Initiator
from ..updater import Updater
from ..reader import DetectionReader
from ..types.array import StateVectors
from ..types.prediction import GaussianStatePrediction
from ..types.track import Track
from ..types.update import GaussianStateUpdate
from ..functions import gm_reduce_single
from ..updater import Updater


class SingleTargetTracker(Tracker):
class SingleTargetTracker(_TrackerMixInNext, Tracker):
"""A simple single target tracker.

Track a single object using Stone Soup components. The tracker works by
Expand Down Expand Up @@ -46,14 +50,10 @@ def __init__(self, *args, **kwargs):
self._track = None

@property
def tracks(self):
def tracks(self) -> Set[Track]:
return {self._track} if self._track else set()

def __iter__(self):
self.detector_iter = iter(self.detector)
return super().__iter__()

def __next__(self):
def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
time, detections = next(self.detector_iter)
if self._track is not None:
associations = self.data_associator.associate(
Expand All @@ -75,7 +75,7 @@ def __next__(self):
return time, self.tracks


class SingleTargetMixtureTracker(Tracker):
class SingleTargetMixtureTracker(_TrackerMixInNext, Tracker):
""" A simple single target tracking that receives associations from a
(Gaussian) Mixture associator.

Expand Down Expand Up @@ -104,14 +104,10 @@ def __init__(self, *args, **kwargs):
self._track = None

@property
def tracks(self):
def tracks(self) -> Set[Track]:
return {self._track} if self._track else set()

def __iter__(self):
self.detector_iter = iter(self.detector)
return super().__iter__()

def __next__(self):
def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
time, detections = next(self.detector_iter)

if self._track is not None:
Expand Down Expand Up @@ -177,7 +173,7 @@ def __next__(self):
return time, self.tracks


class MultiTargetTracker(Tracker):
class MultiTargetTracker(_TrackerMixInNext, Tracker):
"""A simple multi target tracker.

Track multiple objects using Stone Soup components. The tracker works by
Expand All @@ -203,14 +199,10 @@ def __init__(self, *args, **kwargs):
self._tracks = set()

@property
def tracks(self):
def tracks(self) -> Set[Track]:
return self._tracks

def __iter__(self):
self.detector_iter = iter(self.detector)
return super().__iter__()

def __next__(self):
def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
time, detections = next(self.detector_iter)

associations = self.data_associator.associate(
Expand All @@ -231,7 +223,7 @@ def __next__(self):
return time, self.tracks


class MultiTargetMixtureTracker(Tracker):
class MultiTargetMixtureTracker(_TrackerMixInNext, Tracker):
"""A simple multi target tracker that receives associations from a
(Gaussian) Mixture associator.

Expand Down Expand Up @@ -259,14 +251,10 @@ def __init__(self, *args, **kwargs):
self._tracks = set()

@property
def tracks(self):
def tracks(self) -> Set[Track]:
return self._tracks

def __iter__(self):
self.detector_iter = iter(self.detector)
return super().__iter__()

def __next__(self):
def __next__(self) -> Tuple[datetime.datetime, Set[Track]]:
time, detections = next(self.detector_iter)

associations = self.data_associator.associate(
Expand Down
Loading