U
    9%e<                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
Z
ddlmZmZ ddlmZmZ e dZG dd	 d	eZG d
d deZdS )    N)suppress)quote   )AbstractBufferedFileAbstractFileSystem)infer_storage_optionstokenizewebhdfsc                       s  e Zd ZdZee ZdZd= fdd	Z	e
dd	 Zd
d Zd>ddZd?ddZedd Zedd Zedd Zdd Zd@ddZdd Zdd  Zd!d" ZdAd#d$Zd%d& Zd'd( Zd)d* ZdBd+d,Zd-d. Zd/d0 ZdCd1d2Z d3d4 Z!dDd5d6Z"d7d8 Z#d9d: Z$d;d< Z%  Z&S )EWebHDFSa~  
    Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways.

    Three auth mechanisms are supported:

    insecure: no auth is done, and the user is assumed to be whoever they
        say they are (parameter ``user``), or a predefined value such as
        "dr.who" if not given
    spnego: when kerberos authentication is enabled, auth is negotiated by
        requests_kerberos https://github.com/requests/requests-kerberos .
        This establishes a session based on existing kinit login and/or
        specified principal/password; parameters are passed with ``kerb_kwargs``
    token: uses an existing Hadoop delegation token from another secured
        service. Indeed, this client can also generate such tokens when
        not insecure. Note that tokens expire, but can be renewed (by a
        previously specified user) and may allow for proxying.

    )r	   ZwebHDFS  FNc
                    s   | j r
dS t jf |
 dj|	r$dnd||d| _|| _|p>i | _i | _|pNi | _|dk	r||dk	sj|dk	rrt	d|| jd< |dk	r|| jd< |dk	r|| jd	< |r|dk	rt	d
| 
  dt|| | _dS )a  
        Parameters
        ----------
        host: str
            Name-node address
        port: int
            Port for webHDFS
        kerberos: bool
            Whether to authenticate with kerberos for this connection
        token: str or None
            If given, use this token on every call to authenticate. A user
            and user-proxy may be encoded in the token and should not be also
            given
        user: str or None
            If given, assert the user name to connect with
        proxy_to: str or None
            If given, the user has the authority to proxy, and this value is
            the user in who's name actions are taken
        kerb_kwargs: dict
            Any extra arguments for HTTPKerberosAuth, see
            `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_
        data_proxy: dict, callable or None
            If given, map data-node addresses. This can be necessary if the
            HDFS cluster is behind a proxy, running on Docker or otherwise has
            a mismatch between the host-names given by the name-node and the
            address by which to refer to them from the client. If a dict,
            maps host names ``host->data_proxy[host]``; if a callable, full
            URLs are passed, and function must conform to
            ``url->data_proxy(url)``.
        use_https: bool
            Whether to connect to the Name-node using HTTPS instead of HTTP
        kwargs
        Nz%{protocol}://{host}:{port}/webhdfs/v1httpshttp)protocolhostportz_If passing a delegation token, must not set user or proxy_to, as these are encoded in the tokenZ
delegationz	user.nameZdoaszJIf using Kerberos auth, do not specify the user, this is handled by kinit.Zwebhdfs_)_cachedsuper__init__formaturlkerbkerb_kwargsparsproxy
ValueError_connectr   _fsid)selfr   r   ZkerberostokenuserZproxy_tor   Z
data_proxyZ	use_httpskwargs	__class__ ]/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/fsspec/implementations/webhdfs.pyr   +   s8    .
  




zWebHDFS.__init__c                 C   s   | j S N)r   r   r#   r#   r$   fsidx   s    zWebHDFS.fsidc                 C   s0   t  | _| jr,ddlm} |f | j| j_d S )Nr   )HTTPKerberosAuth)requestsSessionsessionr   Zrequests_kerberosr(   r   auth)r   r(   r#   r#   r$   r   |   s    
zWebHDFS._connectgetTc              	   K   s   | j t|pd }| }|| j | |d< td|| | jj	| ||||d}	|	j
dkrz$|	 }
|
d d }|
d d }W n ttfk
r   Y n@X |d	krt|n,|d
krt|n|dkrt|nt||	  |	S )N opzsending %s with %s)methodr   paramsdataallow_redirects)i  i  i  i  i  ZRemoteExceptionmessage	exception)ZIllegalArgumentExceptionZUnsupportedOperationException)ZSecurityExceptionZAccessControlException)ZFileNotFoundException)r   r   copyupdater   upperloggerdebugr+   requeststatus_codejsonr   KeyErrorPermissionErrorFileNotFoundErrorRuntimeErrorraise_for_status)r   r/   r0   pathr2   redirectr    r   argsouterrmsgexpr#   r#   r$   _call   s8    



zWebHDFS._callrbc              
   K   s$   |p| j }t| |||| j|||dS )a^  

        Parameters
        ----------
        path: str
            File location
        mode: str
            'rb', 'wb', etc.
        block_size: int
            Client buffer size for read-ahead or write buffer
        autocommit: bool
            If False, writes to temporary file that only gets put in final
            location upon commit
        replication: int
            Number of copies of file on the cluster, write mode only
        permissions: str or int
            posix permissions, write mode only
        kwargs

        Returns
        -------
        WebHDFile instance
        )mode
block_sizetempdir
autocommitreplicationpermissions)	blocksize	WebHDFilerN   )r   rC   rL   rM   rO   rP   rQ   r    r#   r#   r$   _open   s    !
zWebHDFS._openc                 C   s    | d   | d< | d | d< | S )Ntypelengthsize)lower)infor#   r#   r$   _process_info   s    zWebHDFS._process_infoc                 C   s   t |d S )NrC   )r   )clsrC   r#   r#   r$   _strip_protocol   s    zWebHDFS._strip_protocolc                 C   s:   t | }|dd  |dd  d|kr6|d|d< |S )NrC   r   usernamer   )r   pop)ZurlpathrF   r#   r#   r$   _get_kwargs_from_urls   s    zWebHDFS._get_kwargs_from_urlsc                 C   s,   | j d|d}| d }||d< | |S )NZGETFILESTATUSrC   
FileStatusname)rJ   r=   rZ   )r   rC   rF   rY   r#   r#   r$   rY      s    zWebHDFS.infoc                 C   sv   | j d|d}| d d }|D ](}| | |dd |d  |d< q"|r`t|dd	 d
S tdd |D S d S )NZ
LISTSTATUSr`   ZFileStatusesra   /Z
pathSuffixrb   c                 S   s   | d S )Nrb   r#   )ir#   r#   r$   <lambda>       zWebHDFS.ls.<locals>.<lambda>)keyc                 s   s   | ]}|d  V  qdS )rb   Nr#   ).0rY   r#   r#   r$   	<genexpr>   s     zWebHDFS.ls.<locals>.<genexpr>)rJ   r=   rZ   rstripsorted)r   rC   detailrF   infosrY   r#   r#   r$   ls   s    
z
WebHDFS.lsc                 C   s   | j d|d}| d S )z8Total numbers of files, directories and bytes under pathZGETCONTENTSUMMARYr`   ZContentSummaryrJ   r=   )r   rC   rF   r#   r#   r$   content_summary   s    zWebHDFS.content_summaryc                 C   sb   | j d|dd}d|jkrJ| |jd }| j|}|  | d S |  | d S dS )z/Checksum info of file, giving method and resultZGETFILECHECKSUMF)rC   rD   LocationZFileChecksumN)rJ   headers_apply_proxyr+   r-   rB   r=   )r   rC   rF   locationout2r#   r#   r$   ukey   s    
zWebHDFS.ukeyc                 C   s   |  d}| d S )zGet user's home directoryZGETHOMEDIRECTORYPathro   )r   rF   r#   r#   r$   home_directory  s    
zWebHDFS.home_directoryc                 C   sB   |r| j d|d}n
|  d}| d }|dkr:td|d S )zRetrieve token which can give the same authority to other uses

        Parameters
        ----------
        renewer: str or None
            User who may use this token; if None, will be current user
        ZGETDELEGATIONTOKEN)renewerTokenNz1No token available for this user/security contextZ	urlString)rJ   r=   r   )r   ry   rF   tr#   r#   r$   get_delegation_token
  s    
zWebHDFS.get_delegation_tokenc                 C   s   | j dd|d}| d S )z/Make token live longer. Returns new expiry timeZRENEWDELEGATIONTOKENputr0   r   longro   )r   r   rF   r#   r#   r$   renew_delegation_token  s    zWebHDFS.renew_delegation_tokenc                 C   s   | j dd|d dS )z Stop the token from being usefulZCANCELDELEGATIONTOKENr}   r~   NrJ   )r   r   r#   r#   r$   cancel_delegation_token   s    zWebHDFS.cancel_delegation_tokenc                 C   s   | j dd||d dS )a  Set the permission at path

        Parameters
        ----------
        path: str
            location to set (file or directory)
        mod: str or int
            posix epresentation or permission, give as oct string, e.g, '777'
            or 0o777
        ZSETPERMISSIONr}   )r0   rC   Z
permissionNr   )r   rC   modr#   r#   r$   chmod$  s    zWebHDFS.chmodc                 C   s>   i }|dk	r||d< |dk	r$||d< | j dd|d| dS )zChange owning user and/or groupNownergroupSETOWNERr}   r0   rC   )r   r   )r   rC   r   r   r    r#   r#   r$   chown1  s    zWebHDFS.chownc                 C   s   | j d|d|d dS )a9  
        Set file replication factor

        Parameters
        ----------
        path: str
            File location (not for directories)
        replication: int
            Number of copies of file on the cluster. Should be smaller than
            number of data nodes; normally 3 on most systems.
        ZSETREPLICATIONr}   )rC   r0   rP   Nr   )r   rC   rP   r#   r#   r$   set_replication:  s    zWebHDFS.set_replicationc                 K   s   | j dd|d d S )NZMKDIRSr}   r   r   r   rC   r    r#   r#   r$   mkdirH  s    zWebHDFS.mkdirc                 C   s(   |dkr|  |rt|| | d S )NF)existsFileExistsErrorr   )r   rC   exist_okr#   r#   r$   makedirsK  s    zWebHDFS.makedirsc                 K   s   | j dd||d d S )NZRENAMEr}   )r0   rC   destinationr   )r   Zpath1Zpath2r    r#   r#   r$   mvP  s    z
WebHDFS.mvc                 K   s   | j dd||rdndd d S )NDELETEdeletetruefalse)r0   rC   	recursiver   )r   rC   r   r    r#   r#   r$   rmS  s    
z
WebHDFS.rmc                 K   s   |  | d S r%   )r   r   r#   r#   r$   rm_file[  s    zWebHDFS.rm_filec                 K   s   |  |}d| |dtd g}z4|  |d}t|| W 5 Q R X | || W n4 tk
r   t	t
 | | W 5 Q R X  Y nX W 5 Q R X d S )Nrc   z.tmp.   wb)openjoin_parentsecretsZ	token_hexshutilcopyfileobjr   BaseExceptionr   r@   r   )r   ZlpathZrpathr    ZlstreamZ	tmp_fnameZrstreamr#   r#   r$   cp_file^  s     
zWebHDFS.cp_filec                 C   sH   | j rt| j r|  |}n(| j rD| j  D ]\}}|||d}q,|S )N   )r   callableitemsreplace)r   rt   kvr#   r#   r$   rs   l  s    zWebHDFS._apply_proxy)r   FNNNNNF)r-   NNT)rK   NTNN)F)N)NN)F)F)'__name__
__module____qualname____doc__strtempfile
gettempdirrN   r   r   propertyr'   r   rJ   rT   staticmethodrZ   classmethodr\   r_   rY   rn   rp   rv   rx   r|   r   r   r   r   r   r   r   r   r   r   r   rs   __classcell__r#   r#   r!   r$   r
      s\           M

#     
-





	

r
   c                       sJ   e Zd ZdZ fddZdddZdd Zd	d
 Zdd Zdd Z	  Z
S )rS   z"A file living in HDFS over webHDFSc                    s   t  j||f| | }|dd d kr6|dd  |dd d krR|dd  |dd| _|d}|dddkr| j| _tj	|t
t | _d S )NrQ   rP   i  rN   rO   F)r   r   r6   r-   r^   rQ   rC   targetosr   r   uuiduuid4)r   fsrC   r    rN   r!   r#   r$   r   y  s    
zWebHDFile.__init__Fc                 C   s,   | j jj| j| j ddid}|  dS )zWrite one part of a multi-block file upload

        Parameters
        ==========
        final: bool
            This is the last block, so should complete file, if
            self.autocommit is True.
        content-typeapplication/octet-stream)r2   rr   T)r   r+   postrt   buffergetvaluerB   )r   finalrF   r#   r#   r$   _upload_chunk  s    	zWebHDFile._upload_chunkc                 C   s   | j  }d| jkrd\}}nd\}}d|d< | jj||| jfddi|}| j|jd }d	| jkr| jjj	|d
did}|
  | jjdd| jfddi|}| j|jd | _dS )zCreate remote file/uploada)APPENDPOST)ZCREATEPUTr   	overwriterD   Frq   wr   r   )rr   r   r   N)r    r6   rL   r   rJ   rC   rs   rr   r+   r}   rB   rt   )r   r    r/   r0   rF   rt   ru   r#   r#   r$   _initiate_upload  s    



 zWebHDFile._initiate_uploadc                 C   s   t |d}t| j|}||ks(|| jkr,dS | jjd| j||| dd}|  d|jkr|jd }| jj	| j
|}|jS |jS d S )Nr   rf   ZOPENF)rC   offsetrV   rD   rq   )maxminrW   r   rJ   rC   rB   rr   r+   r-   rs   content)r   startendrF   rt   ru   r#   r#   r$   _fetch_range  s"    
    

zWebHDFile._fetch_rangec                 C   s   | j | j| j d S r%   )r   r   rC   r   r&   r#   r#   r$   commit  s    zWebHDFile.commitc                 C   s   | j | j d S r%   )r   r   rC   r&   r#   r#   r$   discard  s    zWebHDFile.discard)F)r   r   r   r   r   r   r   r   r   r   r   r#   r#   r!   r$   rS   v  s   
rS   )loggingr   r   r   r   r   
contextlibr   urllib.parser   r)   specr   r   utilsr   r   	getLoggerr9   r
   rS   r#   r#   r#   r$   <module>   s   
  d