Compare commits

...

4 Commits

Author SHA1 Message Date
mbsantiago
2cc0bd59d4 Add plot limits 2025-08-11 01:37:22 +01:00
mbsantiago
2308ea83a3 Add example plotting notebook 2025-08-11 01:35:31 +01:00
mbsantiago
374c62d7ab Add dataset summary and split functions 2025-08-11 01:35:09 +01:00
mbsantiago
ef279bee5d Update to soundevent 2.7 2025-08-11 01:34:54 +01:00
8 changed files with 620 additions and 158 deletions

View File

@ -12,8 +12,20 @@ def _():
@app.cell
def _():
from batdetect2.data import load_dataset_config, load_dataset
return load_dataset, load_dataset_config
from batdetect2.data import (
load_dataset_config,
load_dataset,
extract_recordings_df,
extract_sound_events_df,
compute_class_summary,
)
return (
compute_class_summary,
extract_recordings_df,
extract_sound_events_df,
load_dataset,
load_dataset_config,
)
@app.cell
@ -72,183 +84,50 @@ def _(build_targets, targets_config):
def _():
import pandas as pd
from soundevent.geometry import compute_bounds
return compute_bounds, pd
return
@app.cell
def _(dataset, pd):
def get_recording_df(dataset):
recordings = []
for clip_annotation in dataset:
recordings.append(
{
"recording_id": clip_annotation.clip.recording.uuid,
"duration": clip_annotation.clip.duration,
"clip_annotation_id": clip_annotation.uuid,
"samplerate": clip_annotation.clip.recording.samplerate,
"path": clip_annotation.clip.recording.path.name,
}
)
return pd.DataFrame(recordings)
recordings = get_recording_df(dataset)
def _(dataset, extract_recordings_df):
recordings = extract_recordings_df(dataset)
recordings
return (recordings,)
return
@app.cell
def _(compute_bounds, dataset, pd, targets):
def get_sound_event_df(dataset):
sound_events = []
for clip_annotation in dataset:
for sound_event in clip_annotation.sound_events:
if not targets.filter(sound_event):
continue
if sound_event.sound_event.geometry is None:
continue
class_name = targets.encode_class(sound_event)
if class_name is None:
continue
start_time, low_freq, end_time, high_freq = compute_bounds(
sound_event.sound_event.geometry
)
sound_events.append(
{
"clip_annotation_id": clip_annotation.uuid,
"sound_event_id": sound_event.uuid,
"class_name": class_name,
"start_time": start_time,
"end_time": end_time,
"low_freq": low_freq,
"high_freq": high_freq,
}
)
return pd.DataFrame(sound_events)
sound_events = get_sound_event_df(dataset)
def _(dataset, extract_sound_events_df, targets):
sound_events = extract_sound_events_df(dataset, targets)
sound_events
return get_sound_event_df, sound_events
return
@app.cell
def _(recordings, sound_events):
def produce_summary(sound_events):
num_calls = (
sound_events.groupby("class_name")
.size()
.sort_values(ascending=False)
.rename("num calls")
)
num_recs = (
sound_events.groupby("class_name")["clip_annotation_id"]
.nunique()
.sort_values(ascending=False)
.rename("num recordings")
)
durations = (
sound_events.groupby("class_name")
.apply(
lambda group: recordings[
recordings["clip_annotation_id"].isin(
group["clip_annotation_id"]
)
]["duration"].sum(),
include_groups=False,
)
.sort_values(ascending=False)
.rename("duration")
)
return (
num_calls.to_frame()
.join(num_recs)
.join(durations)
.sort_values("num calls", ascending=False)
.assign(call_rate=lambda df: df["num calls"] / df["duration"])
)
produce_summary(sound_events)
return (produce_summary,)
@app.cell
def _(sound_events):
majority_class = (
sound_events.groupby("clip_annotation_id")
.apply(
lambda group: group["class_name"]
.value_counts()
.sort_values(ascending=False)
.index[0],
include_groups=False,
)
.rename("class_name")
.to_frame()
.reset_index()
)
return (majority_class,)
@app.cell
def _(majority_class):
majority_class
def _(compute_class_summary, dataset, targets):
compute_class_summary(dataset, targets)
return
@app.cell
def _():
from sklearn.model_selection import train_test_split
return (train_test_split,)
from batdetect2.data.split import split_dataset_by_recordings
return (split_dataset_by_recordings,)
@app.cell
def _(majority_class, train_test_split):
train, val = train_test_split(
majority_class["clip_annotation_id"],
stratify=majority_class["class_name"],
)
return train, val
@app.cell
def _(dataset, train, val):
train_dataset = [
clip_annotation
for clip_annotation in dataset
if clip_annotation.uuid in set(train.values)
]
val_dataset = [
clip_annotation
for clip_annotation in dataset
if clip_annotation.uuid in set(val.values)
]
def _(dataset, split_dataset_by_recordings, targets):
train_dataset, val_dataset = split_dataset_by_recordings(dataset, targets, random_state=42)
return train_dataset, val_dataset
@app.cell
def _(get_sound_event_df, produce_summary, train_dataset):
train_sound_events = get_sound_event_df(train_dataset)
train_summary = produce_summary(train_sound_events)
train_summary
def _(compute_class_summary, targets, train_dataset):
compute_class_summary(train_dataset, targets)
return
@app.cell
def _(get_sound_event_df, produce_summary, val_dataset):
val_sound_events = get_sound_event_df(val_dataset)
val_summary = produce_summary(val_sound_events)
val_summary
def _(compute_class_summary, targets, val_dataset):
compute_class_summary(val_dataset, targets)
return
@ -291,6 +170,18 @@ def _(Path, data, io, val_dataset):
def _(load_dataset, load_dataset_config):
config = load_dataset_config("../paper/conf/datasets/train/uk_tune.yaml")
rec = load_dataset(config, base_dir="../paper/")
return (rec,)
@app.cell
def _(rec):
dict(rec[0].sound_events[0].tags[0].term)
return
@app.cell
def _(compute_class_summary, rec, targets):
compute_class_summary(rec,targets)
return

273
notebooks/plotting.py Normal file
View File

@ -0,0 +1,273 @@
import marimo
__generated_with = "0.14.16"
app = marimo.App(width="medium")
@app.cell
def _():
import marimo as mo
return
@app.cell
def _():
from batdetect2.data import load_dataset_config, load_dataset
from batdetect2.preprocess import load_preprocessing_config, build_preprocessor
from batdetect2 import api
from soundevent import data
from batdetect2.evaluate.types import MatchEvaluation
from batdetect2.types import Annotation
from batdetect2.compat import annotation_to_sound_event_prediction
from batdetect2.plotting import (
plot_clip,
plot_clip_annotation,
plot_clip_prediction,
plot_matches,
plot_false_positive_match,
plot_false_negative_match,
plot_true_positive_match,
plot_cross_trigger_match,
)
return (
MatchEvaluation,
annotation_to_sound_event_prediction,
api,
build_preprocessor,
data,
load_dataset,
load_dataset_config,
load_preprocessing_config,
plot_clip_annotation,
plot_clip_prediction,
plot_cross_trigger_match,
plot_false_negative_match,
plot_false_positive_match,
plot_matches,
plot_true_positive_match,
)
@app.cell
def _(build_preprocessor, load_dataset_config, load_preprocessing_config):
dataset_config = load_dataset_config(
path="example_data/config.yaml", field="datasets.train"
)
preprocessor_config = load_preprocessing_config(
path="example_data/config.yaml", field="preprocess"
)
preprocessor = build_preprocessor(preprocessor_config)
return dataset_config, preprocessor
@app.cell
def _(dataset_config, load_dataset):
dataset = load_dataset(dataset_config)
return (dataset,)
@app.cell
def _(dataset):
clip_annotation = dataset[1]
return (clip_annotation,)
@app.cell
def _(clip_annotation, plot_clip_annotation, preprocessor):
plot_clip_annotation(
clip_annotation, preprocessor=preprocessor, figsize=(15, 5)
)
return
@app.cell
def _(annotation_to_sound_event_prediction, api, clip_annotation, data):
audio = api.load_audio(clip_annotation.clip.recording.path)
detections, features, spec = api.process_audio(audio)
clip_prediction = data.ClipPrediction(
clip=clip_annotation.clip,
sound_events=[
annotation_to_sound_event_prediction(
prediction, clip_annotation.clip.recording
)
for prediction in detections
],
)
return (clip_prediction,)
@app.cell
def _(clip_prediction, plot_clip_prediction):
plot_clip_prediction(clip_prediction, figsize=(15, 5))
return
@app.cell
def _():
from batdetect2.evaluate import match_predictions_and_annotations
import random
return match_predictions_and_annotations, random
@app.cell
def _(data, random):
def add_noise(clip_annotation, time_buffer=0.003, freq_buffer=1000):
def _add_bbox_noise(bbox):
start_time, low_freq, end_time, high_freq = bbox.coordinates
return data.BoundingBox(
coordinates=[
start_time + random.uniform(-time_buffer, time_buffer),
low_freq + random.uniform(-freq_buffer, freq_buffer),
end_time + random.uniform(-time_buffer, time_buffer),
high_freq + random.uniform(-freq_buffer, freq_buffer),
]
)
def _add_noise(se):
return se.model_copy(
update=dict(
sound_event=se.sound_event.model_copy(
update=dict(
geometry=_add_bbox_noise(se.sound_event.geometry)
)
)
)
)
return clip_annotation.model_copy(
update=dict(
sound_events=[
_add_noise(se) for se in clip_annotation.sound_events
]
)
)
def drop_random(obj, p=0.5):
return obj.model_copy(
update=dict(
sound_events=[se for se in obj.sound_events if random.random() > p]
)
)
return add_noise, drop_random
@app.cell
def _(
add_noise,
clip_annotation,
clip_prediction,
drop_random,
match_predictions_and_annotations,
):
matches = match_predictions_and_annotations(
drop_random(add_noise(clip_annotation), p=0.2),
drop_random(clip_prediction),
)
return (matches,)
@app.cell
def _(clip_annotation, matches, plot_matches):
plot_matches(matches, clip_annotation.clip, figsize=(15, 5))
return
@app.cell
def _(matches):
true_positives = []
false_positives = []
false_negatives = []
for match in matches:
if match.source is None and match.target is not None:
false_negatives.append(match)
elif match.target is None and match.source is not None:
false_positives.append(match)
elif match.target is not None and match.source is not None:
true_positives.append(match)
else:
continue
return false_negatives, false_positives, true_positives
@app.cell
def _(MatchEvaluation, false_positives, plot_false_positive_match):
false_positive = false_positives[0]
false_positive_eval = MatchEvaluation(
match=false_positive,
gt_det=False,
gt_class=None,
pred_score=false_positive.source.score,
pred_class_scores={
"myomyo": 0.2
}
)
plot_false_positive_match(false_positive_eval)
return
@app.cell
def _(MatchEvaluation, false_negatives, plot_false_negative_match):
false_negative = false_negatives[0]
false_negative_eval = MatchEvaluation(
match=false_negative,
gt_det=True,
gt_class="myomyo",
pred_score=None,
pred_class_scores={}
)
plot_false_negative_match(false_negative_eval)
return
@app.cell
def _(MatchEvaluation, plot_true_positive_match, true_positives):
true_positive = true_positives[0]
true_positive_eval = MatchEvaluation(
match=true_positive,
gt_det=True,
gt_class="myomyo",
pred_score=0.87,
pred_class_scores={
"pyomyo": 0.84,
"pippip": 0.84,
}
)
plot_true_positive_match(true_positive_eval)
return (true_positive,)
@app.cell
def _(MatchEvaluation, plot_cross_trigger_match, true_positive):
cross_trigger_eval = MatchEvaluation(
match=true_positive,
gt_det=True,
gt_class="myomyo",
pred_score=0.87,
pred_class_scores={
"pippip": 0.84,
"myomyo": 0.84,
}
)
plot_cross_trigger_match(cross_trigger_eval)
return
@app.cell
def _():
return
if __name__ == "__main__":
app.run()

View File

@ -17,7 +17,7 @@ dependencies = [
"torch>=1.13.1,<2.5.0",
"torchaudio>=1.13.1,<2.5.0",
"torchvision>=0.14.0",
"soundevent[audio,geometry,plot]>=2.6.5",
"soundevent[audio,geometry,plot]>=2.7.0",
"click>=8.1.7",
"netcdf4>=1.6.5",
"tqdm>=4.66.2",
@ -84,6 +84,7 @@ dev = [
"pytest-cov>=6.1.1",
"ty>=0.0.1a12",
"rust-just>=1.40.0",
"pandas-stubs>=2.2.2.240807",
]
dvclive = ["dvclive>=3.48.2"]
mlflow = ["mlflow>=3.1.1"]

View File

@ -11,6 +11,11 @@ from batdetect2.data.datasets import (
load_dataset_config,
load_dataset_from_config,
)
from batdetect2.data.summary import (
compute_class_summary,
extract_recordings_df,
extract_sound_events_df,
)
__all__ = [
"AOEFAnnotations",
@ -18,6 +23,9 @@ __all__ = [
"BatDetect2FilesAnnotations",
"BatDetect2MergedAnnotations",
"DatasetConfig",
"compute_class_summary",
"extract_recordings_df",
"extract_sound_events_df",
"load_annotated_dataset",
"load_dataset",
"load_dataset_config",

View File

@ -0,0 +1,75 @@
from typing import Optional, Tuple
from sklearn.model_selection import train_test_split
from batdetect2.data.datasets import Dataset
from batdetect2.data.summary import (
extract_recordings_df,
extract_sound_events_df,
)
from batdetect2.targets.types import TargetProtocol
def split_dataset_by_recordings(
dataset: Dataset,
targets: TargetProtocol,
train_size: float = 0.75,
random_state: Optional[int] = None,
) -> Tuple[Dataset, Dataset]:
recordings = extract_recordings_df(dataset)
sound_events = extract_sound_events_df(
dataset,
targets,
exclude_non_target=True,
exclude_generic=True,
)
majority_class = (
sound_events.groupby("recording_id")
.apply(
lambda group: group["class_name"] # type: ignore
.value_counts()
.sort_values(ascending=False)
.index[0],
include_groups=False, # type: ignore
)
.rename("class_name")
.to_frame()
.reset_index()
)
train, test = train_test_split(
majority_class["recording_id"],
stratify=majority_class["class_name"],
train_size=train_size,
random_state=random_state,
)
train_ids_set = set(train.values) # type: ignore
test_ids_set = set(test.values) # type: ignore
extra = set(recordings["recording_id"]) - train_ids_set - test_ids_set
if extra:
train_extra, test_extra = train_test_split(
list(extra),
train_size=train_size,
random_state=random_state,
)
train_ids_set.update(train_extra)
test_ids_set.update(test_extra)
train_dataset = [
clip_annotation
for clip_annotation in dataset
if str(clip_annotation.clip.recording.uuid) in train_ids_set
]
test_dataset = [
clip_annotation
for clip_annotation in dataset
if str(clip_annotation.clip.recording.uuid) in test_ids_set
]
return train_dataset, test_dataset

View File

@ -0,0 +1,192 @@
import pandas as pd
from soundevent.geometry import compute_bounds
from batdetect2.data.datasets import Dataset
from batdetect2.targets.types import TargetProtocol
__all__ = [
"extract_recordings_df",
"extract_sound_events_df",
"compute_class_summary",
]
def extract_recordings_df(dataset: Dataset) -> pd.DataFrame:
"""Extract recording metadata into a pandas DataFrame.
Parameters
----------
dataset : List[data.ClipAnnotation]
A list of clip annotations from which to extract recording information.
Returns
-------
pd.DataFrame
A DataFrame where each row corresponds to a recording, containing
metadata such as duration, path, sample rate, and other properties.
"""
recordings = []
for clip_annotation in dataset:
clip = clip_annotation.clip
recording = clip.recording
recordings.append(
{
"clip_annotation_id": str(clip_annotation.uuid),
"recording_id": str(recording.uuid),
"duration": clip.duration,
"filename": recording.path.name,
**recording.model_dump(
mode="json",
include={
"samplerate",
"hash",
"path",
"date",
"time",
"latitude",
"longitude",
},
),
}
)
return pd.DataFrame(recordings)
def extract_sound_events_df(
dataset: Dataset,
targets: TargetProtocol,
exclude_non_target: bool = True,
exclude_generic: bool = True,
) -> pd.DataFrame:
"""Extract sound event data into a pandas DataFrame.
This function iterates through all sound events in the provided dataset,
applies filtering and classification logic based on the `targets`
protocol, and compiles the results into a structured DataFrame.
Parameters
----------
dataset : List[data.ClipAnnotation]
The dataset containing clip annotations with sound events.
targets : TargetProtocol
An object that provides methods to filter (`filter`) and classify
(`encode_class`) sound events.
exclude_non_target : bool, default=True
If True, sound events that do not pass the `targets.filter()` check
are excluded from the output.
exclude_generic : bool, default=True
If True, sound events that are classified with a `None` class name
by `targets.encode_class()` are excluded.
Returns
-------
pd.DataFrame
A DataFrame where each row represents a single sound event, including
its bounding box, class name, and other relevant attributes.
"""
sound_events = []
for clip_annotation in dataset:
for sound_event in clip_annotation.sound_events:
is_target = targets.filter(sound_event)
if not is_target and exclude_non_target:
continue
if sound_event.sound_event.geometry is None:
continue
class_name = targets.encode_class(sound_event)
if class_name is None and exclude_generic:
continue
start_time, low_freq, end_time, high_freq = compute_bounds(
sound_event.sound_event.geometry
)
sound_events.append(
{
"clip_annotation_id": str(clip_annotation.uuid),
"sound_event_id": str(sound_event.uuid),
"recording_id": str(
sound_event.sound_event.recording.uuid
),
"start_time": start_time,
"end_time": end_time,
"low_freq": low_freq,
"high_freq": high_freq,
"is_target": is_target,
"class_name": class_name,
}
)
return pd.DataFrame(sound_events)
def compute_class_summary(
dataset: Dataset,
targets: TargetProtocol,
) -> pd.DataFrame:
"""Compute a summary of sound event statistics grouped by class.
This function generates a high-level summary DataFrame that provides
key metrics for each class identified in the dataset. It calculates
the total number of calls, the number of unique recordings containing
each class, the total duration of those recordings, and the call rate.
Parameters
----------
dataset : List[data.ClipAnnotation]
The dataset to be summarized.
targets : TargetProtocol
An object providing the classification logic for sound events.
Returns
-------
pd.DataFrame
A DataFrame indexed by class name, with columns for 'num calls',
'num recordings', 'duration', and 'call_rate'.
"""
sound_events = extract_sound_events_df(
dataset,
targets,
exclude_generic=True,
exclude_non_target=True,
)
recordings = extract_recordings_df(dataset)
num_calls = (
sound_events.groupby("class_name")
.size()
.sort_values(ascending=False)
.rename("num calls")
)
num_recs = (
sound_events.groupby("class_name")["clip_annotation_id"]
.nunique()
.sort_values(ascending=False)
.rename("num recordings")
)
durations = (
sound_events.groupby("class_name")
.apply(
lambda group: recordings[
recordings["clip_annotation_id"].isin(
group["clip_annotation_id"] # type: ignore
)
]["duration"].sum(),
include_groups=False, # type: ignore
)
.sort_values(ascending=False)
.rename("duration")
)
return (
num_calls.to_frame()
.join(num_recs)
.join(durations)
.sort_values("num calls", ascending=False)
.assign(call_rate=lambda df: df["num calls"] / df["duration"])
)

View File

@ -91,7 +91,7 @@ def plot_class_examples(
):
fig = plt.figure(figsize=(20, 20))
for index, match in enumerate(true_positives):
for index, match in enumerate(true_positives[:n_examples]):
ax = plt.subplot(4, n_examples, index + 1)
try:
plotting.plot_true_positive_match(
@ -103,7 +103,7 @@ def plot_class_examples(
except ValueError:
continue
for index, match in enumerate(false_positives):
for index, match in enumerate(false_positives[:n_examples]):
ax = plt.subplot(4, n_examples, n_examples + index + 1)
try:
plotting.plot_false_positive_match(
@ -115,7 +115,7 @@ def plot_class_examples(
except ValueError:
continue
for index, match in enumerate(false_negatives):
for index, match in enumerate(false_negatives[:n_examples]):
ax = plt.subplot(4, n_examples, 2 * n_examples + index + 1)
try:
plotting.plot_false_negative_match(
@ -127,7 +127,7 @@ def plot_class_examples(
except ValueError:
continue
for index, match in enumerate(cross_triggers):
for index, match in enumerate(cross_triggers[:n_examples]):
ax = plt.subplot(4, n_examples, 4 * n_examples + index + 1)
try:
plotting.plot_cross_trigger_match(

View File

@ -45,7 +45,6 @@ data_source = data.Term(
),
)
call_type = data.Term(
name="soundevent:call_type",
label="Call Type",
@ -79,6 +78,24 @@ generic_class = data.Term(
)
"""Generic term representing a classification model's output class label."""
terms.register_term_set(
terms.TermSet(
terms=[
generic_class,
individual,
call_type,
data_source,
],
aliases={
"class": generic_class.name,
"individual": individual.name,
"event": call_type.name,
"source": data_source.name,
},
),
override_existing=True,
)
class TermRegistry(Mapping[str, data.Term]):
"""Manages a registry mapping unique keys to Term definitions.
@ -278,6 +295,11 @@ def get_term_from_key(
KeyError
If the key is not found in the specified registry.
"""
term = terms.get_term(key)
if term:
return term
term_registry = term_registry or default_term_registry
return term_registry.get_term(key)