Persistence : howto handle large data sets

Helllo Cuba Team,

I would like to know if there is recommendation howto handle large data sets?

Optimizations can take place on different levels, but even if you perform an optimal query (with limiting parameters) and receive a critically large result set, you want parallel processing in segments / batches.

While reading EclipseLink’s documentation I found that EclipseLink recommends using Streamed Cursors when handling large resultsets.

According to the documentation : Cursored streams provide the ability to read back a query result set from the database in manageable subsets, and to scroll through the result set stream.

Your paging mechanism in the generic entity browser has a similar behaviour, except the transaction handling.

Imagine you want to implement an scheduled task which loads a large amount (> 250.000) of entities (like customers) for reason of performing some internal logic (like calculating the order statistics etc).

What would you suggest to master the challenge with large amounts of data?

1 Like

Hi Mike,

This is an interesting question, and I have created a sample application demonstrating two approaches.

The app is here: GitHub - cuba-labs/large-dataset: How to handle large data sets
It works with local PostgreSQL database, so if you want to run it you should have PostgreSQL installed.

It has two entities: Order and Customer, and a service that calculates volume of sales by months.

If you open Orders browser, you can see three buttons:

  • “Generate data”: creates 10,000 customers and 500,000 orders for them in the database

  • “Process method 1” and “Process method 2” calculate volumes using the first and the second approaches (see below).

First approach

Firstly we load a list of identifiers of all objects we are going to process - all orders in this case. It can be done in a single transaction and put into a list in memory because identifiers are small and the total size of loaded data even for large datasets (500K records) can easily fit into memory.

private List<UUID> loadIdList() {
    List<UUID> idList;
    try (Transaction tx = persistence.createTransaction(new TransactionParams().setReadOnly(true))) {
        TypedQuery<UUID> query = persistence.getEntityManager().createQuery(
                "select o.id from sample$Order o", UUID.class);
        idList = query.getResultList();
        tx.commit();
    }
    return idList;
}

Then we process objects in batches, splitting the list of IDs and loading batches of objects in separate transactions.

public Map<String, BigDecimal> process1() {
    log.info("Processing method 1");

    List<UUID> idList = loadIdList();

    HashMap<String, BigDecimal> result = new HashMap<>();

    AtomicInteger counter = new AtomicInteger();
    idList.stream()
            .collect(Collectors.groupingBy(o -> counter.getAndIncrement() / 100))
            .forEach((chunk, chunkIds) -> {
                try (Transaction tx = persistence.createTransaction(new TransactionParams().setReadOnly(true))) {
                    TypedQuery<Order> query = persistence.getEntityManager().createQuery(
                            "select o from sample$Order o where o.id in ?1", Order.class);
                    query.setParameter(1, chunkIds);
                    List<Order> orders = query.getResultList();
                    for (Order order : orders) {
                        processOrder(order.getDate(), order.getAmount(), result);
                    }
                    tx.commit();
                }
                log.info("Processed orders: " + chunk * 100);
            });

    return result;
}

On my laptop with default settings for Tomcat JVM and for PostgreSQL database the processing of 500,000 records is done in 10…12 seconds.

Second approach

Here I use EclipseLink’s CursoredStream in a single transaction.

public Map<String, BigDecimal> process2() {
    log.info("Processing method 2");

    HashMap<String, BigDecimal> result = new HashMap<>();

    try (Transaction tx = persistence.createTransaction(new TransactionParams().setReadOnly(true))) {
        UnitOfWork unitOfWork = persistence.getEntityManager().getDelegate().unwrap(UnitOfWork.class);

        ReadAllQuery query = new ReadAllQuery(Order.class);
        query.useCursoredStream();
        CursoredStream stream = (CursoredStream) unitOfWork.executeQuery(query);

        int i = 0;
        while (!stream.atEnd()) {
            Order order = (Order) stream.read();
            processOrder(order.getDate(), order.getAmount(), result);
            stream.releasePrevious();

            i++;
            if (i % 100 == 0) {
                log.info("Processed orders: " + i);
            }
        }
        stream.close();

        tx.commit();
    }

    return result;
}

Unfortunately, the processing is stopped on about 200,000 records with “GC overhead limit exceeded” exception, which indicates that too much data is loaded and not released. Perhaps I’m doing something wrong, would be great if someone gives a hint for how to use it correctly.

2 Likes

Hello Konstantin,

thank you very much for your effort, you’ve discovered pretty cool approaches.

You can fix your second approach by adding

            stream.releasePrevious();
    public Map<String, BigDecimal> process2() {
        log.info("Processing method 2");

        HashMap<String, BigDecimal> result = new HashMap<>();

        try (Transaction tx = persistence.createTransaction(new TransactionParams().setReadOnly(true))) {
            UnitOfWork unitOfWork = persistence.getEntityManager().getDelegate().unwrap(UnitOfWork.class);

            ReadAllQuery query = new ReadAllQuery(Order.class);
            query.useCursoredStream();
            CursoredStream stream = (CursoredStream) unitOfWork.executeQuery(query);

            int i = 0;
            while (!stream.atEnd()) {
                Order order = (Order) stream.read();
                processOrder(order.getDate(), order.getAmount(), result);
                stream.releasePrevious();

                i++;
                if (i % 100 == 0) {
                    log.info("Processed orders: " + i);
                }
                // https://www.eclipse.org/eclipselink/api/2.1/org/eclipse/persistence/queries/CursoredStream.html#releasePrevious()

                // release all objects read in so far. This should be performed when reading in a large collection of objects in order to preserve memory.
                stream.releasePrevious();
            }
            stream.close();

            tx.commit();
        }

        return result;
    }

I started with CursoredStream approach but then I decided to use the splitting because I need smaller transaction context.

Hi Mike,

Thanks for your comments, but stream.releasePrevious() is already present in my code. Probably it is not enough.

Oh sorry, this was my mistake. I’ve completely overlooked it.

I had the same problem within a similar context and could fix it with releasePrevious.

May I found in ECLIPSE WIKI USING ADVANCED QUERY API (ELUG) the reason for the facing java.lang.OutOfMemoryError: GC overhead limit exceeded.

Note: The releasePrevious message is optional. This releases any previously read objects and frees system memory. Even though released objects are removed from the cursored stream storage, **they may remain in the identity map**.

Yes, probably this approach is not for really large datasets. Anyway, the shorter are transactions - the better for overall database performance.

This is not really acceptable. When performing batch processing with mutation it needs to be in a single transaction, in case of rollback, or you need to add lots of other state in order to manually manage the transaction (restart, etc.).

often it needs to be implemented with a native SQL where the update command modifies the records, or a stored procedure.

it seems the streaming query support is broken, as it should not run out of memory

The problem is that the ORM wants to try and track the changes to be efficient, and ensure correct locking/versioning semantics.

For large result set processing, this sort of needs to be disabled (or the ‘object map/cache’ needs to be a persistable/db like structure

Hi, I managed to fix processing method 2 by setting dontMaintainCache on query:

query.useCursoredStream();
query.dontMaintainCache();

From Using Advanced Query API (ELUG)

When you execute a batch-type operation, use the dontMaintainCache method with a cursored stream. A batch operation performs simple operations on large numbers of objects and then discards the objects. Cursored streams create the required objects only as needed, and the dontMaintainCache ensures that these transient objects are not cached.

2 Likes