org.apache.hadoop.hive.ql.optimizer
Class GenMapRedUtils

java.lang.Object
  extended by org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils

public final class GenMapRedUtils
extends Object

General utility common functions for the Processor to convert operator into map-reduce tasks.


Method Summary
static void addDependentMoveTasks(Task<MoveWork> mvTask, HiveConf hconf, Task<? extends Serializable> parentTask, DependencyCollectionTask dependencyTask)
          Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask.
static void addStatsTask(FileSinkOperator nd, MoveTask mvTask, Task<? extends Serializable> currTask, HiveConf hconf)
          Add the StatsTask as a dependent task of the MoveTask because StatsTask will change the Table/Partition metadata.
static ConditionalTask createCondTask(HiveConf conf, Task<? extends Serializable> currTask, MoveWork mvWork, Serializable mergeWork, String inputPath)
          Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
static org.apache.hadoop.fs.Path createMoveTask(Task<? extends Serializable> currTask, boolean chDir, FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> mvTasks, HiveConf hconf, DependencyCollectionTask dependencyTask)
          Create and add any dependent move tasks
static void createMRWorkForMergingFiles(FileSinkOperator fsInput, org.apache.hadoop.fs.Path finalName, DependencyCollectionTask dependencyTask, List<Task<MoveWork>> mvTasks, HiveConf conf, Task<? extends Serializable> currTask)
           
static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc, org.apache.hadoop.fs.Path finalName, boolean hasDynamicPartitions)
          Create a block level merge task for RCFiles.
static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema)
           
static String findAlias(MapWork work, Operator<?> operator)
           
static Set<String> findAliases(MapWork work, Operator<?> startOp)
           
static Task<MoveWork> findMoveTask(List<Task<MoveWork>> mvTasks, FileSinkOperator fsOp)
           
static Set<Operator<?>> findTopOps(Operator<?> startOp, Class<?> clazz)
           
static Set<Partition> getConfirmedPartitionsForScan(QBParseInfo parseInfo)
           
static List<org.apache.hadoop.fs.Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
           
static MapredWork getMapRedWork(ParseContext parseCtx)
          create a new plan and return.
static MapredWork getMapRedWorkFromConf(HiveConf conf)
          create a new plan and return.
static void initPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
          Initialize the current plan by adding it to root tasks.
static void initUnionPlan(GenMRProcContext opProcCtx, UnionOperator currUnionOp, Task<? extends Serializable> currTask, boolean local)
           
static void initUnionPlan(ReduceSinkOperator op, UnionOperator currUnionOp, GenMRProcContext opProcCtx, Task<? extends Serializable> unionTask)
          Initialize the current union plan.
static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp)
          Returns true iff current query is an insert into for the given file sink
static boolean isMergeRequired(List<Task<MoveWork>> mvTasks, HiveConf hconf, FileSinkOperator fsOp, Task<? extends Serializable> currTask, boolean isInsertTable)
          Returns true iff the fsOp requires a merge
static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc)
          check if it is skewed table and stored as dirs.
static void joinPlan(Task<? extends Serializable> currTask, Task<? extends Serializable> oldTask, GenMRProcContext opProcCtx)
          Merge the current task into the old task for the reducer
static void joinUnionPlan(GenMRProcContext opProcCtx, UnionOperator currUnionOp, Task<? extends Serializable> currentUnionTask, Task<? extends Serializable> existingTask, boolean local)
           
static void linkMoveTask(FileSinkOperator newOutput, ConditionalTask cndTsk, List<Task<MoveWork>> mvTasks, HiveConf hconf, DependencyCollectionTask dependencyTask)
          Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all possible subtrees branching from the ConditionalTask.
static void linkMoveTask(Task<MoveWork> mvTask, Task<? extends Serializable> task, HiveConf hconf, DependencyCollectionTask dependencyTask)
          Follows the task tree down from task and makes all leaves parents of mvTask
static boolean needsTagging(ReduceWork rWork)
           
static Operator<? extends OperatorDesc> putOpInsertMap(Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext parseCtx)
          insert in the map for the operator to row resolver.
static void replaceMapWork(String sourceAlias, String targetAlias, MapWork source, MapWork target)
          Replace the Map-side operator tree associated with targetAlias in target with the Map-side operator tree associated with sourceAlias in source.
static void setKeyAndValueDesc(ReduceWork plan, Operator<? extends OperatorDesc> topOp)
          set key and value descriptor.
static void setKeyAndValueDesc(ReduceWork work, ReduceSinkOperator rs)
          Set key and value descriptor
static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task)
          Set the key and value description for all the tasks rooted at the given task.
static void setMapWork(MapWork plan, ParseContext parseCtx, Set<ReadEntity> inputs, PrunedPartitionList partsList, Operator<? extends OperatorDesc> topOp, String alias_id, HiveConf conf, boolean local)
          initialize MapWork
static void setTaskPlan(String alias_id, Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local, GenMRProcContext opProcCtx)
          set the current task in the mapredWork.
static void setTaskPlan(String alias_id, Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local, GenMRProcContext opProcCtx, PrunedPartitionList pList)
          set the current task in the mapredWork.
static void setTaskPlan(String path, String alias, Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local, TableDesc tt_desc)
          set the current task in the mapredWork.
 
Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Method Detail

needsTagging

public static boolean needsTagging(ReduceWork rWork)

initPlan

public static void initPlan(ReduceSinkOperator op,
                            GenMRProcContext opProcCtx)
                     throws SemanticException
Initialize the current plan by adding it to root tasks.

Parameters:
op - the reduce sink operator encountered
opProcCtx - processing context
Throws:
SemanticException

initUnionPlan

public static void initUnionPlan(ReduceSinkOperator op,
                                 UnionOperator currUnionOp,
                                 GenMRProcContext opProcCtx,
                                 Task<? extends Serializable> unionTask)
                          throws SemanticException
Initialize the current union plan.

Parameters:
op - the reduce sink operator encountered
opProcCtx - processing context
Throws:
SemanticException

initUnionPlan

public static void initUnionPlan(GenMRProcContext opProcCtx,
                                 UnionOperator currUnionOp,
                                 Task<? extends Serializable> currTask,
                                 boolean local)
                          throws SemanticException
Throws:
SemanticException

joinUnionPlan

public static void joinUnionPlan(GenMRProcContext opProcCtx,
                                 UnionOperator currUnionOp,
                                 Task<? extends Serializable> currentUnionTask,
                                 Task<? extends Serializable> existingTask,
                                 boolean local)
                          throws SemanticException
Throws:
SemanticException

joinPlan

public static void joinPlan(Task<? extends Serializable> currTask,
                            Task<? extends Serializable> oldTask,
                            GenMRProcContext opProcCtx)
                     throws SemanticException
Merge the current task into the old task for the reducer

Parameters:
currTask - the current task for the current reducer
oldTask - the old task for the current reducer
opProcCtx - processing context
Throws:
SemanticException

setTaskPlan

public static void setTaskPlan(String alias_id,
                               Operator<? extends OperatorDesc> topOp,
                               Task<?> task,
                               boolean local,
                               GenMRProcContext opProcCtx)
                        throws SemanticException
set the current task in the mapredWork.

Parameters:
alias_id - current alias
topOp - the top operator of the stack
plan - current plan
local - whether you need to add to map-reduce or local work
opProcCtx - processing context
Throws:
SemanticException

setTaskPlan

public static void setTaskPlan(String alias_id,
                               Operator<? extends OperatorDesc> topOp,
                               Task<?> task,
                               boolean local,
                               GenMRProcContext opProcCtx,
                               PrunedPartitionList pList)
                        throws SemanticException
set the current task in the mapredWork.

Parameters:
alias_id - current alias
topOp - the top operator of the stack
plan - current plan
local - whether you need to add to map-reduce or local work
opProcCtx - processing context
pList - pruned partition list. If it is null it will be computed on-the-fly.
Throws:
SemanticException

setMapWork

public static void setMapWork(MapWork plan,
                              ParseContext parseCtx,
                              Set<ReadEntity> inputs,
                              PrunedPartitionList partsList,
                              Operator<? extends OperatorDesc> topOp,
                              String alias_id,
                              HiveConf conf,
                              boolean local)
                       throws SemanticException
initialize MapWork

Parameters:
alias_id - current alias
topOp - the top operator of the stack
plan - map work to initialize
local - whether you need to add to map-reduce or local work
pList - pruned partition list. If it is null it will be computed on-the-fly.
inputs - read entities for the map work
conf - current instance of hive conf
Throws:
SemanticException

setTaskPlan

public static void setTaskPlan(String path,
                               String alias,
                               Operator<? extends OperatorDesc> topOp,
                               MapWork plan,
                               boolean local,
                               TableDesc tt_desc)
                        throws SemanticException
set the current task in the mapredWork.

Parameters:
alias - current alias
topOp - the top operator of the stack
plan - current plan
local - whether you need to add to map-reduce or local work
tt_desc - table descriptor
Throws:
SemanticException

setKeyAndValueDesc

public static void setKeyAndValueDesc(ReduceWork work,
                                      ReduceSinkOperator rs)
Set key and value descriptor

Parameters:
work - RedueWork
rs - ReduceSinkOperator

setKeyAndValueDesc

public static void setKeyAndValueDesc(ReduceWork plan,
                                      Operator<? extends OperatorDesc> topOp)
set key and value descriptor.

Parameters:
plan - current plan
topOp - current top operator in the path

setKeyAndValueDescForTaskTree

public static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task)
Set the key and value description for all the tasks rooted at the given task. Loops over all the tasks recursively.

Parameters:
task -

getMapRedWork

public static MapredWork getMapRedWork(ParseContext parseCtx)
create a new plan and return.

Returns:
the new plan

getMapRedWorkFromConf

public static MapredWork getMapRedWorkFromConf(HiveConf conf)
create a new plan and return. The pan won't contain the name to split sample information in parse context.

Returns:
the new plan

putOpInsertMap

public static Operator<? extends OperatorDesc> putOpInsertMap(Operator<? extends OperatorDesc> op,
                                                              RowResolver rr,
                                                              ParseContext parseCtx)
insert in the map for the operator to row resolver.

Parameters:
op - operator created
rr - row resolver
parseCtx - parse context

createTemporaryTableScanOperator

public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema)

replaceMapWork

public static void replaceMapWork(String sourceAlias,
                                  String targetAlias,
                                  MapWork source,
                                  MapWork target)
Replace the Map-side operator tree associated with targetAlias in target with the Map-side operator tree associated with sourceAlias in source.

Parameters:
sourceAlias -
targetAlias -
source -
target -

createMRWorkForMergingFiles

public static void createMRWorkForMergingFiles(FileSinkOperator fsInput,
                                               org.apache.hadoop.fs.Path finalName,
                                               DependencyCollectionTask dependencyTask,
                                               List<Task<MoveWork>> mvTasks,
                                               HiveConf conf,
                                               Task<? extends Serializable> currTask)
                                        throws SemanticException
Parameters:
fsInput - The FileSink operator.
ctx - The MR processing context.
finalName - the final destination path the merge job should output.
dependencyTask -
mvTasks -
conf -
currTask -
Throws:
SemanticException - create a Map-only merge job using CombineHiveInputFormat for all partitions with following operators: MR job J0: ... | v FileSinkOperator_1 (fsInput) | v Merge job J1: | v TableScan (using CombineHiveInputFormat) (tsMerge) | v FileSinkOperator (fsMerge) Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths do not contain the dynamic partitions (their parent). So after the dynamic partitions are created (after the first job finished before the moveTask or ConditionalTask start), we need to change the pathToPartitionInfo & pathToAlias to include the dynamic partition directories.

linkMoveTask

public static void linkMoveTask(FileSinkOperator newOutput,
                                ConditionalTask cndTsk,
                                List<Task<MoveWork>> mvTasks,
                                HiveConf hconf,
                                DependencyCollectionTask dependencyTask)
Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all possible subtrees branching from the ConditionalTask.

Parameters:
newOutput -
cndTsk -
mvTasks -
hconf -
dependencyTask -

linkMoveTask

public static void linkMoveTask(Task<MoveWork> mvTask,
                                Task<? extends Serializable> task,
                                HiveConf hconf,
                                DependencyCollectionTask dependencyTask)
Follows the task tree down from task and makes all leaves parents of mvTask

Parameters:
mvTask -
task -
hconf -
dependencyTask -

addDependentMoveTasks

public static void addDependentMoveTasks(Task<MoveWork> mvTask,
                                         HiveConf hconf,
                                         Task<? extends Serializable> parentTask,
                                         DependencyCollectionTask dependencyTask)
Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as well.

Parameters:
mvTask -
hconf -
parentTask -
dependencyTask -

addStatsTask

public static void addStatsTask(FileSinkOperator nd,
                                MoveTask mvTask,
                                Task<? extends Serializable> currTask,
                                HiveConf hconf)
Add the StatsTask as a dependent task of the MoveTask because StatsTask will change the Table/Partition metadata. For atomicity, we should not change it before the data is actually there done by MoveTask.

Parameters:
nd - the FileSinkOperator whose results are taken care of by the MoveTask.
mvTask - The MoveTask that moves the FileSinkOperator's results.
currTask - The MapRedTask that the FileSinkOperator belongs to.
hconf - HiveConf

isInsertInto

public static boolean isInsertInto(ParseContext parseCtx,
                                   FileSinkOperator fsOp)
Returns true iff current query is an insert into for the given file sink

Parameters:
parseCtx -
fsOp -
Returns:

createRCFileMergeTask

public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
                                            org.apache.hadoop.fs.Path finalName,
                                            boolean hasDynamicPartitions)
                                     throws SemanticException
Create a block level merge task for RCFiles.

Parameters:
fsInputDesc -
finalName -
Returns:
MergeWork if table is stored as RCFile, null otherwise
Throws:
SemanticException

createCondTask

public static ConditionalTask createCondTask(HiveConf conf,
                                             Task<? extends Serializable> currTask,
                                             MoveWork mvWork,
                                             Serializable mergeWork,
                                             String inputPath)
Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.

Parameters:
conf - HiveConf
currTask - current leaf task
mvWork - MoveWork for the move task
mergeWork - MapredWork for the merge task.
inputPath - the input directory of the merge/move task
Returns:
The conditional task

isSkewedStoredAsDirs

public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc)
check if it is skewed table and stored as dirs.

Parameters:
fsInputDesc -
Returns:

findMoveTask

public static Task<MoveWork> findMoveTask(List<Task<MoveWork>> mvTasks,
                                          FileSinkOperator fsOp)

isMergeRequired

public static boolean isMergeRequired(List<Task<MoveWork>> mvTasks,
                                      HiveConf hconf,
                                      FileSinkOperator fsOp,
                                      Task<? extends Serializable> currTask,
                                      boolean isInsertTable)
Returns true iff the fsOp requires a merge

Parameters:
mvTasks -
hconf -
fsOp -
currTask -
isInsertTable -
Returns:

createMoveTask

public static org.apache.hadoop.fs.Path createMoveTask(Task<? extends Serializable> currTask,
                                                       boolean chDir,
                                                       FileSinkOperator fsOp,
                                                       ParseContext parseCtx,
                                                       List<Task<MoveWork>> mvTasks,
                                                       HiveConf hconf,
                                                       DependencyCollectionTask dependencyTask)
Create and add any dependent move tasks

Parameters:
currTask -
chDir -
fsOp -
parseCtx -
mvTasks -
hconf -
dependencyTask -
Returns:

getConfirmedPartitionsForScan

public static Set<Partition> getConfirmedPartitionsForScan(QBParseInfo parseInfo)

getInputPathsForPartialScan

public static List<org.apache.hadoop.fs.Path> getInputPathsForPartialScan(QBParseInfo parseInfo,
                                                                          StringBuffer aggregationKey)
                                                                   throws SemanticException
Throws:
SemanticException

findAliases

public static Set<String> findAliases(MapWork work,
                                      Operator<?> startOp)

findTopOps

public static Set<Operator<?>> findTopOps(Operator<?> startOp,
                                          Class<?> clazz)

findAlias

public static String findAlias(MapWork work,
                               Operator<?> operator)


Copyright © 2014 The Apache Software Foundation. All rights reserved.