Skip to content

Commit d171efe

Browse files
committed
remove registerConsumerForJMX() method as not needed
1 parent 9b1551b commit d171efe

File tree

3 files changed

+4
-15
lines changed

3 files changed

+4
-15
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ Currently, an example implemented destination is ElasticSearch. One batch of mes
2222

2323
* starting offset positions can be specified via configuration properties
2424
* IConsumerWorker interface-based consumers allow you to :
25+
2526
** customize how offsets are exposed to other systems - like JMX/monitoring or external storage
27+
2628
** customize when offsets are committed to Kafka - allowing you to specify your own logic/rules for re-processing of potentially failed events/batchs
29+
2730
* IBatchMessageProcessor interface-based batch processors can be customized to process each batch of messages into any destination
2831

2932

src/main/java/org/elasticsearch/kafka/indexer/jobs/ConsumerWorker.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import javax.annotation.Resource;
1515

16-
import org.apache.commons.lang3.StringUtils;
1716
import org.apache.kafka.clients.consumer.ConsumerConfig;
1817
import org.apache.kafka.clients.consumer.ConsumerRecord;
1918
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -28,8 +27,6 @@
2827
import org.elasticsearch.kafka.indexer.service.IBatchMessageProcessor;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
31-
import org.springframework.beans.factory.ObjectFactory;
32-
import org.springframework.beans.factory.annotation.Autowired;
3330
import org.springframework.beans.factory.annotation.Value;
3431

3532
public class ConsumerWorker implements AutoCloseable, IConsumerWorker {
@@ -70,7 +67,6 @@ public void initConsumerInstance(int consumerInstanceId) {
7067
kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId);
7168
kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
7269
consumer = new KafkaConsumer<>(kafkaProperties);
73-
registerConsumerForJMX();
7470
logger.info(
7571
"Created ConsumerWorker with properties: consumerClientId={}, consumerInstanceName={}, kafkaTopic={}, kafkaProperties={}",
7672
consumerClientId, consumerInstanceName, kafkaTopic, kafkaProperties);
@@ -162,10 +158,6 @@ public void exposeOffsetPosition(Map<TopicPartition, OffsetAndMetadata> previous
162158
// NO OP
163159
}
164160

165-
public void registerConsumerForJMX() {
166-
// NO OP
167-
}
168-
169161
private void commitOffsetsIfNeeded(boolean shouldCommitThisPoll, Map<TopicPartition, OffsetAndMetadata> partitionOffsetMap) {
170162
try {
171163
if (shouldCommitThisPoll) {

src/main/java/org/elasticsearch/kafka/indexer/jobs/IConsumerWorker.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,7 @@ public interface IConsumerWorker extends Runnable {
1717
* @param previousPollEndPosition
1818
*/
1919
public void exposeOffsetPosition(Map<TopicPartition, OffsetAndMetadata> previousPollEndPosition);
20-
21-
/**
22-
* this method can be overwritten in a custom implementation of the Consumer -
23-
* to register this consumer with a JMX exporting service
24-
*/
25-
public void registerConsumerForJMX();
26-
20+
2721
/**
2822
* Creates Kafka properties and an instance of a KafkaConsumer
2923
*

0 commit comments

Comments
 (0)