Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieve messages from kafka topic #66

Open
chan71 opened this issue Feb 27, 2019 · 1 comment
Open

Retrieve messages from kafka topic #66

chan71 opened this issue Feb 27, 2019 · 1 comment

Comments

@chan71
Copy link

chan71 commented Feb 27, 2019

To pump the messages in the DH kafka queue to a spark instance, is there a way to access the notifications sent to DH can be retried from a kafka topic? The topics listed in kafka are;

  • __consumer_offsets
  • request_topic
  • response_topic_AkLAqJAJ
  • response_topic_auth_AkLAqJAI
  • subscription_update
    I assume that request_topic is where the messages are being sent. When kafka-node module is used in a nodejs client, I keep getting the following error.
$ DEBUG=* node index.js
Wed Feb 27 2019 14:43:01 GMT+1100 (AEDT) [INFO   ] [kafka-test] initializing consumer..
  kafka-node:KafkaClient Connect attempt 1 +0ms
  kafka-node:KafkaClient Trying to connect to host: localhost port: 9092 +3ms
  kafka-node:KafkaClient createBroker localhost 9092 +1ms
Wed Feb 27 2019 14:43:01 GMT+1100 (AEDT) [INFO   ] [kafka-test] waiting connection...
Wed Feb 27 2019 14:43:01 GMT+1100 (AEDT) [INFO   ] [kafka-test] initializing producer..
  kafka-node:KafkaClient Connect attempt 1 +8ms
  kafka-node:KafkaClient Trying to connect to host: localhost port: 9092 +0ms
  kafka-node:KafkaClient createBroker localhost 9092 +0ms
Wed Feb 27 2019 14:43:01 GMT+1100 (AEDT) [INFO   ] [kafka-test] waiting connection...
  kafka-node:KafkaClient Sending versions request to localhost:9092 +7ms
  kafka-node:KafkaClient broker socket connected {"host":"localhost","port":"9092"} +3ms
  kafka-node:KafkaClient connected to socket, trying to load initial metadata +2ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
  kafka-node:KafkaClient waitUntilReady [BrokerWrapper localhost:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
  kafka-node:KafkaClient Sending versions request to localhost:9092 +1ms
  kafka-node:KafkaClient broker socket connected {"host":"localhost","port":"9092"} +0ms
  kafka-node:KafkaClient connected to socket, trying to load initial metadata +1ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +0ms
  kafka-node:KafkaClient waitUntilReady [BrokerWrapper localhost:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
  kafka-node:KafkaClient Received versions response from localhost:9092 +8ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":6,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":0,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
  kafka-node:KafkaClient broker is now ready +1ms
  kafka-node:KafkaClient Received versions response from localhost:9092 +8ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":6,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":1},"deleteTopics":{"min":0,"max":1,"usable":false},"describeConfigs":{"min":0,"max":0,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
  kafka-node:KafkaClient broker is now ready +0ms
  kafka-node:KafkaClient updating metadatas +17ms
  kafka-node:Consumer consumer ready +0ms
  kafka-node:KafkaClient updating metadatas +6ms
Wed Feb 27 2019 14:43:01 GMT+1100 (AEDT) [INFO   ] [kafka-test] producer ready
  kafka-node:KafkaClient compressing messages if needed +1ms
  kafka-node:KafkaClient createBroker 9419344c8533 9092 +3ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
  kafka-node:KafkaClient waitUntilReady [BrokerWrapper 9419344c8533:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
  kafka-node:KafkaClient updating metadatas +5ms
  kafka-node:KafkaClient createBroker 9419344c8533 9092 +2ms
  kafka-node:KafkaClient waitUntilReady [BrokerWrapper 9419344c8533:9092 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +2ms
  kafka-node:Consumer connection closed +9ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 9419344c8533:9092 +1s
  kafka-node:KafkaClient createBroker 9419344c8533 9092 +0ms
  kafka-node:KafkaClient consumer-node reconnecting to 9419344c8533:9092 +1ms
  kafka-node:KafkaClient createBroker 9419344c8533 9092 +0ms
  kafka-node:Consumer connection closed +3ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 9419344c8533:9092 +1s
  kafka-node:KafkaClient createBroker 9419344c8533 9092 +0ms
  kafka-node:KafkaClient consumer-node reconnecting to 9419344c8533:9092 +0ms
  kafka-node:KafkaClient createBroker 9419344c8533 9092 +0ms
  kafka-node:Consumer connection closed +3ms

Changing the suggested environment variable as stated in here worked but the DH admin and other components were broken. Is there a way to do this without writing a plugin to receive the message from the topic and putting them back in a different kafka instance?

@chan71
Copy link
Author

chan71 commented Feb 27, 2019

Debugging the kafka-node source code revealed that loadMetadataForTopics() method returns containerid (e.g. 9419344c8533) and 9092 to which the subsequent calls tries to connect to. Probably the master node returns a worker node and port for the client to connect to send the messages. The attached screenshot will show the exact location and debug info.

screen shot 2019-02-27 at 10 16 52 pm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant