mirror of
https://github.com/macaodha/batdetect2.git
synced 2025-06-30 15:12:06 +02:00
Compare commits
5 Commits
881d6c437b
...
4153030b3f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
4153030b3f | ||
![]() |
f62bc99ab2 | ||
![]() |
47dbdc79c2 | ||
![]() |
e10e270de4 | ||
![]() |
6af7fef316 |
@ -123,6 +123,8 @@ from batdetect2.utils.detector_utils import list_audio_files, load_model
|
||||
import audioread
|
||||
import os
|
||||
import soundfile as sf
|
||||
import requests
|
||||
import io
|
||||
|
||||
# Remove warnings from torch
|
||||
warnings.filterwarnings("ignore", category=UserWarning, module="torch")
|
||||
@ -279,6 +281,49 @@ def process_file(
|
||||
file_id
|
||||
)
|
||||
|
||||
def process_url(
|
||||
url: str,
|
||||
model: DetectionModel = MODEL,
|
||||
config: Optional[ProcessingConfiguration] = None,
|
||||
device: torch.device = DEVICE,
|
||||
file_id: str | None = None
|
||||
) -> du.RunResults:
|
||||
"""Process audio file with model.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url : str
|
||||
HTTP URL to load the audio data from
|
||||
model : DetectionModel, optional
|
||||
Detection model. Uses default model if not specified.
|
||||
config : Optional[ProcessingConfiguration], optional
|
||||
Processing configuration, by default None (uses default parameters).
|
||||
device : torch.device, optional
|
||||
Device to use, by default tries to use GPU if available.
|
||||
file_id: Optional[str],
|
||||
Give the data an id. Defaults to the URL
|
||||
"""
|
||||
if config is None:
|
||||
config = CONFIG
|
||||
|
||||
if file_id is None:
|
||||
file_id = url
|
||||
|
||||
response = requests.get(url)
|
||||
|
||||
# Raise exception on HTTP error
|
||||
response.raise_for_status()
|
||||
|
||||
# Retrieve body as raw bytes
|
||||
raw_audio_data = response.content
|
||||
|
||||
return du.process_file(
|
||||
io.BytesIO(raw_audio_data),
|
||||
model,
|
||||
config,
|
||||
device,
|
||||
file_id
|
||||
)
|
||||
|
||||
def process_spectrogram(
|
||||
spec: torch.Tensor,
|
||||
|
@ -9,6 +9,7 @@ import torch
|
||||
import audioread
|
||||
import os
|
||||
import soundfile as sf
|
||||
import io
|
||||
|
||||
from batdetect2.detector import parameters
|
||||
|
||||
@ -148,6 +149,9 @@ def get_samplerate(
|
||||
path: Union[
|
||||
str, int, os.PathLike[Any], sf.SoundFile, audioread.AudioFile, BinaryIO
|
||||
]):
|
||||
if isinstance(path, (BinaryIO, io.BytesIO)):
|
||||
path.seek(0)
|
||||
|
||||
with sf.SoundFile(path) as f:
|
||||
return f.samplerate
|
||||
|
||||
|
@ -33,8 +33,10 @@ from batdetect2.types import (
|
||||
|
||||
import audioread
|
||||
import os
|
||||
import io
|
||||
import soundfile as sf
|
||||
|
||||
import hashlib
|
||||
import uuid
|
||||
|
||||
__all__ = [
|
||||
"load_model",
|
||||
@ -832,7 +834,7 @@ def process_file(
|
||||
|
||||
_file_id = file_id
|
||||
if _file_id is None:
|
||||
_file_id = os.path.basename(path) if isinstance(path, str) else "unknown"
|
||||
_file_id = _generate_id(path)
|
||||
|
||||
# convert results to a dictionary in the right format
|
||||
results = convert_results(
|
||||
@ -856,6 +858,24 @@ def process_file(
|
||||
|
||||
return results
|
||||
|
||||
def _generate_id(path: Union[
|
||||
str, int, os.PathLike[Any], sf.SoundFile, audioread.AudioFile, BinaryIO
|
||||
]) -> str:
|
||||
""" Generate an id based on the path.
|
||||
|
||||
If the path is a str or PathLike it will parsed as the basename.
|
||||
This should ensure backwards compatibility with previous versions.
|
||||
"""
|
||||
if isinstance(path, str) or isinstance(path, os.PathLike):
|
||||
return os.path.basename(path)
|
||||
elif isinstance(path, (BinaryIO, io.BytesIO)):
|
||||
path.seek(0)
|
||||
md5 = hashlib.md5(path.read()).hexdigest()
|
||||
path.seek(0)
|
||||
return md5
|
||||
else:
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def summarize_results(results, predictions, config):
|
||||
"""Print summary of results."""
|
||||
|
@ -10,11 +10,13 @@ import torch
|
||||
from torch import nn
|
||||
|
||||
from batdetect2 import api
|
||||
import io
|
||||
|
||||
PKG_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
TEST_DATA_DIR = os.path.join(PKG_DIR, "example_data", "audio")
|
||||
TEST_DATA = glob(os.path.join(TEST_DATA_DIR, "*.wav"))
|
||||
|
||||
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
|
||||
|
||||
def test_load_model_with_default_params():
|
||||
"""Test loading model with default parameters."""
|
||||
@ -280,3 +282,28 @@ def test_process_file_with_empty_predictions_does_not_fail(
|
||||
|
||||
assert results is not None
|
||||
assert len(results["pred_dict"]["annotation"]) == 0
|
||||
|
||||
def test_process_file_file_id_defaults_to_basename():
|
||||
"""Test that no detections are made above the nyquist frequency."""
|
||||
# Recording donated by @@kdarras
|
||||
basename = "20230322_172000_selec2.wav"
|
||||
path = os.path.join(DATA_DIR, basename)
|
||||
|
||||
output = api.process_file(path)
|
||||
predictions = output["pred_dict"]
|
||||
id = predictions["id"]
|
||||
assert id == basename
|
||||
|
||||
def test_bytesio_file_id_defaults_to_md5():
|
||||
"""Test that no detections are made above the nyquist frequency."""
|
||||
# Recording donated by @@kdarras
|
||||
basename = "20230322_172000_selec2.wav"
|
||||
path = os.path.join(DATA_DIR, basename)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
data = io.BytesIO(f.read())
|
||||
|
||||
output = api.process_file(data)
|
||||
predictions = output["pred_dict"]
|
||||
id = predictions["id"]
|
||||
assert id == "7ade9ebf1a9fe5477ff3a2dc57001929"
|
||||
|
@ -7,7 +7,9 @@ from hypothesis import strategies as st
|
||||
from batdetect2.detector import parameters
|
||||
from batdetect2.utils import audio_utils, detector_utils
|
||||
import io
|
||||
import requests
|
||||
import os
|
||||
|
||||
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
|
||||
|
||||
@given(duration=st.floats(min_value=0.1, max_value=2))
|
||||
def test_can_compute_correct_spectrogram_width(duration: float):
|
||||
@ -144,3 +146,48 @@ def test_get_samplerate_using_bytesio():
|
||||
|
||||
expected_sample_rate = 500000
|
||||
assert expected_sample_rate == sample_rate
|
||||
|
||||
|
||||
|
||||
def test_load_audio_using_bytes():
|
||||
filename = "example_data/audio/20170701_213954-MYOMYS-LR_0_0.5.wav"
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
audio_bytes = io.BytesIO(f.read())
|
||||
|
||||
sample_rate, audio_data = audio_utils.load_audio(audio_bytes, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
|
||||
|
||||
expected_sample_rate, expected_audio_data = audio_utils.load_audio(filename, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
|
||||
|
||||
assert expected_sample_rate == sample_rate
|
||||
|
||||
assert np.array_equal(audio_data, expected_audio_data)
|
||||
|
||||
|
||||
|
||||
def test_get_samplerate_using_bytesio_2():
|
||||
basename = "20230322_172000_selec2.wav"
|
||||
path = os.path.join(DATA_DIR, basename)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
audio_bytes = io.BytesIO(f.read())
|
||||
|
||||
sample_rate = audio_utils.get_samplerate(audio_bytes)
|
||||
|
||||
expected_sample_rate = 192_000
|
||||
assert expected_sample_rate == sample_rate
|
||||
|
||||
def test_load_audio_using_bytes_2():
|
||||
basename = "20230322_172000_selec2.wav"
|
||||
path = os.path.join(DATA_DIR, basename)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
data = io.BytesIO(f.read())
|
||||
|
||||
sample_rate, audio_data = audio_utils.load_audio(data, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
|
||||
|
||||
expected_sample_rate, expected_audio_data = audio_utils.load_audio(path, time_exp_fact=1, target_samp_rate=parameters.TARGET_SAMPLERATE_HZ)
|
||||
|
||||
assert expected_sample_rate == sample_rate
|
||||
|
||||
assert np.array_equal(audio_data, expected_audio_data)
|
Loading…
Reference in New Issue
Block a user