U
    ‰dk  ã                   @   s”   d dl mZmZ d dlmZ d dlZe e¡ZG dd„ dej	ƒZ
G dd„ dejƒZG dd	„ d	ejƒZG d
d„ deƒZddd„Zddd„Zdd„ ZdS )é    )ÚcoreÚdataio)Ú	TaskGroupNc                   @   s.   e Zd Zddd„Zdd„ Zdd„ Zdd	„ Zd
S )Ú_QueueReaderé   c                 C   s4   |j d k	stdƒ‚tj | |  ¡ ¡ || _|| _d S )Nz.Queue needs a schema in order to be read from.)ÚschemaÚAssertionErrorr   ÚReaderÚ__init__Ú_wrapperÚ_num_dequeue_records)ÚselfÚwrapperÚnum_dequeue_records© r   ú</tmp/pip-unpacked-wheel-ua33x9lu/caffe2/python/queue_util.pyr
      s    ÿz_QueueReader.__init__c                 C   s   |  | j ¡ gd¡ d S ©Nr   ©ÚCloseBlobsQueuer   Úqueue©r   Zinit_netZexit_netr   r   r   Úsetup_ex   s    z_QueueReader.setup_exc                 C   sT   | j  |¡ t d¡}t|| j  ¡ t|  ¡  ¡ ƒ|  ¡  ¡ | j	d\}}|g||fS )NÚdequeue)Úfield_namesÚnum_records)
r   Z_new_readerr   ÚNetr   r   Úlenr   r   r   )r   Úlocal_init_netÚlocal_finish_netZdequeue_netÚfieldsÚstatus_blobr   r   r   Úread_ex   s    

û
z_QueueReader.read_exc                 C   s   |   |d ¡\}}}||fS ©N)r!   )r   ÚnetÚ_r   r   r   r   Úread'   s    z_QueueReader.readN)r   )Ú__name__Ú
__module__Ú__qualname__r
   r   r!   r%   r   r   r   r   r      s   
r   c                   @   s$   e Zd Zdd„ Zdd„ Zdd„ ZdS )Ú_QueueWriterc                 C   s
   || _ d S r"   )r   )r   r   r   r   r   r
   -   s    z_QueueWriter.__init__c                 C   s   |  | j ¡ gd¡ d S r   r   r   r   r   r   r   0   s    z_QueueWriter.setup_exc                 C   s6   | j  |  ¡ |¡ t d¡}t|| j  ¡ ||ƒ |gS )NÚenqueue)r   Z_new_writerr   r   r   r*   r   )r   r   r   r   ÚstatusZenqueue_netr   r   r   Úwrite_ex3   s    
z_QueueWriter.write_exN)r&   r'   r(   r
   r   r,   r   r   r   r   r)   ,   s   r)   c                   @   s.   e Zd Zddd„Zdd„ Zdd„ Zd	d
„ ZdS )ÚQueueWrapperNr   c                 C   s"   t j | |tj¡ || _|| _d S r"   )r   ÚPiper
   r   ZLOCAL_SETUPÚ_queuer   )r   Úhandlerr   r   r   r   r   r
   ;   s    zQueueWrapper.__init__c                 C   s   t | | jdS )N©r   )r   r   ©r   r   r   r   Úreader@   s     ÿzQueueWrapper.readerc                 C   s   t | ƒS r"   )r)   r2   r   r   r   ÚwriterD   s    zQueueWrapper.writerc                 C   s   | j S r"   )r/   r2   r   r   r   r   G   s    zQueueWrapper.queue)Nr   )r&   r'   r(   r
   r3   r4   r   r   r   r   r   r-   :   s   
r-   c                   @   s   e Zd Zddd„Zdd„ ZdS )	ÚQueueNr   r   c                 C   s<   t  |¡}| | d¡¡}tj| |||d || _d| _d S )Nr0   r1   F)r   r   ZAddExternalInputÚNextNamer-   r
   ÚcapacityÚ_setup_done)r   r7   r   Únamer   r#   Z
queue_blobr   r   r   r
   L   s    
   ÿzQueue.__init__c                 C   sB   | j stdƒ‚d| _|jg | jg| jt| j  ¡ ƒ| j  ¡ d d S )Nz"This queue does not have a schema.T)r7   Ú	num_blobsr   )Z_schemar   r8   ZCreateBlobsQueuer/   r7   r   r   )r   Zglobal_init_netr   r   r   ÚsetupV   s    ûzQueue.setup)Nr   r   )r&   r'   r(   r
   r;   r   r   r   r   r5   K   s     ÿ

r5   c                 C   st   |d kr|   d¡}g }|D ]8}||kr2| |¡ qt d |¡¡ | |  |¡¡ q|  |g| ||g ¡}|d S )Nr+   zNeed to copy blob {} to enqueueéÿÿÿÿ)r6   ÚappendÚloggerÚwarningÚformatZCopyZSafeEnqueueBlobs)r#   r   Z
data_blobsr+   Zqueue_blobsZblobÚresultsr   r   r   r*   a   s    
r*   r   c           	         s„   |d k	r,t |ƒ|kst‚‡ fdd„|D ƒ}n‡ fdd„t|ƒD ƒ}|d krTˆ  d¡}ˆ j|||g |d}t|ƒ}| d¡}||fS )Nc                    s   g | ]}ˆ   |¡‘qS r   ©r6   )Ú.0r9   ©r#   r   r   Ú
<listcomp>u   s     zdequeue.<locals>.<listcomp>c                    s   g | ]}ˆ   d |¡‘qS )ÚdatarB   )rC   ÚirD   r   r   rE   w   s     r+   )r   r<   )r   r   Úranger6   ZSafeDequeueBlobsÚlistÚpop)	r#   r   r:   r+   r   r   Z
data_namesrA   r    r   rD   r   r   q   s    
  ÿ
r   c                 G   sN   t  d¡}|D ]}| |gd¡ qt  dt|ƒ |¡}t  dt|ƒ | |g¡S )NZclose_queue_netr   z%s_stepz%s_wraper_step)r   r   r   Zexecution_stepÚstr)ÚstepZqueuesZ	close_netr   Z
close_stepr   r   r   Úclose_queue   s    

þrM   )N)NNr   )Zcaffe2.pythonr   r   Zcaffe2.python.taskr   ÚloggingÚ	getLoggerr&   r>   r	   r   ÚWriterr)   r.   r-   r5   r*   r   rM   r   r   r   r   Ú<module>   s   

  ÿ
