Asyncronous Events hanging

Trying to figure out how to create Asyncronous events.

I have the following as a component:

 @Async
@EventListener
public void handleNewIcpEvent(IcpCreatedEvent icpCreatedEvent) {
    System.out.println("Event Created");
    ScheduledExecutorService executor = Executors
            .newSingleThreadScheduledExecutor();
    executor.submit(new SecurityContextAwareRunnable(() -> {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Before commit");
        dataManager.commit(icpCreatedEvent.getIcp());
        System.out.println("after commit");
    }));

}

Which is being called through the entity listener:

 @Override
public void onAfterInsert(Icp entity, Connection connection) {
    //registryClientService.updateIcpAddress(entity);
        publisher.publishEvent(new IcpCreatedEvent(entity, "String"));
}

The thread hangs at any service I try to call between the two println statements e.g. dataManager.commit(entity). There is no console output at all, it just hangs the thread.

any ideas of what I am doing wrong?

Hi!

Do you want to publish events from core to REST? Do you have any problems with synchronous event? Does it work?

If you try to publish events between tiers then Spring events will not help you. I’d recommend that you take a look at the global-events addon.

Hi Yuriy,

I managed to get the standard synchronised events working from here.

However, what I want is a way to split a list of [Entites] into multiple threads to run simultaneously. My program imports hundreds of thousands of meter readings and I want to process them in batches simultaneously to speed things up.

I was hoping to do something whereby I get all the [Meter Readings] from the database then split the list into batches and run say 10x threads simultaneously to process them. I guess what I am asking is how do I give a generated thread permissions to modify entities and then persist them to the database? Is there any examples of this?

In order to enable @Async edit your spring.xml file in core module:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/beans/spring-task-4.3.xsd">

    <!-- Annotation-based beans -->
    <context:component-scan base-package="com.company.demo"/>

    <!-- Enable @Async -->

    <task:annotation-driven />
    <task:executor id="demo_AsyncExecutor"
                   pool-size="7-42"
                   queue-capacity="11"/>
</beans>

Then, you will be able to define asynchronous event handlers as follows:

@Component("demo_AsyncOrderProcessor")
public class AsyncOrderProcessor {
    @Inject
    private Logger log;
    @Inject
    private Persistence persistence;

    @Async
    @EventListener
    @Transactional // automatically manage transaction
    @Authenticated // work on behalf of admin user
    public void process(BatchProcessingEvent event) {
        log.info("Processing {}", event.getSource().getInstanceName());

        EntityManager em = persistence.getEntityManager();

        Order order = em.merge(event.getSource());

        order.setProcessed(true);
    }
}

See the complete demo project: GitHub - cuba-labs/async-spring-events: Spring Async events with CUBA Platform

1 Like

As for parallel processing, you could use ForkJoinPool with parallel stream instead of asynchronous events:

@Service(AsyncProcessingService.NAME)
public class AsyncProcessingServiceBean implements AsyncProcessingService {

    @Inject
    private Logger log;
    @Inject
    private AsyncOrderProcessor asyncOrderProcessor;

    @Override
    public void process(List<Order> orders) {
        log.info("Start processing");

        SecurityContext securityContext = AppContext.getSecurityContext(); // store current security context

        ForkJoinPool threadPool = new ForkJoinPool(8);
        try {
            threadPool.submit(() -> {
                orders.parallelStream()
                        .forEach(order -> {
                            AppContext.setSecurityContext(securityContext); // set security context for thread

                            asyncOrderProcessor.process(new BatchProcessingEvent(order));
                        });
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Exception during processing", e);
        }

        log.info("Waits until all records are processed");
    }
}

In this case worker bean will be:

@Component("demo_AsyncOrderProcessor")
public class AsyncOrderProcessor {
    @Inject
    private Logger log;
    @Inject
    private Persistence persistence;

    @Transactional
    public void process(BatchProcessingEvent event) {
        log.info("Processing {}", event.getSource().getInstanceName());

        EntityManager em = persistence.getEntityManager();

        Order order = em.merge(event.getSource());

        order.setProcessed(true);
    }
}

It enables you to wait until all records are processed. Do not forger that you should not use single transaction in multiple threads or you have to synchronize access to entity manager properly.

2 Likes

Awesome!

Thanks for that. Should probably add this all to the docs.

Cheers

2 years have past since this topic and I run into the same problem.

Please add this to application events documentation. There is no mentioning there whatsoever regarding @Async or how to enable it.

Hi @tom.monnier,

This is a standard functionality of Spring Framework, not directly related to CUBA.

Also, let me mention that the framework is open source, therefore we would be happy to see your contributions - you always can create a pull request and help CUBA team to make the product better. Another useful thing would be posting an article in the Community How to’s section.

Regards,
Aleksey