The thread hangs at any service I try to call between the two println statements e.g. dataManager.commit(entity). There is no console output at all, it just hangs the thread.
I managed to get the standard synchronised events working from here.
However, what I want is a way to split a list of [Entites] into multiple threads to run simultaneously. My program imports hundreds of thousands of meter readings and I want to process them in batches simultaneously to speed things up.
I was hoping to do something whereby I get all the [Meter Readings] from the database then split the list into batches and run say 10x threads simultaneously to process them. I guess what I am asking is how do I give a generated thread permissions to modify entities and then persist them to the database? Is there any examples of this?
Then, you will be able to define asynchronous event handlers as follows:
@Component("demo_AsyncOrderProcessor")
public class AsyncOrderProcessor {
@Inject
private Logger log;
@Inject
private Persistence persistence;
@Async
@EventListener
@Transactional // automatically manage transaction
@Authenticated // work on behalf of admin user
public void process(BatchProcessingEvent event) {
log.info("Processing {}", event.getSource().getInstanceName());
EntityManager em = persistence.getEntityManager();
Order order = em.merge(event.getSource());
order.setProcessed(true);
}
}
As for parallel processing, you could use ForkJoinPool with parallel stream instead of asynchronous events:
@Service(AsyncProcessingService.NAME)
public class AsyncProcessingServiceBean implements AsyncProcessingService {
@Inject
private Logger log;
@Inject
private AsyncOrderProcessor asyncOrderProcessor;
@Override
public void process(List<Order> orders) {
log.info("Start processing");
SecurityContext securityContext = AppContext.getSecurityContext(); // store current security context
ForkJoinPool threadPool = new ForkJoinPool(8);
try {
threadPool.submit(() -> {
orders.parallelStream()
.forEach(order -> {
AppContext.setSecurityContext(securityContext); // set security context for thread
asyncOrderProcessor.process(new BatchProcessingEvent(order));
});
}).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Exception during processing", e);
}
log.info("Waits until all records are processed");
}
}
In this case worker bean will be:
@Component("demo_AsyncOrderProcessor")
public class AsyncOrderProcessor {
@Inject
private Logger log;
@Inject
private Persistence persistence;
@Transactional
public void process(BatchProcessingEvent event) {
log.info("Processing {}", event.getSource().getInstanceName());
EntityManager em = persistence.getEntityManager();
Order order = em.merge(event.getSource());
order.setProcessed(true);
}
}
It enables you to wait until all records are processed. Do not forger that you should not use single transaction in multiple threads or you have to synchronize access to entity manager properly.
Also, let me mention that the framework is open source, therefore we would be happy to see your contributions - you always can create a pull request and help CUBA team to make the product better. Another useful thing would be posting an article in the Community How to’s section.