摘要

本文通过跟代码的方式,分析从输入一批Pig-latin到输出物理运行计划(与launcher引擎有关,通常是MR运行计划。也能够是Spark RDD的运行算子)的总体流程。

不会详细涉及AST怎样解析、怎样使用了Anltr、逻辑运行计划怎样映射、逻辑运行计划怎样优化、MR运行计划怎样切分为MR Job,而是从输入一批Pig DSL到待运行的真正运行计划的关键变化步骤(方法和类)。

运行计划完整解析

入口处书Main类的main函数

/**
* The Main-Class for the Pig Jar that will provide a shell and setup a classpath appropriate
* for executing Jar files. Warning, this method calls System.exit().
*
* @param args
* -jar can be used to add additional jar files (colon separated). - will start a
* shell. -e will execute the rest of the command line as if it was input to the
* shell.
* @throws IOException
*/
public static void main(String args[]) {
// add win HADOOP_HOME
// make sure you have "winutils.exe" under /bin
// if not, download one from https://github.com/srccodes/hadoop-common-2.2.0-bin/tree/master/bin // entrance for local debug
// better try: -x spark -e cmds
System.exit(run(args, null));
}

Main -> GruntParser,这是第一步。

Main首先进行一些參数初始化(启动模式、输入类型推断、初始化类等等)。然后借助GruntParser解析输入的pig-latin脚本

Pig源代码分析: 简析运行计划的生成-LMLPHP

grunt.exec()之后的下一个关键步骤是进入GruntParser的解析。

GruntParser里parse()会依赖PigScriptParser.jj文件。详细代码跟不进去,终于生成的是语法树。

在出现Dump操作之后,进入GruntParser的processDump方法。

Pig源代码分析: 简析运行计划的生成-LMLPHP

这步结束之后是完毕了语法层面的解析。

PigServer-> QueryParserDriver。PigServer的parseQuery方法会进入QueryParserDriver的parse(query)方法,返回逻辑运行计划

Pig源代码分析: 简析运行计划的生成-LMLPHP

QueryParserDriver->LogicalPlanGenerator,生成逻辑运行计划详细依靠的是LogicalPlanGenerator。

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

截止到这,是生成了逻辑运行计划。

PigServer-> HExecutionEngine。接下来是优化逻辑运行计划和生成物理运行计划。

Pig源代码分析: 简析运行计划的生成-LMLPHP

接下来HExecutionEngine.compile(LogicalPlan, Properties)先优化逻辑运行计划。

HExecutionEngine在初始化的时候。会针对不同的情况组合不同的优化策略(disable某些规则)。

Pig源代码分析: 简析运行计划的生成-LMLPHP

这个PlanOptimizer优化的过程在之前 逻辑运行计划优化 的文档里已经有了详细过程了。

再生成物理运行计划,

Pig源代码分析: 简析运行计划的生成-LMLPHP

主要通过LogToPhyTranslcationVisitor内在walk遍历逻辑运行计划节点的时候,

针对不同的Op accept()时触发相应LogToPhyTranslcationVisitor的多态visit(Op)方法。实现逻辑运行计划步骤同物理运行计划步骤的映射。

接下来就是launchPlan。依据配置启动Launcher。例如以下是启动了SparkLauncher

Pig源代码分析: 简析运行计划的生成-LMLPHP

SparkLauncher里对物理运行计划的每一个步骤进行了RDD操作的翻译(直接相应算子),运行后以SparkStats返回,内含OutputInfo信息(包含结果文件地址等信息)。

关于Pig on Spark怎样实现SparkLauncher和翻译物理运行计划算子,能够參考我的Github这篇博文来阅读代码。

还有一条路是启动MapReduceLauncher。MRCompiler把物理运行计划翻译成MR运行计划。基本的翻译过程在compile(PO)方法里。

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

MR运行计划的翻译主要有两步。

首先是MRCompiler.compile(),把物理运行计划翻译到MapReduce运行计划

其次JobControlCompiler.compile(),输入MROperPlan,返回JobControl。这步控制MR Job队列

以上是从输入脚本到输出运行计划的总体流程。

比較粗糙,但关键步骤和过程都有。

样例

为了直观起见。我把自己跑的样例贴出来。包含pig-latin。逻辑运行计划。物理运行计划,MR运行计划。

pig-latin

REGISTER D:/tutorial.jar;

raw = LOAD 'D:/github/flare-spork/tutorial/data/excite-small.log' USING PigStorage('\t') AS (user, time, query);

clean1 = FILTER raw BY org.apache.pig.tutorial.NonURLDetector(query);

clean2 = FOREACH clean1 GENERATE user, time, org.apache.pig.tutorial.ToLower(query) as query;

houred = FOREACH clean2 GENERATE user, org.apache.pig.tutorial.ExtractHour(time) as hour, query;

ngramed1 = FOREACH houred GENERATE user, hour, flatten(org.apache.pig.tutorial.NGramGenerator(query)) as ngram;

ngramed2 = DISTINCT ngramed1;

hour_frequency1 = GROUP ngramed2 BY (ngram, hour);

hour_frequency2 = FOREACH hour_frequency1 GENERATE flatten($0), COUNT($1) as count;

uniq_frequency1 = GROUP hour_frequency2 BY group::ngram;

uniq_frequency2 = FOREACH uniq_frequency1 GENERATE flatten($0), flatten(org.apache.pig.tutorial.ScoreGenerator($1));

uniq_frequency3 = FOREACH uniq_frequency2 GENERATE $1 as hour, $0 as ngram, $2 as score, $3 as count, $4 as mean;

filtered_uniq_frequency = FILTER uniq_frequency3 BY score > 2.0;

ordered_uniq_frequency = ORDER filtered_uniq_frequency BY hour, score;

dump ordered_uniq_frequency;

逻辑运行计划

#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
ordered_uniq_frequency: (Name: LOSort Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double)
| |
| hour:(Name: Project Type: chararray Uid: 22 Input: 0 Column: hour)
| |
| score:(Name: Project Type: double Uid: 23 Input: 0 Column: score)
|
|---filtered_uniq_frequency: (Name: LOFilter Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double)
| |
| (Name: GreaterThan Type: boolean Uid: 29)
| |
| |---score:(Name: Project Type: double Uid: 23 Input: 0 Column: score)
| |
| |---(Name: Constant Type: double Uid: 28)
|
|---uniq_frequency3: (Name: LOForEach Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double)
| |
| (Name: LOGenerate[false,false,false,false,false] Schema: hour#22:chararray,ngram#9:chararray,score#23:double,count#24:long,mean#25:double)
| | |
| | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour:(Name: Project Type: chararray Uid: 22 Input: 0 Column: (*))
| | |
| | group:(Name: Project Type: chararray Uid: 9 Input: 1 Column: (*))
| | |
| | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score:(Name: Project Type: double Uid: 23 Input: 2 Column: (*))
| | |
| | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count:(Name: Project Type: long Uid: 24 Input: 3 Column: (*))
| | |
| | org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean:(Name: Project Type: double Uid: 25 Input: 4 Column: (*))
| |
| |---(Name: LOInnerLoad[1] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray)
| |
| |---(Name: LOInnerLoad[0] Schema: group#9:chararray)
| |
| |---(Name: LOInnerLoad[2] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double)
| |
| |---(Name: LOInnerLoad[3] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long)
| |
| |---(Name: LOInnerLoad[4] Schema: org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double)
|
|---uniq_frequency2: (Name: LOForEach Schema: group#9:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double)
| |
| (Name: LOGenerate[true,true] Schema: group#9:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::hour#22:chararray,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::score#23:double,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::count#24:long,org.apache.pig.tutorial.scoregenerator_hour_frequency2_27::mean#25:double)
| | |
| | group:(Name: Project Type: chararray Uid: 9 Input: 0 Column: (*))
| | |
| | (Name: UserFunc(org.apache.pig.tutorial.ScoreGenerator) Type: bag Uid: 20)
| | |
| | |---hour_frequency2:(Name: Project Type: bag Uid: 16 Input: 1 Column: (*))
| |
| |---(Name: LOInnerLoad[0] Schema: group#9:chararray)
| |
| |---hour_frequency2: (Name: LOInnerLoad[1] Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long)
|
|---uniq_frequency1: (Name: LOCogroup Schema: group#9:chararray,hour_frequency2#16:bag{#31:tuple(group::ngram#9:chararray,group::hour#6:chararray,count#14:long)})
| |
| group::ngram:(Name: Project Type: chararray Uid: 9 Input: 0 Column: group::ngram)
|
|---hour_frequency2: (Name: LOForEach Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long)
| |
| (Name: LOGenerate[true,false] Schema: group::ngram#9:chararray,group::hour#6:chararray,count#14:long)
| | |
| | group:(Name: Project Type: tuple Uid: 10 Input: 0 Column: (*))
| | |
| | (Name: UserFunc(org.apache.pig.builtin.COUNT) Type: long Uid: 14)
| | |
| | |---ngramed2:(Name: Project Type: bag Uid: 11 Input: 1 Column: (*))
| |
| |---(Name: LOInnerLoad[0] Schema: group#10:tuple(ngram#9:chararray,hour#6:chararray))
| |
| |---ngramed2: (Name: LOInnerLoad[1] Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray)
|
|---hour_frequency1: (Name: LOCogroup Schema: group#10:tuple(ngram#9:chararray,hour#6:chararray),ngramed2#11:bag{#30:tuple(user#1:bytearray,hour#6:chararray,ngram#9:chararray)})
| |
| ngram:(Name: Project Type: chararray Uid: 9 Input: 0 Column: ngram)
| |
| hour:(Name: Project Type: chararray Uid: 6 Input: 0 Column: hour)
|
|---ngramed2: (Name: LODistinct Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray)
|
|---ngramed1: (Name: LOForEach Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray)
| |
| (Name: LOGenerate[false,false,true] Schema: user#1:bytearray,hour#6:chararray,ngram#9:chararray)
| | |
| | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*))
| | |
| | hour:(Name: Project Type: chararray Uid: 6 Input: 1 Column: (*))
| | |
| | (Name: UserFunc(org.apache.pig.tutorial.NGramGenerator) Type: bag Uid: 7)
| | |
| | |---query:(Name: Project Type: chararray Uid: 5 Input: 2 Column: (*))
| |
| |---(Name: LOInnerLoad[user] Schema: user#1:bytearray)
| |
| |---(Name: LOInnerLoad[hour] Schema: hour#6:chararray)
| |
| |---(Name: LOInnerLoad[query] Schema: query#5:chararray)
|
|---houred: (Name: LOForEach Schema: user#1:bytearray,hour#6:chararray,query#5:chararray)
| |
| (Name: LOGenerate[false,false,false] Schema: user#1:bytearray,hour#6:chararray,query#5:chararray)
| | |
| | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*))
| | |
| | (Name: UserFunc(org.apache.pig.tutorial.ExtractHour) Type: chararray Uid: 6)
| | |
| | |---time:(Name: Project Type: bytearray Uid: 2 Input: 1 Column: (*))
| | |
| | query:(Name: Project Type: chararray Uid: 5 Input: 2 Column: (*))
| |
| |---(Name: LOInnerLoad[user] Schema: user#1:bytearray)
| |
| |---(Name: LOInnerLoad[time] Schema: time#2:bytearray)
| |
| |---(Name: LOInnerLoad[query] Schema: query#5:chararray)
|
|---clean2: (Name: LOForEach Schema: user#1:bytearray,time#2:bytearray,query#5:chararray)
| |
| (Name: LOGenerate[false,false,false] Schema: user#1:bytearray,time#2:bytearray,query#5:chararray)
| | |
| | user:(Name: Project Type: bytearray Uid: 1 Input: 0 Column: (*))
| | |
| | time:(Name: Project Type: bytearray Uid: 2 Input: 1 Column: (*))
| | |
| | (Name: UserFunc(org.apache.pig.tutorial.ToLower) Type: chararray Uid: 5)
| | |
| | |---query:(Name: Project Type: bytearray Uid: 3 Input: 2 Column: (*))
| |
| |---(Name: LOInnerLoad[user] Schema: user#1:bytearray)
| |
| |---(Name: LOInnerLoad[time] Schema: time#2:bytearray)
| |
| |---(Name: LOInnerLoad[query] Schema: query#3:bytearray)
|
|---clean1: (Name: LOFilter Schema: user#1:bytearray,time#2:bytearray,query#3:bytearray)
| |
...

物理运行计划

|
|---ordered_uniq_frequency: POSort[bag]() - scope-70
| |
| Project[chararray][0] - scope-68
| |
| Project[double][2] - scope-69
|
|---uniq_frequency3: New For Each(false,false,false,false,false)[bag] - scope-67
| |
| Project[chararray][1] - scope-57
| |
| Project[chararray][0] - scope-59
| |
| Project[double][2] - scope-61
| |
| Project[long][3] - scope-63
| |
| Project[double][4] - scope-65
|
|---filtered_uniq_frequency: Filter[bag] - scope-53
| |
| Greater Than[boolean] - scope-56
| |
| |---Project[double][2] - scope-54
| |
| |---Constant(2.0) - scope-55
|
|---uniq_frequency2: New For Each(true,true)[bag] - scope-52
| |
| Project[chararray][0] - scope-47
| |
| POUserFunc(org.apache.pig.tutorial.ScoreGenerator)[bag] - scope-50
| |
| |---Project[bag][1] - scope-49
|
|---uniq_frequency1: Package[tuple]{chararray} - scope-44
|
|---uniq_frequency1: Global Rearrange[tuple] - scope-43
|
|---uniq_frequency1: Local Rearrange[tuple]{chararray}(false) - scope-45
| |
| Project[chararray][0] - scope-46
|
|---hour_frequency2: New For Each(true,false)[bag] - scope-42
| |
| Project[tuple][0] - scope-37
| |
| POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-40
| |
| |---Project[bag][1] - scope-39
|
|---hour_frequency1: Package[tuple]{tuple} - scope-33
|
|---hour_frequency1: Global Rearrange[tuple] - scope-32
|
|---hour_frequency1: Local Rearrange[tuple]{tuple}(false) - scope-34
| |
| Project[chararray][2] - scope-35
| |
| Project[chararray][1] - scope-36
|
|---ngramed2: PODistinct[bag] - scope-31
|
|---ngramed1: New For Each(false,false,true)[bag] - scope-30
| |
| Project[bytearray][0] - scope-23
| |
| Project[chararray][1] - scope-25
| |
| POUserFunc(org.apache.pig.tutorial.NGramGenerator)[bag] - scope-28
| |
| |---Project[chararray][2] - scope-27
|
|---houred: New For Each(false,false,false)[bag] - scope-22
| |
| Project[bytearray][0] - scope-14
| |
| POUserFunc(org.apache.pig.tutorial.ExtractHour)[chararray] - scope-18
| |
| |---Cast[chararray] - scope-17
| |
| |---Project[bytearray][1] - scope-16
| |
| Project[chararray][2] - scope-20
|
|---clean2: New For Each(false,false,false)[bag] - scope-13
| |
| Project[bytearray][0] - scope-5
| |
| Project[bytearray][1] - scope-7
| |
| POUserFunc(org.apache.pig.tutorial.ToLower)[chararray] - scope-11
| |
| |---Cast[chararray] - scope-10
| |
| |---Project[bytearray][2] - scope-9
|
|---clean1: Filter[bag] - scope-1
| |
| POUserFunc(org.apache.pig.tutorial.NonURLDetector)[boolean] - scope-4
| |
| |---Cast[chararray] - scope-3
| |
| |---Project[bytearray][2] - scope-2
|
|---raw: Load(D:/github/flare-spork/tutorial/data/excite-small.log:PigStorage(' ')) - scope-0

MR运行计划

# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-72
Map Plan
Local Rearrange[tuple]{tuple}(true) - scope-74
| |
| Project[tuple][*] - scope-73
|
|---ngramed1: New For Each(false,false,true)[bag] - scope-30
| |
| Project[bytearray][0] - scope-23
| |
| Project[chararray][1] - scope-25
| |
| POUserFunc(org.apache.pig.tutorial.NGramGenerator)[bag] - scope-28
| |
| |---Project[chararray][2] - scope-27
|
|---houred: New For Each(false,false,false)[bag] - scope-22
| |
| Project[bytearray][0] - scope-14
| |
| POUserFunc(org.apache.pig.tutorial.ExtractHour)[chararray] - scope-18
| |
| |---Cast[chararray] - scope-17
| |
| |---Project[bytearray][1] - scope-16
| |
| Project[chararray][2] - scope-20
|
|---clean2: New For Each(false,false,false)[bag] - scope-13
| |
| Project[bytearray][0] - scope-5
| |
| Project[bytearray][1] - scope-7
| |
| POUserFunc(org.apache.pig.tutorial.ToLower)[chararray] - scope-11
| |
| |---Cast[chararray] - scope-10
| |
| |---Project[bytearray][2] - scope-9
|
|---clean1: Filter[bag] - scope-1
| |
| POUserFunc(org.apache.pig.tutorial.NonURLDetector)[boolean] - scope-4
| |
| |---Cast[chararray] - scope-3
| |
| |---Project[bytearray][2] - scope-2
|
|---raw: Load(D:/github/flare-spork/tutorial/data/excite-small.log:PigStorage(' ')) - scope-0--------
Reduce Plan
Store(file:/tmp/temp1620254926/tmp-1456742965:org.apache.pig.impl.io.InterStorage) - scope-78
|
|---New For Each(true)[bag] - scope-77
| |
| Project[tuple][0] - scope-76
|
|---Package[tuple]{tuple} - scope-75--------
Global sort: false
---------------- MapReduce node scope-80
Map Plan
hour_frequency1: Local Rearrange[tuple]{tuple}(false) - scope-34
| |
| Project[chararray][2] - scope-35
| |
| Project[chararray][1] - scope-36
|
|---Load(file:/tmp/temp1620254926/tmp-1456742965:org.apache.pig.impl.io.InterStorage) - scope-79--------
Reduce Plan
Store(file:/tmp/temp1620254926/tmp2077335416:org.apache.pig.impl.io.InterStorage) - scope-81
|
|---hour_frequency2: New For Each(true,false)[bag] - scope-42
| |
| Project[tuple][0] - scope-37
| |
| POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-40
| |
| |---Project[bag][1] - scope-39
|
|---hour_frequency1: Package[tuple]{tuple} - scope-33--------
Global sort: false
---------------- MapReduce node scope-83
Map Plan
uniq_frequency1: Local Rearrange[tuple]{chararray}(false) - scope-45
| |
| Project[chararray][0] - scope-46
|
|---Load(file:/tmp/temp1620254926/tmp2077335416:org.apache.pig.impl.io.InterStorage) - scope-82--------
Reduce Plan
Store(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.io.InterStorage) - scope-84
|
|---uniq_frequency3: New For Each(false,false,false,false,false)[bag] - scope-67
| |
| Project[chararray][1] - scope-57
| |
| Project[chararray][0] - scope-59
| |
| Project[double][2] - scope-61
| |
| Project[long][3] - scope-63
| |
| Project[double][4] - scope-65
|
|---filtered_uniq_frequency: Filter[bag] - scope-53
| |
| Greater Than[boolean] - scope-56
| |
| |---Project[double][2] - scope-54
| |
| |---Constant(2.0) - scope-55
|
|---uniq_frequency2: New For Each(true,true)[bag] - scope-52
| |
| Project[chararray][0] - scope-47
| |
| POUserFunc(org.apache.pig.tutorial.ScoreGenerator)[bag] - scope-50
| |
| |---Project[bag][1] - scope-49
|
|---uniq_frequency1: Package[tuple]{chararray} - scope-44--------
Global sort: false
---------------- MapReduce node scope-86
Map Plan
ordered_uniq_frequency: Local Rearrange[tuple]{chararray}(false) - scope-91
| |
| Constant(all) - scope-90
|
|---New For Each(false,false)[tuple] - scope-89
| |
| Project[chararray][0] - scope-87
| |
| Project[double][2] - scope-88
|
|---Load(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.impl.io.InterStorage','100')) - scope-85--------
Reduce Plan
Store(file:/tmp/temp1620254926/tmp-586682361:org.apache.pig.impl.io.InterStorage) - scope-101
|
|---New For Each(false)[tuple] - scope-100
| |
| POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - scope-99
| |
| |---Project[tuple][*] - scope-98
|
|---New For Each(false,false)[tuple] - scope-97
| |
| Constant(-1) - scope-96
| |
| ordered_uniq_frequency: POSort[bag]() - scope-70
| | |
| | Project[chararray][0] - scope-94
| | |
| | Project[double][1] - scope-95
| |
| |---Project[bag][1] - scope-93
|
|---Package[tuple]{chararray} - scope-92--------
Global sort: false
---------------- MapReduce node scope-103
Map Plan
ordered_uniq_frequency: Local Rearrange[tuple]{tuple}(false) - scope-104
| |
| Project[chararray][0] - scope-68
| |
| Project[double][2] - scope-69
|
|---Load(file:/tmp/temp1620254926/tmp-26634357:org.apache.pig.impl.io.InterStorage) - scope-102--------
Reduce Plan
ordered_uniq_frequency: Store(file:/tmp/temp1620254926/tmp-225116343:org.apache.pig.impl.io.InterStorage) - scope-71
|
|---New For Each(true)[tuple] - scope-107
| |
| Project[bag][1] - scope-106
|
|---PackageLite[tuple]{tuple} - scope-105--------
Global sort: true
Quantile file: file:/tmp/temp1620254926/tmp-586682361
----------------

全文完 :)

05-24 15:14