U
    d                     @   sX   d dl Z d dlmZmZ d dlm  mZ dd Zdd Zdd Z	G d	d
 d
e j
ZdS )    N)	workspacecorec               	   C   s   d} t tjdg | gddd tdD ]4}t jjjdt	|  t jjjdt	|  q(t jjjd	 t jjjd
 | S )NqueueZCreateBlobsQueue   i  )Z	num_blobscapacityd   blob_status_blob_dequeue_blobstatus_blob)
r   RunOperatorOncer   CreateOperatorrangeCZ	WorkspacecurrentZcreate_blobstr)r   i r   G/tmp/pip-unpacked-wheel-ua33x9lu/caffe2/python/parallel_workers_test.pycreate_queue   s         r   c                    s    fdd}|S )Nc              
      sD   dt |  }t| |  ttd|g|dt |  g d S )Nr   ZSafeEnqueueBlobsr	   )r   r   FeedBlobr   r   r   )	worker_idZblobget_blob_datar   r   r   dummy_worker!   s      z#create_worker.<locals>.dummy_workerr   )r   r   r   r   r   r   create_worker    s    r   c                 C   s(   d}t td| g|dg t |S )Nr
   ZSafeDequeueBlobsr   )r   r   r   r   	FetchBlob)r   r
   r   r   r   dequeue_value/   s      r   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )ParallelWorkersTestc                 C   sn   t   t }t|dd }t|}|  tdD ]$}t|}| 	|dkdt
|  q6| 	|  d S )Nc                 S   s   t | S Nr   r   r   r   r   <lambda>?       z9ParallelWorkersTest.testParallelWorkers.<locals>.<lambda>
   )   0   1Got unexpected value )r   ResetWorkspacer   r   parallel_workersinit_workersstartr   r   
assertTruer   stop)selfr   r   worker_coordinator_valuer   r   r   testParallelWorkers;   s    
 
z'ParallelWorkersTest.testParallelWorkersc                 C   s~   t   t }t|dd }t dd dd }tj||d}|  tdD ]"}t	|}| 
|d	d
t|  qN|  d S )Nc                 S   s
   t dS )Ndata)r   r   r!   r   r   r   r"   P   r#   z@ParallelWorkersTest.testParallelWorkersInitFun.<locals>.<lambda>r3   znot initializedc                 S   s   t dd d S )Nr3   Zinitializedr   r   )r/   Zglobal_coordinatorr   r   r   init_funT   s    z@ParallelWorkersTest.testParallelWorkersInitFun.<locals>.init_fun)r5   r$   s   initializedr'   )r   r(   r   r   r   r)   r*   r+   r   r   assertEqualr   r-   )r.   r   r   r5   r/   r0   r1   r   r   r   testParallelWorkersInitFunK   s*        
z.ParallelWorkersTest.testParallelWorkersInitFunc                 C   sx   t   t }t|dd }t dd dd }tj||d}|  | |	  t 
d}| |dd	t|  d S )
Nc                 S   s   t | S r   r    r!   r   r   r   r"   i   r#   zDParallelWorkersTest.testParallelWorkersShutdownFun.<locals>.<lambda>r3   znot shutdownc                   S   s   t dd d S )Nr3   shutdownr4   r   r   r   r   shutdown_funl   s    zHParallelWorkersTest.testParallelWorkersShutdownFun.<locals>.shutdown_fun)r9   s   shutdownr'   )r   r(   r   r   r   r)   r*   r+   r,   r-   r   r6   r   )r.   r   r   r9   r/   r3   r   r   r   testParallelWorkersShutdownFune   s     
z2ParallelWorkersTest.testParallelWorkersShutdownFunN)__name__
__module____qualname__r2   r7   r:   r   r   r   r   r   :   s   r   )ZunittestZcaffe2.pythonr   r   Zcaffe2.python.parallel_workerspythonr)   r   r   r   ZTestCaser   r   r   r   r   <module>   s   