oozie是Hadoop平台中的任务调度系统,可以将不同类型的作业串联起来,oozie中的核心概念称为workflow,即工作流,每种类型的作业都是一个工作流,oozie中已经集成的workflow包括hive、spark、hdfs、distcp等,有时我们可能需要扩展workflow,添加业务需要的逻辑,在这里介绍下扩展workflow的一般步骤。

       这里以扩展livy的workflow为例,主要功能是通过sparksql执行用户的sql语句。

       首先需要继承ActionExecutor类,并重写其方法,在对应的方法中完成与livy server交互的逻辑:

class LivyActionExecutor extends ActionExecutor("livy") {

	private val logger = LoggerFactory.getLogger(classOf[LivyActionExecutor])

	private val LOG = XLog.getLog(getClass)

	private val SCRIPT = "script"

	private val PROXY_USER = "proxyUser"

	private val PARAM = "param"

	private val QUEUE = "queue"

	private val CONFIGURATION = "configuration"

	private val DRIVER_MEMORY = "driverMemory"

	private val EXECUTOR_MEMORY = "executorMemory"

	private val EXECUTOR_NUMBERS = "numExecutors"

        //根据用户配置的参数,提交请求给livy server
	override def start(context: ActionExecutor.Context, action: WorkflowAction): Unit = {

		LogUtils.setLogInfo(action)
		LOG.warn(XLog.STD, "oozie workflow start")

		val actionXml = XmlUtils.parseXml(action.getConf)
		val ns = actionXml.getNamespace
		val scriptFile = actionXml.getChildText(SCRIPT, ns)
		val proxyUser = actionXml.getChildText(PROXY_USER, ns)
		val params = actionXml.getChildren(PARAM, ns).asInstanceOf[java.util.List[Element]]
		var paramsList = List[String]()
		for (param <- params) {
			paramsList = paramsList :+ param.getTextTrim
		}

		var map:Map[String, String] = Map()
		val driverMemory = actionXml.getChildText(DRIVER_MEMORY, ns)
		if (StringUtils.isNotEmpty(driverMemory)) {
			map += (DRIVER_MEMORY -> driverMemory)
		}
		val executorMemory = actionXml.getChildText(EXECUTOR_MEMORY, ns)
		if (StringUtils.isNotEmpty(executorMemory)) {
			map += (EXECUTOR_MEMORY -> executorMemory)
		}
		val numExecutors = actionXml.getChildText(EXECUTOR_NUMBERS, ns)
		if (StringUtils.isNotEmpty(numExecutors)) {
			map += (EXECUTOR_NUMBERS -> numExecutors)
		}
		val queue = actionXml.getChildText(QUEUE, ns)
		if (StringUtils.isNotEmpty(queue)) {
			map += (QUEUE -> queue)
		}

		val configurations = actionXml.getChild(CONFIGURATION, ns)
		val properties = configurations.getChildren("property", ns).asInstanceOf[java.util.List[Element]]
		var configMap:Map[String, String] = Map()
		for (property <- properties) {
			val name = property.getChildText("name", ns)
			val value = property.getChildText("value", ns)
			configMap += (name -> value)
		}
		if (configMap.nonEmpty) {
			map += ("conf" -> configMap)
		}

		val livyService = Services.get.get(classOf[LivyService])
		val sessionTuple = livyService.submitTask(scriptFile, proxyUser, map, paramsList)
		if (sessionTuple == null) {
			throw new RuntimeException("submit livy task failed")
		}
		LOG.warn(XLog.STD, s"batch session created : ${sessionTuple._2}, proxyUser: $proxyUser, scriptFile: $scriptFile, wfId: ${action.getId}")
		LivyActionExecutor.oozieIdToSessionTuple += (action.getId -> sessionTuple)
		LivyActionExecutor.sessionTupleToLogIndex += (sessionTuple -> 0)
		LivyActionExecutor.sessionTupleIdUpdate += (sessionTuple -> false)
		val trackerUri = s"http://${sessionTuple._1}/batches/${sessionTuple._2}"
		context.setStartData(s"livy-batch-${sessionTuple._2}", trackerUri, "-")
		Thread.sleep(5000)
		check(context, action)
	}

	override def end(context: ActionExecutor.Context, action: WorkflowAction): Unit = {
		val externalStatus = action.getExternalStatus
		var status = WorkflowAction.Status.OK
		if (!externalStatus.equals("OK")) {
			status = WorkflowAction.Status.ERROR
		}
		context.setEndData(status, getActionSignal(status))
	}

	override def check(context: ActionExecutor.Context, wfAction: WorkflowAction): Unit = {
		try {
			LogUtils.setLogInfo(wfAction)
			val wfId = wfAction.getId
			val action = WorkflowActionQueryExecutor.getInstance.get(WorkflowActionQuery.GET_ACTION, wfId)

			if (!LivyActionExecutor.oozieIdToSessionTuple.contains(wfId)) {
				val trackUri = action.getTrackerUri
				if (StringUtils.isNotEmpty(trackUri)) {
					val livyNode = StringUtils.substringBetween(trackUri, "http://", "/batches/")
					val sessionId = StringUtils.substringAfter(trackUri, "/batches/")
					val sessionTuple = (livyNode, sessionId.toInt)
					LivyActionExecutor.oozieIdToSessionTuple += (wfId -> sessionTuple)
					LivyActionExecutor.sessionTupleToLogIndex += (sessionTuple -> 200)
					LivyActionExecutor.sessionTupleIdUpdate += (sessionTuple -> false)
					logger.info(s"recovery wfId: $wfId, livyNode: $livyNode, sessionId: $sessionId")
				}
			}
			LivyActionExecutor.oozieIdToSessionTuple.get(wfId).foreach(sessionTuple => {
				val livyService = Services.get.get(classOf[LivyService])
				val state = livyService.getTaskState(sessionTuple._1, sessionTuple._2)
				logger.info(s"check livy batch session ${sessionTuple._2} state: $state")

				val update = LivyActionExecutor.sessionTupleIdUpdate(sessionTuple)
				if (!update) {
					val tuple3 = livyService.getSessionState(sessionTuple._1, sessionTuple._2)
					if (tuple3 != null) {
						val appId = tuple3._2
						val sparkUiUrl = tuple3._3
						if (StringUtils.isNotEmpty(appId) && StringUtils.isNotEmpty(sparkUiUrl)) {
                                                        //设置workflow的子作业id和url
							action.setExternalId(appId)
							action.setConsoleUrl(sparkUiUrl)
							WorkflowActionQueryExecutor.getInstance.executeUpdate(WorkflowActionQuery.UPDATE_ACTION, action)
							logger.info(s"WorkflowAction externalId: $appId, consoleUrl: $sparkUiUrl")
							LivyActionExecutor.sessionTupleIdUpdate += (sessionTuple -> true)
						}
					}
				}
				if (state == null) {
					context.setExecutionData("FAILED", null)
				} else {
					state match {
						case "success" => context.setExecutionData("OK", null)
						case "killed" => context.setExecutionData("KILLED", null)
						case "running" => logger.info(s"batch session ${sessionTuple._2} is in state $state")
						case "starting" | "error" | "dead" =>
							val logIndexStart = LivyActionExecutor.sessionTupleToLogIndex(sessionTuple)
							val logs = livyService.getSessionLog(sessionTuple._1, sessionTuple._2, logIndexStart, 200)
							if (logs != null) {
								for (log <- logs) {
									LOG.warn(XLog.STD, log)
								}
								LivyActionExecutor.sessionTupleToLogIndex += (sessionTuple -> logs.size)
							}
							if (StringUtils.equals(state, "error") || StringUtils.equals(state, "dead")) {
								context.setExecutionData("FAILED", null)
							}
						case _ =>
					}
				}
			})
		} catch {
			case e: Exception => logger.error(s"check action error: ", e)
		}
	}

	override def kill(context: ActionExecutor.Context, action: WorkflowAction): Unit = {
		val wfId = action.getId
		LivyActionExecutor.oozieIdToSessionTuple.get(wfId).foreach(sessionTuple => {
			val livyService = Services.get.get(classOf[LivyService])
			livyService.killTask(sessionTuple._1, sessionTuple._2)
			logger.info(s"kill livy batch session ${sessionTuple._2}")
			context.setExternalStatus("KILLED")
		})
	}

	override def isCompleted(s: String): Boolean = {
		true
	}
}

    在resource目录中新建livy-action-0.1.xsd文件:

<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
           xmlns:livy="uri:oozie:livy-action:0.1" elementFormDefault="qualified"
           targetNamespace="uri:oozie:livy-action:0.1">
    <xs:element name="livy" type="livy:ACTION"/>
    <xs:complexType name="ACTION">
        <xs:sequence>
            <xs:element name="queue" type="xs:string" minOccurs="0" maxOccurs="1" />
            <xs:element name="driverMemory" type="xs:string" minOccurs="0" maxOccurs="1" />
            <xs:element name="executorMemory" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="numExecutors" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="proxyUser" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="10"/>
            <xs:element name="configuration" type="livy:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>

    <xs:complexType name="CONFIGURATION">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
</xs:schema>

   打包生成jar文件,将jar拷贝至oozie安装路径的lib目录下面

   修改conf目录下的oozie-site.xml文件:

   在oozie.service.SchemaService.wf.ext.schemas中添加livy-action-0.1.xsd

   在oozie.service.ActionService.executor.ext.classes中添加自定义的action类

   在oozie.services.ext中添加扩展的service类,这里类是LivyService,可以把一些访问接口的逻辑放到类里。

   重启oozie,自定义的action即加载进去,可以执行相关的逻辑。

10-05 13:07