U
    d]4                     @   s  d dl mZmZ d dlmZmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d d
lm Z  d dl!Z"d dl#Z#d dl$Z$d dl%Z%dd Z&ddddgZ'dd Z(G dd deZ)G dd deZ*dS )    )StructConstRecord)core	workspacemodel_helper)LocalSession)Dataset)pipe)CheckpointManagerMultiNodeCheckpointManagerJob	JobRunnerepoch_limiterUploadTaskGroupBuilderdb_name)ops)NodeTask	TaskGroupWorkspaceTypeCluster)TestCase)ReaderWithLimitNc                    s   t d|   t jb t P tdttt	df}t
t|}t|d|  d}|t}tdg W 5 Q R X W 5 Q R X  fdd}t|d	d
}t||d t |  W 5 Q R X  gS )N
trainer_%dval
   z
dataset:%dnamed   c                    s   t  |  g g d S N)r   Addr   )Zrectotal A/tmp/pip-unpacked-wheel-ua33x9lu/caffe2/python/checkpoint_test.py	inc_total"   s    z!build_pipeline.<locals>.inc_total   )Znum_iter)	processor)r   r   current
init_groupr   r   nparraylistranger   r   r   readerZConstr   r	   Zadd_stop_conditionZdata_finished)node_idZdata_arrdataZdsZfull_readerr%   Zepoch_readerr#   r!   r$   build_pipeline   s    

 r1   g   s         c                    s    fdd}|S )Nc                    s   t   d S r   )shutilcopyfile)inputsoutputsdestsrcr#   r$   copy_op/   s    zlocal_copy_op.<locals>.copy_opr#   )r<   r;   r=   r#   r:   r$   local_copy_op.   s    r>   c                   @   s   e Zd Zdd Zdd ZdS )UploadToLocalFilec                 C   s
   || _ d S r   )dest_dir)selfr@   r#   r#   r$   __init__5   s    zUploadToLocalFile.__init__c                 C   s   t tj~}|jD ]p\}}tt|V t D t||j|j	}t
j| jt|}tt||gi fg g  W 5 Q R X W 5 Q R X qW 5 Q R X |S r   )r   r   GLOBALZ_node_managersr   strr   r   Z
_node_nameZ
_db_prefixospathjoinr@   r   Pythonr>   )rA   epochZcheckpoint_managerZupload_task_groupnodemanagerZsrc_path	dest_pathr#   r#   r$   build8   s      $zUploadToLocalFile.buildN)__name__
__module____qualname__rB   rM   r#   r#   r#   r$   r?   4   s   r?   c                   @   sD   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dS )TestCheckpointc           
   
      s  t  
 t }tdd}W 5 Q R X ttd|d  fdd}| \}}|t t||	|}| 
|tt | 
||td  td|d D ]6}| \}}t|||d		| | 
||td  qtd|d D ],}	|||	 | 
||t|	d   qW 5 Q R X d S )
Nr   r/   emptystepr9   c                    s   |      d  S Nr   runr9   fetchsessionZoutput_fetcherr#   r$   fetch_totalJ   s    
z,TestCheckpoint.run_with.<locals>.fetch_total   Zresume_from_epoch)r   r   r1   r   r   Netcompiler   r   trainassertEqualslenEXPECTED_TOTALSr-   rX   load)
rA   builderjobr9   r]   r[   
checkpoint
num_epochsinitial_epochrI   r#   r\   r$   run_withD   s2    



 

zTestCheckpoint.run_withc              	      sd   z"t   fdd}| | W 5 t   X z"t   fdd}| | W 5 t   X d S )Nc                     s&   t j } t| }t dd}||fS )NZ	temp_nodeminidb)r   C	Workspacer   r
   wsr[   rj   tmpdirr#   r$   rh   f   s    
z6TestCheckpoint.test_single_checkpoint.<locals>.builderc                     s$   t j } t| }t d}||fS )Nrn   )r   ro   rp   r   r   rq   rs   r#   r$   rh   t   s    

)r6   rmtreetempfilemkdtemprm   )rA   rh   r#   rs   r$   test_single_checkpointa   s    z%TestCheckpoint.test_single_checkpointc                 C   s  zld}t }t|d}t  t }t|D ]}t| q2W 5 Q R X |	t
 ||  t|D ]4}d}d| }|d | d }| |||| qjW 5 Q R X t | t }t  t|D ]}tj }	t
|	}
t|d}t > t }t| W 5 Q R X |	t
 t||}||
}W 5 Q R X | |tt | t|	jd qtj }	t
|	}
| t|	jd d	d
g}t|d}t  t }t|D ]}t| qW 5 Q R X |	t
 t||}|j|d|
d tddD ]Z}| |j|||
d |D ]8}| |	| | |	|tt|d  g qq| |j|d|
d W 5 Q R X W 5 t | X d S )Nr&   rn      r   /z.5   r   z'trainer_1/task_2/GivenTensorInt64Fill:0z'trainer_2/task_2/GivenTensorInt64Fill:0r_   )Z
blob_namesrI   r[   )r6   ru   rv   rw   r   r   r   r-   r1   rb   r   initZnodes_to_checkpointrd   Zget_ckpt_db_namer   ResetWorkspacero   rp   r   rc   re   rf   ZblobsZload_blobs_from_checkpoints
assertTrueZhas_blob
fetch_blobr*   r+   assertFalse)rA   rt   	num_nodesrj   ri   r/   rI   	node_nameZexpected_db_namerr   r[   
job_runnerrk   Zmodel_blob_namesZ	blob_namer#   r#   r$   (test_ckpt_name_and_load_model_from_ckpts~   s    











     z7TestCheckpoint.test_ckpt_name_and_load_model_from_ckptsc                 C   s8  z$t }tj|d}t| d}t|D ],}d| }tj||}| 	tj
| q0tdD ]}tj }t|}t|d}	t Z t }
t| W 5 Q R X |
t t|}t|
|	|d}||}| |tt W 5 Q R X qft|D ],}d| }tj||}| tj
| qW 5 t | X d S )Nuploadr&   r   rn   )Zupload_task_group_builder)r6   ru   rv   rw   rE   rF   rG   mkdirr-   r   existsr   ro   rp   r   r   r   r   r1   rb   r?   r   rc   rd   re   rf   r~   )rA   rt   Z
upload_dirr   r/   r   Zupload_pathrr   r[   rj   ri   Zlocal_upload_builderr   rk   r#   r#   r$   test_upload_checkpoint   s<    



 
z%TestCheckpoint.test_upload_checkpointc           
      C   s   d}d}t   t|D ]z}t j }t|}t|d}t > t }t	| W 5 Q R X |
t t||}||}	W 5 Q R X | |	tt qd S )Nr&   z/tmp/path_does_not_exist/rn   )r   r}   r-   ro   rp   r   r   r   r   r1   rb   r   rc   rd   re   rf   )
rA   r   rt   r/   rr   r[   rj   ri   r   rk   r#   r#   r$   test_ckpt_save_failure   s    



z%TestCheckpoint.test_ckpt_save_failurec           	      C   sz  t jdd}td}dD ]}|jjg |gdgddd q|jd	d
gdg |dgdg t	 }t
d |j t|jd W 5 Q R X |j: t ( td t|j W 5 Q R X W 5 Q R X W 5 Q R X |j t|d W 5 Q R X t|d W 5 Q R X W 5 Q R X tj }t|}t|}|| tddtj}| t||d | t||d dS )z}
        A simple test that ensures we have download task group
        executed between epoch_group and exit_group.
        Z
test_modelr   download_net)input1input2outputdownload_result   g      ?r   )shapevalueZrun_oncer   r   r   r   z	trainer:0)rU   r_   g       @N)r   ZModelHelperr   ra   Zparam_init_netZConstantFillnetr    ZCopyr   r   r)   r   Zepoch_groupr   ZloopZdownload_groupr   r   ro   rp   r   r   rc   r*   fullZastypeZfloat32r~   Zarray_equalr   )	rA   modelr   r   ri   rr   r[   r   Zexpected_resultr#   r#   r$   test_download_group_simple	  sB    

*



z)TestCheckpoint.test_download_group_simplec           
   	      s   zt }tj }t|}t|d}t	 }t
dd}W 5 Q R X ttd|d |t  fdd}t|||}td|d D ],}	t|||	d	| | ||td
  qW 5 t | X dS )zf
        A simple test that ensures we can reuse a MultiNodeCheckpointManager
        object.
        rn   r   rR   rS   rT   c                    s   |      d  S rV   rW   rZ   r\   r#   r$   r]   D  s    
zATestCheckpoint.test_reuse_checkpoint_manager.<locals>.fetch_totalr_   r`   r^   N)r6   ru   rv   rw   r   ro   rp   r   r   r   r1   r   r   ra   rb   r   rc   r-   rd   rf   )
rA   rt   rr   r[   rj   ri   r9   r]   rk   rl   r#   r\   r$   test_reuse_checkpoint_manager4  s*    


z,TestCheckpoint.test_reuse_checkpoint_managerN)
rN   rO   rP   rm   rx   r   r   r   r   r   r#   r#   r#   r$   rQ   C   s   M&+rQ   )+Zcaffe2.python.schemar   r   Zcaffe2.pythonr   r   r   Zcaffe2.python.sessionr   Zcaffe2.python.datasetr   Zcaffe2.python.pipeliner	   Zcaffe2.python.checkpointr
   r   r   r   r   r   r   Zcaffe2.python.net_builderr   Zcaffe2.python.taskr   r   r   r   r   Zcaffe2.python.test_utilr   Zcaffe2.python.dataior   Znumpyr*   rE   r6   rv   r1   rf   r>   r?   rQ   r#   r#   r#   r$   <module>   s$   $