U
    9%e                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlmZmZmZmZ ddlmZmZmZmZmZmZ ddlZddlmZ ddlmZ ed Z z&ddl!m"Z" e"  dd	l#m$Z$ d
Z%W n e&k
r   dZ%Y nX zddl#m'Z' d
Z(W n e&k
r*   dZ(Y nX zddl)Z*d
Z+W n e&k
rV   dZ+Y nX e
,e(dZ-e
,e%dZ.e(o~e'j/dkZ0e
,e0dZ1ej23dZ4ej23dZ5ej6dkZ7e7Z8e4 Z9dd Z:e+rdd Z;ndd Z;dd Z<dd Z=G dd de>Z?G dd  d e?Z@G d!d" d"e>ZAG d#d$ d$e?ZBG d%d& d&e?ZCG d'd( d(e?ZDd)d* ZEd+d, ZFe	jGZHG d-d. d.e>ZId/d0 ZJeFeHejKZLeFeJd1 ZMe4seFeJd2 ZNeFeJd3 ZOeFeJd4 ZPG d5d6 d6eZQejRd7kZSe
,eSd8G d9d: d:eQZTeTU  G d;d< d<e>ZVG d=d> d>eVeQZWeWU  G d?d@ d@eZXeG dAdB dBeXZYeYU  eG dCdD dDeXZZeG dEdF dFeXZ[ee1G dGdH dHeXZ\ee.G dIdJ dJeXZ]eG dKdL dLeZ^ee-G dMdN dNeZ_e`dOkre
a  dS )Pz
Tests the parallel backend
    N)jit	vectorizeguvectorizeset_num_threads)temp_directoryoverride_configTestCasetagskip_parfors_unsupported
linux_only)_TIMEOUT)configg      N@)_check_tbb_version_compatible)tbbpoolTF)omppoolzOpenMP threadpool requiredzTBB threadpool requiredGNUzGNU OpenMP only testswindarwinl        c                 C   s   t | | S N)nponesnv r   `/var/www/html/Darija-Ai-API/env/lib/python3.8/site-packages/numba/tests/test_parallel_backend.pyfooG   s    r   c                 C   s2   t t | | ft | | f}|t |  | S r   )r   dotr   arange)r   r   xr   r   r   linalgL   s     r    c                 C   s   t | | S r   )r   r   r   r   r   r   r    P   s    c                 C   s   | | S r   r   )abr   r   r   	ufunc_fooU   s    r#   c                 C   s   | | |d< d S )Nr   r   )r!   r"   outr   r   r   
gufunc_fooY   s    r%   c                   @   s   e Zd Zdd ZdS )runnablec                 K   s
   || _ d S r   )_options)selfoptionsr   r   r   __init__^   s    zrunnable.__init__N)__name__
__module____qualname__r*   r   r   r   r   r&   ]   s   r&   c                   @   s   e Zd Zdd ZdS )
jit_runnerc                 C   s>   t f | jt}d}d}t||}|||}tj|| d S N   
   )r   r'   r   r   testingassert_allcloser(   cfuncr!   r"   expectedgotr   r   r   __call__d   s    

zjit_runner.__call__Nr+   r,   r-   r8   r   r   r   r   r.   b   s   r.   c                   @   s   e Zd Zdd Zdd ZdS )mask_runnerc                 K   s   || _ || _d S r   )runnermask)r(   r;   r<   r)   r   r   r   r*   n   s    zmask_runner.__init__c                 C   s   | j rt| j  |   d S r   )r<   r   r;   )r(   r   r   r   r8   r   s    
zmask_runner.__call__Nr+   r,   r-   r*   r8   r   r   r   r   r:   m   s   r:   c                   @   s   e Zd Zdd ZdS )linalg_runnerc                 C   s>   t f | jt}d}d}t||}|||}tj|| d S r/   )r   r'   r    r   r2   r3   r4   r   r   r   r8   |   s    

zlinalg_runner.__call__Nr9   r   r   r   r   r>   z   s   r>   c                   @   s   e Zd Zdd ZdS )vectorize_runnerc                 C   sR   t dgf| jt}tjdtj }}t||}|||}tj|| d S )Nz(f4, f4)r1   )	r   r'   r#   r   randomastypefloat32r2   r3   r4   r   r   r   r8      s
    

zvectorize_runner.__call__Nr9   r   r   r   r   r?      s   r?   c                   @   s   e Zd Zdd ZdS )guvectorize_runnerc                 C   sX   dg}t |df| jt}tjdtj }}t||}|||}tj	|| d S )Nz(f4, f4, f4[:])z	(),()->()r1   )
r   r'   r%   r   r@   rA   rB   r#   r2   r3   )r(   sigr5   r!   r"   r6   r7   r   r   r   r8      s    

zguvectorize_runner.__call__Nr9   r   r   r   r   rC      s   rC   c              
   K   sr   | d}z6t  ttt| d D ]}t| }|  q(W n, tk
rl } z|	| W 5 d }~X Y nX d S )Nqueueg      ?)
getfaulthandlerenablerangeintlenr@   choice	Exceptionput)fnlistkwargsq_fner   r   r   chooser   s    

rU   c                    s    fdd}|S )Nc                    s    }d|i fddt dD }|D ]}|  q,|D ]}|  q>| sg }| sr||d qXd}t|ddd |D  d S )	NrE   c                    s   g | ]}t  fd qS ))targetargsrP   )rU   ).0i)rO   kwsparallel_classr   r   
<listcomp>   s   z8compile_factory.<locals>.run_compile.<locals>.<listcomp>r0   Fz)Error(s) occurred in delegated runner:
%s
c                 S   s   g | ]}t |qS r   )repr)rX   r   r   r   r   r\      s     )rI   startjoinemptyappendrF   RuntimeError)rO   rQ   Zthstherrors_msgr[   
queue_impl)rO   rZ   r   run_compile   s    

z$compile_factory.<locals>.run_compiler   )r[   rh   ri   r   rg   r   compile_factory   s    rj   c                   @   s   e Zd Zdd Zdd ZdS )_proc_class_implc                 C   s
   || _ d S r   )_method)r(   methodr   r   r   r*      s    z_proc_class_impl.__init__c                 O   s   t | j}|j||S r   )multiprocessingget_contextrl   Process)r(   rW   rP   ctxr   r   r   r8      s    z_proc_class_impl.__call__Nr=   r   r   r   r   rk      s   rk   c                 C   s,   | dkrd } t | }t| }|j}||fS )Ndefault)rn   ro   rk   Queue)rm   rq   procrE   r   r   r   _get_mp_classes   s    
ru   spawnfork
forkserverrr   c                   @   sj  e Zd ZdZeddedddedddeddedddeddedddeddddeddedddeddddgZe	seddd	edddd
eddd	edddd
gZ
ee
 ejdk rg ZnddgZg ZeD ]ZeD ]Zeeee qqddgZed er$ed ed ede	 d	gedddgedddgeedZddhZdddZdS )TestParallelBackendBasez6
    Base class for testing the parallel backends
    T)nopython)rz   cache)rz   Znogilparallel)rz   rV   )rz   rV   r{   )rz   r|   )rz   r|   r{         	threadingr@   multiprocessing_spawnmultiprocessing_forkmultiprocessing_forkserver)Zconcurrent_jitZconcurrent_vectorizeZconcurrent_guvectorizeZconcurrent_mix_useZconcurrent_mix_use_masksomptbbc              	   C   s   t | jj| _td| j |dkr.t| n|dkr@t| n|dkrRt| nv|dkrdt| nd|dkrvt	| nR|dkrttg}t
r|t |t t| |D ]}|| qntd| W 5 Q R X d S )	NZ	CACHE_DIRr   r   r   r   Zmultiprocessing_defaultr@   zUnknown parallelism supplied %s)r   	__class__r+   Z
_cache_dirr   thread_implfork_proc_implforkserver_proc_implspawn_proc_impldefault_proc_impl_HAVE_OS_FORKrb   r@   shuffle
ValueError)r(   rO   parallelismZpsimplr   r   r   ri     s.    







z#TestParallelBackendBase.run_compileN)r   )r+   r,   r-   __doc__r.   r>   r?   rC   Z	all_impls_parfors_unsupportedZparfor_implsextendr   NUMBA_NUM_THREADSmasksZ
mask_implsr   r<   rb   r:   r   r   runnersZsafe_backendsri   r   r   r   r   ry      sV   













ry   )r   r   	workqueuezThreading layer not explicitc                   @   s   e Zd ZdZedd ZdS )TestParallelBackendav   These are like the numba.tests.test_threadsafety tests but designed
    instead to torture the parallel backend.
    If a suitable backend is supplied via NUMBA_THREADING_LAYER these tests
    can be run directly. This test class cannot be run using the multiprocessing
    option to the test runner (i.e. `./runtests -m`) as daemon processes cannot
    have children.
    c                 C   sX   | j D ]L}| j D ]<\}}d| d | }dd }|||}||_t| || qqd S )Ntest_rR   c                    s    fdd}|S )Nc                    s0   t  }|jrd}| | n| j d d S )Nz)daemonized processes cannot have children)r   )rn   current_processdaemonskipTestri   )r(   Zselfprocrf   r   pr   r   test_methodI  s
    zBTestParallelBackend.generate.<locals>.methgen.<locals>.test_methodr   )r   r   r   r   r   r   methgenH  s    z-TestParallelBackend.generate.<locals>.methgen)r   r   itemsr+   setattr)clsr   namer   methnamer   rS   r   r   r   generateB  s    


zTestParallelBackend.generateN)r+   r,   r-   r   classmethodr   r   r   r   r   r   5  s   r   c                   @   s0   e Zd ZeeedddZdd Zdd Z	dS )	TestInSubprocessF r   r   r   c                 C   s|   t j|t jt j|d}tt|j}zH|  |	 \}}|j
dkrZtd|j
| f | | fW S |  X dS )Nstdoutstderrenvr   /process failed with code %s: stderr follows
%s
)NN
subprocessPopenPIPEr   Timer_TEST_TIMEOUTkillcancelr_   communicate
returncodeAssertionErrordecoder(   cmdliner   popentimeoutr$   errr   r   r   run_cmd_  s$    

zTestInSubprocess.run_cmdc                 C   s0   t j }t||d< tjdd|g}| ||S )NNUMBA_THREADING_LAYERz-mznumba.runtests)osenvironcopystrsys
executabler   )r(   testZthreading_layerZenv_copyr   r   r   r   run_test_in_separate_processr  s    
z-TestInSubprocess.run_test_in_separate_processN)
r+   r,   r-   skip_no_tbbskip_no_ompunittestskipIfbackendsr   r   r   r   r   r   r   Z  s   
r   c                   @   s,   e Zd ZdZdZedd Zedd ZdS )TestSpecificBackenda  
    This is quite contrived, for each test in the TestParallelBackend tests it
    generates a test that will run the TestParallelBackend test in a new python
    process with an environment modified to ensure a specific threadsafe backend
    is used. This is with view of testing the backends independently and in an
    isolated manner such that if they hang/crash/have issues, it doesn't kill
    the test suite.
    Fc           
         sb   | j }tj}d| d | }d|||f  fdd}d|| f }	t| |	td|| d S )Nr   rR   z%s.%s.%sc                    sR   |   \}}| jr&td||f  | d| | d|k | d|k d S )Nzstdout:
 "%s"
 stderr:
 "%s"OKFAILERROR)r   _DEBUGprintassertIn
assertTrue)r(   orT   backendZinjected_methodr   r   test_template  s    z2TestSpecificBackend._inject.<locals>.test_templateztest_%s_%s_%sZlong_running)r,   r   r+   r   r	   )
r   r   r   r   backend_guardZthemodZtheclsr   r   injected_testr   r   r   _inject  s    zTestSpecificBackend._injectc              	   C   st   | j  D ]d\}}| jD ]T}| j D ]D}|dkrH|dkrHtjdrHq&|dkrZ|dkrZq&| |||| q&qq
d S )N)r   r@   r   linux)r   r@   r   )	r   r   r   r   keysr   platform
startswithr   )r   r   r   r   r   r   r   r   r     s    

zTestSpecificBackend.generateN)r+   r,   r-   r   r   r   r   r   r   r   r   r   r   y  s   
r   c                   @   s6   e Zd ZdZdeje Zddei Z	dddZ
dS )	ThreadLayerTestHelperzP
    Helper class for running an isolated piece of code based on a template
    z%ra  if 1:
    import sys
    sys.path.insert(0, "%(here)r")
    import multiprocessing
    import numpy as np
    from numba import njit
    import numba
    try:
        import threading_backend_usecases
    except ImportError as e:
        print("DEBUG:", sys.path)
        raise e
    import os

    sigterm_handler = threading_backend_usecases.sigterm_handler
    busy_func = threading_backend_usecases.busy_func

    def the_test():
        %%s

    if __name__ == "__main__":
        the_test()
    hereNc                 C   s   |d krt j }td|d< tj|tjtj|d}tt	|j
}z8|  | \}}|jdkrxtd|j| f W 5 |  X | | fS )Nr   r   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r_   r   r   r   r   r   r   r   r   r     s(    


zThreadLayerTestHelper.run_cmd)N)r+   r,   r-   r   r   pathdirname__file__Z_heretemplater   r   r   r   r   r     s   r   c                   @   s@   e Zd ZdZdZeeedddZ	e
dd Ze
dd Zd	S )
TestThreadingLayerSelectionz@
    Checks that numba.threading_layer() reports correctly.
    Fr   r   c                    s0    fdd}d  }t | |td|| d S )Nc                    sZ   d}| j |   }tjd|g}tj }t |d< | j||d\}}| jrVt	|| d S )Nzif 1:
                X = np.arange(1000000.)
                Y = np.arange(1000000.)
                Z = busy_func(X, Y)
                assert numba.threading_layer() == '%s'
            -cr   r   )
r   r   r   r   r   r   r   r   r   r   )r(   bodyrunmer   r   r$   r   r   r   r   r     s    
z:TestThreadingLayerSelection._inject.<locals>.test_templatez test_threading_layer_selector_%sZ	important)r   r	   )r   r   r   r   r   r   r   r   r     s
    z#TestThreadingLayerSelection._injectc                 C   s$   | j  D ]\}}| || q
d S r   )r   r   r   )r   r   r   r   r   r   r     s    z$TestThreadingLayerSelection.generateN)r+   r,   r-   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r     s   

r   c                   @   sZ   e Zd ZedddZeedd Zeedd Zedd	 Z	ed
d Z
dd ZdS )TestThreadingLayerPriority)env_varc                 C   sJ   t j }d|d< ||d< d| d}tjdt|g}| j||d dS )	zJTest setting priority via env var NUMBA_THREADING_LAYER_PRIORITY.
        rr   r   ZNUMBA_THREADING_LAYER_PRIORITYa  
                import numba

                # trigger threading layer decision
                # hence catching invalid THREADING_LAYER_PRIORITY
                @numba.jit(
                    'float64[::1](float64[::1], float64[::1])',
                    nopython=True,
                    parallel=True,
                )
                def plus(x, y):
                    return x + y

                captured_envvar = list("a	  ".split())
                assert numba.config.THREADING_LAYER_PRIORITY ==                     captured_envvar, "priority mismatch"
                assert numba.threading_layer() == captured_envvar[0],                    "selected backend mismatch"
                r   r   N)r   r   r   r   r   textwrapdedentr   )r(   r   r   codecmdr   r   r   each_env_var  s    
z'TestThreadingLayerPriority.each_env_varc                 C   s2   dddg}t |D ]}d|}| | qd S )Nr   r   r    )	itertoolspermutationsr`   r   )r(   rr   r   r   r   r   r   test_valid_env_var2  s    

z-TestThreadingLayerPriority.test_valid_env_varc              	   C   sF   d}|  t}| | W 5 Q R X dD ]}| | t|j q(d S )Nztbb omp workqueue notvalidhere)z!THREADING_LAYER_PRIORITY invalid:zIt must be a permutation of)assertRaisesr   r   r   r   	exception)r(   r   Zraisesmsgr   r   r   test_invalid_env_var:  s
    z/TestThreadingLayerPriority.test_invalid_env_varc                 C   s   dD ]}|  | qd S )N)zomp tbb workqueuezomp workqueue tbbr   r(   r   r   r   r   test_ompF  s    z#TestThreadingLayerPriority.test_ompc                 C   s   dD ]}|  | qd S )N)ztbb omp workqueueztbb workqueue ompr   r   r   r   r   test_tbbK  s    z#TestThreadingLayerPriority.test_tbbc                 C   s   dD ]}|  | qd S )N)zworkqueue tbb ompzworkqueue omp tbbr   r   r   r   r   test_workqueueP  s    z)TestThreadingLayerPriority.test_workqueueN)r+   r,   r-   r   r   r   r   r   r   r   r   r  r   r   r   r   r     s   !


r   c                   @   sH   e Zd ZdZdZedd Zedd Zdd Z	e
ed	d
d ZdS )TestMiscBackendIssueszL
    Checks fixes for the issues with threading backends implementation
    Fc                 C   s<   d}t jd|g}tj }d|d< d|d< | j||d dS )	z8
        Tests that OMP does not overflow stack
        a  if 1:
            from numba import vectorize, threading_layer
            import numpy as np

            @vectorize(['f4(f4,f4,f4,f4,f4,f4,f4,f4)'], target='parallel')
            def foo(a, b, c, d, e, f, g, h):
                return a+b+c+d+e+f+g+h

            x = np.ones(2**20, np.float32)
            foo(*([x]*8))
            assert threading_layer() == "omp", "omp not found"
        r   r   r   Z100KZOMP_STACKSIZEr   Nr   r   r   r   r   r   r(   r   r   r   r   r   r   test_omp_stack_overflow\  s    
z-TestMiscBackendIssues.test_omp_stack_overflowc                 C   s<   d}t jd|g}tj }d|d< d|d< | j||d dS )	zq
        Tests that TBB works well with single thread
        https://github.com/numba/numba/issues/3440
        aM  if 1:
            from numba import njit, prange, threading_layer

            @njit(parallel=True)
            def foo(n):
                acc = 0
                for i in prange(n):
                    acc += i
                return acc

            foo(100)
            assert threading_layer() == "tbb", "tbb not found"
        r   r   r   1r   r   Nr  r  r   r   r   test_single_thread_tbbs  s    
z,TestMiscBackendIssues.test_single_thread_tbbc           	   
   C   s   d}t jd|g}tj }d|d< d|d< z| j||d\}}W nV tk
r } z8| jrbt|| t	|}| 
d| d	}| 
|| W 5 d
}~X Y nX d
S )zW
        Tests workqueue raises sigabrt if a nested parallel call is performed
        a  if 1:
            from numba import njit, prange
            import numpy as np

            @njit(parallel=True)
            def nested(x):
                for i in prange(len(x)):
                    x[i] += 1


            @njit(parallel=True)
            def main():
                Z = np.zeros((5, 10))
                for i in prange(Z.shape[0]):
                    nested(Z[i])
                return Z

            main()
        r   r   r   4r   r   zfailed with codezTNumba workqueue threading layer is terminating: Concurrent access has been detected.N)r   r   r   r   r   r   r   r   r   r   r   )	r(   r   r   r   r$   r   rT   Ze_msgr6   r   r   r   +test_workqueue_aborts_on_nested_parallelism  s    

zATestMiscBackendIssues.test_workqueue_aborts_on_nested_parallelismzTest needs fork(2)c                 C   s<   d}t jd|g}tj }d|d< d|d< | j||d d S )Na  if 1:
            from numba import njit, prange, threading_layer
            import numpy as np
            import multiprocessing

            if __name__ == "__main__":
                # Need for force fork context (OSX default is "spawn")
                multiprocessing.set_start_method('fork')

                @njit(parallel=True)
                def func(x):
                    return 10. * x

                arr = np.arange(2.)

                # run in single process to start Numba's thread pool
                np.testing.assert_allclose(func(arr), func.py_func(arr))

                # now run in a multiprocessing pool to get a fork from a
                # non-main thread
                with multiprocessing.Pool(10) as p:
                    result = p.map(func, [arr])
                np.testing.assert_allclose(result,
                                           func.py_func(np.expand_dims(arr, 0)))

                assert threading_layer() == "workqueue"
        r   r   r   r  r   r   r  r  r   r   r   0test_workqueue_handles_fork_from_non_main_thread  s    
zFTestMiscBackendIssues.test_workqueue_handles_fork_from_non_main_threadN)r+   r,   r-   r   r   r   r  r   r  r	  r   
skipUnlessr   r
  r   r   r   r   r  U  s   

)
r  c                   @   sT   e Zd ZdZdZdd Zdd Zdd Zed	d
 Z	dd Z
dd Zedd ZdS )TestForkSafetyIssueszV
    Checks Numba's behaviour in various situations involving GNU OpenMP and fork
    Fc                 C   s"   d}t jd|g}| |\}}d S )Nzsif 1:
            from numba.np.ufunc import omppool
            assert omppool.openmp_vendor == 'GNU'
            r   )r   r   r   )r(   r   r   r$   r   r   r   r   !test_check_threading_layer_is_gnu  s    z6TestForkSafetyIssues.test_check_threading_layer_is_gnuc              
   C   sd   d}| j | }tjd|g}z| |\}}W n2 tk
r^ } z| dt| W 5 d}~X Y nX dS )z~
        Whilst normally valid, this actually isn't for Numba invariant of OpenMP
        Checks SIGABRT is received.
        zif 1:
            X = np.arange(1000000.)
            Y = np.arange(1000000.)
            Z = busy_func(X, Y)
            pid = os.fork()
            if pid  == 0:
                Z = busy_func(X, Y)
            else:
                os.wait()
        r   zfailed with code -6N)r   r   r   r   r   r   r   )r(   r   r   r   r$   r   rT   r   r   r   !test_par_parent_os_fork_par_child  s    

z6TestForkSafetyIssues.test_par_parent_os_fork_par_childc                 C   s<   d}| j | }tjd|g}| |\}}| jr8t|| dS )au  
        Implicit use of multiprocessing fork context.
        Does this:
        1. Start with OpenMP
        2. Fork to processes using OpenMP (this is invalid)
        3. Joins fork
        4. Check the exception pushed onto the queue that is a result of
           catching SIGTERM coming from the C++ aborting on illegal fork
           pattern for GNU OpenMP
        a  if 1:
            mp = multiprocessing.get_context('fork')
            X = np.arange(1000000.)
            Y = np.arange(1000000.)
            q = mp.Queue()

            # Start OpenMP runtime on parent via parallel function
            Z = busy_func(X, Y, q)

            # fork() underneath with no exec, will abort
            proc = mp.Process(target = busy_func, args=(X, Y, q))
            proc.start()

            err = q.get()
            assert "Caught SIGTERM" in str(err)
        r   Nr   r   r   r   r   r   r(   r   r   r   r$   r   r   r   r   *test_par_parent_implicit_mp_fork_par_child  s    
z?TestForkSafetyIssues.test_par_parent_implicit_mp_fork_par_childc                 C   s<   d}| j | }tjd|g}| |\}}| jr8t|| dS )au  
        Explicit use of multiprocessing fork context.
        Does this:
        1. Start with OpenMP
        2. Fork to processes using OpenMP (this is invalid)
        3. Joins fork
        4. Check the exception pushed onto the queue that is a result of
           catching SIGTERM coming from the C++ aborting on illegal fork
           pattern for GNU OpenMP
        a  if 1:
            X = np.arange(1000000.)
            Y = np.arange(1000000.)
            ctx = multiprocessing.get_context('fork')
            q = ctx.Queue()

            # Start OpenMP runtime on parent via parallel function
            Z = busy_func(X, Y, q)

            # fork() underneath with no exec, will abort
            proc = ctx.Process(target = busy_func, args=(X, Y, q))
            proc.start()
            proc.join()

            err = q.get()
            assert "Caught SIGTERM" in str(err)
        r   Nr  r  r   r   r   *test_par_parent_explicit_mp_fork_par_child'  s    
z?TestForkSafetyIssues.test_par_parent_explicit_mp_fork_par_childc                 C   s<   d}| j | }tjd|g}| |\}}| jr8t|| dS )z
        Explicit use of multiprocessing spawn, this is safe.
        Does this:
        1. Start with OpenMP
        2. Spawn to processes using OpenMP
        3. Join spawns
        4. Run some more OpenMP
        a  if 1:
            X = np.arange(1000000.)
            Y = np.arange(1000000.)
            ctx = multiprocessing.get_context('spawn')
            q = ctx.Queue()

            # Start OpenMP runtime and run on parent via parallel function
            Z = busy_func(X, Y, q)
            procs = []
            for x in range(20): # start a lot to try and get overlap
                ## fork() + exec() to run some OpenMP on children
                proc = ctx.Process(target = busy_func, args=(X, Y, q))
                procs.append(proc)
                sys.stdout.flush()
                sys.stderr.flush()
                proc.start()

            [p.join() for p in procs]

            try:
                q.get(False)
            except multiprocessing.queues.Empty:
                pass
            else:
                raise RuntimeError("Queue was not empty")

            # Run some more OpenMP on parent
            Z = busy_func(X, Y, q)
        r   Nr  r  r   r   r   -test_par_parent_mp_spawn_par_child_par_parentJ  s    	
zBTestForkSafetyIssues.test_par_parent_mp_spawn_par_child_par_parentc                 C   s<   d}| j | }tjd|g}| |\}}| jr8t|| dS )a  
        Implicit use of multiprocessing (will be fork, but cannot declare that
        in Py2.7 as there's no process launch context).
        Does this:
        1. Start with no OpenMP
        2. Fork to processes using OpenMP
        3. Join forks
        4. Run some OpenMP
        a,  if 1:
            X = np.arange(1000000.)
            Y = np.arange(1000000.)
            q = multiprocessing.Queue()

            # this is ok
            procs = []
            for x in range(10):
                # fork() underneath with but no OpenMP in parent, this is ok
                proc = multiprocessing.Process(target = busy_func,
                                               args=(X, Y, q))
                procs.append(proc)
                proc.start()

            [p.join() for p in procs]

            # and this is still ok as the OpenMP happened in forks
            Z = busy_func(X, Y, q)
            try:
                q.get(False)
            except multiprocessing.queues.Empty:
                pass
            else:
                raise RuntimeError("Queue was not empty")
        r   Nr  r  r   r   r   =test_serial_parent_implicit_mp_fork_par_child_then_par_parentv  s    

zRTestForkSafetyIssues.test_serial_parent_implicit_mp_fork_par_child_then_par_parentc                 C   s<   d}| j | }tjd|g}| |\}}| jr8t|| dS )z
        Explicit use of multiprocessing 'fork'.
        Does this:
        1. Start with no OpenMP
        2. Fork to processes using OpenMP
        3. Join forks
        4. Run some OpenMP
        a  if 1:
            X = np.arange(1000000.)
            Y = np.arange(1000000.)
            ctx = multiprocessing.get_context('fork')
            q = ctx.Queue()

            # this is ok
            procs = []
            for x in range(10):
                # fork() underneath with but no OpenMP in parent, this is ok
                proc = ctx.Process(target = busy_func, args=(X, Y, q))
                procs.append(proc)
                proc.start()

            [p.join() for p in procs]

            # and this is still ok as the OpenMP happened in forks
            Z = busy_func(X, Y, q)
            try:
                q.get(False)
            except multiprocessing.queues.Empty:
                pass
            else:
                raise RuntimeError("Queue was not empty")
        r   Nr  r  r   r   r   =test_serial_parent_explicit_mp_fork_par_child_then_par_parent  s    

zRTestForkSafetyIssues.test_serial_parent_explicit_mp_fork_par_child_then_par_parentN)r+   r,   r-   r   r   r  r  r  r   r  r  r  r  r   r   r   r   r    s   !
",)r  c                   @   s(   e Zd ZdZedd Zedd ZdS )TestTBBSpecificIssuesFc                 C   sL   d}t jd|g}| |\}}d}| || | jrHtd| td| d S )Na%  if 1:
            import threading
            import numba
            numba.config.THREADING_LAYER='tbb'
            from numba import njit, prange, objmode
            from numba.core.serialize import PickleCallableByPath
            import os

            e_running = threading.Event()
            e_proceed = threading.Event()

            def indirect_core():
                e_running.set()
                # wait for forker() to have forked
                while not e_proceed.isSet():
                    pass

            indirect = PickleCallableByPath(indirect_core)

            @njit
            def obj_mode_func():
                with objmode():
                    indirect()

            @njit(parallel=True, nogil=True)
            def work():
                acc = 0
                for x in prange(10):
                    acc += x
                obj_mode_func()
                return acc

            def runner():
                work()

            def forker():
                # wait for the jit function to say it's running
                while not e_running.isSet():
                    pass
                # then fork
                os.fork()
                # now fork is done signal the runner to proceed to exit
                e_proceed.set()

            numba_runner = threading.Thread(target=runner,)
            fork_runner =  threading.Thread(target=forker,)

            threads = (numba_runner, fork_runner)
            for t in threads:
                t.start()
            for t in threads:
                t.join()
        r   z9Attempted to fork from a non-main thread, the TBB libraryOUT:ERR:)r   r   r   r   r   r   )r(   r   r   r$   r   Zmsg_headr   r   r   test_fork_from_non_main_thread  s    6
z4TestTBBSpecificIssues.test_fork_from_non_main_threadc                 C   s   |    d}dd|}tjd|g}tj }d|d< | j||d\}}||kr\| d n| 	d	| | j
rtd
| td| d S )NzSKIP: COMPILATION FAILEDax  if 1:
            import ctypes
            import sys
            import multiprocessing as mp
            from tempfile import TemporaryDirectory, NamedTemporaryFile
            from numba.pycc.platform import Toolchain, external_compiler_works
            from numba import njit, prange, threading_layer
            import faulthandler
            faulthandler.enable()
            if not external_compiler_works():
                raise AssertionError('External compilers are not found.')
            with TemporaryDirectory() as tmpdir:
                with NamedTemporaryFile(dir=tmpdir) as tmpfile:
                    try:
                        src = """
                        #define TBB_PREVIEW_WAITING_FOR_WORKERS 1
                        #include <tbb/tbb.h>
                        static tbb::task_scheduler_handle tsh;
                        extern "C"
                        {
                        void launch(void)
                        {
                            tsh = tbb::task_scheduler_handle::get();
                        }
                        }
                        """
                        cxxfile = f"{tmpfile.name}.cxx"
                        with open(cxxfile, 'wt') as f:
                            f.write(src)
                        tc = Toolchain()
                        object_files = tc.compile_objects([cxxfile,],
                                                           output_dir=tmpdir)
                        dso_name = f"{tmpfile.name}.so"
                        tc.link_shared(dso_name, object_files,
                                       libraries=['tbb',],
                                       export_symbols=['launch'])
                        # Load into the process, it doesn't matter whether the
                        # DSO exists on disk once it's loaded in.
                        DLL = ctypes.CDLL(dso_name)
                    except Exception as e:
                        # Something is broken in compilation, could be one of
                        # many things including, but not limited to: missing tbb
                        # headers, incorrect permissions, compilers that don't
                        # work for the above
                        print(e)
                        print('BROKEN_COMPILERS')
                        sys.exit(0)

                    # Do the test, launch this library and also execute a
                    # function with the TBB threading layer.

                    DLL.launch()

                    @njit(parallel=True)
                    def foo(n):
                        acc = 0
                        for i in prange(n):
                            acc += i
                        return acc

                    foo(1)

            # Check the threading layer used was TBB
            assert threading_layer() == 'tbb'

            # Use mp context for a controlled version of fork, this triggers the
            # reported bug.

            ctx = mp.get_context('fork')
            def nowork():
                pass
            p = ctx.Process(target=nowork)
            p.start()
            p.join(10)
            print("SUCCESS")
            BROKEN_COMPILERSr   r   r   r   z3Compilation of DSO failed. Check output for detailsSUCCESSr  r  )Zskip_if_no_external_compilerreplacer   r   r   r   r   r   r   r   r   r   )r(   r  r   r   r   r$   r   r   r   r   &test_lifetime_of_task_scheduler_handle*  s     K M

z<TestTBBSpecificIssues.test_lifetime_of_task_scheduler_handleN)r+   r,   r-   r   r   r  r  r   r   r   r   r    s
   
Zr  c                   @   s,   e Zd ZdZdd Zedd Zdd ZdS )	TestInitSafetyIssuesFc                 C   sv   t j|t jt jd}tt|j}z8|  |	 \}}|j
dkrXtd|j
| f W 5 |  X | | fS )N)r   r   r   r   r   )r(   r   r   r   r$   r   r   r   r   r     s     

zTestInitSafetyIssues.run_cmdc                 C   sX   t jt jtd}tj|g}| |\}}| d| | j	rTt
d| t
d| d S )Nzorphaned_semaphore_usecase.pyzleaked semaphorer  r  )r   r   r`   r   r   r   r   r   ZassertNotInr   r   )r(   Z	test_filer   r$   r   r   r   r   test_orphaned_semaphore  s    

z,TestInitSafetyIssues.test_orphaned_semaphorec              	   C   s   dD ]v}zt | W n tk
r.   Y qY nX d}tjd||g}| |\}}| jrntd| td| | 	|| qd S )N)rw   rv   rx   zimport numba; import multiprocessing;multiprocessing.set_start_method('{}');print(multiprocessing.get_context().get_start_method())r   r  r  )
rn   ro   r   r   r   formatr   r   r   r   )r(   methr   r   r$   r   r   r   r   test_lazy_lock_init  s    


z(TestInitSafetyIssues.test_lazy_lock_initN)r+   r,   r-   r   r   r   r  r"  r   r   r   r   r    s
   
r  c                   @   s   e Zd Zdd ZdS )TestOpenMPVendorsc                 C   sN   t  }d|d< d|d< d|d< | D ]"}tj|r&| || tj q&dS )z>
        Checks the OpenMP vendor strings are correct
        ZMSwin32ZIntelr   r   r   N)dictr   r   r   r   assertEqualr   openmp_vendor)r(   r6   kr   r   r   test_vendors  s    zTestOpenMPVendors.test_vendorsN)r+   r,   r-   r)  r   r   r   r   r#    s   r#  __main__)br   rG   r   rn   r   r@   r   r   r   r   r   numpyr   Znumbar   r   r   r   Znumba.tests.supportr   r   r   r	   r
   r   rE   Zt_queueZnumba.testing.mainr   Z_RUNNER_TIMEOUTZ
numba.corer   r   Znumba.np.ufunc.parallelr   Znumba.np.ufuncr   Z_HAVE_TBB_POOLImportErrorr   Z_HAVE_OMP_POOLZscipy.linalg.cython_lapackZscipyZ_HAVE_LAPACKr  r   r   r'  Z_gnuompZskip_unless_gnu_ompr   r   _windowsZ_osxmaxsizeZ_32bitr   r   r   r    r#   r%   objectr&   r.   r:   r>   r?   rC   rU   rj   ThreadZ_thread_classrk   ru   rs   r   r   r   r   r   ry   ZTHREADING_LAYERZ_specific_backendsr   r   r   r   r   r   r   r  r  r  r  r#  r+   mainr   r   r   r   <module>   s    






	X

!36%F  h C<
