Catalyst

Catalyst是与Spark解耦的一个独立库,是一个impl-free的运行计划的生成和优化框架。

眼下与Spark Core还是耦合的。对此user邮件组里有人对此提出疑问,见mail

以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程。

整理对Spark SQL的理解-LMLPHP

Catalyst定位

其它系统假设想基于Spark做一些类sql、标准sql甚至其它查询语言的查询,须要基于Catalyst提供的解析器、运行计划树结构、逻辑运行计划的处理规则体系等类体系来实现运行计划的解析、生成、优化、映射工作。

相应上图中,主要是左側的TreeNodelib及中间三次转化过程中涉及到的类结构都是Catalyst提供的。至于右側物理运行计划映射生成过程。物理运行计划基于成本的优化模型,详细物理算子的运行都由系统自己实现。

Catalyst现状

在解析器方面提供的是一个简单的scala写的sql parser,支持语义有限,并且应该是标准sql的。

在规则方面,提供的优化规则是比較基础的(和Pig/Hive比没有那么丰富)。只是一些优化规则事实上是要涉及到详细物理算子的。所以部分规则须要在系统方那自己制定和实现(如spark-sql里的SparkStrategy)。

Catalyst也有自己的一套数据类型。

以下介绍Catalyst里几套重要的类结构。

TreeNode体系

TreeNode是Catalyst运行计划表示的数据结构,是一个树结构,具备一些scala collection的操作能力和树遍历能力。

这棵树一直在内存里维护,不会dump到磁盘以某种格式的文件存在。且不管在映射逻辑运行计划阶段还是优化逻辑运行计划阶段,树的改动是以替换已有节点的方式进行的。

TreeNode。内部带一个children: Seq[BaseType]表示孩子节点。具备foreach、map、collect等针对节点操作的方法。以及transformDown(默认。前序遍历)、transformUp这种遍历树上节点,对匹配节点实施变化的方法。

提供UnaryNode,BinaryNode, LeafNode三种trait。即非叶子节点同意有一个或两个子节点。

TreeNode提供的是范型。

TreeNode有两个子类继承体系,QueryPlan和Expression。QueryPlan以下是逻辑和物理运行计划两个体系,前者在Catalyst里有详细实现,后者须要在系统自己实现。Expression是表达式体系。后面章节都会展开介绍。

整理对Spark SQL的理解-LMLPHP

Tree的transformation实现:

传入PartialFunction[TreeType,TreeType],假设与操作符匹配,则节点会被结果替换掉,否则节点不会变动。整个过程是对children递归运行的。

运行计划表示模型

逻辑运行计划

QueryPlan继承自TreeNode,内部带一个output: Seq[Attribute],具备transformExpressionDown、transformExpressionUp方法。

在Catalyst中,QueryPlan的主要子类体系是LogicalPlan,即逻辑运行计划表示。其物理运行计划表示由使用方实现(spark-sql项目中)。

LogicalPlan继承自QueryPlan,内部带一个reference:Set[Attribute],主要方法为resolve(name:String): Option[NamedeExpression],用于分析生成相应的NamedExpression。

LogicalPlan有很多详细子类,也分为UnaryNode, BinaryNode, LeafNode三类。详细在org.apache.spark.sql.catalyst.plans.logical路径下。

整理对Spark SQL的理解-LMLPHP

逻辑运行计划实现

LeafNode主要子类是Command体系:

整理对Spark SQL的理解-LMLPHP

各command的语义能够从子类名字看出,代表的是系统能够运行的non-query命令。如DDL。

UnaryNode的子类:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />

BinaryNode的子类:

整理对Spark SQL的理解-LMLPHP

物理运行计划

还有一方面。物理运行计划节点在详细系统里实现,比方spark-sqlproject里的SparkPlan继承体系。

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />

物理运行计划实现

每一个子类都要实现execute()方法,大致有以下实现子类(不全)。

LeadNode的子类:

整理对Spark SQL的理解-LMLPHP

UnaryNode的子类:

整理对Spark SQL的理解-LMLPHP

BinaryNode的子类:

整理对Spark SQL的理解-LMLPHP

提到物理运行计划,还要提一下Catalyst提供的分区表示模型

运行计划映射

Catalyst还提供了一个QueryPlanner[Physical <: TreeNode[PhysicalPlan]]抽象类。须要子类制定一批strategies: Seq[Strategy],其apply方法也是相似依据制定的详细策略来把逻辑运行计划算子映射成物理运行计划算子。由于物理运行计划的节点是在详细系统里实现的,所以QueryPlanner及里面的strategies也须要在详细系统里实现。

整理对Spark SQL的理解-LMLPHP

在spark-sql项目中,SparkStrategies继承了QueryPlanner[SparkPlan],内部制定了LeftSemiJoin, HashJoin,PartialAggregation, BroadcastNestedLoopJoin, CartesianProduct等几种策略,每种策略接受的都是一个LogicalPlan,生成的是Seq[SparkPlan],每一个SparkPlan理解为详细RDD的算子操作。

比方在BasicOperators这个Strategy里。以match-case匹配的方式处理了非常多基本算子(能够一对一直接映射成RDD算子),例如以下:

case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
case logical.Aggregate(group, agg, child) =>
execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil
case logical.Sample(fraction, withReplacement, seed, child) =>
execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil

Expression体系

Expression。即表达式,指不须要运行引擎计算,而能够直接计算或处理的节点,包含Cast操作,Projection操作,四则运算,逻辑操作符运算等。

详细能够參考org.apache.spark.sql.expressionspackage下的类。

Rules体系

凡是须要处理运行计划树(Analyze过程。Optimize过程。SparkStrategy过程),实施规则匹配和节点处理的,都须要继承RuleExecutor[TreeType]抽象类。

RuleExecutor内部提供了一个Seq[Batch],里面定义的是该RuleExecutor的处理步骤。每一个Batch代表着一套规则,配备一个策略,该策略说明了迭代次数(一次还是多次)。

protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

Rule[TreeType <: TreeNode[_]]是一个抽象类,子类须要复写apply(plan: TreeType)方法来制定处理逻辑。

RuleExecutor的apply(plan: TreeType): TreeType方法会依照batches顺序和batch内的Rules顺序,对传入的plan里的节点迭代处理,处理逻辑为由详细Rule子类实现。

Hive相关

Hive支持方式

Spark SQL对hive的支持是单独的spark-hive项目,对Hive的支持包含HQL查询、hive metaStore信息、hive SerDes、hive UDFs/UDAFs/ UDTFs,相似Shark。

仅仅有在HiveContext下通过hive api获得的数据集,才干够使用hql进行查询。其hql的解析依赖的是org.apache.hadoop.hive.ql.parse.ParseDriver类的parse方法,生成Hive AST。

实际上sql和hql。并非一起支持的。能够理解为hql是独立支持的,能被hql查询的数据集必须读取自hive api。下图中的parquet、json等其它文件支持仅仅发生在sql环境下(SQLContext)。

整理对Spark SQL的理解-LMLPHP

Hive on Spark

Hive官方提出了Hive onSpark的JIRA

Shark结束之后,拆分为两个方向:

整理对Spark SQL的理解-LMLPHP

Spark SQL里如今对Hive的支持,体如今复用了Hive的meta store数据、hql解析、UDFs、SerDes。在运行DDL和某些简单命令的时候。调的是hive客户端。hql翻译前会处理一些与query主体无关的set, cache, addfile等命令,然后调用ParserDriver翻译hql,并把AST转换成Catalyst的LogicalPlan。兴许优化、物理运行计划翻译及运行过程,与Sql一样使用的是Catalyst提供的内容,运行引擎是Spark。在整个结合过程中,ASTNode映射成LogicalPlan是重点。

整理对Spark SQL的理解-LMLPHP

而Hive社区的Hive on Spark会如何实现。详细參考jira里的设计文档

与Shark对照,

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />

Shark多依赖了Hive的运行计划相关模块以及CLI。

CLI和JDBC部分是Spark SQL兴许打算支持的。

Shark额外提供的对Table数据行转列、序列化、压缩存内存的模块,也被拿到了Spark Sql的sqlproject里。

以上说明了Shark与Spark SQL Hive的差别,对Shark这个项目继承性的理解。

而Spark SQL Hive与Hive社区 Hive on Spark的差别须要详细參考jira里的设计文档,我也还没有读过。

spark-hiveproject

整理对Spark SQL的理解-LMLPHP

解析过程

HiveQl.parseSql()把hql解析成logicalPlan。

解析过程,提取出一些command。包含:

²  set key=value

²  cache table

²  uncache table

²  add jar

²  add file

²  dfs

²  source

然后由Hive的ParseDriver把hql解析成AST,得到ASTNode。

def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))

把Node转化为Catalyst的LogicalPlan。转化逻辑较复杂,也是Sparksql对hql支持的最关键部分。详见HiveQl.nodeToPlan(node: Node):LogicalPlan方法。

大致转换逻辑包含:

处理TOK_EXPLAIN和TOK_DESCTABLE

处理TOK_CREATETABLE,包含创建表时候一系列表的设置TOK_XXX

处理TOK_QUERY。包含TOK_SELECT,TOK_WHERE,TOK_GROUPBY,TOK_HAVING。TOK_SORTEDBY。TOK_LIMIT等等。对FROM后面跟的语句进行nodeToRelation处理。

处理TOK_UNION

对Hive AST树结构和表示不熟悉。所以此处略过。

Analyze过程

metadata交互

Catalog类为HiveMetastoreCatalog,通过hive的conf生成client(org.apache.hadoop.hive.ql.metadata.Hive,用于与MetaStore通信,获得metadata以及进行DDL操作)。catalog的lookupRelation方法里面,client.getTable()得到表信息,client.getAddPartitionsForPruner()得到分区信息。

udf相关

FunctionRegistry类为HiveFunctionRegistry,依据方法名,通过hive的相关类去查询该方法,检查是否具有该方法,是UDF,还是UDAF(aggregation)。或是UDTF(table)。

这里仅仅做已有udf的查询。不做新方法的include。

与Catalyst的Expression相应继承关系例如以下:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />

Inspector相关

HiveInspectors提供了几个映射数据类型和ObjectInspetor子类的方法,包含PrimitiveObjectInspector,ListObjectInspector,MapObjectInspector,StructObjectInspector四种

Optimizer过程

在做优化前,会尝试对之前生成的逻辑运行计划进行createtabl操作,由于运行的hql可能是“CREATE TABLE XXX”,这部分处理在HiveMetastoreCatalog的CreateTables单例里。继承了Rule[LogicalPlan]。

以及PreInsertionCasts处理,也是HiveMetastoreCatalog里的单例,继承了Rule[LogicalPlan]。

之后的optimizer过程同SQLContext里,用的是同一个Catalyst提供的Optimizer类。

Planner及运行过程

HiveContext继承自SQLContext,其QueryExecution也继承自SQLContext的QueryExecution。兴许运行计划优化、物理运行计划翻译、处理及运行过程同SQL的处理逻辑是一致的。

翻译物理运行计划的时候。hive planner里制定了些特定的策略,与SparkPlanner稍有不同。

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />

多了Scripts,DataSinks,HiveTableScans和HiveCommandStrategy四种处理物理运行计划的策略(见HiveStrategies)。

1.  Scripts,用于处理那种hive命令行运行脚本的情况。实现方式是使用ProcessBuilder新起一个JVM进程的方式,用”/bin/bash –c scripts”的方式运行脚本并获取输出流数据,转化为Catalyst Row数据格式。

2.  DataSinks,用于把数据写入到Hive表的情况。

里面涉及到一些hive读写的数据格式转化、序列化、读配置等工作。最后通过SparkContext的runJob接口,提交作业。

3.  HiveTableScans。用于对hive table进行扫描,支持使用谓词的分区裁剪(Partition pruning predicates are detected and applied)。

4.  HiveCommandStrategy,用于运行native command和describe command。我理解是这种命令是直接调hive客户端单机运行的,由于可能仅仅与meta data打交道。

toRDD: RDD[Row]处理也有少许差别,返回RDD[Row]的时候,对每一个元素做了一次拷贝。

SQL Core

Spark SQL的核心是把已有的RDD,带上Schema信息,然后注冊成相似sql里的”Table”。对其进行sql查询。这里面主要分两部分,一是生成SchemaRD。二是运行查询。

生成SchemaRDD

假设是spark-hive项目,那么读取metadata信息作为Schema、读取hdfs上数据的过程交给Hive完毕,然后依据这俩部分生成SchemaRDD,在HiveContext下进行hql()查询。

对于Spark SQL来说。

数据方面,RDD能够来自不论什么已有的RDD。也能够来自支持的第三方格式。如json file、parquet file。

SQLContext下会把带case class的RDD隐式转化为SchemaRDD

implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
new SchemaRDD(this,
SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))

ExsitingRdd单例里会反射出case class的attributes,并把RDD的数据转化成Catalyst的GenericRow,最后返回RDD[Row]。即一个SchemaRDD。这里的详细转化逻辑能够參考ExsitingRdd的productToRowRdd和convertToCatalyst方法。

之后能够进行SchemaRDD提供的注冊table操作、针对Schema复写的部分RDD转化操作、DSL操作、saveAs操作等等。

Row和GenericRow是Catalyst里的行表示模型

Row用Seq[Any]来表示values,GenericRow是Row的子类,用数组表示values。Row支持数据类型包含Int, Long, Double, Float, Boolean, Short, Byte, String。支持按序数(ordinal)读取某一个列的值。读取前须要做isNullAt(i: Int)的推断。

各自都有Mutable类,提供setXXX(i: int, value: Any)改动某序数上的值。

层次结构

整理对Spark SQL的理解-LMLPHP

下图大致对照了Pig,Spark SQL。Shark在实现层次上的差别,仅做參考。

整理对Spark SQL的理解-LMLPHP

整理对Spark SQL的理解-LMLPHP

查询流程

SQLContext里对sql的一个解析和运行流程:

1.  第一步parseSql(sql: String),simple sql parser做词法语法解析。生成LogicalPlan。

2.  第二步analyzer(logicalPlan),把做完词法语法解析的运行计划进行初步分析和映射,

眼下SQLContext内的Analyzer由Catalyst提供,定义例如以下:

new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)

catalog为SimpleCatalog,catalog是用来注冊table和查询relation的。

而这里的FunctionRegistry不支持lookupFunction方法。所以该analyzer不支持Function注冊,即UDF。

Analyzer内定义了几批规则:

  val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once,
NewRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once,
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*),
Batch("Check Analysis", Once,
CheckResolution),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)

3.  从第二步得到的是初步的logicalPlan。接下来第三步是optimizer(plan)。

Optimizer里面也是定义了几批规则。会按序对运行计划进行优化操作。

  val batches =
Batch("Combine Limits", FixedPoint(100),
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
NullPropagation,
ConstantFolding,
LikeSimplification,
BooleanSimplification,
SimplifyFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions) ::
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughJoin,
ColumnPruning) :: Nil

4.  优化后的运行计划,还要丢给SparkPlanner处理,里面定义了一些策略,目的是依据逻辑运行计划树生成最后能够运行的物理运行计划树。即得到SparkPlan。

    val strategies: Seq[Strategy] =
CommandStrategy(self) ::
TakeOrdered ::
PartialAggregation ::
LeftSemiJoin ::
HashJoin ::
InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil

5.  在终于真正运行物理运行计划前,最后还要进行两次规则,SQLContext里定义这个过程叫prepareForExecution,这个步骤是额外添加的,直接new RuleExecutor[SparkPlan]进行的。

    val batches =
Batch("Add exchange", Once, AddExchange(self)) ::
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil

6.  最后调用SparkPlan的execute()运行计算。

这个execute()在每种SparkPlan的实现里定义。一般都会递归调用children的execute()方法,所以会触发整棵Tree的计算。

其它特性

内存列存储

SQLContext下cache/uncache table的时候会调用列存储模块。

该模块借鉴自Shark,目的是当把表数据cache在内存的时候做行转列操作。以便压缩。

实现类

InMemoryColumnarTableScan类是SparkPlan LeafNode的实现。即是一个物理运行计划。

传入一个SparkPlan(确认了的物理运行计)和一个属性序列,内部包含一个行转列、触发计算并cache的过程(且是lazy的)。

ColumnBuilder针对不同的数据类型(boolean, byte, double, float, int, long, short, string)由不同的子类把数据写到ByteBuffer里,即包装Row的每一个field。生成Columns。与其相应的ColumnAccessor是訪问column,将其转回Row。

CompressibleColumnBuilder和CompressibleColumnAccessor是带压缩的行列转换builder,其ByteBuffer内部存储结构例如以下

 *    .--------------------------- Column type ID (4 bytes)
* | .----------------------- Null count N (4 bytes)
* | | .------------------- Null positions (4 x N bytes, empty if null count is zero)
* | | | .------------- Compression scheme ID (4 bytes)
* | | | | .--------- Compressed non-null elements
* V V V V V
* +---+---+-----+---+---------+
* | | | ... | | ... ... |
* +---+---+-----+---+---------+
* \-----------/ \-----------/
* header body

CompressionScheme子类是不同的压缩实现

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcGVsaWNr/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" />

都是scala实现的。未借助第三方库。不同的实现,指定了支持的column data类型。在build()的时候,会比較每种压缩。选择压缩率最小的(若仍大于0.8就不压缩了)。

这里的估算逻辑,来自子类实现的gatherCompressibilityStats方法。

Cache逻辑

cache之前,须要先把本次cache的table的物理运行计划生成出来。

在cache这个过程里,InMemoryColumnarTableScan并没有触发运行。可是生成了以InMemoryColumnarTableScan为物理运行计划的SparkLogicalPlan,并存成table的plan。

事实上在cache的时候,首先去catalog里寻找这个table的信息和table的运行计划,然后会进行运行(运行到物理运行计划生成),然后把这个table再放回catalog里维护起来。这个时候的运行计划已经是终于要运行的物理运行计划了。可是此时Columner模块相关的转换等操作都是没有触发的。

真正的触发还是在execute()的时候。同其它SparkPlan的execute()方法触发场景是一样的。

Uncache逻辑

UncacheTable的时候,除了删除catalog里的table信息之外。还调用了InMemoryColumnarTableScan的cacheColumnBuffers方法。得到RDD集合,并进行了unpersist()操作。

cacheColumnBuffers主要做了把RDD每一个partition里的ROW的每一个Field存到了ColumnBuilder内。

UDF(暂不支持)

如前面对SQLContext里Analyzer的分析,其FunctionRegistry没有实现lookupFunction。

在spark-hive项目里,HiveContext里是实现了FunctionRegistry这个trait的,事实上现为HiveFunctionRegistry,实现逻辑见org.apache.spark.sql.hive.hiveUdfs

Parquet支持

JSON支持

SQLContext下,添加了jsonFile的读取方法。并且眼下看,代码里实现的是hadoop textfile的读取,也就是这份json文件应该是在HDFS上的。详细这份json文件的加载,InputFormat是TextInputFormat,key class是LongWritable。value class是Text,最后得到的是value部分的那段String内容,即RDD[String]。

除了jsonFile,还支持jsonRDD,样例:

http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

读取json文件之后,转换成SchemaRDD。JsonRDD.inferSchema(RDD[String])里有详细的解析json和映射出schema的过程,最后得到该json的LogicalPlan。

Json的解析使用的是FasterXML/jackson-databind库,GitHub地址wiki

把数据映射成Map[String, Any]

Json的支持丰富了Spark SQL数据接入场景。

JDBC支持

Jdbc support branchis under going

SQL92

Spark SQL眼下的SQL语法支持情况见SqlParser类。目标是支持SQL92??

1. 基本应用上,sql server 和oracle都遵循sql 92语法标准

2. 实际应用中大家都会超出以上标准。使用各家数据库厂商都提供的丰富的自己定义标准函数库和语法。

3. 微软sql server的sql 扩展叫T-SQL(Transcate SQL).

4. Oracle 的sql 扩展叫PL-SQL.

存在问题

大家能够跟进社区邮件列表,兴许待整理。

http://apache-spark-developers-list.1001551.n3.nabble.com/sparkSQL-thread-safe-td7263.html

http://apache-spark-user-list.1001560.n3.nabble.com/Supported-SQL-syntax-in-Spark-SQL-td9538.html

总结

以上整理了对Spark SQL各个模块的实现情况。代码结构。运行流程以及自己对Spark SQL的理解。
理解有偏差的地方欢迎交流讨论 :)
全文完 :)
05-11 22:20