Using spring-integration in Cuba Platform

Is it possible to use spring-integration in CUBA Platform project?

For example, I am litle rewrite original sample to unlink it from stdin.

@SpringBootApplication
public class Application {

	private static final Log LOGGER = LogFactory.getLog(Application.class);

	public static void main(final String... args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	public MqttPahoClientFactory mqttClientFactory() {
		DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		MqttConnectOptions options = new MqttConnectOptions();
		options.setServerURIs(new String[] { "tcp://localhost:1883" });
		options.setUserName("guest");
		options.setPassword("guest".toCharArray());
		factory.setConnectionOptions(options);
		return factory;
	}

	// publisher
	@Bean
	public MessageHandler mqttOutbound() {
		MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
		messageHandler.setAsync(true);
		messageHandler.setDefaultTopic("siSampleTopic");
		return messageHandler;
	}

	@Bean
	public MessageSource<?> randomIntSource() {
		MethodInvokingMessageSource source = new MethodInvokingMessageSource();
		source.setObject(new Random());
		source.setMethodName("nextInt");
		return source;
	}

	@Bean
	public IntegrationFlow someOtherFlow() {
		return IntegrationFlows.from(randomIntSource(),
				e -> e.poller(Pollers.fixedDelay(1000)))
				.transform(p -> p.toString() + " sent to MQTT")
				.handle(mqttOutbound())
				.get();
	}

	// consumer

	@Bean
	public IntegrationFlow mqttInFlow() {
		return IntegrationFlows.from(mqttInbound())
				.handle(logger())
				.get();
	}

	private LoggingHandler logger() {
		LoggingHandler loggingHandler = new LoggingHandler("INFO");
		loggingHandler.setLoggerName("siSample");
		return loggingHandler;
	}

	@Bean
	public MessageProducerSupport mqttInbound() {
		MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
				mqttClientFactory(), "#");
		adapter.setCompletionTimeout(5000);
		adapter.setConverter(new DefaultPahoMessageConverter());
		adapter.setQos(1);
		return adapter;
	}

  1. Every second publisher emit message with random int to mqtt brocker.
  2. Consumer listen to brocker and write incoming message to stdout.
  3. Brocker simply run on localhost as docker container:
docker run --rm -it -p 1883:1883 eclipse-mosquitto

Full sources on github: GitHub - aleax/spring-integration-sample-mqtt

But it builded on top of spring boot.

What I wonder to do in CUBA based project.

Some service run on middleware level. It is connected to brocker and listen to mqtt message, and handle it. For example, save them to entities with standard CUBA aproach.

Is this way possible?

Hi,

You can use spring-integration in a CUBA project.
However CUBA at the moment is not integrated into Spring Boot ecosystem, it is built on top of core Spring framework.

So you will have to use XML configuration to declare your spring-integration objects.

  1. determine spring-integration version compatible to the Spring framework version used by the CUBA you’re using (e.g. CUBA 7.1 uses Spring 5.1.6).

  2. declare dependency to the spring-integration artifact in the build.gradle

  3. declare spring-integration objects in the spring.xml file of the core module

  4. put the logic related to messaging in the core module

  5. use Authentication annotation and/or methods to authenticate the code in the endpoints that receive messages
    System Authentication - CUBA Platform. Developer’s Manual

The project below is not exactly your task, but it shows how to use other spring-* side modules in a CUBA project:

2 Likes

Thank you for your advices.
I have already wrote small working example before read your post.
I have used annotations @EnableIntegration and @Configuration for porting example from “Spring Boot” to “Cuba Platform”.

package me.aleax.cuba.samples.mqtt.core;

import com.haulmont.cuba.core.global.DataManager;
import com.haulmont.cuba.core.global.Metadata;
import com.haulmont.cuba.security.app.Authentication;
import me.aleax.cuba.samples.mqtt.entity.MqttMsg;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
import org.springframework.integration.handler.MethodInvokingMessageHandler;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import java.util.Random;

@Component(MqttSub.NAME)
@EnableIntegration
@Configuration
public class MqttSub {
    public static final String NAME = "mqtt_MqttSub";

    @Inject
    private Logger log;

    @Inject
    DataManager dataManager;

    @Inject
    Authentication authentication;

    @Inject
    private Metadata metadata;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        options.setUserName("guest");
        options.setPassword("guest".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
                mqttClientFactory(), "#");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        return adapter;
    }

    @Bean
    public IntegrationFlow mqttInFlow(){
        return IntegrationFlows.from(mqttInbound())
                .handle(messageHandler())
                .get();
    }

    public MethodInvokingMessageHandler messageHandler() {
        MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(this, "SaveMsg");
        return handler;
    }

    public void SaveMsg(String message) {
        log.info("val = " + message);

        authentication.begin();

        MqttMsg entity = metadata.create(MqttMsg.class);
        entity.setMsg(message);
        dataManager.commit(entity);

        authentication.end();
    }

    @Bean
    public MessageSource<?> randomIntSource() {
        MethodInvokingMessageSource source = new MethodInvokingMessageSource();
        source.setObject(new Random());
        source.setMethodName("nextInt");
        return source;
    }

    @Bean
    public IntegrationFlow someOtherFlow() {
        return IntegrationFlows.from(randomIntSource(),
                e -> e.poller(Pollers.fixedDelay(1000)))
                .transform(p -> p.toString() + " sent to MQTT")
                .handle(mqttOutbound())
                .get();
    }

    @Bean
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("siSampleTopic");
        return messageHandler;
    }

}

Full example on github: GitHub - aleax/cuba-spring-integration-mqtt-sample

1 Like