High-performance data fetching using Spring Data JPA Stream

High-performance data fetching using Spring Data JPA Stream

Java 8 has brought a number of functional goodies to the platform. One of the most useful features is Java Stream which is necessary for parallel processing in a simplified manner. Without that, I can only wish best of luck for someone using ExecutorService.

Like many other frameworks, Spring also started to utilize Java Stream and soon adopted it. The most notable Streaming feature in Spring is Data JPA Stream in my opinion. It brings a significant performance boost to data fetching particularly in databases with millions of record.

In this post, I discuss how to use Spring Boot for high-performance data fetching using Spring Data JPA Stream and pull millions of records from MySQL in a short period of time. But before that, I explain a little about the traditional approaches and their shortcomings.

Traditional large data fetching approaches

#1 Pull all data once in memory

In most of the small to medium size databases, this is the de facto approach. Just pull all the necessary data in memory and lazy loading the relations. For instance, in a small library system we can pull all the books in the memory and using a simple cashing mechanism like memoize and lazy load author object.

When it comes to large databases with millions of records, bringing everything in memory will result in out of memory exception. In such a case using a distributed caching system such as Redis or Hazelcast seems reasonable. But what if we just want to read all these data from source A (MySQL for instance) and do a simple transformation and put in source B (Elasticsearch). In this case caching is not a good solution as we aim to read data once only.

#2 Results pagination

A remedy of out of memory error in the previous approach is pagination. We can rely on the database limit, offset combination to read many rows in pages. This is a very reasonable approach. Spring Data also supports it. All we need is to change the intended repository to extend PagingAndSortingRepository repository instead of JpaRepository. Let’s look at the below example,

import com.madadipouya.jpa.model.Book;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.PagingAndSortingRepository;

public interface BookRepository extends PagingAndSortingRepository<Book, Long> {

    Page<Book> getAll(Pageable pageable);
}

The return type of our method is Page<Book> which is capable of returning the results as well as the total number of pages. However, keep in mind that knowing the total number of pages comes at a price. For each page that Hibernate executes to get a result, it executes an additional count query to calculate the total pages. This slows down the process significantly. So to speed things up we simply can change the return type to Slice. In this way, we get rid of the extra query.

But things are not ending here. Pagination has a huge disadvantage in large databases. As time passes the speed of processing data will be slower. This is the database issue that it takes a longer time to return the result of queries with a larger offset. So if time is a big constraint, this approach will not work well.

#3 Apache Spark

A reliable big data solution such as Apache Spark is a very suitable candidate to stream millions of rows easily. It’s resilient and highly scalable. The only disadvantage is setting up a Spark cluster and knowing what to do exactly. I personally highly recommend this approach and believe that the initial time invested in learning about it will pay off well in the long run as the ecosystem grows.

Streaming data using Spring Data JPA

However, if you don’t want to invest in Spark or having other constraints and/or reservation there is one last option left which is Spring Data JPA Stream which I’m going to discuss it in the following section.

In order to make Spring Data JPA Stream to work, we need to modify our Repository file to return Stream<Book> like this:

import com.madadipouya.jpa.model.Book;
import org.springframework.data.repository.Repository;
import java.util.stream.Stream;

public interface BookRepository extends Repository<Book, Long> {

    Stream<Book> getAll();
}

Pay attention that if run the code like this, it will not work. It will throw runtime exception. We need to add @Query and @QueryHints on top, like below:

import com.madadipouya.jpa.model.Book;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.jpa.repository.QueryHints;
import org.springframework.data.repository.Repository;
import javax.persistence.QueryHint;
import java.util.stream.Stream;

public interface BookRepository extends Repository<Book, Long> {
    
    @QueryHints(value = {
            @QueryHint(name = HINT_FETCH_SIZE, value = "" + Integer.MIN_VALUE),
            @QueryHint(name = HINT_CACHEABLE, value = "false"),
            @QueryHint(name = READ_ONLY, value = "true")
    })
    @Query("select b from Book")
    Stream<Book> getAll();
}

If you pay attention, you will see that we disable second level caching and hint Hibernate that the entities will be read only. In this example, we assumed that we just need to read each row once and then process it and save it somewhere else. If your requirement is different, make sure to change those settings accordingly.

Now we can start reading the database records like this:

import com.madadipouya.jpa.repository.BookRepository;
import com.madadipouya.jpa.model.Book;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.stream.Stream;

@Component
public class BookProcessor {
  
  private final BookRepository bookRepository;
  
  public BookProcessor(BookRepository bookRepository) {
    this.bookRepository = bookRepository;
  }
  
  @Transactional(readOnly = true)
  public void processBooks() {
    Stream<Book> bookStream = bookRepository.getAll();
  
    bookStream.forEach(book -> {
      // do some processing
    });
  }
}

The @Transactional annotation is necessary to make streaming work. Otherwise, Spring throws runtime exception. For this example, we pass readOnly to the annotation since we are not aiming to modify the entity.

But that’s not all of it. If we run the code for 30 minutes, depending on the heap size, we get out of memory exception. That is because Hibernate keeps the entities and their reference in memory. To fix that issue after processing each row we need to detach the entity so garbage collector can reclaim the memory.

The final version of the code will look like that:

import com.madadipouya.jpa.repository.BookRepository;
import com.madadipouya.jpa.model.Book;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.persistence.EntityManager;
import java.util.stream.Stream;

@Component
public class BookProcessor {
  
  private final BookRepository bookRepository;
  private final EntityManager entityManager;
  
  public BookProcessor(BookRepository bookRepository, EntityManager entityManager) {
    this.bookRepository = bookRepository;
    this.entityManager = entityManager;
  }
  
  @Transactional(readOnly = true)
  public void processBooks() {
    Stream<Book> bookStream = bookRepository.getAll();
  
    bookStream.forEach(book -> {
      // do some processing
      entityManager.detach(book);
    });
  }
}

In this post, we discussed high-performance data fetching using Spring Data JPA Stream API. If you are interested to know how to do batch insertion using Spring Data have a look at this post.

References

Inline/featured images credits

2348 2363 2387