Kafka Client is a vanilla java library that makes it easy to consume data from kafka, a list of features:
- Parallel consuming
- Consuming retry
- Consuming failover
- Designed to be easy to mock and test
- Designed to support slow consumers without kafka re balancing
- Designed to high throughput usage
- Individual record consuming
- Batch records consuming
- Frameworkless, but easily configurable to some when wanted
- Commits managed for you based on behavior
- Low CPU usage
compile("com.mageddo.rapids-kafka-client:rapids-kafka-client:2.0.6")
ConsumerConfig.<String, String>builder()
.prop(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(GROUP_ID_CONFIG, "stocks")
.topics("stock_changed")
.recoverCallback(ctx -> {
// here you can send the message to another topic, send a SMS, etc.
log.info("status=recovering, value={}", ctx.record().value());
})
.callback((ctx, record) -> {
log.info("status=consumed, value={}", record.value());
})
.build()
.consume()
.waitFor();
public static void main(String[] args) {
final ConsumerStarter consumerStarter = ConsumerStarter.start(defaultConfig(), Arrays.asList(
new StockConsumer() // and many other consumers
));
consumerStarter.waitFor();
// consumerStarter.stop();
}
static ConsumerConfig defaultConfig() {
return ConsumerConfig.builder()
.prop(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
.prop(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.prop(GROUP_ID_CONFIG, "my-group-id")
.build();
}
static class StockConsumer implements Consumer {
ConsumeCallback<String, String> consume() {
return (callbackContext, record) -> {
System.out.printf("message from kafka: %s\n", record.value());
};
}
@Override
public ConsumerConfig<String, String> config() {
return ConsumerConfig
.<String, String>builder()
.topics("stocks_events")
.consumers(1)
.callback(this.consume())
.build();
}
}