Hello, in Spring boot, I was able to setup my Consumer Config using below syntax. The value from application.yaml is loaded automatically and then SSL config is added manually.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
public Map<String, Object> consumerConfig(){
Map<String, Object> kafkaAutoConfig = ((DefaultKafkaConsumerFactory<String, String>) consumerFactory).getConfigurationProperties();
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.putAll(kafkaAutoConfig);
consumerConfig.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,10000);
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,12000);
consumerConfig.compute(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, (k, v) -> (String)v);
consumerConfig.compute(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, (k,v) -> (String)v);
consumerConfig.compute(SslConfigs.SSL_KEY_PASSWORD_CONFIG, (k,v) -> (String)v);
return consumerConfig;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig()));
return factory;
}
}
However, when I try to do the same in Cubam I get this error:
Error creating bean with name 'kafkaConsumer Config': Unsatisfied dependency expressed through field 'consumerFactory'
. Is there a way to achieve this in Cuba, It will make our code more concise.
Thanks!