
    'Xh"                     
   d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZmZmZmZ d
ZdZ G d dee      Z G d d      Z	 ddee   dededededeeeef      ddfdZ G d d      Zy)    N)defaultdict)Enum)Queueget_context)BaseContext)BaseProcess)Synchronized)Empty)AnyIterableOptionalTypeiX     c                       e Zd ZdZdZdZy)QueueSignalsstopconfirmerrorN)__name__
__module____qualname__r   r   r        J/RAG/venv/lib/python3.12/site-packages/qdrant_client/parallel_processor.pyr   r      s    DGEr   r   c                   F    e Zd Zedededd fd       Zdee   dee   fdZy)Workerargskwargsreturnc                     t               NNotImplementedError)clsr   r   s      r   startzWorker.start   s    !##r   itemsc                     t               r!   r"   )selfr&   s     r   processzWorker.process   s    !##r   N)r   r   r   classmethodr   r%   r   r)   r   r   r   r   r      sD    $# $ $ $ $$Xc] $x} $r   r   worker_classinput_queueoutput_queuenum_active_workers	worker_idr   r   c                    |i }t        j                  d| dt        j                                 	  | j                  d	i |}dt
        t           ffd}|j                   |             D ]  }|j                  |        	 j                          |j                          j                          |j                          |j                         5  |xj                   dz  c_        ddd       t        j                  d| d       y# t        $ r>}	t        j                  |	       |j                  t        j                         Y d}	~	d}	~	ww xY w# 1 sw Y   mxY w# j                          |j                          j                          |j                          |j                         5  |xj                   dz  c_        ddd       n# 1 sw Y   nxY wt        j                  d| d       w xY w)
z
    A worker that pulls data pints off the input queue, and places the execution result on the output queue.
    When there are no data pints left on the input queue, it decrements
    num_active_workers to signal completion.
    NzReader worker: z PID: r   c               3   `   K   	 j                         } | t        j                  k(  ry |  )wr!   )getr   r   )itemr,   s    r   input_queue_iterablez%_worker.<locals>.input_queue_iterable7   s1     "(<,,,
	 s   +.   zReader worker z	 finishedr   )logginginfoosgetpidr%   r   r   r)   put	Exception	exceptionr   r   closejoin_threadget_lockvalue)
r+   r,   r-   r.   r/   r   workerr4   processed_itemes
    `        r   _workerrD   !   s    ~LL?9+VBIIK=AB!<###-f-	hsm 	 %nn-A-CDN^, E 	!  "((*$$)$ + 	~i[	:;)  -!++,,-" +* 	!  "((*$$)$ +** 	~i[	:;sU   AD E$ E	E4EE$ EE$ E!$AG:5G	G:G!G:c            	           e Zd Zdefdedee   dee   defdZ	de
ddfd	Zd
ee
   de
de
dee
   fdZd
ee
   de
de
dee
   fdZd
ee
   de
de
dee
   fdZddZddee   ddfdZddZddZy)ParallelWorkerPoolNnum_workersrA   start_methodmax_internal_batch_sizec                     || _         || _        d | _        d | _        t	        |      | _        g | _        | j                  |z  | _        d| _        d | _	        y )NF)
r+   rG   r,   r-   r   ctx	processes
queue_sizeemergency_shutdownr.   )r(   rG   rA   rH   rI   s        r   __init__zParallelWorkerPool.__init__X   sZ     #&,0-1 +L 9,.**-DD"'7;r   r   r   c                    | j                   j                  | j                        | _        | j                   j                  | j                        | _        | j                   j                  d| j                        }t        |t              sJ || _	        t        d| j                        D ]  }t        | j                   d      sJ | j                   j                  t        | j                  | j                  | j                  | j                  ||j                         f      }|j!                          | j"                  j%                  |        y )Nir   Process)targetr   )rK   r   rM   r,   r-   ValuerG   
isinstance	BaseValuer.   rangehasattrrR   rD   r+   copyr%   rL   append)r(   r   	ctx_valuer/   r)   s        r   r%   zParallelWorkerPool.starti   s    88>>$//: HHNN4??;HHNN3(8(89	)Y///"+q$"2"23I488Y///hh&&%%$$%%++KKM ' 
G MMONN!!'* 4r   streamr   c              /   >  K   	  | j                   d	i | | j                  J d       | j                  J d       d}d}|D ]  }| j                          ||z
  | j                  k  r	 | j                  j                         }n!	 | j                  j                  t              }|7|t        j                  k(  r| j                          t        d      | |dz  }| j                  j                  |       |dz  } t        | j                        D ]+  }	| j                  j                  t        j                          - ||k  r]| j                  j                  t              }|t        j                  k(  r| j                          t        d      | |dz  }||k  r]| j                  J d       | j                  J d       | j#                          | j                  j%                          | j                  j%                          | j&                  r5| j                  j)                          | j                  j)                          y | j                  j+                          | j                  j+                          y # t        $ r d }Y w xY w# t        $ r}| j                          |d }~ww xY w# | j                  J d       | j                  J d       | j#                          | j                  j%                          | j                  j%                          | j&                  r5| j                  j)                          | j                  j)                          w | j                  j+                          | j                  j+                          w xY ww)
NzInput queue was not initializedz Output queue was not initializedr   timeoutzThread unexpectedly terminatedr5   zInput queue is NonezOutput queue is Noner   )r%   r,   r-   check_worker_healthrM   
get_nowaitr
   r2   processing_timeoutjoin_or_terminater   r   RuntimeErrorr:   rW   rG   r   joinr=   rN   cancel_join_threadr>   )
r(   r\   r   r   pushedreadr3   out_itemrC   _s
             r   unordered_mapz ParallelWorkerPool.unordered_map   s7    4	0DJJ  ##/R1RR/$$0T2TT0FD((*D=4??2(#'#4#4#?#?#A #'#4#4#8#8AS#8#T
 '<#5#55..0*+KLL"NAID  $$T*!+ . 4++,  $$\%6%67 - -,,009K0L|111**,&'GHH	 - ##/F1FF/$$0H2HH0IIK""$##%&&  335!!446  ,,.!!--/O ! (#'(
 !  ..0 0 ##/F1FF/$$0H2HH0IIK""$##%&&  335!!446  ,,.!!--/sh   NA#J9 (JJ9  J$C?J9 $C NJJ9 JJ9 	J6J11J66J9 9C!NNc                 @     | j                   t        |      g|i |S r!   )rk   	enumerate)r(   r\   r   r   s       r   semi_ordered_mapz#ParallelWorkerPool.semi_ordered_map   s$    !t!!)F"3EdEfEEr   c              /      K   t        t              }d} | j                  |g|i |D ],  \  }}|||<   ||v s|j                  |       |dz  }||v r. y w)Nr   r5   )r   intrn   pop)r(   r\   r   r   buffernext_expectedidxr3   s           r   ordered_mapzParallelWorkerPool.ordered_map   so     S!...vGGGICF3K6)jj//"  6) Hs   7AAAc                     | j                   D ]^  }|j                         r|j                  dk7  s$d| _        | j	                          t        d|j                   d|j                          y)zJ
        Checks if any worker process has terminated unexpectedly
        r   TzWorker PID: z# terminated unexpectedly with code N)rL   is_aliveexitcoderN   rc   rd   pidr(   r)   s     r   r`   z&ParallelWorkerPool.check_worker_health   si     ~~G##%'*:*:a*?*.'&&(""7;;-/RSZScScRde 	 &r   r_   c                     d| _         | j                  D ]5  }|j                  |       |j                         s&|j	                          7 | j                  j                          y)zM
        Emergency shutdown
        @param timeout:
        @return:
        Tr^   N)rN   rL   re   rw   	terminateclear)r(   r_   r)   s      r   rc   z$ParallelWorkerPool.join_or_terminate   sR     #'~~GLLL)!!!# & 	r   c                 z    | j                   D ]  }|j                           | j                   j                          y r!   )rL   re   r}   rz   s     r   re   zParallelWorkerPool.join   s)    ~~GLLN &r   c                 h    | j                   D ]#  }|j                         s|j                          % y)a  
        Terminate processes if the user hasn't joined. This is necessary as
        leaving stray processes running can corrupt shared state. In brief,
        we've observed shared memory counters being reused (when the memory was
        free from the perspective of the parent process) while the stray
        workers still held a reference to them.
        For a discussion of using destructors in Python in this manner, see
        https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python/.
        N)rL   rw   r|   rz   s     r   __del__zParallelWorkerPool.__del__   s*     ~~G!!!# &r   )r   N)r5   )r   r   r   MAX_INTERNAL_BATCH_SIZErp   r   r   r   strrO   r   r%   r   rk   rn   ru   r`   rc   re   r   r   r   r   rF   rF   W   s    
 '+'><< V< sm	<
 "%<"+c +d +050HSM 50# 50 50QYZ]Q^ 50nFx} FS FC FT\]`Ta F#(3- # #s #xX[} #
# t 
$r   rF   r!   ) r6   r8   collectionsr   enumr   multiprocessingr   r   multiprocessing.contextr   multiprocessing.processr   multiprocessing.sharedctypesr	   rV   queuer
   typingr   r   r   r   rb   r   r   r   r   rp   dictrD   rF   r   r   r   <module>r      s     	 #  . / / B  0 0   3 $ $ (,3<v,3<3< 3< "	3<
 3< T#s(^$3< 
3<lX$ X$r   