import re 
import json 
import torch
import random
import librosa
import torchaudio 
import numpy as np
import pandas as pd
from datasets import load_dataset, load_metric 
from IPython.display import display, HTML
from transformers import Trainer, Wav2Vec2CTCTokenizer, Wav2Vec2ForCTC, Wav2Vec2Processor, TrainingArguments, Wav2Vec2FeatureExtractor

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union


@dataclass
class DataCollatorCTCWithPadding:

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch


def show_random_elements(dataset, num_examples=5):
    assert num_examples <= len(dataset)
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

def remove_special_characters(batch):  
    batch["transcript"] = re.sub(chars_to_ignore_regex, '', batch["transcript"]).lower() + " "
    # batch["transcript"] = re.sub('[a-z]','',batch["transcript"])
    batch["transcript"] = re.sub('\r\n',' ',batch["transcript"])
    batch["transcript"] = re.sub('\n',' ',batch["transcript"])
    batch["transcript"] = re.sub('\r',' ',batch["transcript"])
    batch["transcript"] = re.sub("[إأٱآا]", "ا", batch["transcript"])
    noise = re.compile(""" ّ    | # Tashdid
                             َ    | # Fatha
                             ً    | # Tanwin Fath
                             ُ    | # Damma
                             ٌ    | # Tanwin Damm
                             ِ    | # Kasra
                             ٍ    | # Tanwin Kasr
                             ْ    | # Sukun
                             ـ     # Tatwil/Kashida
                         """, re.VERBOSE)
    batch["transcript"] = re.sub(noise, '', batch["transcript"])
    return batch

def extract_all_chars(batch):
  all_text = " ".join(batch["transcript"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = torchaudio.load("/var/www/html/Darija-Ai-Train/" + batch["path"])
    batch["speech"] = speech_array[0].numpy()
    batch["sampling_rate"] = sampling_rate
    batch["target_text"] = batch["transcript"]
    return batch

def resample(batch): 
    batch["speech"]  = librosa.resample(np.asarray(batch["speech"]), orig_sr=batch['sampling_rate'], target_sr=16_000)
    batch["sampling_rate"] = 16_000
    return batch

def prepare_dataset(batch):
    assert (len(set(batch["sampling_rate"])) == 1), f"Make sure all inputs have the same sampling rate of {processor.feature_extractor.sampling_rate}."

    batch["input_values"] = processor(batch["speech"], sampling_rate=batch["sampling_rate"][0]).input_values
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["target_text"]).input_ids
    return batch

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)
#     cer = cer_metric.compute(predictions=pred_str, references=label_str)
    
    return {"wer": wer}
#     return {"cer" : cer}

chars_to_ignore_regex = '[\,\؟\.\!\-\;\:\'\"\☭\«\»\؛\—\ـ\_\،\“\%\‘\”\�]'


train = load_dataset('csv', data_files='/var/www/html/Darija-Ai-Train/train.csv')
test = load_dataset('csv', data_files='/var/www/html/Darija-Ai-Train/test.csv')

darija_ai_train = train['train']
darija_ai_test = test['train']

print(darija_ai_train)
print(darija_ai_test)

show_random_elements(darija_ai_train.remove_columns(["path"]), num_examples=20)

darija_ai_train = darija_ai_train.map(remove_special_characters)
darija_ai_test = darija_ai_test.map(remove_special_characters)   

show_random_elements(darija_ai_train.remove_columns(["path"]), num_examples=20)

vocab_train = darija_ai_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=darija_ai_train.column_names)
vocab_test = darija_ai_test.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=darija_ai_test.column_names)

vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0]))
vocab_dict = {v: k for k, v in enumerate(vocab_list)}

print(vocab_dict)

vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)

print(len(vocab_dict))

with open("vocab.json", 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

tokenizer = Wav2Vec2CTCTokenizer("vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

darija_ai_train = darija_ai_train.map(speech_file_to_array_fn, remove_columns=darija_ai_train.column_names)
darija_ai_test = darija_ai_test.map(speech_file_to_array_fn, remove_columns=darija_ai_test.column_names)

rand_int = random.randint(0, len(darija_ai_train)-1)
print("Target text:", darija_ai_train[rand_int]["target_text"])
print("Input array shape:", np.asarray(darija_ai_train[rand_int]["speech"]).shape)
print("Sampling rate:", darija_ai_train[rand_int]["sampling_rate"])

# ipd.Audio(data=np.asarray(darija_ai_train[rand_int]["speech"]), autoplay=True, rate=darija_ai_train[rand_int]["sampling_rate"])

darija_ai_train = darija_ai_train.map(resample, num_proc=4)
darija_ai_test = darija_ai_test.map(resample, num_proc=4)

rand_int = random.randint(0, len(darija_ai_train)-1)
print("Target text:", darija_ai_train[rand_int]["target_text"])
print("Input array shape:", np.asarray(darija_ai_train[rand_int]["speech"]).shape)
print("Sampling rate:", darija_ai_train[rand_int]["sampling_rate"])

# ipd.Audio(data=np.asarray(darija_ai_train[rand_int]["speech"]), autoplay=True, rate=16000)

darija_ai_train = darija_ai_train.map(prepare_dataset, remove_columns=darija_ai_train.column_names, batch_size=8, num_proc=4, batched=True)
darija_ai_test = darija_ai_test.map(prepare_dataset, remove_columns=darija_ai_test.column_names, batch_size=8, num_proc=4, batched=True)

data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

wer_metric = load_metric("wer", cache_dir="/var/www/html/Darija-Ai-Train")

model = Wav2Vec2ForCTC.from_pretrained(
    "boumehdi/wav2vec2-large-xlsr-moroccan-darija", 
    attention_dropout=0.1,
    hidden_dropout=0.1,
    feat_proj_dropout=0.0,
    mask_time_prob=0.05,
    layerdrop=0.1,
    # gradient_checkpointing=True, 
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer)
)

model.freeze_feature_encoder()  # 

training_args = TrainingArguments(
  output_dir="/var/www/html/Darija-Ai-Train",
  group_by_length=True,
  per_device_train_batch_size=16,
  gradient_accumulation_steps=2,
  evaluation_strategy="steps",
  num_train_epochs=30,
  fp16=False,
  save_steps=400,
  eval_steps=400,
  logging_steps=400,
  learning_rate=3e-4,
  warmup_steps=500,
  save_total_limit=2,
)

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=darija_ai_train,
    eval_dataset=darija_ai_test,
    tokenizer=processor.feature_extractor,
)

trainer.train()


results = trainer.evaluate(darija_ai_test)
print("Results:", results)

# Save the fine-tuned model
trainer.save_model("/var/www/html/Darija-Ai-Train/fine_tuned_model")