U
    -e\6                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ ejje Z!erd dl"Z"eG d	d
 d
ej#Z$dee% dddZ&dee% dddZ'G dd deZ(G dd dej)Z*dS )    N)	dataclass)TYPE_CHECKINGIterableListOptionalTupleUnion)ArrowWriterParquetWriter)MAX_SHARD_SIZE)is_remote_filesystemrename)_BaseExamplesIterable)convert_file_size_to_intc                   @   s$   e Zd ZU dZdZeej ed< dS )SparkConfigzBuilderConfig for Spark.Nfeatures)	__name__
__module____qualname____doc__r   r   datasetsZFeatures__annotations__ r   r   f/var/www/html/Darija-Ai-Train/env/lib/python3.8/site-packages/datasets/packaged_modules/spark/spark.pyr      s   
r   pyspark.sql.DataFrame)dfnew_partition_orderc                 C   sP   |  dd|d  }|dd  D ]$}|  dd| }||}q&|S )N*z
part_id = r      )selectwhereunion)r   r   Zdf_combinedZpartition_idpartition_dfr   r   r   _reorder_dataframe_by_partition"   s
    r#   )r   partition_orderc                    s   dd l  fdd}|S )Nr   c                  3   s     djj d} t| }d}|jdd}d}|D ]J}| }|d }|d ||krj|}d}| d| |fV  |d7 }q<d S )	Nr   part_idr   T)ZprefetchPartitions_r   )	r   sql	functionsZspark_partition_idaliasr#   ZtoLocalIteratorasDictpop)Zdf_with_partition_idr"   Zrow_idrowsZcurr_partitionrowZrow_as_dictr%   r   r$   pysparkr   r   generate_fn0   s    

z0_generate_iterable_examples.<locals>.generate_fn)r0   )r   r$   r1   r   r/   r   _generate_iterable_examples*   s    r2   c                   @   s\   e Zd ZdddddZdd Zejjd dd	d
Ze	e	d dddZ
ee	dddZdS )SparkExamplesIterableNr   )r   c                 C   s0   || _ |pt| j j | _t| j | j| _d S N)r   rangerddgetNumPartitionsr$   r2   generate_examples_fn)selfr   r$   r   r   r   __init__E   s    zSparkExamplesIterable.__init__c                 c   s   |   E d H  d S r4   )r8   r9   r   r   r   __iter__N   s    zSparkExamplesIterable.__iter__)	generatorreturnc                 C   s,   t t| jj }|| t| j|dS N)r$   )listr5   r   r6   r7   shuffler3   )r9   r=   r$   r   r   r   shuffle_data_sourcesQ   s    
z*SparkExamplesIterable.shuffle_data_sources)	worker_idnum_workersr>   c                 C   s   |  ||}t| j|dS r?   )Zsplit_shard_indices_by_workerr3   r   )r9   rC   rD   r$   r   r   r   shard_data_sourcesV   s    z(SparkExamplesIterable.shard_data_sources)r>   c                 C   s
   t | jS r4   )lenr$   r;   r   r   r   n_shardsZ   s    zSparkExamplesIterable.n_shards)N)r   r   r   r:   r<   nprandom	GeneratorrB   intrE   propertyrG   r   r   r   r   r3   D   s    	r3   c                
       s   e Zd ZeZddeed fddZdd Zdd	 Ze	j
jjd
ddZdd Zeeeeeeeeeef f  dddZddeeeeef  ee dddZdedddZ  ZS )SparkNr   )r   	cache_dirworking_dirc                    sJ   dd l }|jjj | _|| _|| _t j	f |t
| j d| d S )Nr   )rN   Zconfig_name)r0   r(   ZSparkSessionZbuilderZgetOrCreate_sparkr   _working_dirsuperr:   strZsemanticHash)r9   r   rN   rO   Zconfig_kwargsr0   	__class__r   r   r:   b   s    zSpark.__init__c                    sp   | j   fdd}| jjdddr,d S | j rd| jjtdd|	 }t
j|d rdd S tdd S )	Nc                    s6   t j dd t j dt j }t|d |gS )NT)exist_okZfs_testa)osmakedirspathjoinuuiduuid4hexopen)contextZ
probe_filerN   r   r   create_cache_and_write_probe{   s    
z?Spark._validate_cache_dir.<locals>.create_cache_and_write_probezspark.master localr   r   ztWhen using Dataset.from_spark on a multi-node cluster, the driver and all workers should be able to access cache_dir)Z
_cache_dirrP   confget
startswithsparkContextparallelizer5   ZmapPartitionscollectrX   rZ   isfile
ValueError)r9   rb   Zprober   ra   r   _validate_cache_diru   s    
zSpark._validate_cache_dirc                 C   s   t j| jjdS )N)r   )r   ZDatasetInfoconfigr   r;   r   r   r   _info   s    zSpark._info)
dl_managerc                 C   s   t jt jjdgS )N)name)r   ZSplitGeneratorZSplitZTRAIN)r9   rp   r   r   r   _split_generators   s    zSpark._split_generatorsc           	      C   s   dd l }dd }| j }|dkr&|nd}| j|d|d|jj	d
d d j| }|| }||krt|t|| }| j|| _d S )	Nr   c                 s   s$   | D ]}t jd|jgiV  qd S )Nbatch_bytes)paRecordBatchZfrom_pydictnbytes)itbatchr   r   r   get_arrow_batch_size   s    z=Spark._repartition_df_if_needed.<locals>.get_arrow_batch_sized   r   zbatch_bytes: longrs   sample_bytes)r0   r   countlimitZrepartition
mapInArrowaggr(   r)   sumr*   rj   r{   minrK   )	r9   max_shard_sizer0   ry   Zdf_num_rowsZsample_num_rowsZapprox_bytes_per_rowZapprox_total_sizeZnew_num_partitionsr   r   r   _repartition_df_if_needed   s*    
 	zSpark._repartition_df_if_needed)fpathfile_formatr   r>   c              	   #   s  dd l |dkrtnt| jr6tj| jtjn|dk | jj	| j
| jj f	dd}| j|ddjjddjjd	d
jjd	djjdd }|D ] }|j|j|j|j|jffV  qd S )Nr   Zparquetc                 3   s     }t| d }|d kr@tjj|gdgdggdddgdS d}d|dd|d d	}tj|g}|	| | D ]}d k	r|j
kr| \}}|  tjj|g|g|ggdddgdV  |d
7 }|jd|dd|d d	}tj|g}|	| q|j
dkrn| \}}|  tjj|g|g|ggdddgdV  krttjD ]0}	tjtjtj|	}
t|	|
 qd S )Nr   task_idnum_examples	num_bytes)namesSSSSS05dTTTTT)r   rZ   writer_batch_sizestorage_optionsembed_local_filesr   )ZTaskContextZtaskAttemptIdnextrt   ru   Zfrom_arraysreplaceTableZfrom_batchesZwrite_tableZ
_num_bytesfinalizecloseZ	_featuresrX   listdirrZ   dirnamer[   basenameshutilmove)rw   r   Zfirst_batchshard_idwritertablerx   r   r   filedest	r   r   r   r   r0   r   Zworking_fpathr   Zwriter_classr   r   write_arrow   s\    


z0Spark._prepare_split_single.<locals>.write_arrowz2task_id: long, num_examples: long, num_bytes: longr   r   total_num_examplesr   total_num_bytes
num_shardsshard_lengths)r0   r
   r	   rQ   rX   rZ   r[   r   rn   r   Z_writer_batch_size_fsr   r   r~   ZgroupByr   r(   r)   r   r*   r|   Zcollect_listrj   r   r   r   r   r   )r9   r   r   r   r   statsr.   r   r   r   _prepare_split_single   s&    "5zSpark._prepare_split_singlearrowzdatasets.SplitGenerator)split_generatorr   r   num_procc                    s  |    t|pt}| | t| j }|r6tjjnt	j}d}| j
 d|j
 | d| }	|| j|	d}
d}dg }g }| ||D ]L\}}|\}}}}|dkr|
|7 }
||7 }|7 |||f || q|
|j_||j_td d dkr||j_| jtttdfd	d
 g }d}tt|D ]:}|| \}}t|D ]}||||g |d7 }qXq@| jj|t| fdd  n<d}|d d }| d|dd|d|d d S )Nz-TTTTT-SSSSS-of-NNNNN-.r   z	Renaming z shards.r   r   r   global_shard_idc                    s@   t  d|dd| d d|ddd d S )Nr   r   r   zTTTTT-SSSSSZNNNNN)r   r   r   )r   fstotal_shardsr   r   _rename_shard=  s
    z+Spark._prepare_split.<locals>._rename_shardc                    s    |  S r4   r   )args)r   r   r   <lambda>O      z&Spark._prepare_split.<locals>.<lambda>r   r   r   rc   )rm   r   r   r   r   r   rX   rZ   r[   	posixpathrq   Z_output_dirr   appendextendZ
split_infor   r   loggerdebugr   rK   r5   rF   rP   rh   ri   maprj   Z_renamer   )r9   r   r   r   r   kwargsis_localZ	path_joinZSUFFIXfnamer   r   Ztask_id_and_num_shardsZall_shard_lengthsr   contentr   r   r   r   r   r   ir   r   )r   r   r   r   r   _prepare_split	  sd    

(
zSpark._prepare_split)r   r>   c                 C   s
   t | jS r4   )r3   r   )r9   r   r   r   r    _get_examples_iterable_for_splitY  s    z&Spark._get_examples_iterable_for_split)NN)r   NN)r   r   r   r   ZBUILDER_CONFIG_CLASSrS   r:   rm   ro   r   downloadZdownload_managerZDownloadManagerrr   r   rK   r   r   boolr   tupler   r   r   r3   r   __classcell__r   r   rT   r   rM   _   s:     !W   RrM   )+rX   r   r\   dataclassesr   typingr   r   r   r   r   r   numpyrH   Zpyarrowrt   r   Zdatasets.arrow_writerr	   r
   Zdatasets.configr   Zdatasets.filesystemsr   r   Zdatasets.iterable_datasetr   Zdatasets.utils.py_utilsr   utilsloggingZ
get_loggerr   r   r0   ZBuilderConfigr   rK   r#   r2   r3   ZDatasetBuilderrM   r   r   r   r   <module>   s.    	