import os
import re
import sys
from typing import BinaryIO, Optional, Tuple, Union

import torch
import torchaudio
from torchaudio.io import StreamWriter

from .backend import Backend
from .common import AudioMetaData

if torchaudio._extension._FFMPEG_EXT is not None:
    StreamReaderFileObj = torchaudio._extension._FFMPEG_EXT.StreamReaderFileObj
else:
    StreamReaderFileObj = object


def info_audio(
    src: str,
    format: Optional[str],
) -> AudioMetaData:
    i = torch.ops.torchaudio.compat_info(src, format)
    return AudioMetaData(i[0], i[1], i[2], i[3], i[4].upper())


def info_audio_fileobj(
    src,
    format: Optional[str],
    buffer_size: int = 4096,
) -> AudioMetaData:
    s = StreamReaderFileObj(src, format, None, buffer_size)
    i = s.find_best_audio_stream()
    sinfo = s.get_src_stream_info(i)
    if sinfo.num_frames == 0:
        waveform = _load_audio_fileobj(s)
        num_frames = waveform.size(1)
    else:
        num_frames = sinfo.num_frames
    return AudioMetaData(
        int(sinfo.sample_rate),
        num_frames,
        sinfo.num_channels,
        sinfo.bits_per_sample,
        sinfo.codec_name.upper(),
    )


def _get_load_filter(
    frame_offset: int = 0,
    num_frames: int = -1,
    convert: bool = True,
) -> Optional[str]:
    if frame_offset < 0:
        raise RuntimeError("Invalid argument: frame_offset must be non-negative. Found: {}".format(frame_offset))
    if num_frames == 0 or num_frames < -1:
        raise RuntimeError("Invalid argument: num_frames must be -1 or greater than 0. Found: {}".format(num_frames))

    # All default values -> no filter
    if frame_offset == 0 and num_frames == -1 and not convert:
        return None
    # Only convert
    aformat = "aformat=sample_fmts=fltp"
    if frame_offset == 0 and num_frames == -1 and convert:
        return aformat
    # At least one of frame_offset or num_frames has non-default value
    if num_frames > 0:
        atrim = "atrim=start_sample={}:end_sample={}".format(frame_offset, frame_offset + num_frames)
    else:
        atrim = "atrim=start_sample={}".format(frame_offset)
    if not convert:
        return atrim
    return "{},{}".format(atrim, aformat)


def _load_audio_fileobj(
    s: StreamReaderFileObj,
    filter: Optional[str] = None,
    channels_first: bool = True,
) -> torch.Tensor:
    i = s.find_best_audio_stream()
    s.add_audio_stream(i, -1, -1, filter, None, None)
    s.process_all_packets()
    chunk = s.pop_chunks()[0]
    if chunk is None:
        raise RuntimeError("Failed to decode audio.")
    waveform = chunk.frames
    return waveform.T if channels_first else waveform


def load_audio(
    src: str,
    frame_offset: int = 0,
    num_frames: int = -1,
    convert: bool = True,
    channels_first: bool = True,
    format: Optional[str] = None,
) -> Tuple[torch.Tensor, int]:
    filter = _get_load_filter(frame_offset, num_frames, convert)
    return torch.ops.torchaudio.compat_load(src, format, filter, channels_first)


def load_audio_fileobj(
    src: BinaryIO,
    frame_offset: int = 0,
    num_frames: int = -1,
    convert: bool = True,
    channels_first: bool = True,
    format: Optional[str] = None,
    buffer_size: int = 4096,
) -> Tuple[torch.Tensor, int]:
    demuxer = "ogg" if format == "vorbis" else format
    s = StreamReaderFileObj(src, demuxer, None, buffer_size)
    sample_rate = int(s.get_src_stream_info(s.find_best_audio_stream()).sample_rate)
    filter = _get_load_filter(frame_offset, num_frames, convert)
    waveform = _load_audio_fileobj(s, filter, channels_first)
    return waveform, sample_rate


def _get_sample_format(dtype: torch.dtype) -> str:
    dtype_to_format = {
        torch.uint8: "u8",
        torch.int16: "s16",
        torch.int32: "s32",
        torch.int64: "s64",
        torch.float32: "flt",
        torch.float64: "dbl",
    }
    format = dtype_to_format.get(dtype)
    if format is None:
        raise ValueError(f"No format found for dtype {dtype}; dtype must be one of {list(dtype_to_format.keys())}.")
    return format


def _native_endianness() -> str:
    if sys.byteorder == "little":
        return "le"
    else:
        return "be"


def _get_encoder_for_wav(encoding: str, bits_per_sample: int) -> str:
    if bits_per_sample not in {None, 8, 16, 24, 32, 64}:
        raise ValueError(f"Invalid bits_per_sample {bits_per_sample} for WAV encoding.")
    endianness = _native_endianness()
    if not encoding:
        if not bits_per_sample:
            # default to PCM S16
            return f"pcm_s16{endianness}"
        if bits_per_sample == 8:
            return "pcm_u8"
        return f"pcm_s{bits_per_sample}{endianness}"
    if encoding == "PCM_S":
        if not bits_per_sample:
            bits_per_sample = 16
        if bits_per_sample == 8:
            raise ValueError("For WAV signed PCM, 8-bit encoding is not supported.")
        return f"pcm_s{bits_per_sample}{endianness}"
    if encoding == "PCM_U":
        if bits_per_sample in (None, 8):
            return "pcm_u8"
        raise ValueError("For WAV unsigned PCM, only 8-bit encoding is supported.")
    if encoding == "PCM_F":
        if not bits_per_sample:
            bits_per_sample = 32
        if bits_per_sample in (32, 64):
            return f"pcm_f{bits_per_sample}{endianness}"
        raise ValueError("For WAV float PCM, only 32- and 64-bit encodings are supported.")
    if encoding == "ULAW":
        if bits_per_sample in (None, 8):
            return "pcm_mulaw"
        raise ValueError("For WAV PCM mu-law, only 8-bit encoding is supported.")
    if encoding == "ALAW":
        if bits_per_sample in (None, 8):
            return "pcm_alaw"
        raise ValueError("For WAV PCM A-law, only 8-bit encoding is supported.")
    raise ValueError(f"WAV encoding {encoding} is not supported.")


def _get_flac_sample_fmt(bps):
    if bps is None or bps == 16:
        return "s16"
    if bps == 24:
        return "s32"
    raise ValueError(f"FLAC only supports bits_per_sample values of 16 and 24 ({bps} specified).")


def _parse_save_args(
    ext: Optional[str],
    format: Optional[str],
    encoding: Optional[str],
    bps: Optional[int],
):
    # torchaudio's save function accepts the followings, which do not 1to1 map
    # to FFmpeg.
    #
    # - format: audio format
    # - bits_per_sample: encoder sample format
    # - encoding: such as PCM_U8.
    #
    # In FFmpeg, format is specified with the following three (and more)
    #
    # - muxer: could be audio format or container format.
    # the one we passed to the constructor of StreamWriter
    # - encoder: the audio encoder used to encode audio
    # - encoder sample format: the format used by encoder to encode audio.
    #
    # If encoder sample format is different from source sample format, StreamWriter
    # will insert a filter automatically.
    #
    def _type(spec):
        # either format is exactly the specified one
        # or extension matches to the spec AND there is no format override.
        return format == spec or (format is None and ext == spec)

    if _type("wav") or _type("amb"):
        # wav is special because it supports different encoding through encoders
        # each encoder only supports one encoder format
        #
        # amb format is a special case originated from libsox.
        # It is basically a WAV format, with slight modification.
        # https://github.com/chirlu/sox/commit/4a4ea33edbca5972a1ed8933cc3512c7302fa67a#diff-39171191a858add9df87f5f210a34a776ac2c026842ae6db6ce97f5e68836795
        # It is a format so that decoders will recognize it as ambisonic.
        # https://www.ambisonia.com/Members/mleese/file-format-for-b-format/
        # FFmpeg does not recognize amb because it is basically a WAV format.
        muxer = "wav"
        encoder = _get_encoder_for_wav(encoding, bps)
        sample_fmt = None
    elif _type("vorbis"):
        # FFpmeg does not recognize vorbis extension, while libsox used to do.
        # For the sake of bakward compatibility, (and the simplicity),
        # we support the case where users want to do save("foo.vorbis")
        muxer = "ogg"
        encoder = "vorbis"
        sample_fmt = None
    else:
        muxer = format
        encoder = None
        sample_fmt = None
        if _type("flac"):
            sample_fmt = _get_flac_sample_fmt(bps)
        if _type("ogg"):
            sample_fmt = _get_flac_sample_fmt(bps)
    return muxer, encoder, sample_fmt


def save_audio(
    uri: Union[BinaryIO, str, os.PathLike],
    src: torch.Tensor,
    sample_rate: int,
    channels_first: bool = True,
    format: Optional[str] = None,
    encoding: Optional[str] = None,
    bits_per_sample: Optional[int] = None,
    buffer_size: int = 4096,
) -> None:
    ext = None
    if hasattr(uri, "write"):
        if format is None:
            raise RuntimeError("'format' is required when saving to file object.")
    else:
        uri = os.path.normpath(uri)
        if tokens := str(uri).split(".")[1:]:
            ext = tokens[-1].lower()

    muxer, encoder, enc_fmt = _parse_save_args(ext, format, encoding, bits_per_sample)

    if channels_first:
        src = src.T

    s = StreamWriter(uri, format=muxer, buffer_size=buffer_size)
    s.add_audio_stream(
        sample_rate,
        num_channels=src.size(-1),
        format=_get_sample_format(src.dtype),
        encoder=encoder,
        encoder_format=enc_fmt,
    )
    with s.open():
        s.write_audio_chunk(0, src)


def _map_encoding(encoding: str) -> str:
    for dst in ["PCM_S", "PCM_U", "PCM_F"]:
        if dst in encoding:
            return dst
    if encoding == "PCM_MULAW":
        return "ULAW"
    elif encoding == "PCM_ALAW":
        return "ALAW"
    return encoding


def _get_bits_per_sample(encoding: str, bits_per_sample: int) -> str:
    if m := re.search(r"PCM_\w(\d+)\w*", encoding):
        return int(m.group(1))
    elif encoding in ["PCM_ALAW", "PCM_MULAW"]:
        return 8
    return bits_per_sample


class FFmpegBackend(Backend):
    @staticmethod
    def info(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str], buffer_size: int = 4096) -> AudioMetaData:
        if hasattr(uri, "read"):
            metadata = info_audio_fileobj(uri, format, buffer_size=buffer_size)
        else:
            metadata = info_audio(os.path.normpath(uri), format)
        metadata.bits_per_sample = _get_bits_per_sample(metadata.encoding, metadata.bits_per_sample)
        metadata.encoding = _map_encoding(metadata.encoding)
        return metadata

    @staticmethod
    def load(
        uri: Union[BinaryIO, str, os.PathLike],
        frame_offset: int = 0,
        num_frames: int = -1,
        normalize: bool = True,
        channels_first: bool = True,
        format: Optional[str] = None,
        buffer_size: int = 4096,
    ) -> Tuple[torch.Tensor, int]:
        if hasattr(uri, "read"):
            return load_audio_fileobj(
                uri,
                frame_offset,
                num_frames,
                normalize,
                channels_first,
                format,
                buffer_size,
            )
        else:
            return load_audio(os.path.normpath(uri), frame_offset, num_frames, normalize, channels_first, format)

    @staticmethod
    def save(
        uri: Union[BinaryIO, str, os.PathLike],
        src: torch.Tensor,
        sample_rate: int,
        channels_first: bool = True,
        format: Optional[str] = None,
        encoding: Optional[str] = None,
        bits_per_sample: Optional[int] = None,
        buffer_size: int = 4096,
    ) -> None:
        save_audio(
            uri,
            src,
            sample_rate,
            channels_first,
            format,
            encoding,
            bits_per_sample,
            buffer_size,
        )

    @staticmethod
    def can_decode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
        return True

    @staticmethod
    def can_encode(uri: Union[BinaryIO, str, os.PathLike], format: Optional[str]) -> bool:
        return True
