U
    dC                     @   s   d dl mZmZ d dlmZmZ d dlmZmZ d dl	m
Z
mZ d dlmZmZmZ G dd deZdZd	d
 Zd!ddZdd Zd"ddZd#ddZdd Zdd Zdd Zd$ddZG dd deZG dd  d eZdS )%    )core
queue_util)ReaderWriter)
NetBuilderops)	as_recordField)NodeTask	TaskGroupc                   @   s   e Zd ZdZdddZdS )Outputz
    Represents the result of a processor function. A processor can either
    return an Output, or it can return a record, in which case an Output will be
    created for it afterwards.
    Nc                 C   s|   t   }|d ks(t|dks(td|d kr4|}t|tjrF|g}|d krRg nt|| _	|d krhd nt
|| _|| _d S )Nr   z7Cannot both use `ops` syntax and return a list of nets.)r   currentgetlenAssertionError
isinstancer   Netlistnetsr   recordshould_stop)selfr   r   r   Zbuilder_children r   :/tmp/pip-unpacked-wheel-ua33x9lu/caffe2/python/pipeline.py__init__   s    zOutput.__init__)NNN)__name__
__module____qualname____doc__r   r   r   r   r   r      s   r   d   c                 C   s   | d kr*t j|d k	r|ntd}| }nTt| trN|d ksDtdd }| }n0t| drv|d kshtd| }|  }ntd|	|| ||fS )N)capacityzcapacity would not be used.writerz)output must be a reader, queue or stream.)
r   QueueDEFAULT_QUEUE_CAPACITYr"   r   r   r   hasattr
ValueErrorsetup_ex)outputr!   global_init_netglobal_exit_net	out_queuer"   r   r   r   _init_output%   s"    



r,   Nc                    sR    d krdd S t  tjr$t S d k	rJt drJ fdd}| _ S d S )Nc                 S   s   | S Nr   )recr   r   r   <lambda><       z make_processor.<locals>.<lambda>schema_funcc                      s
     S r-   )r1   r   	processorreaderr   r   processor_schemaA   s    z(make_processor.<locals>.processor_schema)r   r   r   NetProcessorr%   schema)r3   r4   r5   r   r2   r   make_processor:   s    r8   c                 C   s|   t | tr| S t | tr"t| dS t | trpt| dkoTt | d toTt | d tj}|rftd|  S t|  S nt| S dS )z
    Allow for processors to return results in several formats.
    TODO(azzolini): simplify once all processors use NetBuilder API.
    )r      r      N)N)r   r   r	   tupler   r   BlobReference)r(   Zis_record_and_blobr   r   r   normalize_processor_outputH   s    




r=   r:   c           
   	   C   s   t | |||||||\}}	|S )a]
  
    Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
    Queue or DataStream in `output`, creates a Task that, when run, will
    pipe the input into the output, using multiple parallel threads.
    Additionally, if a processor is given, it will be called between reading
    and writing steps, allowing it to transform the record.

    Args:
        input:       either a Reader, Queue or DataStream that will be read
                     until a stop is signaled either by the reader or the
                     writer.
        output:      either a Writer, a Queue or a DataStream that will be
                     written to as long as neither reader nor writer signal
                     a stop condition. If output is not provided or is None,
                     a Queue is created with given `capacity` and written to.
        num_threads: number of concurrent threads used for processing and
                     piping. If set to 0, no Task is created, and a
                     reader is returned instead -- the reader returned will
                     read from the reader passed in and process it.
                     ** DEPRECATED **. Use `num_runtime_threads` instead.
                     This option will be removed once all readers/processors
                     support `num_runtime_threads`.
        processor:   (optional) function that takes an input record and
                     optionally returns a record; this will be called
                     between read and write steps. If the processor does
                     not return a record, a writer will not be instantiated.
                     Processor can also be a core.Net with input and output
                     records properly set. In that case, a NetProcessor is
                     instantiated, cloning the net for each of the threads.
        name:        (optional) name of the task to be created.
        capacity:    when output is not passed, a queue of given `capacity`
                     is created and written to.
        group:       (optional) explicitly add the created Task to this
                     TaskGroup, instead of using the currently active one.
        num_runtime_threads: Similar to `num_threads`, but instead of expanding
                     the tasks with a `for` loop in python, does that at
                     runtime. This is preferable to `num_threads`, but some
                     processors/readers still require to be called multiple
                     times in python.

    Returns:
        Output Queue, DataStream, Reader, or None, depending on the parameters
        passed.
    )
_pipe_step)
inputr(   num_threadsr3   namer!   groupnum_runtime_threadsresult_r   r   r   pipec   s    /      rF   c	              
   C   s\   |dkst t| ||||||||	\}	}
d}|dk	rT|
 }t|ttfkrT|d }|	|fS )aq  
    Similar to `pipe`, with the additional ability for the pipe Task to
    return output values to the `Session` once done.

    Returns:
        Tuple (out_queue, *task_outputs)
            out_queue:    same as return value of `pipe`.
            task_outputs: TaskOutput object, fetchable from the client after
                          session.run() returns.
    r   N)r   r>   outputstyper   r;   )r?   r(   r@   r3   rA   r!   rB   rC   final_outputsrD   taskr   r   r   pipe_and_output   s$           rK   c                 C   sT   t | dr| jS t | drL| jdkr*| jS t | drFd| jj| jf S | jS | jjS )NrA   	func_namez<lambda>im_classz%s.%s)r%   rA   rL   r   rM   r   	__class__)r3   r   r   r   processor_name   s    



rO   c              
   C   s  t t }d|d| tr"ttnd|r0t|nd}t| |||d|}	td}
td}|	||
 td}td	}|
||\}}}|jg |gg d
tjjd |d k	rt||||
\}}|||||\}}nd }g }t  t| W 5 Q R X t  t| W 5 Q R X td}|jg |d}td}||g  ttjd|gt| t| |g |d t| t  t| W 5 Q R X t  t|
 W 5 Q R X W 5 Q R X ||	fS )N{0}/{1}/{2}/{3}/{4}rF   NoInputNoOutput)rA   rB   rG   Znum_instancesz	pipe:exitz	pipe:initzpipe:instance:initzpipe:instance:exitFshapevalueZdtypetimer_startZcounter_name	timer_endbodyZshould_stop_blob)strr
   r   formatr?   rO   r   r   r   r'   read_record_exConstantFillDataTypeBOOLr,   write_record_exr   Z	task_initnetZtask_instance_init
TimerBeginTimerEndexecution_stepr   Ztask_instance_exitZ	task_exit)rA   rB   rI   r4   r@   r(   r!   	node_nameprofiler_namerJ   r*   r)   init_netexit_net	read_netsstatusr.   r+   r"   
write_netsrE   timer_start_nettimertimer_end_netr   r   r   _runtime_threads_task   sz    



       







rp   c                 C   s  t t }d|d| tr"ttnd|r0t|nd}t| ||d}	td}
td}|	||
 d }d }g }t
|D ]B}td| d	}td}td}|||\}}}|jg |gg d
tjjd |d k	r(|d krt|	jd t||||
\}}W 5 Q R X |||||\}}ng }td}|jg |d}td}||g  t| ttjd|gt| t| |g |d t| t| W 5 Q R X |t| q|t| ttjd|dd t|
 W 5 Q R X ||	fS )NrP   rF   rQ   rR   )rA   rB   rG   exitinitzt:%d)rA   FrS   )Z	_fullnamerV   rW   rX   rY   rZ   T)Zconcurrent_substeps)r[   r
   r   r\   r?   rO   r   r   r   r'   ranger   r]   r^   r_   r`   rA   r,   ra   rc   rd   r   rb   re   r   appendZto_execution_step)rA   rB   rI   r4   r@   r(   r!   rf   rg   rJ   r*   r)   r+   r"   ZstepsZ	thread_idnbrh   ri   rj   rk   r.   rl   rE   rm   rn   ro   r   r   r   _static_threads_task   s    



 
 

     





rv   c	           
      C   s   |dks|dkst dt| tr(| }	n&t| dr<|  }	ntdt| |dk	r`t|	|}	|dksp|dkr|dks|t |	dfS |dkr|dk	rt	|}|dkr|dk	rdt	| }|dkrdt	|  }|dkrt
||||	|||S t||||	|||S dS )	z
    r:   z;Only one of num_threads or num_runtime_threads must be set.r4   z/Input must be a reader, queue or stream. Got {}Nr   zpipe_into:%szpipe_from:%s)r   r   r   r%   r4   r&   r\   rH   ProcessingReaderrO   rv   rp   )
r?   r(   r@   r3   rA   r!   rB   rC   rI   r4   r   r   r   r>   :  sP    



           r>   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )rw   zo
    Reader that reads from an upstream reader, calls the processor, and returns
    the processed record.
    c                 C   s    t |  || _t||| _d S r-   )r   r   r4   r8   r3   )r   r4   r3   r   r   r   r   f  s    
zProcessingReader.__init__c                 C   s
   | j  S r-   )r3   r7   r   r   r   r   r7   k  s    zProcessingReader.schemac                 C   s   | j || d S r-   )r4   r'   )r   rh   Z
finish_netr   r   r   r'   n  s    zProcessingReader.setup_exc           
   	   C   s   | j ||\}}}t }t| |}W 5 Q R X ||j7 }|jsJ|jrt	d}|jrn|
||jg|g |jr|
||jg|g || t| jdr|tj| j | |j |jr|j nd }	|||	fS )Nstop_netsetup)r4   r]   r   r=   r3   r   r   Z
_stop_blobr   r   Orrt   r%   Zadd_attributer   ZLOCAL_SETUPZ_set_schemar   Zfield_blobs)
r   rh   ri   rj   rk   r.   ru   rD   ry   fieldsr   r   r   read_exq  s     


zProcessingReader.read_exN)r   r   r   r   r   r7   r'   r}   r   r   r   r   rw   a  s
   rw   c                   @   s:   e Zd ZdZdddZdd Zdd Zd	d
 Zdd ZdS )r6   z
    Processor that clones a core.Net each time it's called, executing
    the cloned net as the processor. It requires the Net to have input
    and (optionally) output records set, with net.set_input_record() and
    net.set_output_record().
    Nc                 C   sb   t |tjst|d ks(t |tjs(t|p2t|| _|p<g | _|| _|| _	g | _
d| _g | _d S )NF)r   r   r   r   r<   r[   rA   thread_init_netsrb   _stop_signal
_blob_maps_frozen_cloned_init_nets)r   rb   stop_signalr~   rA   r   r   r   r     s    
 
zNetProcessor.__init__c                 C   s
   | j  S r-   )rb   output_recordrx   r   r   r   r7     s    zNetProcessor.schemac                 C   s   d| _ | j}g | _|S NT)r   r   )r   rh   Zcloned_init_netsr   r   r   rz     s    zNetProcessor.setupc           	      C   s   | j r
tt jd }i }| jD ],}t|t|| ||\}}| j	
| q"t| jt| j| |||\}}| jd krd }n.t| j|krtj|t| j |d}n| j}| j
| t|g| |S )N/)rb   )r   r   r   r   rA   r~   r   Zclone_and_bind_netr[   r   rt   rb   r   r<   r   r   r   )	r   r.   prefixZ
blob_remaprb   Znew_netrE   Z
remappingsr   r   r   r   __call__  s8    

 
      
zNetProcessor.__call__c                 C   s   d| _ | jS r   )r   r   rx   r   r   r   	blob_maps  s    zNetProcessor.blob_maps)NNN)	r   r   r   r   r   r7   rz   r   r   r   r   r   r   r6     s   
r6   )N)Nr:   NNNNr:   )Nr:   NNNNr:   N)Nr:   NNNNNN)Zcaffe2.pythonr   r   Zcaffe2.python.dataior   r   Zcaffe2.python.net_builderr   r   Zcaffe2.python.schemar   r	   Zcaffe2.python.taskr
   r   r   objectr   r$   r,   r8   r=   rF   rK   rO   rp   rv   r>   rw   r6   r   r   r   r   <module>   sP   
            
6              
<B              
',