a
    tDfG                     @  s\  d dl mZ d dlZd dlmZmZmZ d dlmZm	Z	m
Z
 d dlmZmZmZmZmZ d dlmZmZmZmZmZ d dlmZ G dd	 d	eZed6d
ddddddZed7ddd
ddddddZd8ddd
ddddddZddddddZdddddd Zed!d"Zd9d#d$d%d&d'd(d)Zd*d+d,d-Zd.d.d/d0d1Z d#d2d3d4d5Z!dS ):    )annotationsN)defaultdictdeque
namedtuple)IterableMappingMutableMapping)AnyCallableLiteral
NamedTupleoverload)get_dependenciesget_depsgetcycleistaskreverse_dict)Keyc                   @  s   e Zd ZU ded< ded< dS )Orderintpriorityzfloat | intcritical_pathN)__name__
__module____qualname____annotations__ r   r   W/nfs/NAS7/SABIOD/METHODE/ermites/ermites_venv/lib/python3.9/site-packages/dask/order.pyr   :   s   
r   zMapping[Key, Any]zMapping[Key, set[Key]] | NonezLiteral[True]zdict[Key, Order])dskdependenciesreturn_statsreturnc                C  s   d S Nr   r   r   r    r   r   r   order?   s    r$   F)r    zLiteral[False]zdict[Key, int]c                C  s   d S r"   r   r#   r   r   r   r$   I   s    boolz!dict[Key, Order] | dict[Key, int]c          $        s  	si S t 		t	}du r8d}	fdd	D nd}t tdd  D dd  D i d}d	}tt|sd}tD ]}|v rqt	| st| d
krd}t	d
 | }rt|d|< n||< |d
7 }	| 	|= |= | D ](}	|	 	| |	 s
|	 qqtD ]}
|
v r\qJt	|
 sJt|
 d
krJ|sd}t	|
 |
 D ]6}	|	 
|
 |	 	|
 |	 s
|	 q	|
= |
= |
= qJqt\tt	kr8t	d}tdddd |D  dusFJ t\t\}d	
t t g i jd	t }d	 i dddfddddd 
fddddd fdd}|ddfdd}d dfd!d"}| }ddd#d$}|spd%dfd&d'}| }ntdd(}|j}~g }|j|j
dddfd)d*}|j|jd+dd,fd-d.}|j|jddfd/d0}t|k rd
7 |rJ |rJ | }| }|| |r\t|d1}|| | }|| q.d}|r| }|v rzq`| r^|v r|D ]}|t| qq`|| | }g }g } | j}!|j}"t|d1D ]"}#|#v r|!|# n|"|# qt|d
krd}|D ]}#||# q| D ]"}#|#D ]}|t| q>q2~q`n&|r|tt|k r||  | q`|  qt|ksJ S )2a   Order nodes in dask graph

    This produces an ordering over our tasks that we use to break ties when
    executing.  We do this ahead of time to reduce a bit of stress on the
    scheduler and also to assist in static analysis.

    This currently traverses the graph as a single-threaded scheduler would
    traverse it.

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> add = lambda x, y: x + y
    >>> dsk = {'a': 1, 'b': 2, 'c': (inc, 'a'), 'd': (add, 'b', 'c')}
    >>> order(dsk)
    {'a': 0, 'c': 1, 'b': 2, 'd': 3}
    NTc                   s   i | ]}|t  |qS r   )r   ).0k)r   r   r   
<dictcomp>r       zorder.<locals>.<dictcomp>Fc                 S  s   h | ]\}}|s|qS r   r   r&   r'   vr   r   r   	<setcomp>|   r)   zorder.<locals>.<setcomp>c                 S  s   h | ]\}}|s|qS r   r   r*   r   r   r   r,   }   r)   r      z2Cycle detected between the following keys:
  -> %sz
  -> c                 s  s   | ]}t |V  qd S r"   )str)r&   xr   r   r   	<genexpr>   r)   zorder.<locals>.<genexpr>r   ztuple[int, int, int, int, str])r0   r!   c              	     sf   z
 |  W S  t y`   d us$J |  t|  t|  |   t| f  | < }| Y S 0 d S r"   )KeyErrorlenr/   )r0   rv)_sort_keys_cacher   max_dependentsroots_connectedtotal_dependenciesr   r   sort_key   s    



zorder.<locals>.sort_keyNone)itemr!   c                   s   | g}|r|  } |  |  |  | 	v r:q|  rT|     q:
rlt  	| < n	| < d7 | dD ]L}|  d8  < | | st|  dkr|| q| qqd S )Nr-   r   )popdiscardr   getaddr3   append)r;   Z
next_itemsdep)_crit_path_counter_offsetadd_to_resultcrit_path_counter
dependentsi
leaf_nodes
num_neededreachable_hullrequires_data_taskresultr    runnablerunnable_hullr   r   rC      s*    



zorder.<locals>.add_to_resultzCallable[..., None])funcr!   c                   s   dddd fdd}|S )Nr	   r:   )argskwargsr!   c                    s&   d z| i | W d nd 0 d S )Ng      ?r   r   )rO   rP   )rB   rN   r   r   wrapper  s    z,order.<locals>._with_offset.<locals>.wrapperr   )rN   rQ   )rB   )rN   r   _with_offset  s    zorder.<locals>._with_offsetr!   c                    s>  r:  }   | r |  }|	v s|v r4q|v rF | q|g}t|g}|r| }|d }	| | }| }|s| dkr|D ]} | qq4t|dkst|dkr2t|dkrt|
dD ]0}t| dkr|  }	|	| ||	 qqV		| |
t|
d qbn|v r| | t| | kr0t }
|D ]}|d vrn|
| qnt|
| k rt|
|< n@|
r0|
 }|D ](}| r|
|  q | qqn@t| dkr&| dkr&|D ]} | qn
|g|< qVqbqVqq dS )aE  Compute all currently runnable paths and either cache or execute them

        This is designed to ensure we are running tasks that are free to execute
        (e.g. the result of a splitter task) not too eagerly. If we executed
        such free tasks too early we'd be walking the graph in a too wide /
        breadth first fashion that is not optimal. If instead we were to only
        execute them once they are needed for a final result, this can cause
        very high memory pressure since valuable reducers are executed too
        late.

        The strategy here is to take all runnable tasks and walk forwards until
        we hit a reducer node (i.e. a node with more than one dependency). We
        will remember/cache the path to this reducer node.
        If this path leads to a leaf or if we find enough runnable paths for a
        reducer to be runnable, we will execute the path.

        If instead of a reducer a splitter is encountered that is runnable, we
        will follow its splitter paths individually and apply the same logic to
        each branch.
        r.   r-   keyN)copyclearr<   r   popleftr?   r3   sortedr@   updateextendlist)
candidatesrU   pathZbranchescurrentZdeps_downstreamZdeps_upstreamr'   dbranchZpruned_branches)rC   r   rE   known_runnable_pathsknown_runnable_paths_poprG   rH   rK   rL   rM   r9   r   r   process_runnables  s|    










z order.<locals>.process_runnablesr%   c                    sZ   d} t tt  t dk rVD ],}| s>t| } q(| t| kr( dS q(dS )Nr   g?FT)absr3   )sizer)rG   leafs_connected
root_nodesr   r   use_longest_path  s     zorder.<locals>.use_longest_pathc                   S  s
   t  d S r"   )NotImplementedErrorr   r   r   r   
get_target  s    zorder.<locals>.get_targetzCallable[[], Key]c            	        s   t t} D ]"}| D ]}| |  d7  < qqt t}|  D ]\}}|| | q@i  | D ]\}}t|	dd |< qd~~ dd fdddd	fd	d
}|S )Nr-   TrU   reversez
Key | NonerS   c                    sD    r@t  }  |  d }|v r< |     |  s  | = q |S d S )Nr.   )maxr<   )rU   Zpicked_root)occurences_grouped_sortedrK   r   r   	pick_seed  s    z3order.<locals>._build_get_target.<locals>.pick_seedr   c                    sj    } }r$fdd}| @ } nr>fdd}| @ } | s^  }rV| } np\} t | |dS )Nc                   s    |  | fS r"   r   r'   rH   r9   r   r   <lambda>  r)   zForder.<locals>._build_get_target.<locals>.get_target.<locals>.<lambda>c                   s    |  | fS r"   r   rs   rt   r   r   ru     r)   rT   )min)r^   Zskeyseed)rG   ri   rH   rr   rI   rM   r9   r   r   rm     s    


z4order.<locals>._build_get_target.<locals>.get_target)r   r   setitemsr?   rZ   )	occurrencestrh   Zoccurences_groupedrootZoccr'   r+   rm   )rG   ri   rH   rI   rK   r7   rM   r9   )rq   rr   r   _build_get_target  s    z order.<locals>._build_get_targetrn   c                   s    |  |  d S r"   r   r;   )cpath_append
scpath_addr   r   path_append  s    zorder.<locals>.path_appendzIterable[Key])ry   r!   c                   s    |  |  d S r"   r   )ry   )cpath_extendscpath_updater   r   path_extend  s    zorder.<locals>.path_extendc                    s     } |  | S r"   r   r~   )	cpath_popscpath_discardr   r   path_pop#  s    zorder.<locals>.path_poprT   )dictr3   r   ry   r   rx   r]   r   r   remover?   rW   deepcopyndependenciesr   RuntimeErrorjoin_connecting_to_rootsr<   rZ   r@   r[   r\   r=   rp   reversed
difference)$r   r   r    expected_lenZdependencies_are_copyZ	all_tasksZn_removed_leavesZleafpriorA   r|   cycle_Z
scrit_pathrR   re   rk   Zlongest_pathrm   r}   Zleaf_nodes_sortedr   r   r   r   targetZ	next_depsr;   Zwalked_backr_   depsunknownZknownZk_appendZ	uk_appendra   r   )rB   r5   rC   r   r   r   rD   r   rE   r   rF   rc   rd   rG   ri   r6   rH   rI   rJ   rK   r    rj   r7   rL   rM   r   r   r   r9   r8   r   r$   S   s$   



 



,$(f .G







zMapping[Key, set[Key]]z*tuple[dict[Key, set[Key]], dict[Key, int]])r   rE   r!   c                 C  s  i }g }dd |   D }i }t }|   D ]`\}}|s,|| |h||< || }	t|	||< |	D ]&}
||
  d8  < ||
 sd||
 qdq,|rx| }|| D ]&}||  d8  < || s|| qd}d}d}| | D ]n}
||
 }|s|}||
 ||< q|r||u s||sd}|s.| }t||
 || ||< |	| q|dush|dushJ |pp|||< qt }|D ]}|||< q||fS )a  Determine for every node which root nodes are connected to it (i.e.
    ancestors). If arguments of dependencies and dependents are switched, this
    can also be used to determine which leaf nodes are connected to which node
    (i.e. descendants).

    Also computes a weight that is defined as (cheaper to compute here)

            `max(len(dependents[k]) for k in connected_roots[key])`

    c                 S  s   i | ]\}}|r|t |qS r   r3   r*   r   r   r   r(   s  r)   z(_connecting_to_roots.<locals>.<dictcomp>r-   NTF)
ry   rx   r?   r3   r@   r<   issubsetrW   rp   r[   )r   rE   rK   r`   rH   r6   rootsr'   r+   r   childrU   parentZnew_setZidentical_setsZresult_firstZr_childZ	empty_setrh   r   r   r   r   d  s\    

r   z%tuple[dict[Key, int], dict[Key, int]]c                   s   i }i  |   D ] \}}t|||< |sd |< q| }g }|j}|j} D ]2}	||	 D ]$}
||
  d8  < ||
 sZ||
 qZqN|r| }	dt fdd| |	 D   |	< ||	 D ]$}
||
  d8  < ||
 s||
 qq| fS )aG  Number of total data elements on which this key depends

    For each key we return the number of tasks that must be run for us to run
    this task.

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> dsk = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')}
    >>> dependencies, dependents = get_deps(dsk)
    >>> num_dependencies, total_dependencies = ndependencies(dependencies, dependents)
    >>> sorted(total_dependencies.items())
    [('a', 1), ('b', 2), ('c', 3)]

    Returns
    -------
    num_dependencies: Dict[key, int]
    total_dependencies: Dict[key, int]
    r-   c                 3  s   | ]} | V  qd S r"   r   )r&   r   rK   r   r   r1     r)   z ndependencies.<locals>.<genexpr>)ry   r3   rW   r<   r@   sum)r   rE   rH   r'   r+   Znum_dependenciesr`   Zcurrent_popZcurrent_appendrU   r   r   r   r   r     s.    
"r   	OrderInfo)r$   ageZnum_data_when_runZnum_data_when_releasedZnum_dependencies_freedzMutableMapping[Key, Any]zMapping[Key, int] | Nonez$MutableMapping[Key, set[Key]] | Nonez&tuple[dict[Key, OrderInfo], list[int]])r   or   r!   c                   sB  |du rt | \}}nt|}|dus*J |du r@t| |dd}g }d}i  i i i dd | D }tt| |jdD ]\}}|| ||< d}	|| D ]@}
||
  d8  < ||
 dkr|||
   |
< ||
< |	d7 }	q|	|< || r||	d 8 }q|d |< ||< ||	8 }q| fd	d| D }||fS )
a  Simulate runtime metrics as though running tasks one at a time in order.

    These diagnostics can help reveal behaviors of and issues with ``order``.

    Returns a dict of `namedtuple("OrderInfo")` and a list of the number of outputs held over time.

    OrderInfo fields:
    - order : the order in which the node is run.
    - age : how long the output of a node is held.
    - num_data_when_run : the number of outputs held in memory when a node is run.
    - num_data_when_released : the number of outputs held in memory when the output is released.
    - num_dependencies_freed : the number of dependencies freed by running the node.
    NF)r   r    r   c                 S  s   i | ]\}}|t |qS r   r   r&   rU   valr   r   r   r(   
  r)   zdiagnostics.<locals>.<dictcomp>rT   r-   c              
     s2   i | ]*\}}|t | | | | | qS r   )r   r   r   ZfreedZreleasepressureZrunpressurer   r   r(     s   )r   r   r$   ry   	enumeraterZ   __getitem__r@   )r   r   r   rE   ZpressureZnum_in_memoryrH   rF   rU   ZreleasedrA   r4   r   r   r   diagnostics  sB    



r   r:   rS   c                   C  s   d S r"   r   r   r   r   r   _f&  s    r   r	   )taskr!   c                 C  s   t | rt| d sJ g }| dd  D ]j}t|ttfrF|| q(t|trtt |rh|t| q|| q(t|tr(|dd |D  q(t	g|R S t| trt	| fS t	g| R S d S )Nr   r-   c                 S  s   g | ]}t |qS r   )_convert_task)r&   er   r   r   
<listcomp>7  r)   z!_convert_task.<locals>.<listcomp>)
r   callable
isinstancer/   r   r@   tupler   r]   r   )r   Znew_specelr   r   r   r   *  s     


r   r   )r   r!   c                 C  sB   i }|   D ]\}}|}t|||< qt|t| kr>td|S )zTake a dask graph and replace callables with a dummy function and remove
    payload data like numpy arrays, dataframes, etc.
    z)Sanitization failed to preserve topology.)ry   r   r   r   )r   newrU   valuesZnew_keyr   r   r   sanitize_dsk?  s    r   )N)N)N)NN)"
__future__r   rW   collectionsr   r   r   collections.abcr   r   r   typingr	   r
   r   r   r   Z	dask.corer   r   r   r   r   Zdask.typingr   r   r$   r   r   r   r   r   r   r   r   r   r   r   <module>   sF   0 	      I1  <