U
    -e                     @   s`   d dl Z d dlmZ d dlmZ ddlmZ dZeeZ	G dd deZ
dd
dZdddZdS )    N)FileSystems)Pipeline   )
get_loggeri    c                   @   s   e Zd ZdZdd ZdS )BeamPipelinez<Wrapper over `apache_beam.pipeline.Pipeline` for conveniencec                 C   s   | j  d}|dkS )Nrunner)NZDirectRunnerZPortableRunner)_optionsZget_all_optionsget)selfr    r   Z/var/www/html/Darija-Ai-Train/env/lib/python3.8/site-packages/datasets/utils/beam_utils.pyis_local   s    zBeamPipeline.is_localN)__name__
__module____qualname____doc__r   r   r   r   r   r      s   r   Fc              
   C   s   t }||r<|r&td| d ntd| d dS ||@}t| d*}|t}|rx|| |t}q^W 5 Q R X W 5 Q R X dS )zJUse the Beam Filesystems to upload to a remote directory on gcs/s3/hdfs...zRemote path already exist: &. Overwriting it as force_upload=True.$. Skipping it as force_upload=False.Nrb)	r   existsloggerinfocreateopenread
CHUNK_SIZEwrite)local_file_pathremote_file_pathZforce_uploadfsremote_file
local_filechunkr   r   r   upload_local_to_remote   s    


r#   c              
   C   s   t }tj|r>|r(td|  d ntd|  d dS || @}t|d*}|t}|rz|	| |t}q`W 5 Q R X W 5 Q R X dS )zNUse the Beam Filesystems to download from a remote directory on gcs/s3/hdfs...zLocal path already exist: r   r   Nwb)
r   ospathr   r   r   r   r   r   r   )r   r   Zforce_downloadr   r    r!   r"   r   r   r   download_remote_to_local&   s    

r'   )F)F)r%   Zapache_beam.io.filesystemsr   Zapache_beam.pipeliner   loggingr   r   r   r   r   r#   r'   r   r   r   r   <module>   s   
