Kafka Exception Handler in Spring boot

Shiksha Engineering
4 min readOct 4, 2023

Author : Rohit Kumar Upadhyay

Problem statement

In Spring boot services, we handle Exceptions by using use the @ControllerAdvice and @ExceptionHandler annotations to handle exceptions across applications. So that we have a centralized Exception handler for our application. Spring boot automatically handles exceptions thrown anywhere in the application and maps them with API response. However, this approach only works within the context of the dispatcher servlet and therefore does not apply to Kafka consumers, which are outside of this context. We need to implement a solution for handling Kafka consumer exceptions that internally uses the same centralized exception handling methods defined in the CustomExceptionHandler class.

Let’s understand the problem with an example.

We have a consumer in our application

@Service
public class Consumer {

@KafkaListener(topics ="exceptionHandler", groupId ="exceptionHandlerGroup")
public void listen(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws Exception
{
throw new RuntimeException();
}
}

Our centralized Exception handler class to handle Exceptions in Rest API.

@Configuration
@ControllerAdvice
public class CustomExceptionHandler {

@ExceptionHandler(RuntimeException.class)
@ResponseBody
public ResponseEntity<?> handleRunTimeException(RuntimeException exception) {

logException(exception);
return new ResponseEntity<>("response message", HttpStatus.INTERNAL_SERVER_ERROR);
}

@ExceptionHandler(HttpRequestMethodNotSupportedException.class)
@ResponseBody
public ResponseEntity<?> handleHttpRequestMethodNotSupportedException(HttpServletRequest request,
Exception exception) {

logException(exception);

return new ResponseEntity<>("response message", HttpStatus.METHOD_NOT_ALLOWED);
}
}

Since we are using @ControllerAdvice and @ExceptionHandler annotations to handle exceptions across applications. this approach only works within the context of the dispatcher servlet and therefore does not apply to Kafka consumers, which are outside of this context

So we have to implement a solution that handles exceptions of consumers by using our CustomExceptionHandler class.

Solution

First, we will develop a component to load all the exceptions contained within the CustomExceptionHandler class into a Map structure. In this Map, the exception class will serve as the key, and the corresponding method name along with its reference will function as the associated value. This will enable us to access the methods within our consumer exception handler class for execution.

@Component
public class KafkaExceptionHandler {

@PostConstruct
private void init() throws ClassNotFoundException {

logger.error("KafkaExceptionHandler init method started");
ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
scanner.addIncludeFilter(new AnnotationTypeFilter(ControllerAdvice.class));
Set<BeanDefinition> definitions = scanner.findCandidateComponents("com.test.exception.*");
try {
for (BeanDefinition bd : definitions) {
final Class<?> clazz = Class.forName(bd.getBeanClassName());
final Method[] methods = clazz.getMethods();
loadExceptionMethodMapping(methods);
}
}
catch (Exception e) {
logger.error("getting exception while loading exception classes for kafka bean :{}, exception:{}",
definitions, e);
}
logger.error("URI and method map for which user validation required {}", exceptionMethodMapping);
}

private void loadExceptionMethodMapping(Method[] methods) {
for (final Method m : methods) {
ExceptionHandler rm;
try {
if (m.isAnnotationPresent(ExceptionHandler.class)) {
rm = m.getAnnotation(ExceptionHandler.class);
if (rm != null) {
for(Class<? extends Throwable> val :rm.value()) {
exceptionMethodMapping.put(val.getName(), m);
}

}
}

} catch (Exception e) {
logger.error("getting exception while loading all exception method", e);

}

}

}
}

@Component will make sure that Spring loads this class as a bean in the IOC container and @PostConstruct make sure that this method will be executed as soon as the bean is loaded into the IOC container.
This method uses reflection to scan all the methods present in our centralized exception handler package and load them into Map exceptionMethodMapping.

Now we will use AOP to detect the exceptions thrown in our consumers and trigger a method in which we pass exception as a param and make sure exceptionMethodMapping var is available to provide methods to be executed.

We will use the same class KafkaExceptionHandler.class for AOP. To implement AOP(it is a feature developed by Spring Boot for all the cross-cutting concerns in the App) we will use annotation @Aspect on a class level and annotation

@AfterThrowing( pointcut = "@annotation(org.springframework.kafka.annotation.KafkaListener)", throwing = "ex")

on a method level.
throwing will give us an exception as a parameter.
Pointcut is used to tell Spring boot to monitor methods which are annotated with this annotation.
@AfterThrowing is used to tell Spring Boot to monitor methods only for exceptions.

@Aspect
@Component
public class KafkaExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(KafkaExceptionHandler.class);

@Value("${server.servlet.context-path}")
private String context;

@Autowired
private ApplicationContext applicationContext;

private static final Map<Object, Method> exceptionMethodMapping = new HashMap<>();

@AfterThrowing( pointcut = "@annotation(org.springframework.kafka.annotation.KafkaListener)", throwing = "ex")
public void afterThrowingExceptionAdvice(Throwable ex)
{

logger.error("exception thrown in consumer :{}", ex);
try {

Method excMethod = exceptionMethodMapping.get(ex.getClass().getName());
if(excMethod != null) {
excMethod.invoke(applicationContext.getBean(excMethod.getDeclaringClass()),ex);
}
else {
DefaultLogException.logException(ex);;
}
} catch (Exception exe) {
logger.error("exception thrown in consumer exception handler exce :{}",exe);
DefaultLogException.logException(ex);
}

}

@PostConstruct
private void init() throws ClassNotFoundException {

logger.error("KafkaExceptionHandler init method started");
ClassPathScanningCandidateComponentProvider scanner = new ClassPathScanningCandidateComponentProvider(false);
scanner.addIncludeFilter(new AnnotationTypeFilter(ControllerAdvice.class));
Set<BeanDefinition> definitions = scanner.findCandidateComponents("com.test.exception.*");
try {
for (BeanDefinition bd : definitions) {
final Class<?> clazz = Class.forName(bd.getBeanClassName());
final Method[] methods = clazz.getMethods();
loadExceptionMethodMapping(methods);
}
}
catch (Exception e) {
logger.error("getting exception while loading exception classes for kafka bean :{}, exception:{}",
definitions, e);
}
logger.error("URI and method map for which user validation required {}", exceptionMethodMapping);
}

private void loadExceptionMethodMapping(Method[] methods) {
for (final Method m : methods) {
ExceptionHandler rm;
try {
if (m.isAnnotationPresent(ExceptionHandler.class)) {
rm = m.getAnnotation(ExceptionHandler.class);
if (rm != null) {
for(Class<? extends Throwable> val :rm.value()) {
exceptionMethodMapping.put(val.getName(), m);
}

}
}

} catch (Exception e) {
logger.error("getting exception while loading all exception method", e);

}

}

}
}

afterThrowingExceptionAdvice method will be triggered whenever any exception occurs inside consumer execution and pass Exception as a param inside throwable obj.
We will get the exception class name from the throwable object and get the method reference from the map.

Method excMethod = exceptionMethodMapping.get(ex.getClass().getName());

Now to execute this method we need an instance or bean of the class having this method on which we will call the method(instance methods are only accessible by instance obj of that class).

excMethod.invoke(applicationContext.getBean(excMethod.getDeclaringClass()),ex);

We will get the bean from applicationContext since this is an IOC container (default in spring boot ).

Flow diagram

By implementing this, we can effectively handle Kafka consumer exceptions in our spring boot application architecture. This will improve the reliability and robustness of our microservices system.

--

--