
    Dh                       d Z ddlmZ ddlZddlZddlZddlmZ ej                  rddl	m
Z
 ddl	mZ ddlmZ  ed	      Zd
Z G d dej                         Zej$                  	 dej&                  dd	 	 	 	 	 	 	 dd       Zej$                  ej&                  dd	 	 	 	 	 	 	 dd       Z	 dej&                  dd	 	 	 	 	 	 	 ddZy)zAsyncIOTask implementation.    )annotationsN   )class_decorator)	Awaitable)Callable)	ParamSpecSpec)AsyncIOTaskasynciotaskc                       e Zd ZdZdZ	 dej                  dd	 	 	 	 	 	 	 d fdZedd       Z	edd       Z
ddZ	 	 	 	 dd	Z	 	 	 	 	 	 d fd
ZddZ xZS )r
   zWrap to asyncio.Task.)__loop_getter__loop_getter_need_contextFloop_getterloop_getter_need_contextc               B    t         |   |       || _        || _        y)a#  Wrap function in future and return.

        :param func: Function to wrap
        :type func: typing.Optional[Callable[..., Awaitable]]
        :param loop_getter: Method to get event loop, if wrap in asyncio task
        :type loop_getter: typing.Union[
                               Callable[..., asyncio.AbstractEventLoop],
                               asyncio.AbstractEventLoop
                           ]
        :param loop_getter_need_context: Loop getter requires function context
        :type loop_getter_need_context: bool
        )funcN)super__init___AsyncIOTask__loop_getter&_AsyncIOTask__loop_getter_need_context)selfr   r   r   	__class__s       w/var/www/fastuser/data/www/generator.snapmosaic.io/flask_app/venv/lib/python3.12/site-packages/threaded/_asynciotask.pyr   zAsyncIOTask.__init__+   s%    & 	d#ep0H'    c                    | j                   S )zxLoop getter.

        :rtype: typing.Union[Callable[..., asyncio.AbstractEventLoop], asyncio.AbstractEventLoop]
        )r   r   s    r   r   zAsyncIOTask.loop_getterB   s     !!!r   c                    | j                   S )zBLoop getter need execution context.

        :rtype: bool
        )r   r   s    r   r   z$AsyncIOTask.loop_getter_need_contextJ   s     ...r   c                    t        | j                        r.| j                  r | j                  |i |S | j                         S | j                  S )zGet event loop in decorator class.

        :return: event loop if available or getter available
        :rtype: Optional[asyncio.AbstractEventLoop]
        )callabler   r   )r   argskwargss      r   get_loopzAsyncIOTask.get_loopR   sN     D$$%,,'t''888##%%r   c                H     t        j                        d fd       }|S )a!  Here should be constructed and returned real decorator.

        :param func: Wrapped function
        :type func: Callable[..., Awaitable]
        :return: wrapper, which will produce asyncio.Task on call with function called inside it
        :rtype: Callable[..., asyncio.Task]
        c                 V     j                   | i |}|j                   | i |      S )zgFunction wrapper.

            :return: asyncio.Task
            :rtype: asyncio.Task[Any]
            )r#   create_task)r!   r"   loopr   r   s      r   wrapperz2AsyncIOTask._get_function_wrapper.<locals>.wrapperj   s5     !4==$1&1D##D$$9&$9::r   )r!   z	Spec.argsr"   zSpec.kwargsreturnzasyncio.Task[typing.Any])	functoolswraps)r   r   r(   s   `` r   _get_function_wrapperz!AsyncIOTask._get_function_wrapper^   s'     
		; 
	; r   c                "    t        |   |i |S )zCallable instance.

        :return: asyncio.Task or getter
        :rtype: Union[asyncio.Task[Any], Callable[..., asyncio.Task[Any]]]
        )r   __call__)r   r!   r"   r   s      r   r.   zAsyncIOTask.__call__v   s     w000r   c                    d| j                   j                   d| j                  d| j                  d| j                  dt        |       ddS )zLFor debug purposes.

        :return: repr info
        :rtype: str
        <(z, loop_getter=z, loop_getter_need_context=z	, ) at 0xX>)r   __name___funcr   r   idr   s    r   __repr__zAsyncIOTask.__repr__   s`     ''(zzn ++. /((,(E(E'H Ihq\	$	
r   N)r   +Callable[..., Awaitable[typing.Any]] | Noner   DCallable[..., asyncio.AbstractEventLoop] | asyncio.AbstractEventLoopr   boolr)   None)r)   r:   )r)   r;   )r!   
typing.Anyr"   r=   r)   zasyncio.AbstractEventLoop)r   z%Callable[Spec, Awaitable[typing.Any]]r)   'Callable[..., asyncio.Task[typing.Any]])r!   z1Callable[..., Awaitable[typing.Any]] | typing.Anyr"   r=   r)   zBasyncio.Task[typing.Any] | Callable[..., asyncio.Task[typing.Any]])r)   str)r4   
__module____qualname____doc__	__slots__asyncioget_event_loopr   propertyr   r   r#   r,   r.   r7   __classcell__)r   s   @r   r
   r
   &   s    ?I =AI _f^t^t).I9I [	I
 #'I 
I. " " / /
 9	00
1@
1 
1 
L	
1
r   r
   Fr   c                    y)zOverload: no function.N r   r   r   s      r   r   r          r   c                    y)zOverload: provided function.NrI   rJ   s      r   r   r      rK   r   c               J    | t        | ||      S  t        d||      |       S )a  Wrap function in future and return.

    :param func: Function to wrap
    :type func: typing.Optional[Callable[..., Awaitable]]
    :param loop_getter: Method to get event loop, if wrap in asyncio task
    :type loop_getter: typing.Union[
                           Callable[..., asyncio.AbstractEventLoop],
                           asyncio.AbstractEventLoop
                       ]
    :param loop_getter_need_context: Loop getter requires function context
    :type loop_getter_need_context: bool
    :return: AsyncIOTask instance, if called as function or argumented decorator, else callable wrapper
    :rtype: typing.Union[AsyncIOTask, Callable[..., asyncio.Task]]
    NrJ   )r
   rJ   s      r   r   r      sC    ( |#%=
 	

;!9 	 r   r8   )r   r<   r   r:   r   r;   r)   r
   )r   z$Callable[..., Awaitable[typing.Any]]r   r:   r   r;   r)   r>   )r   r9   r   r:   r   r;   r)   z5AsyncIOTask | Callable[..., asyncio.Task[typing.Any]])rB   
__future__r   rD   r*   typing r   TYPE_CHECKINGcollections.abcr   r   typing_extensionsr   r	   __all__BaseDecoratorr
   overloadrE   r   rI   r   r   <module>rW      s?   " "    	)(+VD
(h
/// h
V ! [bZpZp%*	!
! W! #	!
 ! !  [bZpZp%*	'
.' W' #	'
 -' ' 9= [bZpZp%*	
5 W #	
 ;r   