a
    tDf                     @  s   U d Z ddlmZ ddlZddlZddlZddlZddlm	Z	 ddl
mZmZ ddlmZmZ ddlmZmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ dd Ze Zdaded< e	eZ ded< e Z!dd Z"ddddddZ#dS )z2
A threaded shared-memory scheduler

See local.py
    )annotationsN)defaultdict)MappingSequence)ExecutorThreadPoolExecutor)Lockcurrent_thread)config)MultiprocessingPoolExecutor	get_async)	CPU_COUNT)Keyc                   C  s   t  jS )N)r	   ident r   r   Z/nfs/NAS7/SABIOD/METHODE/ermites/ermites_venv/lib/python3.9/site-packages/dask/threaded.py_thread_get_id   s    r   zExecutor | Nonedefault_poolz2defaultdict[threading.Thread, dict[int, Executor]]poolsc                 C  s   | t  d fS )N   )sysexc_info)edumpsr   r   r   pack_exception!   s    r   r   zSequence[Key] | Key)dskkeysc                 K  sx  |pt dd}|pt dd}t }t |du r|du rf|tu rftdu r`ttat	tj
 t}q|tv r|t| v rt| | }qt|}t	|j
 |t| |< nt|tjjrt|}W d   n1 s0    Y  t|j|j| |f|ttd|}t^ tt }|turTttD ].}	|	|vr$t|	 D ]}
|

  q@q$W d   n1 sj0    Y  |S )a  Threaded cached implementation of dask.get

    Parameters
    ----------

    dsk: dict
        A dask dictionary specifying a workflow
    keys: key or list of keys
        Keys corresponding to desired data
    num_workers: integer of thread count
        The number of threads to use in the ThreadPool that will actually execute tasks
    cache: dict-like (optional)
        Temporary storage of results

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> add = lambda x, y: x + y
    >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')}
    >>> get(dsk, 'w')
    4
    >>> get(dsk, ['w', 'y'])
    (4, 2)
    poolNnum_workers)cacheZget_idr   )r
   getr	   
pools_lockmain_threadr   r   r   atexitregistershutdownr   
isinstancemultiprocessingr   ZPoolr   r   ZsubmitZ_max_workersr   r   set	threading	enumeratelistpopvalues)r   r   r   r   r   kwargsthreadresultsZactive_threadstpr   r   r   r    %   sJ    !&

0r    )NNN)$__doc__
__future__r   r#   Zmultiprocessing.poolr'   r   r)   collectionsr   collections.abcr   r   concurrent.futuresr   r   r   r	   Zdaskr
   Z
dask.localr   r   Zdask.systemr   Zdask.typingr   r   r"   r   __annotations__dictr   r!   r   r    r   r   r   r   <module>   s.      