U
    -ea                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	Z
ddlZddlmZ zddlmZ W n ek
r|   dZY nX dd	lmZ d
d Zdd Zdd Zdd ZdddZdd ZG dd dZG dd dZdd ZdS )zTF-specific utils import.    N)partial)ceil)uuid4)get_context)SharedMemory   )configc                    s   t | tr| S tjrdd l}ntd| d }i }| D ]x\ }t |tjrlt	 fdd| D | < q:t ||j
r|	 fdd| D | < q:t fdd| D | < q:|S )Nr   FCalled a Tensorflow-specific function but Tensorflow is not installed.c                    s   g | ]}|  qS  r
   .0fkr
   X/var/www/html/Darija-Ai-Train/env/lib/python3.8/site-packages/datasets/utils/tf_utils.py
<listcomp>0   s     z)minimal_tf_collate_fn.<locals>.<listcomp>c                    s   g | ]}|  qS r
   r
   r   r   r
   r   r   2   s     c                    s   g | ]}|  qS r
   r
   r   r   r
   r   r   4   s     )
isinstancedictr   TF_AVAILABLE
tensorflowImportErroritemsnpndarraystackZTensorarray)featurestffirstbatchvr
   r   r   minimal_tf_collate_fn$   s    

r!   c                 C   s&   t | }d|kr"|d |d< |d= |S )Nlabellabels)r!   )r   r   r
   r
   r   #minimal_tf_collate_fn_with_renaming8   s
    r$   c                 C   s:   t j| rt| jS t j| p8t j| p8t j| S N)patypesZis_listis_numeric_pa_typeZ
value_type
is_integerZis_floatingZ
is_decimal)Zpa_typer
   r
   r   r(   @   s    
r(   c                 C   s   ddl m}m}m} ddlm} t| |r4t| jS t| t	rJt| d S t| |r`t
|  jS t| |rtt
|  S t| |rdS dS d S )Nr   )
ClassLabelSequenceValue)_ArrayXDr   TF) r*   r+   r,   Zfeatures.featuresr-   r   is_numeric_featurefeaturelistr(   Zstorage_dtype)r0   r*   r+   r,   r-   r
   r
   r   r/   F   s    






r/   Fc                    s`  t | tjs|  } d}t | tjr6||    d}nVtt| dkrd|| d | d d   n(t | tjrz||   ntd	t
| d k	rfdd  D  |rtt  d } fd	d
t|D  | f| |r"i }	| D ](\}
}t |
 }||}||	|
< qn:g }	| D ],\}
}t |
 }||}|	| q.|	S )NTF   r   zUnexpected type for indices: {}c                    s&   i | ]\}}| ks|d kr||qS ))r"   Z	label_idsr#   r
   r   keyvalue)cols_to_retainr
   r   
<dictcomp>k   s
     z np_get_batch.<locals>.<dictcomp>c                    s"   g | ]  fd d  D qS )c                    s   i | ]\}}||  qS r
   r
   r4   ir
   r   r8   t   s      z+np_get_batch.<locals>.<listcomp>.<dictcomp>r   )r   )r   r9   r   r   t   s     z np_get_batch.<locals>.<listcomp>)r   r   r   numpyintegeritemalldiffRuntimeErrorformattyper   lenr1   valuesranger   Zastypeappend)indicesdatasetr7   
collate_fncollate_fn_argscolumns_to_np_typesreturn_dictZ
is_batchedZactual_sizeZ	out_batchcol
cast_dtyper   r
   )r   r7   r   np_get_batchX   s>    



rP   c	              	      sv  t jrddlntdtdr*jn4tjjdrDjjjnt	dkrZt
d dtt||| dd	fd
d  D jdjgd fdd}	jjt	}
|rdk	rjdjdjdd}fdd}|
||}
n|r"|
|
 }
|dk	r:|
j||d}
|
|	}
|dk	r^fdd}nfdd}|
|S )a  Create a tf.data.Dataset from the underlying Dataset. This is a single-process method - the multiprocess
    equivalent is multiprocess_dataset_to_tf.

    Args:
        dataset (`Dataset`): Dataset to wrap with tf.data.Dataset.
        cols_to_retain (`List[str]`): Dataset column(s) to load in the
            tf.data.Dataset. It is acceptable to include column names that are created by the `collate_fn` and
            that do not exist in the original dataset.
        collate_fn(`Callable`): A function or callable object (such as a `DataCollator`) that will collate
            lists of samples into a batch.
        collate_fn_args (`Dict`): A  `dict` of keyword arguments to be passed to the
            `collate_fn`. Can be empty.
        columns_to_np_types (`Dict[str, np.dtype]`): A `dict` mapping column names to numpy dtypes.
        output_signature (`Dict[str, tf.TensorSpec]`): A `dict` mapping column names to
            `tf.TensorSpec` objects.
        shuffle(`bool`): Shuffle the dataset order when loading. Recommended True for training, False for
            validation/evaluation.
        batch_size (`int`, default `None`): Size of batches to load from the dataset. Defaults to `None`, which implies that
            the dataset won't be batched, but the returned dataset can be batched later with `tf_dataset.batch(batch_size)`.
        drop_remainder(`bool`, default `None`): Drop the last incomplete batch when loading. If not provided,
            defaults to the same setting as shuffle.

    Returns:
        `tf.data.Dataset`
    r   Nr	   random_index_shuffleindex_shufflei zto_tf_dataset() can be memory-inefficient on versions of TensorFlow older than 2.9. If you are iterating over a dataset with a very large number of samples, consider upgrading to TF >= 2.9.F)rI   r7   rJ   rK   rL   rM   c                    s   g | ]} j |qS r
   )ZdtypesZas_dtype)r   dtype)r   r
   r   r      s     z!dataset_to_tf.<locals>.<listcomp>)Zinput_signaturec                    s,   j | gd  fddt D S )N)ZinpZToutc                    s   i | ]\}}| | qS r
   r
   )r   r:   r5   outputr
   r   r8      s      z9dataset_to_tf.<locals>.fetch_function.<locals>.<dictcomp>)Zpy_function	enumeratekeys)rH   )rL   	getter_fnr   toutrT   r   fetch_function   s    z%dataset_to_tf.<locals>.fetch_function   r3   )rS   )r6   c                    s@    | dkr"jjddjd} || t d d}| |fS )Nr3   r[   l            )shapemaxvalrS   r2   )indexseedZ	max_index)Z
reduce_allrandomuniformint64rD   )stater_   Zshuffled_index)rI   rQ   r   r
   r   scan_random_index   s    z(dataset_to_tf.<locals>.scan_random_index)drop_remainderc                    s    fdd|   D S )Nc                    s$   i | ]\}}| | | jqS r
   Zensure_shaper]   r   r5   valoutput_signaturer   r
   r   r8      s      8dataset_to_tf.<locals>.ensure_shapes.<locals>.<dictcomp>r;   Z
input_dictrj   r
   r   ensure_shapes   s    z$dataset_to_tf.<locals>.ensure_shapesc                    s    fdd|   D S )Nc              	      s,   i | ]$\}}| | | jd d qS )r2   Nrg   rh   rj   r
   r   r8      s      rl   r;   rm   rj   r
   r   rn      s    )r   r   r   r   hasattrrQ   ra   experimentalrR   rD   warningswarnr   rP   rE   functionZ
TensorSpecrc   dataDatasetrF   fillcastscanshuffleZcardinalityr   map)rI   r7   rJ   rK   rL   rk   ry   
batch_sizerf   rZ   
tf_datasetZ	base_seedre   rn   r
   )rL   rI   rX   rk   rQ   r   rY   r   dataset_to_tf   sL    $




r}   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )SharedMemoryContextc                 C   s   g | _ g | _d S r%   )created_shmsopened_shmsselfr
   r
   r   __init__   s    zSharedMemoryContext.__init__c                 C   s4   t t|||d}|r$| j| n| j| |S )N)sizenamecreate)r   intr   rG   r   )r   r   r   r   shmr
   r
   r   get_shm   s
    zSharedMemoryContext.get_shmc                 C   s4   | j |t|t|j |d}tj|||jdS )N)r   r   r   )rS   buffer)r   r   prodrS   itemsizer   buf)r   r   r]   rS   r   r   r
   r
   r   	get_array
  s    "zSharedMemoryContext.get_arrayc                 C   s   | S r%   r
   r   r
   r
   r   	__enter__  s    zSharedMemoryContext.__enter__c                 C   s4   | j D ]}|  |  q| jD ]}|  q"d S r%   )r   closeunlinkr   )r   exc_type	exc_value	tracebackr   r
   r
   r   __exit__  s
    


zSharedMemoryContext.__exit__N)__name__
__module____qualname__r   r   r   r   r   r
   r
   r
   r   r~      s
   
r~   c                   @   s<   e Zd Zdd Zdd Zdd Zedd Zed	d
 ZdS )NumpyMultiprocessingGeneratorc                    s~   | _ | _| _| _dd | D  _ fdd| D  _| _| _| _	|	 _
|
 _ fdd| D  _d S )Nc                 S   s$   g | ]\}}|t jt jfkr|qS r
   )r   Zunicode_Zstr_r   rN   rS   r
   r
   r   r   +  s      z:NumpyMultiprocessingGenerator.__init__.<locals>.<listcomp>c                    s*   i | ]"\}}|| j kr|ntd qS )U1)string_columnsr   rS   r   r   r
   r   r8   -  s    z:NumpyMultiprocessingGenerator.__init__.<locals>.<dictcomp>c                    s8   i | ]0\}}|| j kr$t|jjnt|jjd  qS r2   )r   r   r]   rank)r   rN   specr   r
   r   r8   7  s    )rI   r7   rJ   rK   r   r   rL   rk   ry   r{   rf   num_workerscolumns_to_ranks)r   rI   r7   rJ   rK   rL   rk   ry   r{   rf   r   r
   r   r   r     s     

z&NumpyMultiprocessingGenerator.__init__c                 #   s  t jtttjj }jjj|j	\}}}t
dg g }g }fddt|D }fddt|D }jjjjjjjd}	t ԉt|D ]tt }
d d|
 d d  fd	d
j D }|| | }|kr*|d k	r*|}nd }||| | d|	}jj|dd}|  || qd}|sft|D ]މ| jddstd|   | }tdd | D rd} qtt j  fdd
| D }dd
 | D }jD ].}|| d|| j d  !d||< qW 5 Q R X |V  | "  qqt|D ]}|#  qjW 5 Q R X d S )Nspawnc                    s   g | ]}   qS r
   Eventr   _ctxr
   r   r   G  s     z:NumpyMultiprocessingGenerator.__iter__.<locals>.<listcomp>c                    s   g | ]}   qS r
   r   r   r   r
   r   r   H  s     )rI   r7   rJ   rK   rL   r   r   Zdw_r   
   c              	      s4   i | ],\}}| j  d | d|ftjddqS )r   _shapeTr]   rS   r   r   r   rc   r   rN   r   shm_ctxworker_namer
   r   r8   Y  s    z:NumpyMultiprocessingGenerator.__iter__.<locals>.<dictcomp>)r   rH   extra_batcharray_ready_eventarray_loaded_eventT)targetkwargsdaemonF<   )timeoutzData loading worker timed out!c                 s   s   | ]}t |d k V  qdS )r   N)r   any)r   r]   r
   r
   r   	<genexpr>w  s     z9NumpyMultiprocessingGenerator.__iter__.<locals>.<genexpr>c              	      s8   i | ]0\}}| j   d | |j| ddqS )r   Fr   )r   rL   )r   rN   r]   )batch_shm_ctxr:   namesr   r
   r   r8     s    c                 S   s   i | ]\}}|t |qS r
   )r   copy)r   rN   Zarrr
   r
   r   r8     s      Ur3   )$minr   r   r   rD   rI   r{   distribute_batchesrf   ry   r   rF   r7   rJ   rK   rL   r   r   r~   strr   rG   r   Processworker_loopstartwaitTimeoutErrorclearr   rE   viewr]   squeezesetjoin)r   r   Zper_worker_batchesZfinal_batchZfinal_batch_workershape_arraysworkersZarray_ready_eventsZarray_loaded_eventsZ	base_argsZworker_random_idZworker_shape_arraysworker_indicesZfinal_batch_argZworker_kwargsZworkerZend_signal_receivedZarray_shapesZarraysZ
string_colr
   )r   r   r:   r   r   r   r   r   __iter__<  s         
	




"z&NumpyMultiprocessingGenerator.__iter__c                 C   s   | S r%   r
   r   r
   r
   r   __call__  s    z&NumpyMultiprocessingGenerator.__call__c              
      s   dt jd< tjrdd l}ntd|jg d  	
f
dd}t f
fdd	| D |D ]}|| qr|d k	r||  D ]\}}d
|d d < q	  W 5 Q R X d S )N3ZTF_CPP_MIN_LOG_LEVELr   r	   ZGPUc              	      s   t | dd}i }t } D ]r\}}|| }|krX|d|jd }|j| d d < |j	 d| |j|dd||< ||| d d < q*      	  W 5 Q R X d S )NT)rH   rI   r7   rJ   rK   rL   rM   r   )r3   r   r   )
rP   r~   r   r   reshaper]   r   r   r   r   )rH   r   Z
out_arraysr   rN   rO   r   )
r   r   rJ   rK   r7   rL   rI   r   r   r   r
   r   send_batch_to_parent  s4       
zGNumpyMultiprocessingGenerator.worker_loop.<locals>.send_batch_to_parentc              	      s4   i | ],\}}| j  d | d|ftjddqS )r   r   Fr   r   r   r   r
   r   r8     s    z=NumpyMultiprocessingGenerator.worker_loop.<locals>.<dictcomp>r3   )
osenvironr   r   r   r   Zset_visible_devicesr~   r   r   )rI   r7   rJ   rK   rL   r   r   rH   r   r   r   r   r   r   r   rN   r   r
   )r   r   rJ   rK   r7   rL   rI   r   r   r   r   r   r     s"    

!
z)NumpyMultiprocessingGenerator.worker_loopc                 C   s  t t| }|rt j| t|}|||  }t ||g\}}|sTt|dkrXd }|d|}t|}	|	|	|  }
t ||
g\}}|d||}t j||jd dd}dd |D }tt|D ]*}t j	|| || ddgdd||< q|d k	rt|}nd }|||fS )Nr   r3   r2   )Zaxisc                 S   s   g | ]}t |d qS r   )r   r   )r   r   r
   r
   r   r     s     zDNumpyMultiprocessingGenerator.distribute_batches.<locals>.<listcomp>)
r   ZarangerD   ra   ry   splitr   r]   rF   Zconcatenate)rI   r{   rf   r   ry   rH   Znum_samplesZincomplete_batch_cutoffZlast_incomplete_batchZnum_batchesZfinal_batches_cutoffZfinal_batchesZper_worker_indicesr:   Zincomplete_batch_worker_idxr
   r
   r   r     s*    (

z0NumpyMultiprocessingGenerator.distribute_batchesN)	r   r   r   r   r   r   staticmethodr   r   r
   r
   r
   r   r     s   "a
Gr   c
                 C   s   t jrddl}
ntdt| |||||||||	d
}|
jjj||d}|r\tt	| | }ntt
t	| | }||
jj|S )ao  Create a tf.data.Dataset from the underlying Dataset. This is a multi-process method - the single-process
    equivalent is dataset_to_tf.

    Args:
        dataset (`Dataset`): Dataset to wrap with tf.data.Dataset.
        cols_to_retain (`List[str]`): Dataset column(s) to load in the
            tf.data.Dataset. It is acceptable to include column names that are created by the `collate_fn` and
            that do not exist in the original dataset.
        collate_fn(`Callable`): A function or callable object (such as a `DataCollator`) that will collate
            lists of samples into a batch.
        collate_fn_args (`Dict`): A  `dict` of keyword arguments to be passed to the
            `collate_fn`. Can be empty.
        columns_to_np_types (`Dict[str, np.dtype]`): A `dict` mapping column names to numpy dtypes.
        output_signature (`Dict[str, tf.TensorSpec]`): A `dict` mapping column names to
            `tf.TensorSpec` objects.
        shuffle(`bool`): Shuffle the dataset order when loading. Recommended True for training, False for
            validation/evaluation.
        batch_size (`int`, default `None`): Size of batches to load from the dataset. Defaults to `None`, which implies that
            the dataset won't be batched, but the returned dataset can be batched later with `tf_dataset.batch(batch_size)`.
        drop_remainder(`bool`, default `None`): Drop the last incomplete batch when loading. If not provided,
            defaults to the same setting as shuffle.
        num_workers (`int`): Number of workers to use for loading the dataset. Should be >= 1.

    Returns:
        `tf.data.Dataset`
    r   Nr	   )
rI   r7   rJ   rK   rL   rk   ry   r{   rf   r   )rk   )r   r   r   r   r   rt   ru   Zfrom_generatorr   rD   r   applyrp   Zassert_cardinality)rI   r7   rJ   rK   rL   rk   ry   r{   rf   r   r   Zdata_generatorr|   Zdataset_lengthr
   r
   r   multiprocess_dataset_to_tf	  s(    &
r   )F)__doc__r   rq   	functoolsr   mathr   uuidr   r<   r   Zpyarrowr&   Zmultiprocessr   Zmultiprocess.shared_memoryr   r   r.   r   r!   r$   r(   r/   rP   r}   r~   r   r   r
   r
   r
   r   <module>   s0   
 
0q  q