U
    d[                     @   s   d Z ddlmZ ddlmZmZmZ ddlZddl	Z	G dd de
ZG dd de
ZG d	d
 d
e
ZG dd deZG dd de
ZG dd deZG dd deZG dd deZdd ZG dd deZG dd deZG dd deZG dd deZdS )a  
Defines the base interface for reading and writing operations.

Readers/Writers are objects that produce operations that read/write sequences
of data. Each operation reads or writes a list of BlobReferences.

Readers and Writers must be implemented such that read and write operations
are atomic and thread safe.

Examples of possible Readers and Writers:
    QueueReader, QueueWriter,
    DatasetReader, DatasetWriter,

See `dataset.py` for an example of implementation.
    )core)FieldStructfrom_blob_listNc                   @   sd   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd ZdddZdS )Readera  
    Reader is an abstract class to be implemented in order to provide
    operations capable of iterating through a dataset or stream of data.

    A Reader must implement at least one operation, `read`, which
    adds operations to a net that read the next batch of data. Readers can
    optionally support the `reset` operation, which is useful when multiple
    passes over the data are required.
    Nc                 C   s    |d k	rt |tst|| _d S N)
isinstancer   AssertionError_schemaselfschema r   8/tmp/pip-unpacked-wheel-ua33x9lu/caffe2/python/dataio.py__init__'   s    zReader.__init__c                 C   s   | j d k	std| j S )Nz$Schema not provided for this reader.)r
   r	   r   r   r   r   r   ,   s    zReader.schemac                 C   s
   || _ d S r   r
   r   r   r   r   _set_schema0   s    zReader._set_schemac                 C   s   dS )zSetup nets to run at task initialization and cleanup time.

        Args:
            global_init_net: A net invoked at task init time.
            global_finish_net: A net invoked at task cleanup time.
        Nr   r   init_net
finish_netr   r   r   setup_ex3   s    zReader.setup_exc                 C   s   t d}|gf| | S )Nreader_body)r   Netread)r   local_init_netlocal_finish_netread_netr   r   r   read_ex<   s    
zReader.read_exc                 C   s.   |  ||\}}}| jr$t| j|}|||fS r   )r   r
   r   )r   r   r   netsshould_stopfieldsr   r   r   read_record_ex@   s     
zReader.read_record_exc                 C   s   t ddS )a!  Append operations to read_net that will read a batch from the
        underlying data soruce.

        Operations added to `read_net` must be thread safe and atomic, that is,
        it should be possible to clone `read_net` and run multiple instances of
        it in parallel.

        Args:
            read_net: the net that will be appended with read operations

        Returns:
            A tuple (should_stop, fields), with:
                should_stop: BlobReference pointing to a boolean scalar
                    blob that indicates whether the read operation
                    was succesfull or whether the end of data has
                    been reached.
                fields: A tuple of BlobReference containing the latest batch
                    of data that was read.
        zReaders must implement `read`.NNotImplementedError)r   r   r   r   r   r   G   s    zReader.readc                 C   s   t ddS )zAppend operations to `net` that will reset the reader.

        This can be used to read the data multiple times.
        Not all readers support this operation.
        zThis reader cannot be resetted.Nr#   )r   netr   r   r   reset]   s    zReader.resetc                 C   s(   |  |\}}| jr t| j|}||fS r   )r   r
   r   )r   r   r    r!   r   r   r   read_recorde   s    zReader.read_recordc                 C   sP   t |p
d}| |\}}|dk	r2|||g}t jd|||d}||fS )aQ  Create an execution step with a net containing read operators.

        The execution step will contain a `stop_blob` that knows how to stop
        the execution loop when end of data was reached.

        E.g.:

            read_step, fields = reader.execution_step()
            consume_net = core.Net('consume')
            consume_net.Print(fields[0], [])
            p = core.Plan('reader')
            p.AddStep(read_step.AddNet(consume_net))
            core.RunPlan(p)

        Args:
            reader_net_name: (optional) the name of the reader_net to be
                             created. The execution step will
                             be named accordingly.

        Returns:
            A tuple (read_step, fields), with:
                read_step: A newly created execution step containing a net with
                           read operations. The step will have `stop_blob` set,
                           in order to stop the loop on end of data.
                fields: A tuple of BlobReference containing the latest batch
                        of data that was read.
        readerNz{}_step)Zshould_stop_blob)r   r   r'   Orexecution_stepformat)r   Zreader_net_nameZexternal_should_stopZ
reader_netr    r!   Z	read_stepr   r   r   r*   k   s    zReader.execution_step)N)NN)__name__
__module____qualname____doc__r   r   r   r   r   r"   r   r&   r'   r*   r   r   r   r   r      s   	
	r   c                   @   sN   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dddZ
dd ZdS )Writeraq  
    Writer is an abstract class to be implemented in order to provide
    operations capable of feeding a data stream or a dataset.

    A Writer must implement 2 operations:
    `write`, which adds operations to a net that write the write batch of
    data, and `commit`, which adds operations to a net in order to indicate
    that no more data will be written.
    Nc                 C   s   | j S r   r   r   r   r   r   r      s    zWriter.schemac                 C   s   t ddS )aZ  Add operations to `writer_net` that write the next batch of data.

        Operations added to the net must be thread-safe and unique, that is:
        multiple writers must be able to write to the dataset in parallel.

        Args:
            fields: a tuple of BlobReference containing the batch of data to
                    write.
        zWriters must implement write.Nr#   r   Z
writer_netr!   r   r   r   write   s    
zWriter.writec                 C   s(   t |tr|| _| }| || d S r   )r   r   r
   field_blobsr2   r1   r   r   r   write_record   s    
zWriter.write_recordc                 C   s   |  | dS )zExperimental, don't use yetN)commitr   r   r   r   r      s    zWriter.setup_exc                 C   s   t d}| || |gS )z6Experimental extension to the interface. Don't use yet	write_net)r   r   r2   )r   r!   r   r   	stop_blobr6   r   r   r   write_ex   s    
zWriter.write_exc                 C   sB   t |tr|| _| }|dkr*|d}| ||||}||fS )z7Experimental extension to the interface. Don't use yet.NZdequeue_status)r   r   r
   r3   NextNamer8   )r   r!   r   r   r7   Z
write_netsr   r   r   write_record_ex   s    

   zWriter.write_record_exc                 C   s   dS )zAdd operations to `finish_net` that signal end of data.

        This must be implemented by all Writers, but may be no-op for some
        of them.
        Nr   )r   r   r   r   r   r5      s    zWriter.commit)N)r,   r-   r.   r/   r
   r   r2   r4   r   r8   r:   r5   r   r   r   r   r0      s   	 
r0   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	ReaderBuilderz1 Allow usage of a reader in distributed fashion. c                 C   s
   t  d S r   r#   r   r   r   r   r      s    zReaderBuilder.schemac                 K   s
   t  dS )z
        Optionally, perform one-time setup before calling new_reader().
        Subclass should make sure this function is only called once.
        Nr#   r   kwargsr   r   r   setup   s    zReaderBuilder.setupc                 K   s
   t  d S r   r#   r<   r   r   r   
new_reader   s    zReaderBuilder.new_readerN)r,   r-   r.   r/   r   r>   r?   r   r   r   r   r;      s   r;   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )PipedReaderBuildera  ReaderBuilder that modifies underlying builder by calling `piper`
    function on each new reader produced, and return the result of
    the function. This way, it is possible to append data processing
    pipelines that will be replicated for each reader that gets created.

    E.g.:

    PipedReaderBuilder(
        ReaderBuilder(...),
        lambda reader: pipe(reader, processor=my_proc))
    c                 C   s   || _ || _d S r   )_builder_piper)r   ZbuilderZpiperr   r   r   r      s    zPipedReaderBuilder.__init__c                 C   s
   | j  S r   )rA   r   r   r   r   r   r      s    zPipedReaderBuilder.schemac                 K   s   | j jf |S r   )rA   r>   r<   r   r   r   r>      s    zPipedReaderBuilder.setupc                 K   s4   | j f d| jjf |i|}t|tr,|S | S )Nr(   )rB   rA   r?   r   r   r(   )r   r=   outputr   r   r   r?      s    zPipedReaderBuilder.new_readerN)r,   r-   r.   r/   r   r   r>   r?   r   r   r   r   r@      s
   r@   c                   @   sV   e Zd ZdddZdd Zdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd ZdS )PipeNc                 C   s   d| _ d| _|| _|| _d S )Nr   )_num_writers_num_readersr
   _obj_key)r   r   Zobj_keyr   r   r   r     s    zPipe.__init__c                 C   s   | j S r   r   r   r   r   r   r   
  s    zPipe.schemac                 C   s   d S r   r   )r   global_init_netr   r   r   r>     s    z
Pipe.setupc                 C   s
   t  d S r   r#   r   r   r   r   r(     s    zPipe.readerc                 C   s
   t  d S r   r#   r   r   r   r   writer  s    zPipe.writerc                 C   s   | j S r   )rF   r   r   r   r   num_readers  s    zPipe.num_readersc                 C   s   | j S r   )rE   r   r   r   r   num_writers  s    zPipe.num_writersc                 C   sB   |d k	r| j d kr|| _ |  jd7  _| jd k	r>|| j|  d S N   )r
   rE   rG   add_attribute)r   Zwriter_schemaZwriter_init_netr   r   r   _new_writer  s
    
zPipe._new_writerc                 C   s*   |  j d7  _ | jd k	r&|| j|  d S rL   )rF   rG   rN   )r   Zreader_init_netr   r   r   _new_reader#  s    
zPipe._new_reader)NN)r,   r-   r.   r   r   r>   r(   rI   rJ   rK   rO   rP   r   r   r   r   rD     s   
rD   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	CounterReaderz+ Reader that produces increasing integers. c                 C   s(   t j| tdtjfd d | _d | _d S )Niterr   )r   r   r   npZint64counterr    r   r   r   r   r   +  s    zCounterReader.__init__c                 C   s6   | j d kr2|jg dd| _ |jg g tjjdd| _d S )Nr   Z
init_countF)shapedtypevalue)rU   CreateCounterConstantFillr   DataTypeBOOLr    r   rH   global_finish_netr   r   r   r   0  s    
   zCounterReader.setup_exc                 C   s*   t d}|| jgd}|g| j|gfS )NZlimited_reader_counterrM   )r   r   ZCountUprU   r    )r   r   r   Z	count_netrY   r   r   r   r   6  s    
zCounterReader.read_exNr,   r-   r.   r/   r   r   r   r   r   r   r   rQ   )  s   rQ   c                   @   s@   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dS )ReaderWithLimitBaseap  Abstract Reader constrained by certain conditions.

    Base class for Reader classes which check for certain conditions to stop
    further processing (e.g. max number of iterations or time limit).
    Also produces a boolean blob (data_finished) that can be used to see if
    the reader exausted all input data (true) or stopped for another reason
    (false).
    c                 C   sB   t j| |jd || _td| _| j| jd| _	d | _
d S )NrS   Zreader_with_limitdata_finished)r   r   r
   r(   r   r   r%   AddExternalInputr9   _data_finishedr    )r   r(   r   r   r   r   F  s    
zReaderWithLimitBase.__init__c                 C   s:   |j g | jgg dtjjd | j|| | || d S )NFrW   rY   rX   )r[   rd   r   r\   r]   r(   r   setup_limiterr^   r   r   r   r   N  s       zReaderWithLimitBase.setup_exc           	      C   sx   t d}| |}| j||\}}}| | jj t d}||| || j	|g| j	g |g| |g ||fS )a  Reads from an underlying Reader class, but may stop due to additional
        constraints.

        Build and return network(s) to read data from a Reader with
        additional constraints, depending on which derived class is used.
        Derived classes implement setup_limited and check_limiter_condition
        which determine the nature of the constraint imposed on the reader,
        e.g. iteration limits or time limit.

        Args:
            local_init_net: A net invoked at task instance init time (Once per
                parallel thread).
            local_finish_net: A net invoked at task instance cleanup time (Once
                per parallel thread).
        Zlimited_reader_conditionZlimited_reader_post)
r   r   check_limiter_conditionr(   r   r   r
   ZCopyr)   rd   )	r   r   r   stop_condition_netr    r   Zlocal_data_finishedr!   Zcheck_done_netr   r   r   r   U  s    

 

zReaderWithLimitBase.read_exc                 C   s   t ddS )a  Configure task level init/cleanup nets required to implement limit
        condition. Must be implemented by subclass.

        Args:
            global_init_net: A net invoked at task init time.
            global_finish_net: A net invoked at task cleanup time.
        z'Subclass must implement `setup_limiter`Nr#   r^   r   r   r   rf   |  s    z!ReaderWithLimitBase.setup_limiterc                 C   s   t ddS )a  Configure a net that is invoked between reading batches to see if
        limit condition is met. Must be implemented by subclass.

        Args:
            stop_condition_net: A net invoked to evaluate an early termination
                condition.
        z0Subclass must implement `check_limiter_conditionNr#   r   rh   r   r   r   rg     s    z+ReaderWithLimitBase.check_limiter_conditionc                 C   s   | j S )a  
        Return a blob that can be checked after the end of the reading task,
        which will contain a scalar float indicating whether the underlying
        reader has been exhausted (True) or whether we stopped because reached
        the limit of iterations (False).
        )rd   r   r   r   r   rb     s    z!ReaderWithLimitBase.data_finishedN)
r,   r-   r.   r/   r   r   r   rf   rg   rb   r   r   r   r   ra   <  s   	'

ra   c                       s2   e Zd ZdZd	 fdd	Zdd Zdd Z  ZS )
ReaderWithLimitzReader that stops after `num_iter` batches.

    If `num_iter` <= 0 or is None, reverts to an unconstrained reader that
    exports a boolean blob indicating that the reader has exhausted
    the data steam.
    rM   c                    s@   t t| | d| _|| _| jdk	r<| j| jd| _dS )ay  Class initializer.

        Args:
            reader: The underlying reader object doing the actual read.
            num_iter: Number of batches to read. If `None`,
                the class reverts to a normal reader except that it also
                produces a data_finished blob as a side effect to indicate
                whether the input stream is exhausted.
        NrU   )superrj   r   rU   num_iterr%   rc   r9   )r   r(   rl   	__class__r   r   r     s    


zReaderWithLimit.__init__c                 C   s$   | j r |jg | j gt| jd d S )NrV   )rU   rZ   intrl   r^   r   r   r   rf     s      zReaderWithLimit.setup_limiterc                 C   s2   | j r|| j gdS |jg dg dtjjdS d S NrM   Fre   )rU   Z	CountDownr[   r   r\   r]   ri   r   r   r   rg     s       z'ReaderWithLimit.check_limiter_condition)rM   r,   r-   r.   r/   r   rf   rg   __classcell__r   r   rm   r   rj     s   rj   c                 C   s   t t | S r   )rj   rQ   )rl   r   r   r   
CountUntil  s    rs   c                       s2   e Zd ZdZd	 fdd	Zdd Zdd Z  ZS )
ReaderWithTimeLimitzReader that stops after `duration` seconds.

    If `duration` <= 0 or is None, reverts to an unconstrained reader that
    exports a boolean blob indicating that the reader has exhausted
    the data steam.
    r   c                    s&   t t| | d| _|| _d| _dS )a  Class initializer.

        Args:
            reader: The underlying reader object doing the actual read.
            duration: Number of seconds to read. If un-specified, None, or <= 0,
                the class reverts to a normal reader except that it also
                produces a data_finished blob as a side effect to indicate
                whether the input stream is exhausted.
        N)rk   rt   r   timerdurationduration_ns_blob)r   r(   rv   rm   r   r   r     s    
zReaderWithTimeLimit.__init__c                 C   sd   | j d k	r`| j dkr`t| j d }|jg dd| _|| j}|j|g|d| _|| jgg  d S )Nr   i ʚ;Zepoch_timer)Zcounter_name)rY   )rv   ro   Z
TimerBeginru   TimerGetr[   rw   ZTimerEnd)r   rH   r_   Zduration_ns
start_timer   r   r   rf     s      z!ReaderWithTimeLimit.setup_limiterc                 C   sF   | j r*|| j}||| jgt| jS |jg dg dtj	j
dS d S rp   )rv   rx   ru   ZGErw   strr    r[   r   r\   r]   )r   rh   Ztime_elapsedr   r   r   rg     s         z+ReaderWithTimeLimit.check_limiter_condition)r   rq   r   r   rm   r   rt     s   rt   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	ReaderWithDelayz?Test reader class that inserts a delay between reading batches.c                 C   s    t j| |jd || _|| _d S )NrS   )r   r   r
   r(   delay)r   r(   r|   r   r   r   r     s    zReaderWithDelay.__init__c                 C   s   | j || d S r   )r(   r   r^   r   r   r   r     s    zReaderWithDelay.setup_exc                    s:   t d} fdd}||g g  |gf j| S )Nr   c                     s   t  j d S r   )timesleepr|   )argsZargdr   r   r   sleep_op   s    z)ReaderWithDelay.read_ex.<locals>.sleep_op)r   r   Pythonr(   r   )r   r   r   r   r   r   r   r   r     s    
zReaderWithDelay.read_exNr`   r   r   r   r   r{     s   r{   c                       s8   e Zd ZdZ fddZdd Zdd Zdd	 Z  ZS )
CompositeReaderzu
    Base class for a reader that wrap multiple readers, e.g., reading from
    multiple sources simultaneously.
    c                    sJ   t |t |ksttt| jtdd t||D  d || _|| _dS )z
        Args:
            names: list[str] names of readers; used as schema keys
            readers: list[Reader] Reader instances, must have schema
        c                 S   s   g | ]\}}||  fqS r   rS   ).0namer(   r   r   r   
<listcomp>  s    z,CompositeReader.__init__.<locals>.<listcomp>rS   N)	lenr	   rk   r   r   r   zip_names_readers)r   namesreadersrm   r   r   r     s    zCompositeReader.__init__c                 C   s   | j D ]}||| qd S r   )r   r   )r   r   r   r(   r   r   r   r     s    
zCompositeReader.setup_exc                 C   s   g }g }g }t | j| jD ]<\}}|||\}}	}
||	 || ||
  qg }|d }t | j||D ]H\}}}|| ||krqrtd	|}|
||g| || qr|||fS )z7
        Stops when one of the reader finished
        z{}_stop)r   r   r   r"   appendextendr3   r   r   r+   r)   )r   r   r   r!   Z
stop_blobsZall_sub_read_netsr   r(   Zsub_read_netsr    recordZ	read_netsZlocal_should_stopr7   Zstop_netr   r   r   r     s*     



zCompositeReader.read_exc                 C   s   | j D ]}|| qd S r   )r   r&   )r   r%   r(   r   r   r   r&   ;  s    
zCompositeReader.reset)	r,   r-   r.   r/   r   r   r   r&   rr   r   r   rm   r   r     s
   r   c                       s8   e Zd ZdZ fddZdd Zdd Zdd	 Z  ZS )
CompositeReaderBuilderz.
    A reader builder for CompositeReader
    c                    s8   t t|   || _|| _tdd t||D  | _dS )z
        Args:
            names: list[str] names of readers; used as schema keys
            reader_builders: list[ReaderBuilder] ReaderBuilder instances;
                must have schema
        c                 S   s   g | ]\}}||  fqS r   rS   )r   r   reader_builderr   r   r   r   N  s   z3CompositeReaderBuilder.__init__.<locals>.<listcomp>N)rk   r   r   r   _reader_buildersr   r   r
   )r   r   Zreader_buildersrm   r   r   r   D  s    zCompositeReaderBuilder.__init__c                 C   s   | j S r   r   r   r   r   r   r   S  s    zCompositeReaderBuilder.schemac           	      K   s   i }d|kr| d}nd }t| jD ]\}}|t| jd krP|d k	rP||d< |jf |}t| t| @ }t| t| @ }|t kstd	||t kstd	||
| q&|S )NlimiterrM   zOverlapping keys: {}zOverlapping values: {})pop	enumerater   r   r>   setkeysvaluesr	   r+   update)	r   r=   Zdata_finished_blobsr   ir   Zsub_reader_data_finished_blobsZoverlapping_keysZoverlapping_valuesr   r   r   r>   V  s    zCompositeReaderBuilder.setupc                 K   sp   g }| j D ]B}|jf |}t|tr&nt|dr:| }ntd|| q
t| j	|}|
 | jkslt|S )Nr(   z,reader must be an instance of Reader or Pipe)r   r?   r   r   hasattrr(   
ValueErrorr   r   r   r   r
   r	   )r   r=   r   r   r(   Zmulti_readerr   r   r   r?   m  s    



z!CompositeReaderBuilder.new_reader)	r,   r-   r.   r/   r   r   r>   r?   rr   r   r   rm   r   r   @  s
   r   )r/   Zcaffe2.pythonr   Zcaffe2.python.schemar   r   r   ZnumpyrT   r}   objectr   r0   r;   r@   rD   rQ   ra   rj   rs   rt   r{   r   r   r   r   r   r   <module>   s"   u@!&^&.9