mirror of
https://github.com/macaodha/batdetect2.git
synced 2025-06-29 14:41:58 +02:00
commit
c10903a646
@ -97,8 +97,9 @@ consult the API documentation in the code.
|
||||
|
||||
"""
|
||||
import warnings
|
||||
from typing import List, Optional, Tuple
|
||||
from typing import List, Optional, Tuple, BinaryIO, Any, Union
|
||||
|
||||
from .types import AudioPath
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
@ -120,6 +121,12 @@ from batdetect2.types import (
|
||||
)
|
||||
from batdetect2.utils.detector_utils import list_audio_files, load_model
|
||||
|
||||
import audioread
|
||||
import os
|
||||
import soundfile as sf
|
||||
import requests
|
||||
import io
|
||||
|
||||
# Remove warnings from torch
|
||||
warnings.filterwarnings("ignore", category=UserWarning, module="torch")
|
||||
|
||||
@ -238,34 +245,82 @@ def generate_spectrogram(
|
||||
|
||||
|
||||
def process_file(
|
||||
audio_file: str,
|
||||
path: AudioPath,
|
||||
model: DetectionModel = MODEL,
|
||||
config: Optional[ProcessingConfiguration] = None,
|
||||
device: torch.device = DEVICE,
|
||||
file_id: Optional[str] = None
|
||||
) -> du.RunResults:
|
||||
"""Process audio file with model.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
audio_file : str
|
||||
Path to audio file.
|
||||
path : AudioPath
|
||||
Path to audio data.
|
||||
model : DetectionModel, optional
|
||||
Detection model. Uses default model if not specified.
|
||||
config : Optional[ProcessingConfiguration], optional
|
||||
Processing configuration, by default None (uses default parameters).
|
||||
device : torch.device, optional
|
||||
Device to use, by default tries to use GPU if available.
|
||||
file_id: Optional[str],
|
||||
Give the data an id. If path is a string path to a file this can be ignored and
|
||||
the file_id will be the basename of the file.
|
||||
"""
|
||||
if config is None:
|
||||
config = CONFIG
|
||||
|
||||
return du.process_file(
|
||||
audio_file,
|
||||
path,
|
||||
model,
|
||||
config,
|
||||
device,
|
||||
file_id
|
||||
)
|
||||
|
||||
def process_url(
|
||||
url: str,
|
||||
model: DetectionModel = MODEL,
|
||||
config: Optional[ProcessingConfiguration] = None,
|
||||
device: torch.device = DEVICE,
|
||||
file_id: Optional[str] = None
|
||||
) -> du.RunResults:
|
||||
"""Process audio file with model.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url : str
|
||||
HTTP URL to load the audio data from
|
||||
model : DetectionModel, optional
|
||||
Detection model. Uses default model if not specified.
|
||||
config : Optional[ProcessingConfiguration], optional
|
||||
Processing configuration, by default None (uses default parameters).
|
||||
device : torch.device, optional
|
||||
Device to use, by default tries to use GPU if available.
|
||||
file_id: Optional[str],
|
||||
Give the data an id. Defaults to the URL
|
||||
"""
|
||||
if config is None:
|
||||
config = CONFIG
|
||||
|
||||
if file_id is None:
|
||||
file_id = url
|
||||
|
||||
response = requests.get(url)
|
||||
|
||||
# Raise exception on HTTP error
|
||||
response.raise_for_status()
|
||||
|
||||
# Retrieve body as raw bytes
|
||||
raw_audio_data = response.content
|
||||
|
||||
return du.process_file(
|
||||
io.BytesIO(raw_audio_data),
|
||||
model,
|
||||
config,
|
||||
device,
|
||||
file_id
|
||||
)
|
||||
|
||||
def process_spectrogram(
|
||||
spec: torch.Tensor,
|
||||
|
@ -1,6 +1,10 @@
|
||||
"""Types used in the code base."""
|
||||
|
||||
from typing import List, NamedTuple, Optional, Union
|
||||
from typing import List, NamedTuple, Optional, Union, Any, BinaryIO
|
||||
|
||||
import audioread
|
||||
import os
|
||||
import soundfile as sf
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
@ -40,6 +44,9 @@ __all__ = [
|
||||
"SpectrogramParameters",
|
||||
]
|
||||
|
||||
AudioPath = Union[
|
||||
str, int, os.PathLike[Any], sf.SoundFile, audioread.AudioFile, BinaryIO
|
||||
]
|
||||
|
||||
class SpectrogramParameters(TypedDict):
|
||||
"""Parameters for generating spectrograms."""
|
||||
|
@ -1,17 +1,24 @@
|
||||
import warnings
|
||||
from typing import Optional, Tuple
|
||||
from typing import Optional, Tuple, Union, Any, BinaryIO
|
||||
|
||||
from ..types import AudioPath
|
||||
|
||||
import librosa
|
||||
import librosa.core.spectrum
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
import audioread
|
||||
import os
|
||||
import soundfile as sf
|
||||
|
||||
from batdetect2.detector import parameters
|
||||
|
||||
from . import wavfile
|
||||
|
||||
__all__ = [
|
||||
"load_audio",
|
||||
"load_audio_and_samplerate",
|
||||
"generate_spectrogram",
|
||||
"pad_audio",
|
||||
]
|
||||
@ -140,9 +147,8 @@ def generate_spectrogram(
|
||||
|
||||
return spec, spec_for_viz
|
||||
|
||||
|
||||
def load_audio(
|
||||
audio_file: str,
|
||||
path: AudioPath,
|
||||
time_exp_fact: float,
|
||||
target_samp_rate: int,
|
||||
scale: bool = False,
|
||||
@ -154,7 +160,7 @@ def load_audio(
|
||||
Only mono files are supported.
|
||||
|
||||
Args:
|
||||
audio_file (str): Path to the audio file.
|
||||
path (string, int, pathlib.Path, soundfile.SoundFile, audioread object, or file-like object): path to the input file.
|
||||
target_samp_rate (int): Target sampling rate.
|
||||
scale (bool): Whether to scale the audio to [-1, 1].
|
||||
max_duration (float): Maximum duration of the audio in seconds.
|
||||
@ -166,12 +172,42 @@ def load_audio(
|
||||
Raises:
|
||||
ValueError: If the audio file is stereo.
|
||||
|
||||
"""
|
||||
sample_rate, audio_data, _ = load_audio_and_samplerate(path, time_exp_fact, target_samp_rate, scale, max_duration)
|
||||
return sample_rate, audio_data
|
||||
|
||||
def load_audio_and_samplerate(
|
||||
path: AudioPath,
|
||||
time_exp_fact: float,
|
||||
target_samp_rate: int,
|
||||
scale: bool = False,
|
||||
max_duration: Optional[float] = None,
|
||||
) -> Tuple[int, np.ndarray, Union[float, int]]:
|
||||
"""Load an audio file and resample it to the target sampling rate.
|
||||
|
||||
The audio is also scaled to [-1, 1] and clipped to the maximum duration.
|
||||
Only mono files are supported.
|
||||
|
||||
Args:
|
||||
path (string, int, pathlib.Path, soundfile.SoundFile, audioread object, or file-like object): path to the input file.
|
||||
target_samp_rate (int): Target sampling rate.
|
||||
scale (bool): Whether to scale the audio to [-1, 1].
|
||||
max_duration (float): Maximum duration of the audio in seconds.
|
||||
|
||||
Returns:
|
||||
sampling_rate: The sampling rate of the audio.
|
||||
audio_raw: The audio signal in a numpy array.
|
||||
file_sampling_rate: The original sampling rate of the audio
|
||||
|
||||
Raises:
|
||||
ValueError: If the audio file is stereo.
|
||||
|
||||
"""
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore", category=wavfile.WavFileWarning)
|
||||
# sampling_rate, audio_raw = wavfile.read(audio_file)
|
||||
audio_raw, sampling_rate = librosa.load(
|
||||
audio_file,
|
||||
audio_raw, file_sampling_rate = librosa.load(
|
||||
path,
|
||||
sr=None,
|
||||
dtype=np.float32,
|
||||
)
|
||||
@ -179,7 +215,7 @@ def load_audio(
|
||||
if len(audio_raw.shape) > 1:
|
||||
raise ValueError("Currently does not handle stereo files")
|
||||
|
||||
sampling_rate = sampling_rate * time_exp_fact
|
||||
sampling_rate = file_sampling_rate * time_exp_fact
|
||||
|
||||
# resample - need to do this after correcting for time expansion
|
||||
sampling_rate_old = sampling_rate
|
||||
@ -207,7 +243,7 @@ def load_audio(
|
||||
audio_raw = audio_raw - audio_raw.mean()
|
||||
audio_raw = audio_raw / (np.abs(audio_raw).max() + 10e-6)
|
||||
|
||||
return sampling_rate, audio_raw
|
||||
return sampling_rate, audio_raw, file_sampling_rate
|
||||
|
||||
|
||||
def compute_spectrogram_width(
|
||||
|
@ -1,8 +1,9 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Iterator, List, Optional, Tuple, Union
|
||||
from typing import Any, Iterator, List, Optional, Tuple, Union, BinaryIO
|
||||
|
||||
from ..types import AudioPath
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import torch
|
||||
@ -31,6 +32,13 @@ from batdetect2.types import (
|
||||
SpectrogramParameters,
|
||||
)
|
||||
|
||||
import audioread
|
||||
import os
|
||||
import io
|
||||
import soundfile as sf
|
||||
import hashlib
|
||||
import uuid
|
||||
|
||||
__all__ = [
|
||||
"load_model",
|
||||
"list_audio_files",
|
||||
@ -729,10 +737,11 @@ def process_audio_array(
|
||||
|
||||
|
||||
def process_file(
|
||||
audio_file: str,
|
||||
path: AudioPath,
|
||||
model: DetectionModel,
|
||||
config: ProcessingConfiguration,
|
||||
device: torch.device,
|
||||
file_id: Optional[str] = None
|
||||
) -> Union[RunResults, Any]:
|
||||
"""Process a single audio file with detection model.
|
||||
|
||||
@ -741,7 +750,7 @@ def process_file(
|
||||
|
||||
Parameters
|
||||
----------
|
||||
audio_file : str
|
||||
path : AudioPath
|
||||
Path to audio file.
|
||||
|
||||
model : torch.nn.Module
|
||||
@ -750,6 +759,9 @@ def process_file(
|
||||
config : ProcessingConfiguration
|
||||
Configuration for processing.
|
||||
|
||||
file_id: Optional[str],
|
||||
Give the data an id. Defaults to the filename if path is a string. Otherwise an md5 will be calculated from the binary data.
|
||||
|
||||
Returns
|
||||
-------
|
||||
results : Results or Any
|
||||
@ -762,19 +774,17 @@ def process_file(
|
||||
cnn_feats = []
|
||||
spec_slices = []
|
||||
|
||||
# Get original sampling rate
|
||||
file_samp_rate = librosa.get_samplerate(audio_file)
|
||||
orig_samp_rate = file_samp_rate * (config.get("time_expansion") or 1)
|
||||
|
||||
# load audio file
|
||||
sampling_rate, audio_full = au.load_audio(
|
||||
audio_file,
|
||||
sampling_rate, audio_full, file_samp_rate = au.load_audio_and_samplerate(
|
||||
path,
|
||||
time_exp_fact=config.get("time_expansion", 1) or 1,
|
||||
target_samp_rate=config["target_samp_rate"],
|
||||
scale=config["scale_raw_audio"],
|
||||
max_duration=config.get("max_duration"),
|
||||
)
|
||||
|
||||
orig_samp_rate = file_samp_rate * (config.get("time_expansion") or 1)
|
||||
|
||||
# loop through larger file and split into chunks
|
||||
# TODO: fix so that it overlaps correctly and takes care of
|
||||
# duplicate detections at borders
|
||||
@ -823,9 +833,13 @@ def process_file(
|
||||
spec_slices,
|
||||
)
|
||||
|
||||
_file_id = file_id
|
||||
if _file_id is None:
|
||||
_file_id = _generate_id(path)
|
||||
|
||||
# convert results to a dictionary in the right format
|
||||
results = convert_results(
|
||||
file_id=os.path.basename(audio_file),
|
||||
file_id=_file_id,
|
||||
time_exp=config.get("time_expansion", 1) or 1,
|
||||
duration=audio_full.shape[0] / float(sampling_rate),
|
||||
params=config,
|
||||
@ -845,6 +859,22 @@ def process_file(
|
||||
|
||||
return results
|
||||
|
||||
def _generate_id(path: AudioPath) -> str:
|
||||
""" Generate an id based on the path.
|
||||
|
||||
If the path is a str or PathLike it will parsed as the basename.
|
||||
This should ensure backwards compatibility with previous versions.
|
||||
"""
|
||||
if isinstance(path, str) or isinstance(path, os.PathLike):
|
||||
return os.path.basename(path)
|
||||
elif isinstance(path, (BinaryIO, io.BytesIO)):
|
||||
path.seek(0)
|
||||
md5 = hashlib.md5(path.read()).hexdigest()
|
||||
path.seek(0)
|
||||
return md5
|
||||
else:
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def summarize_results(results, predictions, config):
|
||||
"""Print summary of results."""
|
||||
|
@ -10,11 +10,13 @@ import torch
|
||||
from torch import nn
|
||||
|
||||
from batdetect2 import api
|
||||
import io
|
||||
|
||||
PKG_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
TEST_DATA_DIR = os.path.join(PKG_DIR, "example_data", "audio")
|
||||
TEST_DATA = glob(os.path.join(TEST_DATA_DIR, "*.wav"))
|
||||
|
||||
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
|
||||
|
||||
def test_load_model_with_default_params():
|
||||
"""Test loading model with default parameters."""
|
||||
@ -280,3 +282,28 @@ def test_process_file_with_empty_predictions_does_not_fail(
|
||||
|
||||
assert results is not None
|
||||
assert len(results["pred_dict"]["annotation"]) == 0
|
||||
|
||||
def test_process_file_file_id_defaults_to_basename():
|
||||
"""Test that process_file assigns basename as an id if no file_id is provided."""
|
||||
# Recording donated by @@kdarras
|
||||
basename = "20230322_172000_selec2.wav"
|
||||
path = os.path.join(DATA_DIR, basename)
|
||||
|
||||
output = api.process_file(path)
|
||||
predictions = output["pred_dict"]
|
||||
id = predictions["id"]
|
||||
assert id == basename
|
||||
|
||||
def test_bytesio_file_id_defaults_to_md5():
|
||||
"""Test that process_file assigns an md5 sum as an id if no file_id is provided when using binary data."""
|
||||
# Recording donated by @@kdarras
|
||||
basename = "20230322_172000_selec2.wav"
|
||||
path = os.path.join(DATA_DIR, basename)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
data = io.BytesIO(f.read())
|
||||
|
||||
output = api.process_file(data)
|
||||
predictions = output["pred_dict"]
|
||||
id = predictions["id"]
|
||||
assert id == "7ade9ebf1a9fe5477ff3a2dc57001929"
|
||||
|
@ -6,7 +6,10 @@ from hypothesis import strategies as st
|
||||
|
||||
from batdetect2.detector import parameters
|
||||
from batdetect2.utils import audio_utils, detector_utils
|
||||
import io
|
||||
import os
|
||||
|
||||
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
|
||||
|
||||
@given(duration=st.floats(min_value=0.1, max_value=2))
|
||||
def test_can_compute_correct_spectrogram_width(duration: float):
|
||||
@ -134,3 +137,20 @@ def test_pad_audio_with_fixed_width(duration: float, width: int):
|
||||
resize_factor=params["resize_factor"],
|
||||
)
|
||||
assert expected_width == width
|
||||
|
||||
|
||||
def test_load_audio_using_bytesio():
|
||||
basename = "20230322_172000_selec2.wav"
|
||||
path = os.path.join(DATA_DIR, basename)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
data = io.BytesIO(f.read())
|
||||
|
||||
sample_rate, audio_data, file_sample_rate = audio_utils.load_audio_and_samplerate(data, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
|
||||
|
||||
expected_sample_rate, expected_audio_data, exp_file_sample_rate = audio_utils.load_audio_and_samplerate(path, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
|
||||
|
||||
assert expected_sample_rate == sample_rate
|
||||
assert exp_file_sample_rate == file_sample_rate
|
||||
|
||||
assert np.array_equal(audio_data, expected_audio_data)
|
Loading…
Reference in New Issue
Block a user