mirror of
https://github.com/macaodha/batdetect2.git
synced 2026-01-10 17:19:34 +01:00
Compare commits
3 Commits
110432bd40
...
69921f258a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69921f258a | ||
|
|
6039b2c3eb | ||
|
|
efc996a0db |
@ -22,6 +22,11 @@ def data(): ...
|
|||||||
type=str,
|
type=str,
|
||||||
help="If the dataset info is in a nested field please specify here.",
|
help="If the dataset info is in a nested field please specify here.",
|
||||||
)
|
)
|
||||||
|
@click.option(
|
||||||
|
"--targets",
|
||||||
|
"targets_path",
|
||||||
|
type=click.Path(exists=True),
|
||||||
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"--base-dir",
|
"--base-dir",
|
||||||
type=click.Path(exists=True),
|
type=click.Path(exists=True),
|
||||||
@ -30,9 +35,11 @@ def data(): ...
|
|||||||
def summary(
|
def summary(
|
||||||
dataset_config: Path,
|
dataset_config: Path,
|
||||||
field: Optional[str] = None,
|
field: Optional[str] = None,
|
||||||
|
targets_path: Optional[Path] = None,
|
||||||
base_dir: Optional[Path] = None,
|
base_dir: Optional[Path] = None,
|
||||||
):
|
):
|
||||||
from batdetect2.data import load_dataset_from_config
|
from batdetect2.data import compute_class_summary, load_dataset_from_config
|
||||||
|
from batdetect2.targets import load_targets
|
||||||
|
|
||||||
base_dir = base_dir or Path.cwd()
|
base_dir = base_dir or Path.cwd()
|
||||||
|
|
||||||
@ -44,6 +51,15 @@ def summary(
|
|||||||
|
|
||||||
print(f"Number of annotated clips: {len(dataset)}")
|
print(f"Number of annotated clips: {len(dataset)}")
|
||||||
|
|
||||||
|
if targets_path is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
targets = load_targets(targets_path)
|
||||||
|
|
||||||
|
summary = compute_class_summary(dataset, targets)
|
||||||
|
|
||||||
|
print(summary.to_markdown())
|
||||||
|
|
||||||
|
|
||||||
@data.command()
|
@data.command()
|
||||||
@click.argument(
|
@click.argument(
|
||||||
@ -78,15 +94,9 @@ def convert(
|
|||||||
|
|
||||||
base_dir = base_dir or Path.cwd()
|
base_dir = base_dir or Path.cwd()
|
||||||
|
|
||||||
config = load_dataset_config(
|
config = load_dataset_config(dataset_config, field=field)
|
||||||
dataset_config,
|
|
||||||
field=field,
|
|
||||||
)
|
|
||||||
|
|
||||||
dataset = load_dataset(
|
dataset = load_dataset(config, base_dir=base_dir)
|
||||||
config,
|
|
||||||
base_dir=base_dir,
|
|
||||||
)
|
|
||||||
|
|
||||||
annotation_set = data.AnnotationSet(
|
annotation_set = data.AnnotationSet(
|
||||||
clip_annotations=list(dataset),
|
clip_annotations=list(dataset),
|
||||||
|
|||||||
@ -88,14 +88,35 @@ def annotation_to_sound_event(
|
|||||||
return data.SoundEventAnnotation(
|
return data.SoundEventAnnotation(
|
||||||
uuid=uuid.uuid5(NAMESPACE, f"{sound_event.uuid}_annotation"),
|
uuid=uuid.uuid5(NAMESPACE, f"{sound_event.uuid}_annotation"),
|
||||||
sound_event=sound_event,
|
sound_event=sound_event,
|
||||||
tags=[
|
tags=get_sound_event_tags(
|
||||||
data.Tag(key=label_key, value=annotation.label),
|
annotation, label_key, event_key, individual_key
|
||||||
data.Tag(key=event_key, value=annotation.event),
|
),
|
||||||
data.Tag(key=individual_key, value=str(annotation.individual)),
|
|
||||||
],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_sound_event_tags(
|
||||||
|
annotation: Annotation,
|
||||||
|
label_key: str = "class",
|
||||||
|
event_key: str = "event",
|
||||||
|
individual_key: str = "individual",
|
||||||
|
) -> List[data.Tag]:
|
||||||
|
"""Get the tags for a sound event annotation."""
|
||||||
|
tags = []
|
||||||
|
|
||||||
|
if annotation.label:
|
||||||
|
tags.append(data.Tag(key=label_key, value=annotation.label))
|
||||||
|
|
||||||
|
if annotation.event:
|
||||||
|
tags.append(data.Tag(key=event_key, value=annotation.event))
|
||||||
|
|
||||||
|
if annotation.individual:
|
||||||
|
tags.append(
|
||||||
|
data.Tag(key=individual_key, value=str(annotation.individual))
|
||||||
|
)
|
||||||
|
|
||||||
|
return tags
|
||||||
|
|
||||||
|
|
||||||
def file_annotation_to_clip(
|
def file_annotation_to_clip(
|
||||||
file_annotation: FileAnnotation,
|
file_annotation: FileAnnotation,
|
||||||
audio_dir: Optional[PathLike] = None,
|
audio_dir: Optional[PathLike] = None,
|
||||||
@ -109,10 +130,14 @@ def file_annotation_to_clip(
|
|||||||
if not full_path.exists():
|
if not full_path.exists():
|
||||||
raise FileNotFoundError(f"File {full_path} not found.")
|
raise FileNotFoundError(f"File {full_path} not found.")
|
||||||
|
|
||||||
|
tags = []
|
||||||
|
if file_annotation.label:
|
||||||
|
tags.append(data.Tag(key=label_key, value=file_annotation.label))
|
||||||
|
|
||||||
recording = data.Recording.from_file(
|
recording = data.Recording.from_file(
|
||||||
full_path,
|
full_path,
|
||||||
time_expansion=file_annotation.time_exp,
|
time_expansion=file_annotation.time_exp,
|
||||||
tags=[data.Tag(key=label_key, value=file_annotation.label)],
|
tags=tags,
|
||||||
)
|
)
|
||||||
|
|
||||||
return data.Clip(
|
return data.Clip(
|
||||||
@ -135,11 +160,15 @@ def file_annotation_to_clip_annotation(
|
|||||||
if file_annotation.notes:
|
if file_annotation.notes:
|
||||||
notes.append(data.Note(message=file_annotation.notes))
|
notes.append(data.Note(message=file_annotation.notes))
|
||||||
|
|
||||||
|
tags = []
|
||||||
|
if file_annotation.label:
|
||||||
|
tags.append(data.Tag(key=label_key, value=file_annotation.label))
|
||||||
|
|
||||||
return data.ClipAnnotation(
|
return data.ClipAnnotation(
|
||||||
uuid=uuid.uuid5(NAMESPACE, f"{file_annotation.id}_clip_annotation"),
|
uuid=uuid.uuid5(NAMESPACE, f"{file_annotation.id}_clip_annotation"),
|
||||||
clip=clip,
|
clip=clip,
|
||||||
notes=notes,
|
notes=notes,
|
||||||
tags=[data.Tag(key=label_key, value=file_annotation.label)],
|
tags=tags,
|
||||||
sound_events=[
|
sound_events=[
|
||||||
annotation_to_sound_event(
|
annotation_to_sound_event(
|
||||||
annotation,
|
annotation,
|
||||||
|
|||||||
@ -3,14 +3,15 @@ from typing import Annotated, List, Literal, Optional, Sequence, Tuple, Union
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pydantic import Field
|
from pydantic import Field
|
||||||
|
from scipy.optimize import linear_sum_assignment
|
||||||
from soundevent import data
|
from soundevent import data
|
||||||
from soundevent.evaluation import compute_affinity
|
from soundevent.evaluation import compute_affinity
|
||||||
from soundevent.evaluation import match_geometries as optimal_match
|
from soundevent.geometry import buffer_geometry, compute_bounds, scale_geometry
|
||||||
from soundevent.geometry import compute_bounds
|
|
||||||
|
|
||||||
from batdetect2.core import BaseConfig, Registry
|
from batdetect2.core import BaseConfig, Registry
|
||||||
from batdetect2.evaluate.affinity import (
|
from batdetect2.evaluate.affinity import (
|
||||||
AffinityConfig,
|
AffinityConfig,
|
||||||
|
BBoxIOUConfig,
|
||||||
GeometricIOUConfig,
|
GeometricIOUConfig,
|
||||||
build_affinity_function,
|
build_affinity_function,
|
||||||
)
|
)
|
||||||
@ -357,23 +358,32 @@ def greedy_match(
|
|||||||
yield None, gt_idx, 0
|
yield None, gt_idx, 0
|
||||||
|
|
||||||
|
|
||||||
class OptimalMatchConfig(BaseConfig):
|
class GreedyAffinityMatchConfig(BaseConfig):
|
||||||
name: Literal["optimal_match"] = "optimal_match"
|
name: Literal["greedy_affinity_match"] = "greedy_affinity_match"
|
||||||
|
affinity_function: AffinityConfig = Field(default_factory=BBoxIOUConfig)
|
||||||
affinity_threshold: float = 0.5
|
affinity_threshold: float = 0.5
|
||||||
time_buffer: float = 0.005
|
time_buffer: float = 0
|
||||||
frequency_buffer: float = 1_000
|
frequency_buffer: float = 0
|
||||||
|
time_scale: float = 1.0
|
||||||
|
frequency_scale: float = 1.0
|
||||||
|
|
||||||
|
|
||||||
class OptimalMatcher(MatcherProtocol):
|
class GreedyAffinityMatcher(MatcherProtocol):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
affinity_threshold: float,
|
affinity_threshold: float,
|
||||||
time_buffer: float,
|
affinity_function: AffinityFunction,
|
||||||
frequency_buffer: float,
|
time_buffer: float = 0,
|
||||||
|
frequency_buffer: float = 0,
|
||||||
|
time_scale: float = 1.0,
|
||||||
|
frequency_scale: float = 1.0,
|
||||||
):
|
):
|
||||||
self.affinity_threshold = affinity_threshold
|
self.affinity_threshold = affinity_threshold
|
||||||
|
self.affinity_function = affinity_function
|
||||||
self.time_buffer = time_buffer
|
self.time_buffer = time_buffer
|
||||||
self.frequency_buffer = frequency_buffer
|
self.frequency_buffer = frequency_buffer
|
||||||
|
self.time_scale = time_scale
|
||||||
|
self.frequency_scale = frequency_scale
|
||||||
|
|
||||||
def __call__(
|
def __call__(
|
||||||
self,
|
self,
|
||||||
@ -381,21 +391,125 @@ class OptimalMatcher(MatcherProtocol):
|
|||||||
predictions: Sequence[data.Geometry],
|
predictions: Sequence[data.Geometry],
|
||||||
scores: Sequence[float],
|
scores: Sequence[float],
|
||||||
):
|
):
|
||||||
return optimal_match(
|
if self.time_buffer != 0 or self.frequency_buffer != 0:
|
||||||
source=predictions,
|
ground_truth = [
|
||||||
target=ground_truth,
|
buffer_geometry(
|
||||||
|
geometry,
|
||||||
time_buffer=self.time_buffer,
|
time_buffer=self.time_buffer,
|
||||||
freq_buffer=self.frequency_buffer,
|
freq_buffer=self.frequency_buffer,
|
||||||
|
)
|
||||||
|
for geometry in ground_truth
|
||||||
|
]
|
||||||
|
|
||||||
|
predictions = [
|
||||||
|
buffer_geometry(
|
||||||
|
geometry,
|
||||||
|
time_buffer=self.time_buffer,
|
||||||
|
freq_buffer=self.frequency_buffer,
|
||||||
|
)
|
||||||
|
for geometry in predictions
|
||||||
|
]
|
||||||
|
|
||||||
|
affinity_matrix = compute_affinity_matrix(
|
||||||
|
ground_truth,
|
||||||
|
predictions,
|
||||||
|
self.affinity_function,
|
||||||
|
time_scale=self.time_scale,
|
||||||
|
frequency_scale=self.frequency_scale,
|
||||||
|
)
|
||||||
|
|
||||||
|
return select_greedy_matches(
|
||||||
|
affinity_matrix,
|
||||||
|
affinity_threshold=self.affinity_threshold,
|
||||||
|
)
|
||||||
|
|
||||||
|
@matching_strategies.register(GreedyAffinityMatchConfig)
|
||||||
|
@staticmethod
|
||||||
|
def from_config(config: GreedyAffinityMatchConfig):
|
||||||
|
affinity_function = build_affinity_function(config.affinity_function)
|
||||||
|
return GreedyAffinityMatcher(
|
||||||
|
affinity_threshold=config.affinity_threshold,
|
||||||
|
affinity_function=affinity_function,
|
||||||
|
time_scale=config.time_scale,
|
||||||
|
frequency_scale=config.frequency_scale,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class OptimalMatchConfig(BaseConfig):
|
||||||
|
name: Literal["optimal_affinity_match"] = "optimal_affinity_match"
|
||||||
|
affinity_function: AffinityConfig = Field(default_factory=BBoxIOUConfig)
|
||||||
|
affinity_threshold: float = 0.5
|
||||||
|
time_buffer: float = 0
|
||||||
|
frequency_buffer: float = 0
|
||||||
|
time_scale: float = 1.0
|
||||||
|
frequency_scale: float = 1.0
|
||||||
|
|
||||||
|
|
||||||
|
class OptimalMatcher(MatcherProtocol):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
affinity_threshold: float,
|
||||||
|
affinity_function: AffinityFunction,
|
||||||
|
time_buffer: float = 0,
|
||||||
|
frequency_buffer: float = 0,
|
||||||
|
time_scale: float = 1.0,
|
||||||
|
frequency_scale: float = 1.0,
|
||||||
|
):
|
||||||
|
self.affinity_threshold = affinity_threshold
|
||||||
|
self.affinity_function = affinity_function
|
||||||
|
self.time_buffer = time_buffer
|
||||||
|
self.frequency_buffer = frequency_buffer
|
||||||
|
self.time_scale = time_scale
|
||||||
|
self.frequency_scale = frequency_scale
|
||||||
|
|
||||||
|
def __call__(
|
||||||
|
self,
|
||||||
|
ground_truth: Sequence[data.Geometry],
|
||||||
|
predictions: Sequence[data.Geometry],
|
||||||
|
scores: Sequence[float],
|
||||||
|
):
|
||||||
|
if self.time_buffer != 0 or self.frequency_buffer != 0:
|
||||||
|
ground_truth = [
|
||||||
|
buffer_geometry(
|
||||||
|
geometry,
|
||||||
|
time_buffer=self.time_buffer,
|
||||||
|
freq_buffer=self.frequency_buffer,
|
||||||
|
)
|
||||||
|
for geometry in ground_truth
|
||||||
|
]
|
||||||
|
|
||||||
|
predictions = [
|
||||||
|
buffer_geometry(
|
||||||
|
geometry,
|
||||||
|
time_buffer=self.time_buffer,
|
||||||
|
freq_buffer=self.frequency_buffer,
|
||||||
|
)
|
||||||
|
for geometry in predictions
|
||||||
|
]
|
||||||
|
|
||||||
|
affinity_matrix = compute_affinity_matrix(
|
||||||
|
ground_truth,
|
||||||
|
predictions,
|
||||||
|
self.affinity_function,
|
||||||
|
time_scale=self.time_scale,
|
||||||
|
frequency_scale=self.frequency_scale,
|
||||||
|
)
|
||||||
|
return select_optimal_matches(
|
||||||
|
affinity_matrix,
|
||||||
affinity_threshold=self.affinity_threshold,
|
affinity_threshold=self.affinity_threshold,
|
||||||
)
|
)
|
||||||
|
|
||||||
@matching_strategies.register(OptimalMatchConfig)
|
@matching_strategies.register(OptimalMatchConfig)
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_config(config: OptimalMatchConfig):
|
def from_config(config: OptimalMatchConfig):
|
||||||
|
affinity_function = build_affinity_function(config.affinity_function)
|
||||||
return OptimalMatcher(
|
return OptimalMatcher(
|
||||||
affinity_threshold=config.affinity_threshold,
|
affinity_threshold=config.affinity_threshold,
|
||||||
|
affinity_function=affinity_function,
|
||||||
time_buffer=config.time_buffer,
|
time_buffer=config.time_buffer,
|
||||||
frequency_buffer=config.frequency_buffer,
|
frequency_buffer=config.frequency_buffer,
|
||||||
|
time_scale=config.time_scale,
|
||||||
|
frequency_scale=config.frequency_scale,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -404,11 +518,100 @@ MatchConfig = Annotated[
|
|||||||
GreedyMatchConfig,
|
GreedyMatchConfig,
|
||||||
StartTimeMatchConfig,
|
StartTimeMatchConfig,
|
||||||
OptimalMatchConfig,
|
OptimalMatchConfig,
|
||||||
|
GreedyAffinityMatchConfig,
|
||||||
],
|
],
|
||||||
Field(discriminator="name"),
|
Field(discriminator="name"),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def compute_affinity_matrix(
|
||||||
|
ground_truth: Sequence[data.Geometry],
|
||||||
|
predictions: Sequence[data.Geometry],
|
||||||
|
affinity_function: AffinityFunction,
|
||||||
|
time_scale: float = 1,
|
||||||
|
frequency_scale: float = 1,
|
||||||
|
) -> np.ndarray:
|
||||||
|
# Scale geometries if necessary
|
||||||
|
if time_scale != 1 or frequency_scale != 1:
|
||||||
|
ground_truth = [
|
||||||
|
scale_geometry(geometry, time_scale, frequency_scale)
|
||||||
|
for geometry in ground_truth
|
||||||
|
]
|
||||||
|
|
||||||
|
predictions = [
|
||||||
|
scale_geometry(geometry, time_scale, frequency_scale)
|
||||||
|
for geometry in predictions
|
||||||
|
]
|
||||||
|
|
||||||
|
affinity_matrix = np.zeros((len(ground_truth), len(predictions)))
|
||||||
|
for gt_idx, gt_geometry in enumerate(ground_truth):
|
||||||
|
for pred_idx, pred_geometry in enumerate(predictions):
|
||||||
|
affinity = affinity_function(
|
||||||
|
gt_geometry,
|
||||||
|
pred_geometry,
|
||||||
|
)
|
||||||
|
affinity_matrix[gt_idx, pred_idx] = affinity
|
||||||
|
|
||||||
|
return affinity_matrix
|
||||||
|
|
||||||
|
|
||||||
|
def select_optimal_matches(
|
||||||
|
affinity_matrix: np.ndarray,
|
||||||
|
affinity_threshold: float = 0.5,
|
||||||
|
) -> Iterable[Tuple[Optional[int], Optional[int], float]]:
|
||||||
|
num_gt, num_pred = affinity_matrix.shape
|
||||||
|
gts = set(range(num_gt))
|
||||||
|
preds = set(range(num_pred))
|
||||||
|
|
||||||
|
assiged_rows, assigned_columns = linear_sum_assignment(
|
||||||
|
affinity_matrix,
|
||||||
|
maximize=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
for gt_idx, pred_idx in zip(assiged_rows, assigned_columns):
|
||||||
|
affinity = float(affinity_matrix[gt_idx, pred_idx])
|
||||||
|
|
||||||
|
if affinity <= affinity_threshold:
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield gt_idx, pred_idx, affinity
|
||||||
|
gts.remove(gt_idx)
|
||||||
|
preds.remove(pred_idx)
|
||||||
|
|
||||||
|
for gt_idx in gts:
|
||||||
|
yield gt_idx, None, 0
|
||||||
|
|
||||||
|
for pred_idx in preds:
|
||||||
|
yield None, pred_idx, 0
|
||||||
|
|
||||||
|
|
||||||
|
def select_greedy_matches(
|
||||||
|
affinity_matrix: np.ndarray,
|
||||||
|
affinity_threshold: float = 0.5,
|
||||||
|
) -> Iterable[Tuple[Optional[int], Optional[int], float]]:
|
||||||
|
num_gt, num_pred = affinity_matrix.shape
|
||||||
|
unmatched_pred = set(range(num_pred))
|
||||||
|
|
||||||
|
for gt_idx in range(num_gt):
|
||||||
|
row = affinity_matrix[gt_idx]
|
||||||
|
|
||||||
|
top_pred = int(np.argmax(row))
|
||||||
|
top_affinity = float(row[top_pred])
|
||||||
|
|
||||||
|
if (
|
||||||
|
top_affinity <= affinity_threshold
|
||||||
|
or top_pred not in unmatched_pred
|
||||||
|
):
|
||||||
|
yield None, gt_idx, 0
|
||||||
|
continue
|
||||||
|
|
||||||
|
unmatched_pred.remove(top_pred)
|
||||||
|
yield top_pred, gt_idx, top_affinity
|
||||||
|
|
||||||
|
for pred_idx in unmatched_pred:
|
||||||
|
yield pred_idx, None, 0
|
||||||
|
|
||||||
|
|
||||||
def build_matcher(config: Optional[MatchConfig] = None) -> MatcherProtocol:
|
def build_matcher(config: Optional[MatchConfig] = None) -> MatcherProtocol:
|
||||||
config = config or StartTimeMatchConfig()
|
config = config or StartTimeMatchConfig()
|
||||||
return matching_strategies.build(config)
|
return matching_strategies.build(config)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user