我的目标是使用AWS Kinesis API创建具有特定名称的Kinesis流(如果尚不存在的话),然后首先写入它是否存在。

到目前为止,这是我想出的。尝试创建流。如果失败并显示代码400,并返回请求ID,则说明该流已经存在。然后写入流以确保它在那里。在Go中:

k := kinesis.New(session.New())
_, err := k.CreateStream(&kinesis.CreateStreamInput{
    ShardCount: aws.Int64(2),
    StreamName: aws.String("stream"),
})
if err != nil {
    if reqerr, ok := err.(awserr.RequestFailure); ok {
        if reqerr.RequestID() == "" {
            log.Fatal("request was not delivered as it has no ID",
                reqerr.Code(),
                reqerr.Message(),
            )
        }
        if reqerr.StatusCode() != 400 {
            log.Fatal("unexpected status code", reqerr.StatusCode())
        }
    } else {
        log.Fatal(err)
    }
}
// Code 400 + requestID does not necessarily mean that the stream exists
// So write to the stream to confirm it exists
_, err = k.PutRecord(&kinesis.PutRecordInput{
    Data:         []byte("Hello Kinesis"),
    PartitionKey: aws.String("partitionkey"),
    StreamName:   aws.String("stream"),
})
if err != nil {
    log.Fatal(err)
}

上面的方法似乎很复杂,更重要的是,我认为它无法与我期望的确切错误有效匹配。在错误消息上进行字符串比较似乎也是一个错误的选择,因为这很容易改变。

我想知道是否有更可靠,更直接的方法来实现这一目标?列出所有可用的要搜索的流是一件很麻烦的事情,因为它是线性搜索,并且涉及带有ExclusiveStartStreamName新值的多个请求。

最佳答案

描述流。如果没有流,请创建流并旋转等待以使其变为 Activity 状态。

创建流后,您将无法立即将其推送到该流。它将首先转换为CREATING,然后过一会(几秒钟)转换为ACTIVE。

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#DescribeStream-instance_method

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#CreateStream-instance_method

您还可以使用ListStreams快速查看所有流的状态:

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#ListStreams-instance_method

关于amazon-web-services - AWS Kinesis : determine whether a named stream exists,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35636659/

10-11 06:44