使apache风暴拓扑结构使用来自kafka的最新偏移量(Make apache storm topology to use latest offset from kafka)

我有一个kafkaspout,2个螺栓来处理数据,2个螺栓存储处理的数据在mongodb中

我使用apache flux来创建拓扑,我正在从kafka读取数据到喷口。 一切运行良好,但每次运行拓扑时,它都会从头开始处理卡夫卡中的所有消息。 一旦它处理了所有的信息,它就不会等待更多的信息和崩溃。

我如何才能使风暴拓扑仅处理最新的消息。

这里是我的拓扑文件.yaml

name: "kafka-topology" components: # MongoDB mapper - id: "block-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["block"] # MongoDB mapper - id: "transaction-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["transaction"] - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "172.25.33.191:2181" - id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: # brokerHosts - ref: "zkHosts" # topic - "blockdata" # zkRoot - "" # id - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "ignoreZkOffsets" value: flase config: topology.workers: 1 # ... # spout definitions spouts: - id: "kafka-spout" className: "org.apache.storm.kafka.KafkaSpout" constructorArgs: - ref: "spoutConfig" parallelism: 1 # bolt definitions bolts: - id: "blockprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "process-bolt.py"] # output fields - ["block"] parallelism: 1 # ... - id: "transprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "trans-bolt.py"] # output fields - ["transaction"] parallelism: 1 # ... - id: "mongoBlock-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "block" - ref: "block-mapper" parallelism: 1 # ... - id: "mongoTrans-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "transaction" - ref: "transaction-mapper" parallelism: 1 # ... - id: "log" className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" parallelism: 1 # ... #stream definitions # stream definitions define connections between spouts and bolts. # note that such connections can be cyclical # custom stream groupings are also supported streams: - name: "kafka --> block-Processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "blockprocessing-bolt" grouping: type: SHUFFLE - name: "kafka --> transaction-processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "transprocessing-bolt" grouping: type: SHUFFLE - name: "block --> mongo" from: "blockprocessing-bolt" to: "mongoBlock-bolt" grouping: type: SHUFFLE - name: "transaction --> mongo" from: "transprocessing-bolt" to: "mongoTrans-bolt" grouping: type: SHUFFLE

我曾尝试将属性添加到spoutconfig中,以便像这样获取最新的消息

- id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" - "blockdata" - "" - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "startOffsetTime" ref: "EarliestTime" - name: "forceFromStart" value: false

但是无论我放在startOffsetTime的参考中,它都会给出错误

Exception in thread "main" java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

I have a kafkaspout, 2 bolts to process data, 2 bolts to store processed data in mongodb

I am using apache flux to create topology where I am reading data into spout from kafka. Everything is running fine but every time I run the topology, it processes all the msgs in kafka from the start. and once it processes all the msgs, it does not wait for more msgs and crashes.

How can I make storm topology to process latest msgs only.

here is my topology file .yaml

name: "kafka-topology" components: # MongoDB mapper - id: "block-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["block"] # MongoDB mapper - id: "transaction-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["transaction"] - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "172.25.33.191:2181" - id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: # brokerHosts - ref: "zkHosts" # topic - "blockdata" # zkRoot - "" # id - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "ignoreZkOffsets" value: flase config: topology.workers: 1 # ... # spout definitions spouts: - id: "kafka-spout" className: "org.apache.storm.kafka.KafkaSpout" constructorArgs: - ref: "spoutConfig" parallelism: 1 # bolt definitions bolts: - id: "blockprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "process-bolt.py"] # output fields - ["block"] parallelism: 1 # ... - id: "transprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "trans-bolt.py"] # output fields - ["transaction"] parallelism: 1 # ... - id: "mongoBlock-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "block" - ref: "block-mapper" parallelism: 1 # ... - id: "mongoTrans-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "transaction" - ref: "transaction-mapper" parallelism: 1 # ... - id: "log" className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" parallelism: 1 # ... #stream definitions # stream definitions define connections between spouts and bolts. # note that such connections can be cyclical # custom stream groupings are also supported streams: - name: "kafka --> block-Processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "blockprocessing-bolt" grouping: type: SHUFFLE - name: "kafka --> transaction-processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "transprocessing-bolt" grouping: type: SHUFFLE - name: "block --> mongo" from: "blockprocessing-bolt" to: "mongoBlock-bolt" grouping: type: SHUFFLE - name: "transaction --> mongo" from: "transprocessing-bolt" to: "mongoTrans-bolt" grouping: type: SHUFFLE

I have tried adding property to spoutconfig for fetching latest msgs only like this

- id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" - "blockdata" - "" - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "startOffsetTime" ref: "EarliestTime" - name: "forceFromStart" value: false

But It gives error no matter what I place in ref of startOffsetTime

Exception in thread "main" java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

最满意答案

您需要将startOffsetTime设置为kafka.api.OffsetRequest.LatestTime。 正如你可以在https://github.com/apache/storm/tree/64af629a19a82591dbf3428f7fd6b02f39e0723f/external/storm-kafka#kafkaconfig中看到的,默认设置将转到可用的最早偏移量。

你打的例外似乎没有关系。 它看起来像一个策展人/动物园管理员不兼容。

编辑:我认为你遇到了这个问题https://issues.apache.org/jira/browse/STORM-2978 。 1.2.2应该很快出来,请在发布后尝试升级。

编辑编辑:如果你想在不升级的情况下解决它,编辑你的拓扑结构的POM,以便它包含对Zookeeper 3.4而不是3.5的依赖。

You need to set the startOffsetTime to kafka.api.OffsetRequest.LatestTime. As you can see at https://github.com/apache/storm/tree/64af629a19a82591dbf3428f7fd6b02f39e0723f/external/storm-kafka#kafkaconfig, the default setting will go to the earliest offset available.

The exception you're hitting seems unrelated. It looks like a Curator/Zookeeper incompatibility.

Edit: I think you're hitting this issue https://issues.apache.org/jira/browse/STORM-2978. 1.2.2 should be out soon, please try upgrading once it releases.

Edit edit: If you want to work around it without upgrading, edit the pom for your topology so it includes a dependency on Zookeeper 3.4 and not 3.5.

使apache风暴拓扑结构使用来自kafka的最新偏移量(Make apache storm topology to use latest offset from kafka)

我有一个kafkaspout,2个螺栓来处理数据,2个螺栓存储处理的数据在mongodb中

我使用apache flux来创建拓扑,我正在从kafka读取数据到喷口。 一切运行良好,但每次运行拓扑时,它都会从头开始处理卡夫卡中的所有消息。 一旦它处理了所有的信息,它就不会等待更多的信息和崩溃。

我如何才能使风暴拓扑仅处理最新的消息。

这里是我的拓扑文件.yaml

name: "kafka-topology" components: # MongoDB mapper - id: "block-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["block"] # MongoDB mapper - id: "transaction-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["transaction"] - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "172.25.33.191:2181" - id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: # brokerHosts - ref: "zkHosts" # topic - "blockdata" # zkRoot - "" # id - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "ignoreZkOffsets" value: flase config: topology.workers: 1 # ... # spout definitions spouts: - id: "kafka-spout" className: "org.apache.storm.kafka.KafkaSpout" constructorArgs: - ref: "spoutConfig" parallelism: 1 # bolt definitions bolts: - id: "blockprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "process-bolt.py"] # output fields - ["block"] parallelism: 1 # ... - id: "transprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "trans-bolt.py"] # output fields - ["transaction"] parallelism: 1 # ... - id: "mongoBlock-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "block" - ref: "block-mapper" parallelism: 1 # ... - id: "mongoTrans-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "transaction" - ref: "transaction-mapper" parallelism: 1 # ... - id: "log" className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" parallelism: 1 # ... #stream definitions # stream definitions define connections between spouts and bolts. # note that such connections can be cyclical # custom stream groupings are also supported streams: - name: "kafka --> block-Processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "blockprocessing-bolt" grouping: type: SHUFFLE - name: "kafka --> transaction-processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "transprocessing-bolt" grouping: type: SHUFFLE - name: "block --> mongo" from: "blockprocessing-bolt" to: "mongoBlock-bolt" grouping: type: SHUFFLE - name: "transaction --> mongo" from: "transprocessing-bolt" to: "mongoTrans-bolt" grouping: type: SHUFFLE

我曾尝试将属性添加到spoutconfig中,以便像这样获取最新的消息

- id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" - "blockdata" - "" - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "startOffsetTime" ref: "EarliestTime" - name: "forceFromStart" value: false

但是无论我放在startOffsetTime的参考中,它都会给出错误

Exception in thread "main" java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

I have a kafkaspout, 2 bolts to process data, 2 bolts to store processed data in mongodb

I am using apache flux to create topology where I am reading data into spout from kafka. Everything is running fine but every time I run the topology, it processes all the msgs in kafka from the start. and once it processes all the msgs, it does not wait for more msgs and crashes.

How can I make storm topology to process latest msgs only.

here is my topology file .yaml

name: "kafka-topology" components: # MongoDB mapper - id: "block-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["block"] # MongoDB mapper - id: "transaction-mapper" className: "org.apache.storm.mongodb.common.mapper.SimpleMongoMapper" configMethods: - name: "withFields" args: # The following are the tuple fields to map to a MongoDB document - ["transaction"] - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "172.25.33.191:2181" - id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: # brokerHosts - ref: "zkHosts" # topic - "blockdata" # zkRoot - "" # id - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "ignoreZkOffsets" value: flase config: topology.workers: 1 # ... # spout definitions spouts: - id: "kafka-spout" className: "org.apache.storm.kafka.KafkaSpout" constructorArgs: - ref: "spoutConfig" parallelism: 1 # bolt definitions bolts: - id: "blockprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "process-bolt.py"] # output fields - ["block"] parallelism: 1 # ... - id: "transprocessing-bolt" className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt" constructorArgs: # command line - ["python", "trans-bolt.py"] # output fields - ["transaction"] parallelism: 1 # ... - id: "mongoBlock-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "block" - ref: "block-mapper" parallelism: 1 # ... - id: "mongoTrans-bolt" className: "org.apache.storm.mongodb.bolt.MongoInsertBolt" constructorArgs: - "mongodb://172.25.33.205:27017/testdb" - "transaction" - ref: "transaction-mapper" parallelism: 1 # ... - id: "log" className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" parallelism: 1 # ... #stream definitions # stream definitions define connections between spouts and bolts. # note that such connections can be cyclical # custom stream groupings are also supported streams: - name: "kafka --> block-Processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "blockprocessing-bolt" grouping: type: SHUFFLE - name: "kafka --> transaction-processing" # name isn't used (placeholder for logging, UI, etc.) from: "kafka-spout" to: "transprocessing-bolt" grouping: type: SHUFFLE - name: "block --> mongo" from: "blockprocessing-bolt" to: "mongoBlock-bolt" grouping: type: SHUFFLE - name: "transaction --> mongo" from: "transprocessing-bolt" to: "mongoTrans-bolt" grouping: type: SHUFFLE

I have tried adding property to spoutconfig for fetching latest msgs only like this

- id: "spoutConfig" className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" - "blockdata" - "" - "myId" properties: - name: "scheme" ref: "stringMultiScheme" - name: "startOffsetTime" ref: "EarliestTime" - name: "forceFromStart" value: false

But It gives error no matter what I place in ref of startOffsetTime

Exception in thread "main" java.lang.IllegalArgumentException: Can not set long field org.apache.storm.kafka.KafkaConfig.startOffsetTime to null value

最满意答案

您需要将startOffsetTime设置为kafka.api.OffsetRequest.LatestTime。 正如你可以在https://github.com/apache/storm/tree/64af629a19a82591dbf3428f7fd6b02f39e0723f/external/storm-kafka#kafkaconfig中看到的,默认设置将转到可用的最早偏移量。

你打的例外似乎没有关系。 它看起来像一个策展人/动物园管理员不兼容。

编辑:我认为你遇到了这个问题https://issues.apache.org/jira/browse/STORM-2978 。 1.2.2应该很快出来,请在发布后尝试升级。

编辑编辑:如果你想在不升级的情况下解决它,编辑你的拓扑结构的POM,以便它包含对Zookeeper 3.4而不是3.5的依赖。

You need to set the startOffsetTime to kafka.api.OffsetRequest.LatestTime. As you can see at https://github.com/apache/storm/tree/64af629a19a82591dbf3428f7fd6b02f39e0723f/external/storm-kafka#kafkaconfig, the default setting will go to the earliest offset available.

The exception you're hitting seems unrelated. It looks like a Curator/Zookeeper incompatibility.

Edit: I think you're hitting this issue https://issues.apache.org/jira/browse/STORM-2978. 1.2.2 should be out soon, please try upgrading once it releases.

Edit edit: If you want to work around it without upgrading, edit the pom for your topology so it includes a dependency on Zookeeper 3.4 and not 3.5.