Compare commits

...

4 Commits

Author SHA1 Message Date
Kavi
f62bc99ab2 Added api method to process a URL 2025-02-26 14:13:21 +01:00
Kavi
47dbdc79c2 Added tests for api and load_audio 2025-02-26 14:12:42 +01:00
Kavi
e10e270de4 Fix error in get_samplerate when reading io.BytesIO. 2025-02-26 14:12:09 +01:00
Kavi
6af7fef316 Fix 'unknown' id by providing a _generate_id() function. 2025-02-26 14:11:11 +01:00
5 changed files with 147 additions and 4 deletions

View File

@ -123,6 +123,8 @@ from batdetect2.utils.detector_utils import list_audio_files, load_model
import audioread import audioread
import os import os
import soundfile as sf import soundfile as sf
import requests
import io
# Remove warnings from torch # Remove warnings from torch
warnings.filterwarnings("ignore", category=UserWarning, module="torch") warnings.filterwarnings("ignore", category=UserWarning, module="torch")
@ -279,6 +281,49 @@ def process_file(
file_id file_id
) )
def process_url(
url: str,
model: DetectionModel = MODEL,
config: Optional[ProcessingConfiguration] = None,
device: torch.device = DEVICE,
file_id: str | None = None
) -> du.RunResults:
"""Process audio file with model.
Parameters
----------
url : str
HTTP URL to load the audio data from
model : DetectionModel, optional
Detection model. Uses default model if not specified.
config : Optional[ProcessingConfiguration], optional
Processing configuration, by default None (uses default parameters).
device : torch.device, optional
Device to use, by default tries to use GPU if available.
file_id: Optional[str],
Give the data an id. Defaults to the URL
"""
if config is None:
config = CONFIG
if file_id is None:
file_id = url
response = requests.get(url)
# Raise exception on HTTP error
response.raise_for_status()
# Retrieve body as raw bytes
raw_audio_data = response.content
return du.process_file(
io.BytesIO(raw_audio_data),
model,
config,
device,
file_id
)
def process_spectrogram( def process_spectrogram(
spec: torch.Tensor, spec: torch.Tensor,

View File

@ -9,6 +9,7 @@ import torch
import audioread import audioread
import os import os
import soundfile as sf import soundfile as sf
import io
from batdetect2.detector import parameters from batdetect2.detector import parameters
@ -148,6 +149,9 @@ def get_samplerate(
path: Union[ path: Union[
str, int, os.PathLike[Any], sf.SoundFile, audioread.AudioFile, BinaryIO str, int, os.PathLike[Any], sf.SoundFile, audioread.AudioFile, BinaryIO
]): ]):
if isinstance(path, (BinaryIO, io.BytesIO)):
path.seek(0)
with sf.SoundFile(path) as f: with sf.SoundFile(path) as f:
return f.samplerate return f.samplerate

View File

@ -33,8 +33,10 @@ from batdetect2.types import (
import audioread import audioread
import os import os
import io
import soundfile as sf import soundfile as sf
import hashlib
import uuid
__all__ = [ __all__ = [
"load_model", "load_model",
@ -832,7 +834,7 @@ def process_file(
_file_id = file_id _file_id = file_id
if _file_id is None: if _file_id is None:
_file_id = os.path.basename(path) if isinstance(path, str) else "unknown" _file_id = _generate_id(path)
# convert results to a dictionary in the right format # convert results to a dictionary in the right format
results = convert_results( results = convert_results(
@ -856,6 +858,24 @@ def process_file(
return results return results
def _generate_id(path: Union[
str, int, os.PathLike[Any], sf.SoundFile, audioread.AudioFile, BinaryIO
]) -> str:
""" Generate an id based on the path.
If the path is a str or PathLike it will parsed as the basename.
This should ensure backwards compatibility with previous versions.
"""
if isinstance(path, str) or isinstance(path, os.PathLike):
return os.path.basename(path)
elif isinstance(path, (BinaryIO, io.BytesIO)):
path.seek(0)
md5 = hashlib.md5(path.read()).hexdigest()
path.seek(0)
return md5
else:
return str(uuid.uuid4())
def summarize_results(results, predictions, config): def summarize_results(results, predictions, config):
"""Print summary of results.""" """Print summary of results."""

View File

@ -10,11 +10,13 @@ import torch
from torch import nn from torch import nn
from batdetect2 import api from batdetect2 import api
import io
PKG_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) PKG_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
TEST_DATA_DIR = os.path.join(PKG_DIR, "example_data", "audio") TEST_DATA_DIR = os.path.join(PKG_DIR, "example_data", "audio")
TEST_DATA = glob(os.path.join(TEST_DATA_DIR, "*.wav")) TEST_DATA = glob(os.path.join(TEST_DATA_DIR, "*.wav"))
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
def test_load_model_with_default_params(): def test_load_model_with_default_params():
"""Test loading model with default parameters.""" """Test loading model with default parameters."""
@ -280,3 +282,28 @@ def test_process_file_with_empty_predictions_does_not_fail(
assert results is not None assert results is not None
assert len(results["pred_dict"]["annotation"]) == 0 assert len(results["pred_dict"]["annotation"]) == 0
def test_process_file_file_id_defaults_to_basename():
"""Test that no detections are made above the nyquist frequency."""
# Recording donated by @@kdarras
basename = "20230322_172000_selec2.wav"
path = os.path.join(DATA_DIR, basename)
output = api.process_file(path)
predictions = output["pred_dict"]
id = predictions["id"]
assert id == basename
def test_bytesio_file_id_defaults_to_md5():
"""Test that no detections are made above the nyquist frequency."""
# Recording donated by @@kdarras
basename = "20230322_172000_selec2.wav"
path = os.path.join(DATA_DIR, basename)
with open(path, "rb") as f:
data = io.BytesIO(f.read())
output = api.process_file(data)
predictions = output["pred_dict"]
id = predictions["id"]
assert id == "7ade9ebf1a9fe5477ff3a2dc57001929"

View File

@ -7,7 +7,9 @@ from hypothesis import strategies as st
from batdetect2.detector import parameters from batdetect2.detector import parameters
from batdetect2.utils import audio_utils, detector_utils from batdetect2.utils import audio_utils, detector_utils
import io import io
import requests import os
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
@given(duration=st.floats(min_value=0.1, max_value=2)) @given(duration=st.floats(min_value=0.1, max_value=2))
def test_can_compute_correct_spectrogram_width(duration: float): def test_can_compute_correct_spectrogram_width(duration: float):
@ -144,3 +146,48 @@ def test_get_samplerate_using_bytesio():
expected_sample_rate = 500000 expected_sample_rate = 500000
assert expected_sample_rate == sample_rate assert expected_sample_rate == sample_rate
def test_load_audio_using_bytes():
filename = "example_data/audio/20170701_213954-MYOMYS-LR_0_0.5.wav"
with open(filename, "rb") as f:
audio_bytes = io.BytesIO(f.read())
sample_rate, audio_data = audio_utils.load_audio(audio_bytes, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
expected_sample_rate, expected_audio_data = audio_utils.load_audio(filename, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
assert expected_sample_rate == sample_rate
assert np.array_equal(audio_data, expected_audio_data)
def test_get_samplerate_using_bytesio_2():
basename = "20230322_172000_selec2.wav"
path = os.path.join(DATA_DIR, basename)
with open(path, "rb") as f:
audio_bytes = io.BytesIO(f.read())
sample_rate = audio_utils.get_samplerate(audio_bytes)
expected_sample_rate = 192_000
assert expected_sample_rate == sample_rate
def test_load_audio_using_bytes_2():
basename = "20230322_172000_selec2.wav"
path = os.path.join(DATA_DIR, basename)
with open(path, "rb") as f:
data = io.BytesIO(f.read())
sample_rate, audio_data = audio_utils.load_audio(data, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
expected_sample_rate, expected_audio_data = audio_utils.load_audio(path, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
assert expected_sample_rate == sample_rate
assert np.array_equal(audio_data, expected_audio_data)