a
    ƹDfAD                     @   s|   d Z ddgZddlmZ ddlmZ ddlZi  ZZdddZ	dd	dZ
dd
lmZ ddlmZmZ G dd deZeZdS )aI
  
This module contains map and pipe interfaces to the parallelpython (pp) module.

Pipe methods provided:
    pipe        - blocking communication pipe             [returns: value]
    apipe       - asynchronous communication pipe         [returns: object]

Map methods provided:
    map         - blocking and ordered worker pool        [returns: list]
    imap        - non-blocking and ordered worker pool    [returns: iterator]
    uimap       - non-blocking and unordered worker pool  [returns: iterator]
    amap        - asynchronous worker pool                [returns: object]


Usage
=====

A typical call to a pathos pp map will roughly follow this example:

    >>> # instantiate and configure the worker pool
    >>> from pathos.pp import ParallelPool
    >>> pool = ParallelPool(nodes=4)
    >>>
    >>> # do a blocking map on the chosen function
    >>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
    >>>
    >>> # do a non-blocking map, then extract the results from the iterator
    >>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
    >>> print("...")
    >>> print(list(results))
    >>>
    >>> # do an asynchronous map, then get the results
    >>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
    >>> while not results.ready():
    ...     time.sleep(5); print(".", end=' ')
    ...
    >>> print(results.get())
    >>>
    >>> # do one item at a time, using a pipe
    >>> print(pool.pipe(pow, 1, 5))
    >>> print(pool.pipe(pow, 2, 6))
    >>>
    >>> # do one item at a time, using an asynchronous pipe
    >>> result1 = pool.apipe(pow, 1, 5)
    >>> result2 = pool.apipe(pow, 2, 6)
    >>> print(result1.get())
    >>> print(result2.get())


Notes
=====

This worker pool leverages the parallelpython (pp) module, and thus
has many of the limitations associated with that module. The function f and
the sequences in args must be serializable. The maps in this worker pool
have full functionality when run from a script, but may be somewhat limited
when used in the python interpreter. Both imported and interactively-defined
functions in the interpreter session may fail due to the pool failing to
find the source code for the target function. For a work-around, try:

    >>> # instantiate and configure the worker pool
    >>> from pathos.pp import ParallelPool
    >>> pool = ParallelPool(nodes=4)
    >>>
    >>> # wrap the function, so it can be used interactively by the pool
    >>> def wrapsin(*args, **kwds):
    >>>      from math import sin
    >>>      return sin(*args, **kwds)
    >>>
    >>> # do a blocking map using the wrapped function
    >>> results = pool.map(wrapsin, [1,2,3,4,5])

ParallelPoolstats    )parallelpython)	cpu_countNc                 C   sz   d}| du rt t } nd}zt| } W n tyB   | f} Y n0 | sd|rPdnd}td|  dS | D ]}|  qhdS )zprint stats from the pp.ServerTNFz; no activez for the requestedz#Stats are not available%s servers.
)list__STATEvaluestuple	TypeErrorprintZprint_stats)serversZ
FROM_STATEmsgserver r   \/nfs/NAS7/SABIOD/METHODE/ermites/ermites_venv/lib/python3.9/site-packages/pathos/parallel.py__print_statsb   s    
r   c                 C   sx   | du rdnt | jt }ddl}ddl}|j}z|  |_}t| W n   d}Y n0 ||_|rp|	 nd}|S )z<return a string containing stats response from the pp.ServerNr    )
r   get_idr	   iosysstdoutStringIOr   getvalue)poolr   r   r   r   resultr   r   r   r   u   s    
)AbstractWorkerPool)ApplyResult	MapResultc                   @   s0  e Zd ZdZdd Zejjr,ejjej e_d-ddZdd ZeZd	d
 Z	ej	je	_dd Z
ej
je
_dd Zejje_dd Zejje_dd Zejje_dd Zejje_dd Zdd Zdd Zdd Zdd Zd.d d!Zd/d#d$Zd%d& Zd'd( Zd)d* Zd+d, ZeeeZeeeZeeeZeZdS )0r   z:
Mapper that leverages parallelpython (i.e. pp) maps.
    c           
      O   s   d|v }t |}d|v r.|s |r.d}t|n4|rR|rBd}t||d|d< n|rb|d |d< d| _d| _|dd}|dd}|du rd}|d	d| _| jdu rt|tu rt	|nd
}d
|d
t|g| _d| _| j||d}	dS )a  
NOTE: if number of nodes is not given, will autodetect processors.

NOTE: if a tuple of servers is not provided, defaults to localhost only.

NOTE: additional keyword input is optional, with:
    id          - identifier for the pool
    servers     - tuple of pp.Servers
        nodesncpusz0got multiple values for keyword argument 'ncpus'z0got multiple values for keyword argument 'nodes'r   Nr   r   id*@+F)r   r   )lenr
   pop_ParallelPool__nodes_ParallelPool__serversr   r   typeintstrjoinsorted_exiting_serve)
selfargskwdsZhasnodesZarglenr   r    r   _nodes_poolr   r   r   __init__   s.    

zParallelPool.__init__Nc                 C   s   |du r| j }|dv rd}|du r2tt| j}n|dv r>d}t| jd}|s\tj|d}|j	rhdgng }t|j
| }tdd |D }||krtj|d}|dkrt n|}|| kr|| |t| j< |dv rdn|| _|| _|S )	z4Create a new server if one isn't already initializedNr"   
autodetect)r"   r7   )	ppserversc                 s   s"   | ]}d  dd |D V  qdS ):c                 s   s   | ]}t |V  qd S Nr+   .0ir   r   r   	<genexpr>       z0ParallelPool._serve.<locals>.<genexpr>.<genexpr>Nr,   r=   tupr   r   r   r?      r@   z&ParallelPool._serve.<locals>.<genexpr>)r7   )r   r	   r-   r(   _ParallelPool__STATEr   r   ppServerauto_ppserversr8   r   	get_ncpusZ	set_ncpusr'   )r0   r   r   r4   _auto_serversr3   r   r   r   r/      s(    

zParallelPool._servec                 C   s<   t | jd}| |sdS |  t | jd d| _dS )z!Remove server with matching stateNF)rD   r   r   _equalsdestroyr&   r.   r0   r4   r   r   r   _clear   s    
zParallelPool._clearc                 O   s0   t j| |g|R i | t| j|g|R  S r:   )r   _AbstractWorkerPool__mapr   imap)r0   fr1   r2   r   r   r   map   s    zParallelPool.mapc                    sH   t j g|R i |  fdd}dd ttj|g|R  D S )Nc                     s@     }z|j | t dW S  tjy:   d Y n0 dS zsend a job to the serverglobalsNr/   submitrU   rE   DestroyedServerError	_is_aliveZargzr4   rQ   r0   r   r   rW      s
    z!ParallelPool.imap.<locals>.submitc                 s   s   | ]}| V  qd S r:   r   )r=   Zsubprocr   r   r   r?      r@   z$ParallelPool.imap.<locals>.<genexpr>)r   _AbstractWorkerPool__imapr   builtinsrR   )r0   rQ   r1   r2   rW   r   r[   r   rP      s    	zParallelPool.imapc                    sF   t j g|R i |  fdd}dd }|tj|g|R  S )Nc                     s@     }z|j | t dW S  tjy:   d Y n0 dS rS   rV   rZ   r[   r   r   rW      s
    z"ParallelPool.uimap.<locals>.submitc                 s   s@   t | } t| r<t| D ] \}}|jr| | V   qqqdS )zbuild a unordered map iteratorN)r   r%   	enumeratefinishedr&   )itr>   Zjobr   r   r   imap_unordered   s    z*ParallelPool.uimap.<locals>.imap_unordered)r   r\   r]   rR   )r0   rQ   r1   r2   rW   ra   r   r[   r   uimap   s    zParallelPool.uimapc                    s  t j g|R i |  fddd|v r4dnd}|dd}tdd |D }t| }fd	d
|D }dd
 |D }j}jdv r }	|	 }|sd}d}
d}||
k rt||| \}}|rq||krq|d }q|r|d7 }t	||f}|j
|  |S )Nc                     s@     }z|j | t dW S  tjy:   d Y n0 dS rS   rV   rZ   r[   r   r   rW     s
    z!ParallelPool.amap.<locals>.submitsizeTF   c                 s   s   | ]}t |V  qd S r:   )r%   r=   taskr   r   r   r?     r@   z$ParallelPool.amap.<locals>.<genexpr>c                    s   g | ]} | qS r   r   re   )rW   r   r   
<listcomp>  r@   z%ParallelPool.amap.<locals>.<listcomp>c                 S   s   g | ]}t |qS r   )r   re   r   r   r   rg     r@   )r"   r7   N   l            )r   rO   r&   minzipr   r/   rH   divmodr   queue)r0   rQ   r1   r2   overrideZ	elem_sizelengthZtasksr   r4   maxsize	chunksizeextramr   )rQ   r0   rW   r   amap  s0    



zParallelPool.amapc                 O   sD   |   }z|j||t d}W n tjy<   | d  Y n0 | S NrT   rV   r0   rQ   r1   r2   r4   rf   r   r   r   pipe5  s    zParallelPool.pipec                 O   sF   |   }z|j||t d}W n tjy<   | d  Y n0 t|S rt   )r/   rW   rU   rE   rX   rY   r   ru   r   r   r   apipe?  s    zParallelPool.apipec                 C   s   | j j| j| jf}d| S )Nz<pool %s(ncpus=%s, servers=%s)>)	__class____name__r    r   )r0   Zmapargsr   r   r   __repr__J  s    zParallelPool.__repr__c                 C   s   | j }|dkrd}|S )z'get the number of nodes used in the mapNr"   )r'   r0   r   r   r   r   Z__get_nodesM  s    zParallelPool.__get_nodesc                 C   s   |du rd}| j |d dS )z'set the number of nodes used in the mapNr7   )r   r/   r{   r   r   r   Z__set_nodesR  s    zParallelPool.__set_nodesc                 C   s$   | j }|dkrd}n|dkr d}|S )zget the servers used in the mapr   Nr6   r"   )r(   r0   r   r   r   r   Z__get_serversW  s    zParallelPool.__get_serversc                 C   s   |du rd}| j |d dS )zset the servers used in the mapNr   )r   r|   r}   r   r   r   Z__set_servers]  s    zParallelPool.__set_serversFc                 C   sL   t | jd}| |rH|s*| j|dd | jr:|   nd|_|  }dS )zrestart a closed poolNT)negateF)rD   r   r   rK   rY   r.   rN   r/   )r0   forcer4   r   r   r   restartg  s    

zParallelPool.restartTc                 C   sn   d\}}}dd }|d u r*|r"|n||_ n|j|_ |rJ|rJ|j |ksjJ n |rb|j ||fv sjJ ntdd S )N)r   rh   rd   c                   S   s   d S r:   r   r   r   r   r   <lambda>v  r@   z(ParallelPool._is_alive.<locals>.<lambda>zPool not running)_stater.   
ValueError)r0   r   r~   runZRUNZCLOSEZ	TERMINATEr   r   r   r   rY   t  s    
zParallelPool._is_alivec                 C   sh   |sdS | j du rt n| j }|| kr.dS |jr:dgng }t|j| }dd |D }t| j|kS )z!check if the server is compatibleFNr6   c                 S   s    g | ]}d  dd |D qS )r9   c                 s   s   | ]}t |V  qd S r:   r;   r<   r   r   r   r?     r@   z2ParallelPool._equals.<locals>.<listcomp>.<genexpr>rA   rB   r   r   r   rg     r@   z(ParallelPool._equals.<locals>.<listcomp>)r'   r   rH   rG   r-   r8   r(   )r0   r   r3   rI   rJ   r   r   r   rK     s    zParallelPool._equalsc                 C   s"   t | jd}| |rd|_dS )zclose the pool to any new jobsNT)rD   r   r   rK   r.   rM   r   r   r   close  s    
zParallelPool.closec                 C   s   |    |   dS )za more abrupt closeN)r   r,   )r0   r   r   r   	terminate  s    zParallelPool.terminatec                 C   s:   t | jd}| |r6| j|ddd |  d| _dS )z#cleanup the closed worker processesNTF)r~   r   )rD   r   r   rK   rY   rL   r.   rM   r   r   r   r,     s    
zParallelPool.join)NN)F)NFT) ry   
__module____qualname____doc__r5   r   r/   rN   clearrR   rP   rb   rs   rv   rw   rz   Z_ParallelPool__get_nodesZ_ParallelPool__set_nodesZ_ParallelPool__get_serversZ_ParallelPool__set_serversr   rY   rK   r   r   r,   propertyr    r   r   rD   Z	__state__r   r   r   r   r      sD   *




&
	
	



	


)N)N)r   __all__Zpathos.helpersr   rE   r   r]   r   rD   r   r   Zpathos.abstract_launcherr   Zpathos.helpers.pp_helperr   r   r   ZParallelPythonPoolr   r   r   r   <module>   s   I

   