测试demo 来自官方例子

nats docker-compose file

version: '3.3'
services:
nats:
image: nats
ports:
- "4222:4222"
- "8222:8222"
- "6222:6222"

benthos stream 配置

  • 整体说明
    里面的例子就是数据从文件读取到nats 之后不同的消费者进行nats 消息消费,再到webhook,webhook 标准输出

  • 文件读取说明

input:
type: read_until
read_until:
condition:
type: static
static: false
input:
type: file
file:
path: ./sample.json
restart_input: true
pipeline:
processors:
- type: throttle
throttle:
period: 3s
output:
type: nats
nats:
subject: benthos_messages
urls:
- nats://localhost:4222
  • nats 消费者(三个)
input:
type: nats
nats:
subject: benthos_messages
urls:
- nats://localhost:4222
pipeline:
processors:
- type: filter
filter:
type: jmespath
jmespath:
query: |
keys(@) | contains(@, 'title')
output:
type: http_client
http_client:
url: http://localhost:4195/webhooks/post/customer1
verb: POST
  • webhook 配置
input:
type: broker
broker:
inputs:
- type: http_server
http_server:
path: /post/customer1
processors:
- type: text
text:
operator: prepend
value: "Customer 1 received: "
- type: http_server
http_server:
path: /post/customer2
processors:
- type: text
text:
operator: prepend
value: "Customer 2 received: "
- type: http_server
http_server:
path: /post/customer3
processors:
- type: text
text:
operator: prepend
value: "Customer 3 received: "
output:
type: stdout

运行

  • docker-compose
dokcer-compose up -d
  • 启动benthos,基于文件stream 配置
benthos --streams --streams-dir ./configs

运行效果

benthos stream nats 集成试用-LMLPHP
修改sample.json 测试
benthos stream nats 集成试用-LMLPHP

参考资料

https://github.com/Jeffail/benthos/tree/master/resources/docker/streams

 
 
 
 
05-11 14:47