a
    tDf                     @  sx  d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlZd dlmZ d dlmZ d d	lmZmZmZmZ d d
lmZ d dlmZ d dlmZmZmZm Z  G dd dZ!G dd deZ"G dd de"Z#G dd de"Z$G dd deZ%d'ddZ&dd Z'dd Z(G dd deZ)G dd  d e)Z*G d!d" d"eZ+G d#d$ d$eZ,G d%d& d&eZ-dS )(    annotationsN)defaultdict)Callable)product)Any)map)tokenize)	BlockwiseBlockwiseDepBlockwiseDepDictblockwise_token)flatten)Layer)applycached_cumsumconcreteinsertc                   @  s    e Zd ZdZdd Zdd ZdS )CallableLazyImportzFunction Wrapper for Lazy Importing.

    This Class should only be used when materializing a graph
    on a distributed scheduler.
    c                 C  s
   || _ d S N)function_path)selfr    r   X/nfs/NAS7/SABIOD/METHODE/ermites/ermites_venv/lib/python3.9/site-packages/dask/layers.py__init__"   s    zCallableLazyImport.__init__c                 O  s    ddl m} || j|i |S )Nr   )import_term)Zdistributed.utilsr   r   )r   argskwargsr   r   r   r   __call__%   s    zCallableLazyImport.__call__N)__name__
__module____qualname____doc__r   r   r   r   r   r   r      s   r   c                   @  sJ   e Zd ZU dZded< ded< dZded< dd	d
dZddddZdS )ArrayBlockwiseDepzg
    Blockwise dep for array-likes, which only needs chunking
    information to compute its data.
    tuple[tuple[int, ...], ...]chunkstuple[int, ...]	numblocksFboolproduces_tasksr%   c                 C  s$   || _ tdd |D | _d| _d S )Nc                 s  s   | ]}t |V  qd S r   )len).0chunkr   r   r   	<genexpr>>       z-ArrayBlockwiseDep.__init__.<locals>.<genexpr>F)r%   tupler'   r)   r   r%   r   r   r   r   <   s    zArrayBlockwiseDep.__init__idxc                 C  s   t dd S )Nz%Subclasses must implement __getitem__)NotImplementedErrorr   r3   r   r   r   __getitem__A   s    zArrayBlockwiseDep.__getitem__N)r   r    r!   r"   __annotations__r)   r   r6   r   r   r   r   r#   2   s   
r#   c                   @  s   e Zd ZdZddddZdS )ArrayChunkShapeDepz(Produce chunk shapes given a chunk indexr&   r2   c                 C  s   t dd t|| jD S )Nc                 s  s   | ]\}}|| V  qd S r   r   )r,   ir-   r   r   r   r.   I   r/   z1ArrayChunkShapeDep.__getitem__.<locals>.<genexpr>)r0   zipr%   r5   r   r   r   r6   H   s    zArrayChunkShapeDep.__getitem__N)r   r    r!   r"   r6   r   r   r   r   r8   E   s   r8   c                      s>   e Zd ZU dZded< dd fddZddd	d
Z  ZS )ArraySliceDepz>Produce slice(s) into the full-sized array given a chunk indexr$   startsr*   c                   s$   t  | tdd |D | _d S )Nc                 s  s   | ]}t |d dV  qdS )T)Zinitial_zeroN)r   )r,   cr   r   r   r.   S   r/   z)ArraySliceDep.__init__.<locals>.<genexpr>)superr   r0   r<   r1   	__class__r   r   r   Q   s    zArraySliceDep.__init__r0   r2   c                 C  s,   t dd t|| jD }t dd |D S )Nc                 s  s&   | ]\}}|| ||d   fV  qdS    Nr   )r,   r9   startr   r   r   r.   V   r/   z,ArraySliceDep.__getitem__.<locals>.<genexpr>c                 s  s    | ]}t g |d R  V  qd S r   slicer,   sr   r   r   r.   W   r/   )r0   r:   r<   )r   r3   locr   r   r   r6   U   s    zArraySliceDep.__getitem__)r   r    r!   r"   r7   r   r6   __classcell__r   r   r?   r   r;   L   s   
r;   c                      sn   e Zd ZdZ fddZdd Zedd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd ZdddZ  ZS )ArrayOverlapLayera\  Simple HighLevelGraph array overlap layer.

    Lazily computed High-level graph layer for a array overlap operations.

    Parameters
    ----------
    name : str
        Name of new output overlap array.
    array : Dask array
    axes: Mapping
        Axes dictionary indicating overlap in each dimension,
        e.g. ``{'0': 1, '1': 1}``
    c                   s2   t    || _|| _|| _|| _|| _d | _d S r   )r>   r   nameaxesr%   r'   token_cached_keys)r   rK   rL   r%   r'   rM   r?   r   r   r   i   s    
zArrayOverlapLayer.__init__c                 C  s   d| j  dS )NzArrayOverlapLayer<name=''rK   r   r   r   r   __repr__y   s    zArrayOverlapLayer.__repr__c                 C  s$   t | dr| jS |  }|| _| jS z$Materialize full dict representation_cached_dicthasattrrT   _construct_graphr   dskr   r   r   _dict|   s
    
zArrayOverlapLayer._dictc                 C  s
   | j | S r   rZ   r   keyr   r   r   r6      s    zArrayOverlapLayer.__getitem__c                 C  s
   t | jS r   iterrZ   rQ   r   r   r   __iter__   s    zArrayOverlapLayer.__iter__c                 C  s
   t | jS r   r+   rZ   rQ   r   r   r   __len__   s    zArrayOverlapLayer.__len__c                 C  s
   t | dS NrT   rV   rQ   r   r   r   is_materialized   s    z!ArrayOverlapLayer.is_materializedc                 C  s   |   S r   )keysrQ   r   r   r   get_output_keys   s    z!ArrayOverlapLayer.get_output_keysc                   sH   | j d ur| j S | j| j| j    fdd  | _ }|S )Nc                    sb   sfgS t  }|d t krB fddt| D }n fddt| D }|S )NrB   c                   s   g | ]}f  |f qS r   r   r,   r9   )r   rK   r   r   
<listcomp>   r/   z>ArrayOverlapLayer._dask_keys.<locals>.keys.<locals>.<listcomp>c                   s   g | ]} |f  qS r   r   rh   )r   rf   r   r   ri      r/   )r+   range)r   indresultr%   rf   rK   r'   )r   r   rf      s    z*ArrayOverlapLayer._dask_keys.<locals>.keys)rN   rK   r%   r'   )r   rl   r   rm   r   
_dask_keys   s    

zArrayOverlapLayer._dask_keysFc                 C  s
  | j }| j}| j}|  }d| j }d| j }|r<td}nddlm} tt	t
|}	tjt|	|d}
t|tt	|
t	ttjt}i }i }|D ]h}t|f| |}|f| |kr|||f| < q|f| ||f| < |t|
d| |dff||f| < qt||}|S )	z/Construct graph for a simple overlap operation.zgetitem-zoverlap-zdask.array.core.concatenate3r   )concatenate3)dimsrL   r   rP   )rL   r%   rK   rn   rM   r   Zdask.array.corero   listr   r+   	functoolspartial_expand_keys_around_centertoolzpiper   concatfractional_slicer   merge)r   deserializingrL   r%   rK   Z	dask_keysZgetitem_nameZoverlap_namero   rp   Zexpand_key2Zinterior_keysZinterior_slicesZoverlap_blockskZ
frac_slicerY   r   r   r   rW      s8    


z"ArrayOverlapLayer._construct_graph)F)r   r    r!   r"   r   rR   propertyrZ   r6   r`   rb   re   rg   rn   rW   rI   r   r   r?   r   rJ   Z   s   
	rJ   c                   s   fddg }t | dd D ]>\}}d}|dkr<|d7 }|| d k rT|d7 }|| q  fddt | dd D }|dur|gg| }tt| }	 fddt |D }
t|
|	}|S )	a  Get all neighboring keys around center

    Parameters
    ----------
    k: Key
        The key around which to generate new keys
    dims: Sequence[int]
        The number of chunks in each dimension
    name: Option[str]
        The name to include in the output keys, or none to include no name
    axes: Dict[int, int]
        The axes active in the expansion.  We don't expand on non-active axes

    Examples
    --------
    >>> _expand_keys_around_center(('x', 2, 3), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y', 1.1, 2.1), ('y', 1.1, 3), ('y', 1.1, 3.9)],
     [('y',   2, 2.1), ('y',   2, 3), ('y',   2, 3.9)],
     [('y', 2.9, 2.1), ('y', 2.9, 3), ('y', 2.9, 3.9)]]

    >>> _expand_keys_around_center(('x', 0, 4), dims=[5, 5], name='y', axes={0: 1, 1: 1})  # noqa: E501 # doctest: +NORMALIZE_WHITESPACE
    [[('y',   0, 3.1), ('y',   0,   4)],
     [('y', 0.9, 3.1), ('y', 0.9,   4)]]
    c                   sN   g }|d dkr| |d  | | |d  |  d k rJ| |d  |S )Ng?r   rB   )append)r9   rk   rv)rp   r   r   inds   s    
z(_expand_keys_around_center.<locals>.indsrB   Nr   c                   s2   g | ]*\}}t  |d fr(||n|gqS )r   anyget)r,   r9   rk   )rL   r   r   r   ri      s   z._expand_keys_around_center.<locals>.<listcomp>c                   s*   g | ]"\}}t  |d fr"|ndqS )r   rB   r   )r,   r9   d)rL   r   r   ri     r/   )	enumerater}   rq   r   reshapelist)r{   rp   rK   rL   shaper9   rk   numr   seqZshape2rl   r   )rL   rp   r   r   rt      s$    	
rt   c                   sF   t  dkrt|S tt | d  } fddt||D S dS )zgReshape iterator to nested shape

    >>> reshapelist((2, 3), range(6))
    [[0, 1, 2], [3, 4, 5]]
    rB   r   c                   s   g | ]}t  d d |qS rA   )r   r,   partr   r   r   ri     r/   zreshapelist.<locals>.<listcomp>N)r+   rq   intru   	partition)r   r   nr   r   r   r     s    r   c           
      C  s  | d ft dd | dd D  }g }tt| dd |dd D ]\}\}}||d}t|t rz|d }|d }	n|}|}	||kr|tddd qF||k r|	r|td|	 qF||kr|r|t| d qF|tdd qFt |}tdd |D r| S tj	||fS dS )a  

    >>> fractional_slice(('x', 5.1), {0: 2})
    (<built-in function getitem>, ('x', 5), (slice(-2, None, None),))

    >>> fractional_slice(('x', 3, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(None, None, None), slice(-3, None, None)))

    >>> fractional_slice(('x', 2.9, 5.1), {0: 2, 1: 3})
    (<built-in function getitem>, ('x', 3, 5), (slice(0, 2, None), slice(-3, None, None)))
    r   c                 s  s   | ]}t t|V  qd S r   )r   roundrh   r   r   r   r.   $  r/   z#fractional_slice.<locals>.<genexpr>rB   Nc                 s  s   | ]}|t d d d kV  qd S r   rD   )r,   rk   r   r   r   r.   :  r/   )
r0   r   r:   r   
isinstancer}   rE   alloperatorgetitem)
ZtaskrL   Zroundedindexr9   trdepthZ
left_depthZright_depthr   r   r   rx     s(    $*

rx   c                      s   e Zd ZdZd  fdd	Zdd Zdd Zd	d
 Zdd Ze	dd Z
dd Zdd Zdd Zdd Zd!ddZdd Zdd Zd"ddZ  ZS )#SimpleShuffleLayera  Simple HighLevelGraph Shuffle layer

    High-level graph layer for a simple shuffle operation in which
    each output partition depends on all input partitions.

    Parameters
    ----------
    name : str
        Name of new shuffled output collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    npartitions : int
        Number of output partitions.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc
           
        sv   || _ || _|| _|| _|| _|| _|| _|p4t|| _d| j  | _	|	pJi }	d | _
d|	vrd| j|	d< t j|	d d S )Nsplit-priorityr   )rK   columnnpartitionsnpartitions_inputignore_index
name_input
meta_inputrj   	parts_out
split_nameZ_split_keys_key_priorityr>   r   )
r   rK   r   r   r   r   r   r   r   r   r?   r   r   r   d  s    
zSimpleShuffleLayer.__init__c                 C  s(   t |tsJ |d | jkr dS dS d S )Nr   rB   )r   r0   r   r\   r   r   r   r     s    z SimpleShuffleLayer._key_priorityc                   s    fdd j D S )Nc                   s   h | ]} j |fqS r   rP   r   rQ   r   r   	<setcomp>  r/   z5SimpleShuffleLayer.get_output_keys.<locals>.<setcomp>r   rQ   r   rQ   r   rg     s    z"SimpleShuffleLayer.get_output_keysc                 C  s   d | j| jS )Nz-SimpleShuffleLayer<name='{}', npartitions={}>)formatrK   r   rQ   r   r   r   rR     s    zSimpleShuffleLayer.__repr__c                 C  s
   t | dS rc   rd   rQ   r   r   r   re     s    z"SimpleShuffleLayer.is_materializedc                 C  s$   t | dr| jS |  }|| _| jS rS   rU   rX   r   r   r   rZ     s
    
zSimpleShuffleLayer._dictc                 C  s
   | j | S r   r[   r\   r   r   r   r6     s    zSimpleShuffleLayer.__getitem__c                 C  s
   t | jS r   r^   rQ   r   r   r   r`     s    zSimpleShuffleLayer.__iter__c                 C  s
   t | jS r   ra   rQ   r   r   r   rb     s    zSimpleShuffleLayer.__len__c              	   C  sN   t  }|D ]>}z|\}}W n ty0   Y q
Y n0 || jkr>q
|| q
|S z4Simple utility to convert keys to partition indices.set
ValueErrorrK   addr   rf   partsr]   _name_partr   r   r   _keys_to_parts  s    

z!SimpleShuffleLayer._keys_to_partsc                   sN   t t}|p |}|D ].}| j|f   fddt jD O  < q|S )zDetermine the necessary dependencies to produce `keys`.

        For a simple shuffle, output partitions always depend on
        all input partitions. This method does not require graph
        materialization.
        c                   s   h | ]} j |fqS r   r   rh   rQ   r   r   r     s   z8SimpleShuffleLayer._cull_dependencies.<locals>.<setcomp>)r   r   r   rK   rj   r   )r   rf   r   depsr   r   rQ   r   _cull_dependencies  s    z%SimpleShuffleLayer._cull_dependenciesc              
   C  s&   t | j| j| j| j| j| j| j|dS Nr   )r   rK   r   r   r   r   r   r   r   r   r   r   r   _cull  s    zSimpleShuffleLayer._cullc                 C  sD   |  |}| j||d}|t| jkr8| |}||fS | |fS dS )a  Cull a SimpleShuffleLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   Nr   r   r   r   r   r   rf   all_keysr   Zculled_depsculled_layerr   r   r   cull  s    

zSimpleShuffleLayer.cullFc           
   
     s   dj  }|r td}td}nddlm} ddlm} i }jD ]  fddtjD }||j	f|j  f< |D ]^\}}}	t
j||	f|f|j||	f< ||	f|vrz|j|	fjdjjj	jf|||	f< qzqB|S )	z/Construct graph for a simple shuffle operation.group-dask.dataframe.core._concat$dask.dataframe.shuffle.shuffle_groupr   _concatshuffle_groupc                   s   g | ]}j  |fqS r   )r   )r,   Zpart_inZpart_outr   r   r   ri     s   z7SimpleShuffleLayer._construct_graph.<locals>.<listcomp>)rK   r   dask.dataframe.corer   dask.dataframe.shuffler   r   rj   r   r   r   r   r   r   r   r   )
r   rz   shuffle_group_nameconcat_funcshuffle_group_funcrY   _concat_list_Z	_part_outZ_part_inr   r   r   rW     sB    

z#SimpleShuffleLayer._construct_graph)NN)N)F)r   r    r!   r"   r   r   rg   rR   re   r|   rZ   r6   r`   rb   r   r   r   r   rW   rI   r   r   r?   r   r   G  s$   %  *
	
r   c                      sF   e Zd ZdZd fdd	Zdd ZdddZd	d
 ZdddZ  Z	S )ShuffleLayera"  Shuffle-stage HighLevelGraph layer

    High-level graph layer corresponding to a single stage of
    a multi-stage inter-partition shuffle operation.

    Stage: (shuffle-group) -> (shuffle-split) -> (shuffle-join)

    Parameters
    ----------
    name : str
        Name of new (partially) shuffled collection.
    column : str or list of str
        Column(s) to be used to map rows to output partitions (by hashing).
    inputs : list of tuples
        Each tuple dictates the data movement for a specific partition.
    stage : int
        Index of the current shuffle stage.
    npartitions : int
        Number of output partitions for the full (multi-stage) shuffle.
    npartitions_input : int
        Number of partitions in the original (un-shuffled) DataFrame.
    k : int
        A partition is split into this many groups during each stage.
    ignore_index: bool, default False
        Ignore index during shuffle.  If ``True``, performance may improve,
        but index values will not be preserved.
    name_input : str
        Name of input collection.
    meta_input : pd.DataFrame-like object
        Empty metadata of input collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations
    Nc                   s@   || _ || _|| _t j||||||	|
|p4tt||d	 d S )N)r   r   )inputsstagensplitsr>   r   rj   r+   )r   rK   r   r   r   r   r   r   r   r   r   r   r   r?   r   r   r   A  s    zShuffleLayer.__init__c                 C  s   d | j| j| j| jS )Nz=ShuffleLayer<name='{}', stage={}, nsplits={}, npartitions={}>)r   rK   r   r   r   rQ   r   r   r   rR   _  s    zShuffleLayer.__repr__c           
      C  s   t t}|p| |}dd t| jD }|D ]}| j| }t| jD ]j}t|| j|}|| }	| jdkr|	| j	kr|| j
|f d| j
 |df qF|| j
|f | j|	f qFq.|S )zqDetermine the necessary dependencies to produce `keys`.

        Does not require graph materialization.
        c                 S  s   i | ]\}}||qS r   r   r,   r9   inpr   r   r   
<dictcomp>k  r/   z3ShuffleLayer._cull_dependencies.<locals>.<dictcomp>r   r   empty)r   r   r   r   r   rj   r   r   r   r   rK   r   r   )
r   rf   r   r   inp_part_mapr   outr{   _inpr   r   r   r   r   d  s    
"zShuffleLayer._cull_dependenciesc                 C  s2   t | j| j| j| j| j| j| j| j| j	| j
|dS r   )r   rK   r   r   r   r   r   r   r   r   r   r   r   r   r   r   w  s    zShuffleLayer._cullFc              
   C  sl  d| j  }|r td}td}nddlm} ddlm} i }dd t| jD }| jD ]}| j| }g }	t	| j
D ].}
t|| j|
}|| j }|	| j||f qt||	| jf|| j |f< |	D ]\}}}tj||f|f|| j||f< ||f|vr|| }| jdkr2|| jk r| j|f}n||d	f}| j||< n
| j|f}||| j| j| j
| j| j| jf|||f< qqV|S )
z2Construct graph for a "rearrange-by-column" stage.r   r   r   r   r   r   c                 S  s   i | ]\}}||qS r   r   r   r   r   r   r     r/   z1ShuffleLayer._construct_graph.<locals>.<dictcomp>r   )rK   r   r   r   r   r   r   r   r   rj   r   r   r   r}   r   r   r   r   r   r   r   r   r   )r   rz   r   r   r   rY   r   r   r   r   r9   r   Z_idxr   r   Z	input_keyr   r   r   rW     sX    




zShuffleLayer._construct_graph)NN)N)F)
r   r    r!   r"   r   rR   r   r   rW   rI   r   r   r?   r   r     s   0  
r   c                      s   e Zd ZdZd  fdd	Zdd Zdd Zd	d
 Zedd Z	dd Z
dd Zdd Zdd Zedd Zd!ddZdd Zdd Zd"ddZ  ZS )#BroadcastJoinLayera;  Broadcast-based Join Layer

    High-level graph layer for a join operation requiring the
    smaller collection to be broadcasted to every partition of
    the larger collection.

    Parameters
    ----------
    name : str
        Name of new (joined) output collection.
    lhs_name: string
        "Left" DataFrame collection to join.
    lhs_npartitions: int
        Number of partitions in "left" DataFrame collection.
    rhs_name: string
        "Right" DataFrame collection to join.
    rhs_npartitions: int
        Number of partitions in "right" DataFrame collection.
    parts_out : list of int (optional)
        List of required output-partition indices.
    annotations : dict (optional)
        Layer annotations.
    **merge_kwargs : **dict
        Keyword arguments to be passed to chunkwise merge func.
    Nc                   s   t  j|d || _|| _|| _|| _|| _|| _|pBtt	| j| _
t|	trXt|	n|	| _t|
trpt|
n|
| _|| _| jd| _| j| jd< | j| jd< d S )Nr   howleft_onright_on)r>   r   rK   r   lhs_namelhs_npartitionsrhs_namerhs_npartitionsr   rj   r   r   rq   r0   r   r   merge_kwargsr   r   )r   rK   r   r   r   r   r   r   r   r   r   r   r?   r   r   r     s    zBroadcastJoinLayer.__init__c                   s    fdd j D S )Nc                   s   h | ]} j |fqS r   rP   r   rQ   r   r   r     r/   z5BroadcastJoinLayer.get_output_keys.<locals>.<setcomp>r   rQ   r   rQ   r   rg     s    z"BroadcastJoinLayer.get_output_keysc                 C  s   d | j| j| j| jS )Nz5BroadcastJoinLayer<name='{}', how={}, lhs={}, rhs={}>)r   rK   r   r   r   rQ   r   r   r   rR   	  s    zBroadcastJoinLayer.__repr__c                 C  s
   t | dS rc   rd   rQ   r   r   r   re     s    z"BroadcastJoinLayer.is_materializedc                 C  s$   t | dr| jS |  }|| _| jS rS   rU   rX   r   r   r   rZ     s
    
zBroadcastJoinLayer._dictc                 C  s
   | j | S r   r[   r\   r   r   r   r6     s    zBroadcastJoinLayer.__getitem__c                 C  s
   t | jS r   r^   rQ   r   r   r   r`     s    zBroadcastJoinLayer.__iter__c                 C  s
   t | jS r   ra   rQ   r   r   r   rb   !  s    zBroadcastJoinLayer.__len__c              	   C  sN   t  }|D ]>}z|\}}W n ty0   Y q
Y n0 || jkr>q
|| q
|S r   r   r   r   r   r   r   $  s    

z!BroadcastJoinLayer._keys_to_partsc                 C  s8   | j | jk r | j| j | j| jfS | j| j| j| jfS d S r   )r   r   r   r   r   r   rQ   r   r   r   _broadcast_plan1  s    		z"BroadcastJoinLayer._broadcast_planc                   s|   | j dd \ }}tt}|p(| |}|D ]H}|| j|f   fddt|D O  < || j|f  ||fhO  < q.|S )zDetermine the necessary dependencies to produce `keys`.

        For a broadcast join, output partitions always depend on
        all partitions of the broadcasted collection, but only one
        partition of the "other" collection.
        N   c                   s   h | ]} |fqS r   r   rh   
bcast_namer   r   r   X  r/   z8BroadcastJoinLayer._cull_dependencies.<locals>.<setcomp>)r   r   r   r   rK   rj   )r   rf   r   
bcast_size
other_namer   r   r   r   r   r   K  s    (
z%BroadcastJoinLayer._cull_dependenciesc                 C  s0   t | j| j| j| j| j| jf| j|d| jS )N)r   r   )	r   rK   r   r   r   r   r   r   r   r   r   r   r   r   ^  s    	zBroadcastJoinLayer._cullc                 C  sD   |  |}| j||d}|t| jkr8| |}||fS | |fS dS )a  Cull a BroadcastJoinLayer HighLevelGraph layer.

        The underlying graph will only include the necessary
        tasks to produce the keys (indices) included in `parts_out`.
        Therefore, "culling" the layer only requires us to reset this
        parameter.
        r   Nr   r   r   r   r   r   k  s    

zBroadcastJoinLayer.cullFc                 C  s2  d| j  }d| j  }|r2td}td}td}n$ddlm} ddlm} dd	lm} | j\}}}	}
| j| jk rtd
nd}i }| j	D ]}| j
dkr||	|f|
|f|||f< g }t|D ]d}| j
dkrtj||f|fn|	|f||fg}|d
kr|  |||f}t||| jf||< || q||f|| j |f< q|S )z/Construct graph for a broadcast join operation.zinter-r   z%dask.dataframe.multi._split_partitionz$dask.dataframe.multi._concat_wrapperz)dask.dataframe.multi._merge_chunk_wrapperr   )_concat_wrapper)_merge_chunk_wrapper)_split_partitionleftrightinner)rK   r   Zdask.dataframe.multir   r   r   r   r   r   r   r   rj   r   r   reverser   r   r}   )r   rz   Z
inter_namer   Zsplit_partition_funcr   Zmerge_chunk_funcr   r   r   Zother_onZ
bcast_siderY   r9   r   jZ_merge_argsZ	inter_keyr   r   r   rW   {  sZ    





z#BroadcastJoinLayer._construct_graph)NNNN)N)F)r   r    r!   r"   r   rg   rR   re   r|   rZ   r6   r`   rb   r   r   r   r   r   rW   rI   r   r   r?   r   r     s*   "    
	

r   c                      s>   e Zd ZdZd fdd	Zedd Zdd	 Zd
d Z  Z	S )DataFrameIOLayera  DataFrame-based Blockwise Layer with IO

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    columns : str, list or None
        Field name(s) to read in as columns in the output.
    inputs : list or BlockwiseDep
        List of arguments to be passed to ``io_func`` so
        that the materialized task to produce partition ``i``
        will be: ``(<io_func>, inputs[i])``.  Note that each
        element of ``inputs`` is typically a tuple of arguments.
    io_func : callable
        A callable function that takes in a single tuple
        of arguments, and outputs a DataFrame partition.
        Column projection will be supported for functions
        that satisfy the ``DataFrameIOFunction`` protocol.
    label : str (optional)
        String to use as a prefix in the place-holder collection
        name. If nothing is specified (default), "subset-" will
        be used.
    produces_tasks : bool (optional)
        Whether one or more elements of `inputs` is expected to
        contain a nested task. This argument in only used for
        serialization purposes, and will be deprecated in the
        future. Default is False.
    creation_info: dict (optional)
        Dictionary containing the callable function ('func'),
        positional arguments ('args'), and key-word arguments
        ('kwargs') used to produce the dask collection with
        this underlying ``DataFrameIOLayer``.
    annotations: dict (optional)
        Layer annotations to pass through to Blockwise.
    NFc	                   s   || _ || _|| _|| _|| _|| _|| _|| _t|t	sZt
dd t| jD | jd}	n|}	| j |tdfi}
t j| j d|
|	dfgi |d d S )Nc                 S  s   i | ]\}}|f|qS r   r   r   r   r   r   r     r/   z-DataFrameIOLayer.__init__.<locals>.<dictcomp>)r)   r   r9   )outputZoutput_indicesrY   indicesr'   r   )rK   _columnsr   io_funclabelr)   r   creation_infor   r   r   r   r   r>   r   )r   rK   columnsr   r   r   r)   r   r   Z
io_arg_maprY   r?   r   r   r     s.    
zDataFrameIOLayer.__init__c                 C  s   | j S )z(Current column projection for this layer)r   rQ   r   r   r   r     s    zDataFrameIOLayer.columnsc              	   C  s   ddl m} t|}| jdu s.t| j|rt| j|rH| j|}n| j}t	| j
pXdd t| j| || j|| j
| j| jd}|S | S dS )zProduce a column projection for this IO layer.
        Given a list of required output columns, this method
        returns the projected layer.
        r   )DataFrameIOFunctionNZsubset-)r   r)   r   )Zdask.dataframe.io.utilsr   rq   r   r   
issupersetr   r   project_columnsr   r   r	   rK   r   r)   r   )r   r   r   r   Zlayerr   r   r   r     s"    	z DataFrameIOLayer.project_columnsc                 C  s   d | jt| j| jS )Nz3DataFrameIOLayer<name='{}', n_parts={}, columns={}>)r   rK   r+   r   r   rQ   r   r   r   rR   <  s    zDataFrameIOLayer.__repr__)NFNN)
r   r    r!   r"   r   r|   r   r   rR   rI   r   r   r?   r   r     s   *    (
r   c                      s  e Zd ZU dZded< ded< ded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< d;dddddd
dddddd fddZddddZd<d!d"Zd#d$ Zd%d& Z	d'd( Z
d)d* Zd+d, Zed-d. Zd/d0 Zd1d2 Zd3d4 Zd5d6 Zd7d8 Zd9d: Z  ZS )=DataFrameTreeReductionag  DataFrame Tree-Reduction Layer

    Parameters
    ----------
    name : str
        Name to use for the constructed layer.
    name_input : str
        Name of the input layer that is being reduced.
    npartitions_input : str
        Number of partitions in the input layer.
    concat_func : callable
        Function used by each tree node to reduce a list of inputs
        into a single output value. This function must accept only
        a list as its first positional argument.
    tree_node_func : callable
        Function used on the output of ``concat_func`` in each tree
        node. This function must accept the output of ``concat_func``
        as its first positional argument.
    finalize_func : callable, optional
        Function used in place of ``tree_node_func`` on the final tree
        node(s) to produce the final output for each split. By default,
        ``tree_node_func`` will be used.
    split_every : int, optional
        This argument specifies the maximum number of input nodes
        to be handled by any one task in the tree. Defaults to 32.
    split_out : int, optional
        This argument specifies the number of output nodes in the
        reduction tree. If ``split_out`` is set to an integer >=1, the
        input tasks must contain data that can be indexed by a ``getitem``
        operation with a key in the range ``[0, split_out)``.
    output_partitions : list, optional
        List of required output partitions. This parameter is used
        internally by Dask for high-level culling.
    tree_node_name : str, optional
        Name to use for intermediate tree-node tasks.
    strrK   r   r   r   r   r   tree_node_funczCallable | Nonefinalize_funcsplit_every	split_outz	list[int]output_partitionstree_node_namewidthsheightN    z
int | Nonezlist[int] | Nonez
str | Nonezdict[str, Any] | None)rK   r   r   r   r   r   r   r   r   r   r   c                   s   t  j|d || _|| _|| _|| _|| _|| _|| _|| _	|	d u rXt
t| j	pRdn|	| _|
pjd| j | _| j}|g| _|dkrt|| j }| jt| q|t| j| _d S )Nr   rB   z
tree_node-)r>   r   rK   r   r   r   r   r   r   r   rq   rj   r   r   r  mathceilr}   r   r+   r  )r   rK   r   r   r   r   r   r   r   r   r   r   r   r?   r   r   r   u  s(    zDataFrameTreeReduction.__init__r   splitc                G  s   | j r||f S |S r   )r   )r   r  Z
name_partsr   r   r   	_make_key  s    z DataFrameTreeReduction._make_keyFc                 C  s(   |r| j r| j }n| j}tj|| j|fS r   )r   r   ru   rv   r   )r   
input_keys
final_taskZ
outer_funcr   r   r   _define_task  s    
z#DataFrameTreeReduction._define_taskc                   s  i }j s|S jjr`d7 j D ]6tjD ]&}tjj|ff|j|d< q6q(jdkrpj D ]tdjD ] tj	  D ]}j	 d  }j
| }t|j
 |} dkrfddt||D }n fddt||D } jd krF|dks,J d	| d
j|dd|jf< qj|dd|jj| d< qqqrn8j D ]0jddg}j|dd|jf< qv|S )z%Construct graph for a tree reduction.z-splitr     rB   c                   s   g | ]}j  |d qS )r  )r  r,   p)name_input_userG   r   r   r   ri     s   z;DataFrameTreeReduction._construct_graph.<locals>.<listcomp>c                   s$   g | ]}j j| d  dqS )rB   r  )r  r   r  )r   rG   r   r   r   ri     s   r   zgroup = z%, not 0 for final tree reduction taskT)r
  F)r   r   r   rj   r   r   r   r  r  r  r   minr  rK   r   )r   rY   r  groupZp_maxZlstartZlstopr	  r   )r   r  rG   r   r   rW     sX    



	


z'DataFrameTreeReduction._construct_graphc                 C  s   d | j| j| jS )Nz>DataFrameTreeReduction<name='{}', input_name={}, split_out={}>)r   rK   r   r   rQ   r   r   r   rR     s    zDataFrameTreeReduction.__repr__c                   s    fdd j D S )Nc                   s   h | ]} j |fqS r   rP   rF   rQ   r   r   r     r/   z6DataFrameTreeReduction._output_keys.<locals>.<setcomp>)r   rQ   r   rQ   r   _output_keys  s    z#DataFrameTreeReduction._output_keysc                 C  s$   t | dr| jS |  }|| _| jS )N_cached_output_keys)rV   r  r  )r   Zoutput_keysr   r   r   rg     s
    
z&DataFrameTreeReduction.get_output_keysc                 C  s
   t | dS rc   rd   rQ   r   r   r   re     s    z&DataFrameTreeReduction.is_materializedc                 C  s$   t | dr| jS |  }|| _| jS rS   rU   rX   r   r   r   rZ     s
    
zDataFrameTreeReduction._dictc                 C  s
   | j | S r   r[   r\   r   r   r   r6     s    z"DataFrameTreeReduction.__getitem__c                 C  s
   t | jS r   r^   rQ   r   r   r   r`     s    zDataFrameTreeReduction.__iter__c                 C  s>   t | jdd  pd| jpd }| jr:|| jt| j  S |S )NrB   )sumr  r   r   r+   r   )r   Z	tree_sizer   r   r   rb     s     zDataFrameTreeReduction.__len__c              	   C  sN   t  }|D ]>}z|\}}W n ty0   Y q
Y n0 || jkr>q
|| q
|S )z;Simple utility to convert keys to output partition indices.r   )r   rf   splitsr]   r   _splitr   r   r   _keys_to_output_partitions  s    

z1DataFrameTreeReduction._keys_to_output_partitionsc                 C  s2   t | j| j| j| j| j| j| j| j|| j	| j
dS )N)r   r   r   r   r   r   )r   rK   r   r   r   r   r   r   r   r   r   )r   r   r   r   r   r   ,  s    zDataFrameTreeReduction._cullc                   sX    j df fddt jD i} |}|t jkrL |}||fS  |fS dS )z2Cull a DataFrameTreeReduction HighLevelGraph layerr   c                   s   h | ]} j |fqS r   r   rh   rQ   r   r   r   >  s   z.DataFrameTreeReduction.cull.<locals>.<setcomp>N)rK   rj   r   r  r   r   r   )r   rf   r   r   r   r   r   rQ   r   r   ;  s    

zDataFrameTreeReduction.cull)Nr  NNNN)F)r   r    r!   r"   r7   r   r  r  rW   rR   r  rg   re   r|   rZ   r6   r`   rb   r  r   r   rI   r   r   r?   r   r   B  sF   
%	      ('
J
	r   )NN).
__future__r   rr   r  r   collectionsr   collections.abcr   	itertoolsr   typingr   Ztlzru   Ztlz.curriedr   Z	dask.baser	   Zdask.blockwiser
   r   r   r   Z	dask.corer   Zdask.highlevelgraphr   Z
dask.utilsr   r   r   r   r   r#   r8   r;   rJ   rt   r   rx   r   r   r   r   r   r   r   r   r   <module>   s<   	z
7/ V 3 ~w