from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
import pandas as pd 
import time, sys, glob, json, time, os
from datetime import datetime, timedelta
from functions import *
from importlib import reload
import logging
reload(logging)
import coloredlogs


table_name="2m"



logging.getLogger('sqlalchemy').setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
fh = logging.FileHandler('log_file.log')
fh.setLevel(logging.INFO)
formatter = coloredlogs.ColoredFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
coloredlogs.install(level='INFO')

with open("detected_add.json", "r+") as jsonFile: #insert all changed ts videos
    detected_add = json.load(jsonFile)

segment = str(sys.argv[1])
logger.info('the %s will be processed',segment)
if os.path.isfile(table_name+"/2m_hls/"+segment) and ".ts" in segment:

    #if level2['end']<current_time and data_stat['situation']

    print("segment => "+segment)
    #clip = VideoFileClip("2m/2m_hls/"+segment)
    #duration = with_ffprobe("2m/2m_hls/"+segment)
    duration = 3
    ts = time.time()
    ts = "."+str(ts).split('.')[0]
    now = str(segment).split('.')[0]

    home = os.path.expanduser("~")
    ffmpeg = 'ffmpeg'
    cwd = os.getcwd()

    # video = 25 frame par second

    cmd ="ffmpeg -y -i 2m/2m_hls/"+segment+" -vf 'select=gt(scene\,0.10)' -vsync vfr -frame_pts true 2m/iframe_live/"+now+"_%d_"+str(duration)+".png"
    #cmd = "ffmpeg -y -i 2m_hls/"+segment+" -vf 'select=gt(scene\,0.10)' -vsync vfr -frame_pts true iframes_live/"+now+str(ts)+"_%d_"+str(duration)+".png"
    # cmd = "ffmpeg -y -i 2m_hls/"+segment+" -vf \"select='eq(pict_type,PICT_TYPE_I)'\" -vsync vfr -frame_pts true iframes_live/"+now+str(ts)+"_%d_"+str(duration)+".png"
    #print(cmd)
    # python3 iframe_scanner.py $file
    os.system(cmd)


images=[iframe for iframe in glob.glob("2m/iframe_live/*.png") if segment.split('.')[0] in iframe]
iframes=[image.split('/')[-1] for image in images]
logger.info('the number of the iframes is %s', len(iframes))

ads=['ariel','glovo','hawai','inwi','maroc_telecom','orange','water']

all_ads=[file for folder in ads for file in glob.glob("ads/"+folder+"/*.png")]

for filename in iframes:
    for img in all_ads:
        # print(('image is',img))
        # print(('image is',filename))

        result= compare_image(img,"2m/iframe_live/"+filename)

        if result<30:
            logger.info('there is no result')

        else:

            logger.info('the %s and the %s are similar !!',filename,img)
            parent=img.split('/')[-2]
            if detected_add['name']==parent:
                logger.info('the %s is detected again',parent)

            else:

                logger.info('new add is detected which is %s',parent)
                detected_add['name']=parent
                jsonFile= open("detected_add.json", "w+")
                jsonFile.write(json.dumps(detected_add))
                jsonFile.close()

                current_time=datetime.now()
                sc = SparkContext(conf=SparkConf())
                spark = SparkSession(sc)
                df = spark.read.csv('ads_historic.csv',header=True)
                df=df.drop("_c0")
                data_new = pd.DataFrame({"add_name": [parent],"day":[str(current_time).split(' ')[0]],"time":[str(current_time).split(' ')[1].split('.')[0]],"date":[str(current_time)]})
                dataframe = spark.createDataFrame(data_new)
                df = df.unionAll(dataframe)
                df.toPandas().to_csv('ads_historic.csv', header= True)
                spark.stop()


        logger.info('preprocessing is done for %s',filename)
            


           

