本文介绍了如何扇出AWS kinesis流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将输入的AWS Kinesis流成扇形散布/链接/复制到N个新的Kinesis流,以使写入输入Kinesis的每条记录都将出现在N条流中的每条中./p>

是否有 AWS服务或开源解决方案?

如果有现成的解决方案,我不希望编写代码来做到这一点. AWS Kinesis Firehose 是没有解决方案,因为它无法输出到运动学.如果运行起来不会太昂贵,也许是AWS Lambda解决方案?

解决方案

有两种方法可以散布Amazon Kinesis流:

  • 使用 Amazon Kinesis Analytics 将记录复制到其他流中
  • 触发一个 AWS Lambda 函数以将记录复制到另一个流中

选项1:使用Amazon Kinesis Analytics进行扇出

您可以使用 Amazon Kinesis Analytics 生成来自现有流的新流.

来自 Amazon Kinesis Analytics文档 :

应用代码" 部分:

我设法实现了以下目的:

  • 创建了三个流:input,output1,output2
  • 创建了两个Amazon Kinesis Analytics应用程序:copy1,copy2

Amazon Kinesis Analytics SQL应用程序如下所示:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";

此代码创建一个(将其视为连续的select语句),该泵从input流中进行选择并输出到output1流.我创建了另一个相同的应用程序,输出到output2流.

为了测试,我将数据发送到了input流:

#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data={}
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)

我让它运行一段时间,然后使用以下代码显示输出:

from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]

输出为:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']

我再次为output2运行它,并显示了相同的输出.

选项2:使用AWS Lambda

如果您要扇出许多流,则更有效的方法可能是创建一个AWS Lambda函数:

  • 由Amazon Kinesis流记录触发
  • 将记录写入到多个Amazon Kinesis输出"流

您甚至可以让Lambda函数根据命名约定自动发现输出流(例如,任何名为app-output-*的流).

I'd like to fanout/chain/replicate an Input AWS Kinesis stream To N new Kinesis streams, So that each record written to the input Kinesis will appear in each of the N streams.

Is there an AWS service or an open source solution?

I prefer not to write code to do that if there's a ready-made solution. AWS Kinesis firehose is a no solution because it can't output to kinesis. Perhaps a AWS Lambda solution if that won't be too expensive to run?

解决方案

There are two ways you could accomplish fan-out of an Amazon Kinesis stream:

  • Use Amazon Kinesis Analytics to copy records to additional streams
  • Trigger an AWS Lambda function to copy records to another stream

Option 1: Using Amazon Kinesis Analytics to fan-out

You can use Amazon Kinesis Analytics to generate a new stream from an existing stream.

From the Amazon Kinesis Analytics documentation:

Fan-out is mentioned in the Application Code section:

I managed to implement this as follows:

  • Created three streams: input, output1, output2
  • Created two Amazon Kinesis Analytics applications: copy1, copy2

The Amazon Kinesis Analytics SQL application looks like this:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";

This code creates a pump (think of it as a continual select statement) that selects from the input stream and outputs to the output1 stream. I created another identical application that outputs to the output2 stream.

To test, I sent data to the input stream:

#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data={}
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)

I let it run for a while, then displayed the output using this code:

from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]

The output was:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']

I ran it again for output2 and the identical output was shown.

Option 2: Using AWS Lambda

If you are fanning-out to many streams, a more efficient method might be to create an AWS Lambda function:

  • Triggered by Amazon Kinesis stream records
  • That writes records to multiple Amazon Kinesis 'output' streams

You could even have the Lambda function self-discover the output streams based on a naming convention (eg any stream named app-output-*).

这篇关于如何扇出AWS kinesis流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-24 16:33