U
    Z+d                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 z$ddl
Z
ddlZ
ddlZ
ddlZ
W n ek
rx   dZ
Y nX dZeeZd	Zd
ZdZdZdZdZdd ZG dd de	ZdS )z@Apache Cassandra result store backend using the DataStax driver.    N)states)ImproperlyConfigured)
get_logger   )BaseBackend)CassandraBackendz
You need to install the cassandra-driver library to
use the Cassandra backend.  See https://github.com/datastax/python-driver
z
CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.
See https://datastax.github.io/python-driver/api/cassandra/auth.html.
z
INSERT INTO {table} (
    task_id, status, result, date_done, traceback, children) VALUES (
        %s, %s, %s, %s, %s, %s) {expires};
z]
SELECT status, result, date_done, traceback, children
FROM {table}
WHERE task_id=%s
LIMIT 1
z
CREATE TABLE {table} (
    task_id text,
    status text,
    result blob,
    date_done timestamp,
    traceback blob,
    children blob,
    PRIMARY KEY ((task_id), date_done)
) WITH CLUSTERING ORDER BY (date_done DESC);
z
    USING TTL {0}
c                 C   s
   t | dS )Nutf8)bytes)x r   =/tmp/pip-unpacked-wheel-ucduq0nd/celery/backends/cassandra.pybuf_t?   s    r   c                       s^   e Zd ZdZdZdZd fdd	Zddd	Zdd
dZdddZ	dd Z
d fdd	Z  ZS )r   zCassandra backend utilizing DataStax driver.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`cassandra-driver` is not available,
            or if the :setting:`cassandra_servers` setting is not set.
    NTR#  c                    sv  t  jf | tstt| jj}|p0|dd | _|pB|dd | _	|pT|dd | _
|pf|dd | _|di | _| jr| j
r| jstd|p|dd }|d k	rt|nd| _|d	pd
}	|dpd
}
ttj|	tjj| _ttj|
tjj| _d | _|dd }|dd }|rP|rPttj|d }|sDtt|f || _d | _d | _d | _d | _t | _d S )NZcassandra_serversZcassandra_portZcassandra_keyspaceZcassandra_tablecassandra_optionsz!Cassandra backend not configured.Zcassandra_entry_ttl Zcassandra_read_consistencyLOCAL_QUORUMZcassandra_write_consistencyZcassandra_auth_providerZcassandra_auth_kwargs)super__init__	cassandrar   E_NO_CASSANDRAappconfgetserversportkeyspacetabler   	Q_EXPIRESformat
cqlexpiresgetattrZConsistencyLevelr   read_consistencywrite_consistencyauth_providerauth!E_NO_SUCH_CASSANDRA_AUTH_PROVIDER_cluster_session_write_stmt
_read_stmt	threadingRLock_lock)selfr   r   r   Z	entry_ttlr   kwargsr   expiresZ	read_consZ
write_consr#   Zauth_kwargsZauth_provider_class	__class__r   r   r   Q   sN      zCassandraBackend.__init__Fc                 C   sN  | j dk	rdS | j  z"z| j dk	r4W W dS tjj| jf| j| j	d| j
| _| j| j| _ tjtj| j| jd| _| j| j_tjtj| jd| _| j| j_|rtjtj| jd}| j|_z| j | W n tjk
r   Y nX W n< tjk
r8   | jdk	r&| j  d| _d| _  Y nX W 5 | j  X dS )zjPrepare the connection for action.

        Arguments:
            write (bool): are we a writer?
        N)r   r#   )r   r/   )r   )r'   r,   acquirereleaser   ZclusterZClusterr   r   r#   r   r&   connectr   queryZSimpleStatementQ_INSERT_RESULTr   r   r   r(   r"   Zconsistency_levelQ_SELECT_RESULTr)   r!   Q_CREATE_RESULT_TABLEexecuteZAlreadyExistsZOperationTimedOutshutdown)r-   writeZ	make_stmtr   r   r   _get_connection   sT    


 

	

z CassandraBackend._get_connectionc                 K   sV   | j dd | j| j||t| || j t| |t| | |f dS )z1Store return value and state of an executed task.T)r;   N)	r<   r'   r9   r(   r   encoder   nowZcurrent_task_children)r-   task_idresultstate	tracebackrequestr.   r   r   r   _store_result   s    
zCassandraBackend._store_resultc                 C   s   dS )Nzcassandra://r   )r-   Zinclude_passwordr   r   r   as_uri   s    zCassandraBackend.as_uric              
   C   sf   |    | j| j|f }|s.tjddS |\}}}}}| ||| ||| || |dS )z$Get task meta-data for a task by id.N)statusr@   )r?   rF   r@   	date_donerB   children)	r<   r'   r9   r)   Zoner   ZPENDINGZmeta_from_decodeddecode)r-   r?   resrF   r@   rG   rB   rH   r   r   r   _get_task_meta_for   s    z#CassandraBackend._get_task_meta_forr   c                    s2   |si n|}| | j| j| jd t ||S )N)r   r   r   )updater   r   r   r   
__reduce__)r-   argsr.   r0   r   r   rM      s    zCassandraBackend.__reduce__)NNNNr   )F)NN)T)r   N)__name__
__module____qualname____doc__r   Zsupports_autoexpirer   r<   rD   rE   rK   rM   __classcell__r   r   r0   r   r   C   s   	  /
?   

r   )rR   r*   Zceleryr   Zcelery.exceptionsr   Zcelery.utils.logr   baser   r   Zcassandra.authZcassandra.clusterZcassandra.queryImportError__all__rO   loggerr   r%   r6   r7   r8   r   r   r   r   r   r   r   <module>   s,   
