U
    Y+dv                     @   sJ  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZmZ ddlmZmZ ddlmZ d	d
lmZ dZdeeeZejdkrddlZddlZddlZejZdZ ej!Z"e# Z$dd Z%dd Z&nBejdkrddl'Z'ddl'mZm"Z"m Z  dd Z%dd Z&ne(dG dd dej)Z)G dd dej*Z*dS )a=	  File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
    N)Empty)	monotonic)ChannelError)bytes_to_strstr_to_bytes)dumpsloads)cached_property   )virtual)r
   r   r   .ntc                 C   s$   t |  }t ||ddt dS )Create file lock.r         N)	win32file_get_osfhandlefilenoZ
LockFileEx__overlapped)fileflagshfile r   >/tmp/pip-unpacked-wheel-rdvd7wfc/kombu/transport/filesystem.pylocky   s    r   c                 C   s"   t |  }t |ddt dS )Remove file lock.r   r   N)r   r   r   ZUnlockFileExr   )r   r   r   r   r   unlock~   s    r   posix)LOCK_EXLOCK_NBLOCK_SHc                 C   s   t |  | dS )r   N)fcntlflockr   )r   r   r   r   r   r      s    c                 C   s   t |  t j dS )r   N)r    r!   r   ZLOCK_UN)r   r   r   r   r      s    z9Filesystem plugin only defined for NT and POSIX platformsc                   @   sl   e Zd ZdZdd Zdd Zdd Zdd	 Zed
d Z	e
dd Ze
dd Ze
dd Ze
dd ZdS )ChannelzFilesystem Channel.c                 K   s   d ttt d t |}tj| j	|}zTz*t|d}t|t |tt| W n$ tk
r   td|dY nX W 5 t
| |  X dS )zPut `message` onto `queue`.z{}_{}.{}.msgi  wbzCannot add file z to directoryN)formatintroundr   uuidZuuid4ospathjoindata_folder_outr   closeopenr   r   writer   r   OSErrorr   )selfqueuepayloadkwargsfilenamefr   r   r   _put   s     


zChannel._putc                 C   s  d| d }t | j}t|}t|dkr|d}||dk rFq | jrT| j}nt	
 }ztt j| j|| W n tk
r   Y nX t j||}z.t|d}| }|  | jst | W n$ tk
r   td|dY nX tt|S t dS )zGet next message from `queue`.r   .msgr   rbzCannot read file z from queue.N)r(   listdirdata_folder_insortedlenpopfindstore_processedprocessed_foldertempfile
gettempdirshutilmover)   r*   r/   r-   readr,   remover   r   r   r   )r0   r1   
queue_findfolderr4   r@   r5   r2   r   r   r   _get   s:    



zChannel._getc                 C   s   d}d| d }t | j}t|dkr| }z8||dk rDW qt j| j|}t | |d7 }W q t	k
r|   Y qX q|S )z!Remove all messages from `queue`.r   r   r7   r
   )
r(   r9   r:   r<   r=   r>   r)   r*   rF   r/   r0   r1   countrG   rH   r4   r   r   r   _purge   s    
zChannel._purgec                 C   sN   d}d| d}t | j}t|dkrJ| }||dk r@q|d7 }q|S )z<Return the number of messages in `queue` as an :class:`int`.r   r   r7   r
   )r(   r9   r:   r<   r=   r>   rJ   r   r   r   _size   s    
zChannel._sizec                 C   s
   | j jjS N)
connectionclienttransport_optionsr0   r   r   r   rQ      s    zChannel.transport_optionsc                 C   s   | j ddS )Nr:   Zdata_inrQ   getrR   r   r   r   r:      s    zChannel.data_folder_inc                 C   s   | j ddS )Nr+   Zdata_outrS   rR   r   r   r   r+     s    zChannel.data_folder_outc                 C   s   | j ddS )Nr?   FrS   rR   r   r   r   r?     s    zChannel.store_processedc                 C   s   | j ddS )Nr@   	processedrS   rR   r   r   r   r@   	  s    zChannel.processed_folderN)__name__
__module____qualname____doc__r6   rI   rL   rM   propertyrQ   r	   r:   r+   r?   r@   r   r   r   r   r"      s   '



r"   c                       s@   e Zd ZdZeZe ZdZdZ	dZ
 fddZdd Z  ZS )	TransportzFilesystem Transport.r   
filesystemc                    s   t  j|f| | j| _d S rN   )super__init__global_statestate)r0   rP   r3   	__class__r   r   r^     s    zTransport.__init__c                 C   s   dS )NzN/Ar   rR   r   r   r   driver_version  s    zTransport.driver_version)rV   rW   rX   rY   r"   r   ZBrokerStater_   default_portZdriver_typeZdriver_namer^   rc   __classcell__r   r   ra   r   r[     s   r[   )+rY   r(   rC   rA   r'   r1   r   timer   Zkombu.exceptionsr   Zkombu.utils.encodingr   r   Zkombu.utils.jsonr   r   Zkombu.utils.objectsr	    r   VERSIONr*   mapstr__version__nameZ
pywintypesZwin32conr   ZLOCKFILE_EXCLUSIVE_LOCKr   r   ZLOCKFILE_FAIL_IMMEDIATELYr   Z
OVERLAPPEDr   r   r   r    RuntimeErrorr"   r[   r   r   r   r   <module>   sB   Z


z