问题

我有一条管道链:

class PipelineA(base_handler.PipelineBase):
  def run(self, *args):
    # do something

class PipelineB(base_handler.PipelineBase):
  def run(self, *args):
    # do something


class EntryPipeline(base_handler.PipelineBase):
  def run(self):

    if some_condition():
      self.abort("Condition failed. Pipeline aborted!")

    yield PipelineA()

    mr_output = yield mapreduce_pipeline.MapreducePipeline(
      # mapreduce configs here
      # ...
    )

    yield PipelineB(mr_output)

p = EntryPipeline()
p.start()


EntryPipeline中,我正在测试一些条件,然后再启动PipelineAMapreducePipelinePipelineB。如果条件失败,我想中止EntryPipeline和所有后续管道。

问题


什么是优美的管道中止? self.abort()是正确的方法还是我需要sys.exit()
如果要在PipelineA中进行流产怎么办?例如PipelineA成功启动,但是阻止随后的管道(MapreducePipelinePipelineB)启动。




编辑:

我最终将条件语句移到了EntryPipeline之外,因此仅当条件为true时才开始整个操作。否则,我认为尼克的答案是正确的。

最佳答案

由于文档当前说“ TODO:谈论显式中止和重试”

我们将必须阅读源代码:

https://github.com/GoogleCloudPlatform/appengine-pipelines/blob/master/python/src/pipeline/pipeline.py#L703

  def abort(self, abort_message=''):
    """Mark the entire pipeline up to the root as aborted.
    Note this should only be called from *outside* the context of a running
    pipeline. Synchronous and generator pipelines should raise the 'Abort'
    exception to cause this behavior during execution.

    Args:
      abort_message: Optional message explaining why the abort happened.

    Returns:
      True if the abort signal was sent successfully; False if the pipeline
      could not be aborted for any reason.
    """


因此,如果您拥有不属于自己的some_pipeline的句柄,则可以调用some_pipeline.abort()...,但是如果您想中止自己,则需要提高Abort()...,这会冒泡到顶部杀死整棵树

关于python - 如何优雅地中止App Engine管道?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29979036/

10-12 17:42