U
    3d3#                     @   s   d dl mZ G dd deZG dd deZG dd deZG dd	 d	eZG d
d deZG dd deZG dd deZ	G dd deZ
G dd de
ZG dd de
ZdS )    )sixc                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	Stepz!
    Jobflow Step base class
    c                 C   s
   t  dS )z=
        :rtype: str
        :return: URI to the jar
        NNotImplementedself r   1/tmp/pip-unpacked-wheel-d7dsrkjd/boto/emr/step.pyjar   s    zStep.jarc                 C   s
   t  dS )zS
        :rtype: list(str)
        :return: List of arguments for the step
        Nr   r   r   r   r	   args%   s    z	Step.argsc                 C   s
   t  dS )zB
        :rtype: str
        :return: The main class name
        Nr   r   r   r   r	   
main_class,   s    zStep.main_classN)__name__
__module____qualname____doc__r
   r   r   r   r   r   r	   r      s   r   c                   @   s2   e Zd ZdZdddZdd Zdd	 Zd
d ZdS )JarStepz
    Custom jar step
    NTERMINATE_JOB_FLOWc                 C   s4   || _ || _|| _|| _t|tjr*|g}|| _dS )a  
        A elastic mapreduce step that executes a jar

        :type name: str
        :param name: The name of the step
        :type jar: str
        :param jar: S3 URI to the Jar file
        :type main_class: str
        :param main_class: The class to execute in the jar
        :type action_on_failure: str
        :param action_on_failure: An action, defined in the EMR docs to
            take on failure.
        :type step_args: list(str)
        :param step_args: A list of arguments to pass to the step
        N)name_jar_main_classaction_on_failure
isinstancer   string_types	step_args)r   r   r
   r   r   r   r   r   r	   __init__8   s    zJarStep.__init__c                 C   s   | j S Nr   r   r   r   r	   r
   S   s    zJarStep.jarc                 C   s   g }| j r|| j  |S r   )r   extend)r   r   r   r   r	   r   V   s    zJarStep.argsc                 C   s   | j S r   )r   r   r   r   r	   r   ^   s    zJarStep.main_class)Nr   N)r   r   r   r   r   r
   r   r   r   r   r   r	   r   4   s       
r   c                	   @   s:   e Zd ZdZdddZdd Zd	d
 Zdd Zdd ZdS )StreamingStepz
    Hadoop streaming step
    Nr   3/home/hadoop/contrib/streaming/hadoop-streaming.jarc                 C   sX   || _ || _|| _|| _|| _|| _|| _|	| _|
| _|| _	t
|tjrN|g}|| _dS )a  
        A hadoop streaming elastic mapreduce step

        :type name: str
        :param name: The name of the step
        :type mapper: str
        :param mapper: The mapper URI
        :type reducer: str
        :param reducer: The reducer URI
        :type combiner: str
        :param combiner: The combiner URI. Only works for Hadoop 0.20
            and later!
        :type action_on_failure: str
        :param action_on_failure: An action, defined in the EMR docs to
            take on failure.
        :type cache_files: list(str)
        :param cache_files: A list of cache files to be bundled with the job
        :type cache_archives: list(str)
        :param cache_archives: A list of jar archives to be bundled with
            the job
        :type step_args: list(str)
        :param step_args: A list of arguments to pass to the step
        :type input: str or a list of str
        :param input: The input uri
        :type output: str
        :param output: The output uri
        :type jar: str
        :param jar: The hadoop streaming jar. This can be either a local
            path on the master node, or an s3:// URI.
        N)r   mapperreducercombinerr   cache_filescache_archivesinputoutputr   r   r   r   r   )r   r   r    r!   r"   r   r#   r$   r   r%   r&   r
   r   r   r	   r   f   s    #zStreamingStep.__init__c                 C   s   | j S r   r   r   r   r   r	   r
      s    zStreamingStep.jarc                 C   s   d S r   r   r   r   r   r	   r      s    zStreamingStep.main_classc                 C   s   g }| j r|| j  |d| jg | jr<|d| jg | jrT|d| jg n|ddg | jrt| jtr| jD ]}|d|f qzn|d| jf | jr|d| jf | j	r| j	D ]}|d|f q| j
r| j
D ]}|d	|f q|S )
Nz-mapperz	-combinerz-reducerz-jobconfzmapred.reduce.tasks=0z-inputz-outputz
-cacheFilez-cacheArchive)r   r   r    r"   r!   r%   r   listr&   r#   r$   )r   r   r%   Z
cache_fileZcache_archiver   r   r	   r      s.    


zStreamingStep.argsc                 C   s<   d| j j| j j| j| j| j| j| j| j| j	| j
| j| jf S )Nz%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r))	__class__r   r   r   r    r!   r   r#   r$   r   r%   r&   r   r   r   r   r	   __repr__   s            zStreamingStep.__repr__)	NNr   NNNNNr   )	r   r   r   r   r   r
   r   r   r)   r   r   r   r	   r   b   s              
3%r   c                       s    e Zd ZdZ fddZ  ZS )ScriptRunnerStepzEs3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jarc                    s   t t| j|| jf| d S r   )superr*   r   ScriptRunnerJar)r   r   kwr(   r   r	   r      s    zScriptRunnerStep.__init__)r   r   r   r,   r   __classcell__r   r   r.   r	   r*      s   r*   c                   @   s   e Zd ZdddgZdS )PigBasez4s3n://us-east-1.elasticmapreduce/libs/pig/pig-script--base-pathz*s3n://us-east-1.elasticmapreduce/libs/pig/Nr   r   r   BaseArgsr   r   r   r	   r0      s    r0   c                       s&   e Zd ZdZdZd fdd	Z  ZS )InstallPigStepz!
    Install pig on emr step
    zInstall Piglatestc                    sD   g }| | j | dg | d|g tt| j| j|d d S )Nz--install-pig--pig-versionsr   )r   r3   r+   r4   r   InstallPigName)r   pig_versionsr   r.   r   r	   r      s
    zInstallPigStep.__init__)r5   )r   r   r   r   r8   r   r/   r   r   r.   r	   r4      s   r4   c                       s&   e Zd ZdZdg f fdd	Z  ZS )PigStepz
    Pig script step
    r5   c                    sR   g }| | j | d|g | ddd|g | | tt| j||d d S )Nr6   z--run-pig-script--args-fr7   )r   r3   r+   r:   r   )r   r   Zpig_filer9   Zpig_argsr   r.   r   r	   r      s    
zPigStep.__init__r   r   r   r   r   r/   r   r   r.   r	   r:      s   r:   c                   @   s   e Zd ZdddgZdS )HiveBasez6s3n://us-east-1.elasticmapreduce/libs/hive/hive-scriptr1   z+s3n://us-east-1.elasticmapreduce/libs/hive/Nr2   r   r   r   r	   r>      s    r>   c                       s&   e Zd ZdZdZd fdd	Z  ZS )InstallHiveStepz"
    Install Hive on EMR step
    zInstall Hiver5   Nc                    s\   g }| | j | dg | d|g |d k	rB| d| g tt| j| j|d d S )Nz--install-hive--hive-versionsz--hive-site=%sr7   )r   r3   r+   r?   r   InstallHiveName)r   hive_versionsZ	hive_siter   r.   r   r	   r     s    zInstallHiveStep.__init__)r5   N)r   r   r   r   rA   r   r/   r   r   r.   r	   r?      s   r?   c                       s"   e Zd ZdZd fdd	Z  ZS )HiveStepz
    Hive script step
    r5   Nc                    sZ   g }| | j | d|g | ddd|g |d k	rB| | tt| j||d d S )Nr@   z--run-hive-scriptr;   r<   r7   )r   r3   r+   rC   r   )r   r   Z	hive_filerB   Z	hive_argsr   r.   r   r	   r     s    
zHiveStep.__init__)r5   Nr=   r   r   r.   r	   rC     s     rC   N)Zboto.compatr   objectr   r   r   r*   r0   r4   r:   r>   r?   rC   r   r   r   r	   <module>   s   .j