Skip to content

Commit f4043f5

Browse files
committed
Add ability to distinguish between recoverable and non-recoverable exceptions when executing pre-commit hooks. Do not fail if the number of recoverable exceptions does not exceed configured limit. Added full Unit tests for the new functionality
1 parent 3451a83 commit f4043f5

File tree

5 files changed

+445
-60
lines changed

5 files changed

+445
-60
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ group = 'org.elasticsearch.kafka.indexer'
2121
version = release + '.' + build
2222
mainClassName = "org.elasticsearch.kafka.indexer.KafkaESIndexerProcess"
2323

24-
description = "kafka-elasticsearch-consumer"
24+
description = "kafka-es-consumer"
2525
def artifact_provider_url = hasProperty('artifact_provider_url') ? artifact_provider_url : System.getenv('artifact_provider_url')
2626
def artifact_provider_user = hasProperty('artifact_provider_user') ? artifact_provider_user : System.getenv('artifact_provider_user')
2727
def artifact_provider_org = hasProperty('artifact_provider_org') ? artifact_provider_org : System.getenv('artifact_provider_org')
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* @author marinapopova
3+
* Sep 25, 2015
4+
*/
5+
package org.elasticsearch.kafka.indexer.exception;
6+
7+
public class ConsumerNonRecoverableException extends Exception {
8+
9+
/**
10+
*
11+
*/
12+
public ConsumerNonRecoverableException() {
13+
// TODO Auto-generated constructor stub
14+
}
15+
16+
/**
17+
* @param message
18+
*/
19+
public ConsumerNonRecoverableException(String message) {
20+
super(message);
21+
// TODO Auto-generated constructor stub
22+
}
23+
24+
/**
25+
* @param cause
26+
*/
27+
public ConsumerNonRecoverableException(Throwable cause) {
28+
super(cause);
29+
// TODO Auto-generated constructor stub
30+
}
31+
32+
/**
33+
* @param message
34+
* @param cause
35+
*/
36+
public ConsumerNonRecoverableException(String message, Throwable cause) {
37+
super(message, cause);
38+
// TODO Auto-generated constructor stub
39+
}
40+
41+
/**
42+
* @param message
43+
* @param cause
44+
* @param enableSuppression
45+
* @param writableStackTrace
46+
*/
47+
public ConsumerNonRecoverableException(String message, Throwable cause,
48+
boolean enableSuppression, boolean writableStackTrace) {
49+
super(message, cause, enableSuppression, writableStackTrace);
50+
// TODO Auto-generated constructor stub
51+
}
52+
53+
}

src/main/java/org/elasticsearch/kafka/indexer/exception/IndexerESException.java renamed to src/main/java/org/elasticsearch/kafka/indexer/exception/ConsumerRecoverableException.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,27 @@
44
*/
55
package org.elasticsearch.kafka.indexer.exception;
66

7-
public class IndexerESException extends Exception {
7+
public class ConsumerRecoverableException extends Exception {
88

99
/**
1010
*
1111
*/
12-
public IndexerESException() {
12+
public ConsumerRecoverableException() {
1313
// TODO Auto-generated constructor stub
1414
}
1515

1616
/**
1717
* @param message
1818
*/
19-
public IndexerESException(String message) {
19+
public ConsumerRecoverableException(String message) {
2020
super(message);
2121
// TODO Auto-generated constructor stub
2222
}
2323

2424
/**
2525
* @param cause
2626
*/
27-
public IndexerESException(Throwable cause) {
27+
public ConsumerRecoverableException(Throwable cause) {
2828
super(cause);
2929
// TODO Auto-generated constructor stub
3030
}
@@ -33,7 +33,7 @@ public IndexerESException(Throwable cause) {
3333
* @param message
3434
* @param cause
3535
*/
36-
public IndexerESException(String message, Throwable cause) {
36+
public ConsumerRecoverableException(String message, Throwable cause) {
3737
super(message, cause);
3838
// TODO Auto-generated constructor stub
3939
}
@@ -44,7 +44,7 @@ public IndexerESException(String message, Throwable cause) {
4444
* @param enableSuppression
4545
* @param writableStackTrace
4646
*/
47-
public IndexerESException(String message, Throwable cause,
47+
public ConsumerRecoverableException(String message, Throwable cause,
4848
boolean enableSuppression, boolean writableStackTrace) {
4949
super(message, cause, enableSuppression, writableStackTrace);
5050
// TODO Auto-generated constructor stub

src/main/java/org/elasticsearch/kafka/indexer/jobs/ConsumerWorker.java

Lines changed: 173 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import javax.annotation.Resource;
1515

16+
import org.apache.kafka.clients.consumer.Consumer;
1617
import org.apache.kafka.clients.consumer.ConsumerConfig;
1718
import org.apache.kafka.clients.consumer.ConsumerRecord;
1819
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -24,6 +25,8 @@
2425
import org.apache.kafka.common.serialization.StringDeserializer;
2526
import org.elasticsearch.kafka.indexer.CommonKafkaUtils;
2627
import org.elasticsearch.kafka.indexer.FailedEventsLogger;
28+
import org.elasticsearch.kafka.indexer.exception.ConsumerNonRecoverableException;
29+
import org.elasticsearch.kafka.indexer.exception.ConsumerRecoverableException;
2730
import org.elasticsearch.kafka.indexer.service.IBatchMessageProcessor;
2831
import org.slf4j.Logger;
2932
import org.slf4j.LoggerFactory;
@@ -32,6 +35,10 @@
3235
public class ConsumerWorker implements AutoCloseable, IConsumerWorker {
3336

3437
private static final Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
38+
@Value("${kafka.consumer.poll.retry.limit:5}")
39+
private int pollRetryLimit;
40+
@Value("${kafka.consumer.poll.retry.delay.interval.ms:1000}")
41+
private long pollRetryIntervalMs;
3542
@Value("${kafka.consumer.source.topic:testTopic}")
3643
private String kafkaTopic;
3744
@Value("${application.id:app1}")
@@ -47,7 +54,7 @@ public class ConsumerWorker implements AutoCloseable, IConsumerWorker {
4754
private OffsetLoggingCallbackImpl offsetLoggingCallback;
4855
private IBatchMessageProcessor batchMessageProcessor;
4956

50-
private KafkaConsumer<String, String> consumer;
57+
private Consumer<String, String> consumer;
5158
private AtomicBoolean running = new AtomicBoolean(false);
5259
private int consumerInstanceId;
5360

@@ -80,57 +87,7 @@ public void run() {
8087
consumer.subscribe(Arrays.asList(kafkaTopic), offsetLoggingCallback);
8188
batchMessageProcessor.onStartup(consumerInstanceId);
8289
while (running.get()) {
83-
boolean isPollFirstRecord = true;
84-
int numProcessedMessages = 0;
85-
int numFailedMessages = 0;
86-
int numMessagesInBatch = 0;
87-
long pollStartMs = 0L;
88-
logger.debug("consumerInstanceId={}; about to call consumer.poll() ...", consumerInstanceId);
89-
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollIntervalMs));
90-
batchMessageProcessor.onPollBeginCallBack(consumerInstanceId);
91-
for (ConsumerRecord<String, String> record : records) {
92-
numMessagesInBatch++;
93-
logger.debug("consumerInstanceId={}; received record: partition: {}, offset: {}, value: {}",
94-
consumerInstanceId, record.partition(), record.offset(), record.value());
95-
if (isPollFirstRecord) {
96-
isPollFirstRecord = false;
97-
logger.info("Start offset for partition {} in this poll : {}", record.partition(), record.offset());
98-
pollStartMs = System.currentTimeMillis();
99-
}
100-
try {
101-
boolean processedOK = batchMessageProcessor.processMessage(record, consumerInstanceId);
102-
if (processedOK) {
103-
numProcessedMessages++;
104-
} else {
105-
FailedEventsLogger.logFailedEvent("Failed to process event: ",
106-
record.value(), record.offset());
107-
numFailedMessages++;
108-
}
109-
} catch (Exception e) {
110-
FailedEventsLogger.logFailedEventWithException(e.getMessage(), record.value(), record.offset(), e);
111-
numFailedMessages++;
112-
}
113-
}
114-
long endOfPollLoopMs = System.currentTimeMillis();
115-
batchMessageProcessor.onPollEndCallback(consumerInstanceId);
116-
if (numMessagesInBatch > 0) {
117-
Map<TopicPartition, OffsetAndMetadata> previousPollEndPosition = getPreviousPollEndPosition();
118-
boolean shouldCommitThisPoll = batchMessageProcessor.beforeCommitCallBack(consumerInstanceId, previousPollEndPosition);
119-
long afterProcessorCallbacksMs = System.currentTimeMillis();
120-
commitOffsetsIfNeeded(shouldCommitThisPoll, previousPollEndPosition);
121-
long afterOffsetsCommitMs = System.currentTimeMillis();
122-
exposeOffsetPosition(previousPollEndPosition);
123-
logger.info(
124-
"Last poll snapshot: numMessagesInBatch: {}, numProcessedMessages: {}, numFailedMessages: {}, " +
125-
"timeToProcessLoop: {}ms, timeInMessageProcessor: {}ms, timeToCommit: {}ms, totalPollTime: {}ms",
126-
numMessagesInBatch, numProcessedMessages, numFailedMessages,
127-
endOfPollLoopMs - pollStartMs,
128-
afterProcessorCallbacksMs - endOfPollLoopMs,
129-
afterOffsetsCommitMs - afterProcessorCallbacksMs,
130-
afterOffsetsCommitMs - pollStartMs);
131-
} else {
132-
logger.info("No messages recieved during this poll");
133-
}
90+
processPoll();
13491
}
13592
} catch (WakeupException e) {
13693
logger.warn("ConsumerWorker [consumerInstanceId={}] got WakeupException - exiting ...", consumerInstanceId, e);
@@ -148,7 +105,142 @@ public void run() {
148105
consumer.close();
149106
}
150107
}
151-
108+
109+
/**
110+
* perform one call to Kafka consumer's poll() and process all events from this poll
111+
* @throws Exception
112+
*/
113+
public void processPoll() throws Exception {
114+
logger.debug("consumerInstanceId={}; about to call consumer.poll() ...", consumerInstanceId);
115+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollIntervalMs));
116+
batchMessageProcessor.onPollBeginCallBack(consumerInstanceId);
117+
boolean isPollFirstRecord = true;
118+
int numProcessedMessages = 0;
119+
int numFailedMessages = 0;
120+
int numMessagesInBatch = 0;
121+
long pollStartMs = 0L;
122+
for (ConsumerRecord<String, String> record : records) {
123+
numMessagesInBatch++;
124+
logger.debug("consumerInstanceId={}; received record: partition: {}, offset: {}, value: {}",
125+
consumerInstanceId, record.partition(), record.offset(), record.value());
126+
if (isPollFirstRecord) {
127+
isPollFirstRecord = false;
128+
logger.info("Start offset for partition {} in this poll : {}", record.partition(), record.offset());
129+
pollStartMs = System.currentTimeMillis();
130+
}
131+
try {
132+
boolean processedOK = batchMessageProcessor.processMessage(record, consumerInstanceId);
133+
if (processedOK) {
134+
numProcessedMessages++;
135+
} else {
136+
FailedEventsLogger.logFailedEvent("Failed to process event: ", record.value(), record.offset());
137+
numFailedMessages++;
138+
}
139+
} catch (Exception e) {
140+
FailedEventsLogger.logFailedEventWithException(e.getMessage(), record.value(), record.offset(), e);
141+
numFailedMessages++;
142+
}
143+
}
144+
long endOfPollLoopMs = System.currentTimeMillis();
145+
// deprecate this method and move logic to 'beforeVommitCallback()' in those consumers that use it
146+
batchMessageProcessor.onPollEndCallback(consumerInstanceId);
147+
if (numMessagesInBatch > 0) {
148+
Map<TopicPartition, OffsetAndMetadata> previousPollEndPosition = getPreviousPollEndPosition();
149+
boolean shouldCommitThisPoll = performCallbackWithRetry(records, previousPollEndPosition);
150+
long afterProcessorCallbacksMs = System.currentTimeMillis();
151+
commitOffsetsIfNeeded(shouldCommitThisPoll, previousPollEndPosition);
152+
long afterOffsetsCommitMs = System.currentTimeMillis();
153+
exposeOffsetPosition(previousPollEndPosition);
154+
logger.info(
155+
"Last poll snapshot: numMessagesInBatch: {}, numProcessedMessages: {}, numFailedMessages: {}, " +
156+
"timeToProcessLoop: {}ms, timeInMessageProcessor: {}ms, timeToCommit: {}ms, totalPollTime: {}ms",
157+
numMessagesInBatch, numProcessedMessages, numFailedMessages,
158+
endOfPollLoopMs - pollStartMs,
159+
afterProcessorCallbacksMs - endOfPollLoopMs,
160+
afterOffsetsCommitMs - afterProcessorCallbacksMs,
161+
afterOffsetsCommitMs - pollStartMs);
162+
} else {
163+
logger.info("No messages recieved during this poll");
164+
}
165+
}
166+
167+
/**
168+
* After each poll() we call batchMessageProcessor.beforeCommitCallBack() method to do custom
169+
* business/application logic and to decide whether the current poll() offsetts should be committed or not.
170+
* If during this operation some recoverable exceptions happen - try to re-process the poll() events and
171+
* re-do the beforeCommitCallBack() operation up until the configured number of times;
172+
* Examples of recoverable exceptions could be:
173+
* --- Intermittent Timeout exceptions from Cassandra or Postgress or any other third-party called during this operation
174+
*
175+
* In such cases, in order to function properly, batchMessageProcessor.beforeCommitCallBack() method implementation has to
176+
* throw an instance of the ConsumerRecoverableException;
177+
*
178+
* If some other non-recoverable exceptions happen - an instance of some other Exception should be thrown out;
179+
* it will cause the consumer to shutdown
180+
*
181+
* WARNING!!! it is very important to make sure that the event processing (the batchMessageProcessor.processMessage() method)
182+
* is IDEMPOTENT! - meaning that it can safely re-process the same events multiple times
183+
*
184+
* @param records
185+
* @param previousPollEndPosition
186+
* @return
187+
* @throws Exception
188+
*/
189+
public boolean performCallbackWithRetry(
190+
ConsumerRecords<String, String> records,
191+
Map<TopicPartition, OffsetAndMetadata> previousPollEndPosition) throws Exception {
192+
boolean shouldCommitThisPoll = true;
193+
int retryAttempt = 0;
194+
// only catch recoverable exception and try to re-process all records from the current poll();
195+
// any other Exception thrown from this method will be propagated up and will cause the consumer to shutdown
196+
boolean keepRetrying = true;
197+
while (keepRetrying) {
198+
try {
199+
shouldCommitThisPoll = batchMessageProcessor.beforeCommitCallBack(consumerInstanceId, previousPollEndPosition);
200+
// no errors - exit this method
201+
keepRetrying = false;
202+
} catch (ConsumerRecoverableException e) {
203+
// ignore this exception - it is recoverable - if the retry limit is not reached
204+
retryAttempt++;
205+
if (retryAttempt > pollRetryLimit) {
206+
keepRetrying = false;
207+
logger.error("FAILED to re-trying poll() - reached limit of retry attempts: retryAttempt = {} out of {};" +
208+
" will throw ConsumerNonRecoverableException and shutdown; error: {}",
209+
retryAttempt, pollRetryLimit, e.getMessage());
210+
throw new ConsumerNonRecoverableException(e.getMessage() + ": after retrying failed");
211+
} else {
212+
logger.warn("Re-trying poll(); afer getting ConsumerRecoverableException: {}; retryAttempt = {} out of {};" +
213+
" will sleep for {}ms before re-trying",
214+
e.getMessage(), retryAttempt, pollRetryLimit, pollRetryIntervalMs);
215+
// sleep for a configured delay and try to re-process events from the last poll() again
216+
Thread.sleep(pollRetryIntervalMs);
217+
reprocessPollEvents(retryAttempt, records);
218+
}
219+
}
220+
}
221+
return shouldCommitThisPoll;
222+
}
223+
224+
public void reprocessPollEvents(int retryAttempt, ConsumerRecords<String, String> records) {
225+
int numProcessedMessages = 0;
226+
int numFailedMessages = 0;
227+
// do not log failed events when reprocessing
228+
for (ConsumerRecord<String, String> record : records) {
229+
try {
230+
boolean processedOK = batchMessageProcessor.processMessage(record, consumerInstanceId);
231+
if (processedOK) {
232+
numProcessedMessages++;
233+
} else {
234+
numFailedMessages++;
235+
}
236+
} catch (Exception e) {
237+
numFailedMessages++;
238+
}
239+
}
240+
logger.info("Poll re-processing snapshot, retryAttempt={}: numProcessedMessages: {}, numFailedMessages: {} ",
241+
retryAttempt, numProcessedMessages, numFailedMessages);
242+
}
243+
152244
/**
153245
* this method can be overwritten (implemented) in your own ConsumerManager
154246
* if you want to expose custom JMX metrics
@@ -218,4 +310,32 @@ public void setOffsetLoggingCallback(OffsetLoggingCallbackImpl offsetLoggingCall
218310
this.offsetLoggingCallback = offsetLoggingCallback;
219311
}
220312

313+
public int getPollRetryLimit() {
314+
return pollRetryLimit;
315+
}
316+
317+
public void setPollRetryLimit(int pollRetryLimit) {
318+
this.pollRetryLimit = pollRetryLimit;
319+
}
320+
321+
public long getPollRetryIntervalMs() {
322+
return pollRetryIntervalMs;
323+
}
324+
325+
public void setPollRetryIntervalMs(long pollRetryIntervalMs) {
326+
this.pollRetryIntervalMs = pollRetryIntervalMs;
327+
}
328+
329+
public void setConsumer(Consumer<String, String> consumer) {
330+
this.consumer = consumer;
331+
}
332+
333+
public void setConsumerInstanceId(int consumerInstanceId) {
334+
this.consumerInstanceId = consumerInstanceId;
335+
}
336+
337+
public void setKafkaTopic(String kafkaTopic) {
338+
this.kafkaTopic = kafkaTopic;
339+
}
340+
221341
}

0 commit comments

Comments
 (0)