Back & ArchitectureTutorials

R2DBC (Reactive Relational Database Connectivity)

These days, reactive programming where data is sent and received asynchronously, is becoming more and more popular. That is why the need arises to execute database requests that are not waiting to be resolved, in order to obey this paradigm.

R2DBC provides a solution to the aforementioned need, since it is based on Reactive Streams using Publisher and Subscriber to allow access to data asynchronously. The use of the already known features of Spring for reactive programming, facilitates its integration when compared to other more «rustic» alternatives.

General configuration

The configuration, as usual with Spring and its starter artifacts, is quite simple.

Simply start by indicating to use the appropriate dependency:

R2DBC dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>

In addition to indicating the dependency of the driver for the database you want to use, among which are currently:

Once the driver dependency has been added, and following the options indicated in their repositories, you only have to add the connection data with your DB via application.yml or as a Java configuration, as desired.

YAML Configuration

spring:
  r2dbc:
    url: r2dbc:<driver>://<host>:<port>/<database_name>
    username: <username>
    password: <password>

Changes

It is natural that, being this a different paradigm, there are things that must be changed, and not only at the level of dependencies: Also in the code itself.

Entity definition

The usual javax annotations to indicate columns, tables, table identifiers, etc., are no longer the ones to use. Now you have to make use of the Spring ones, which are defined in a slightly different way, as we will see below.

Entity example

import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
 
import lombok.Data;
 
@Data
@Table("SCHEMA.EXAMPLE_ENTITY")
public class ExampleEntity {
 
    @Id
    @Column("ID")
    private Long id;
     
    @Column("COLUMN_NAME")
    private String columnName;
}

The above example would connect to the EXAMPLE_ENTITY table with the SCHEMA and would have two attributes:

  1. ID: a unique identifier.
  2. COLUMN_NAME.

Of course, if the attribute names fully match those defined previously in the database, you should not use these annotations. The same happens with the table and its schema, in case it is defined at the database connection level in the configuration.

Repository definition

The repositories are defined in the same way that we are used to using Spring and JPA, but changing the extension to be used.

Repositories with Spring Data

@Repository
public interface ExampleEntityRepository extends ReactiveCrudRepository<ExampleEntity, Long>{
}

By default, this interface would implement all known methods, queryable through its API.

Use cases

Using ReactiveCrudRepository

Let’s assume that we have defined the previously-commented repository, and now we want to query a table record by its identifier.

Query by ID

@Slf4j
@Service
public class ExampleEntityServiceImpl implements ExampleEntityService {
 
    @Autowired
    private ExampleEntityRepository exampleEntityRepository;
     
    @Autowired
    private ExampleEntityMapper exampleEntityMapper;
 
    @Override
    public Mono<ExampleEntityDTO> getById(Long id) {
        return exampleEntityRepository.findById(id)
                .map(e -> exampleEntityMapper.mapToDto(e));
    }
}

The previous example is using both the previously-defined repository, and a mapper that will be in charge of transforming the entity into a DTO, as dictated by good practices.

Batch execution

There are cases where you have to make a huge number of simultaneous requests to the database and, for the sake of efficiency, you decide to do it in batches, each of them with N requests. With synchronous connections, one of the ways to do this, is through Statements, and it is also possible with R2DBC.

To do this, it is necessary to configure the database connection and the client that consumes it in the following way:

R2DBC configuration in Java

@Configuration
public class DBConfig extends AbstractR2dbcConfiguration {
     
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return ConnectionFactories.get(ConnectionFactoryOptions.parse("r2dbc:<driver>://<host>:<port>/<database_name>")
                .mutate()
                .option(ConnectionFactoryOptions.USER, "<user>")
                .option(ConnectionFactoryOptions.PASSWORD, "<password>")
                .build());
    }
     
    @Bean
    public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
        return DatabaseClient.builder()
                .connectionFactory(connectionFactory)
                .namedParameters(true)
                .build();
    }
}

The next step would be to define a @Repository that would make use of this connection defined in configuration, where the desired operations will be implemented. In this case, we are going to show you an example of insertion of multiple records into a table.

R2DBC Statement Repository

@Repository
public class ExampleEntityStatementRepository {
    public static final String SQL_STATEMENT = "INSERT INTO SCHEMA.EXAMPLE_ENTITY(ID, COLUMN_VALUE) VALUES(?,?)";
     
    public Flux<?> insertMultiple(List<ExampleEntityDTO> insertList) {
        return databaseClient.inConnectionMany(connection -> {
             
            Statement statement = connection.createStatement(SQL_STATEMENT);
            insertList.forEach(exampleEntity -> {
                addStatement(statement, exampleEntity);
            });
             
            return Flux.from(statement.execute());
        });
    }
     
    private void addStatement(Statement statement, ExampleEntityDTO exampleEntity) {
        statement
        .bind(0, exampleEntity.getId())
        .bind(1, exampleEntity.getColumnName())
        .add();
    }
}

The previous code prepares an asynchronous collection of Statements that can launched later, as in the following example:

Statement Subscription

@Service
public class ExampleEntityServiceImpl implements ExampleEntityService {
 
    @Autowired
    private ExampleEntityStatementRepository exampleEntityStatementRepository;
 
    @Override
    public void insertList(List<ExampleEntityDTO> insertList) {
       exampleEntityStatementRepository.insertMultiple(insertList).subscribe();
    }
}

The previous service, when consumed, will generate a Statement that will be executed asynchronously, instead of making the microservice wait to resolve that request for insertion of N elements.

Connection Pool

Logically, at this point it is convenient to ask how to configure a connection pool to launch several threads managed by R2DBC, thus taking advantage of the power of the machine and the database, in case it is correctly configured to be optimal with multiple threads.

To do this, you simply have to indicate in the connection itself that you are using a pool, as follows:

R2DBC connection pool configuration

@Override
@Bean
public ConnectionFactory connectionFactory() {
    return ConnectionFactories.get(ConnectionFactoryOptions.parse("r2dbc:pool:<driver>://<host>:<port>/<database_name>")
        .mutate()
        .option(ConnectionFactoryOptions.USER, "<user>")
        .option(ConnectionFactoryOptions.PASSWORD, "<password>")
        .option(INITIAL_SIZE, 10)
        .option(MAX_SIZE, 20)
        .build());
}

As you can see, just indicating pool would be enough (it can also be done through an option such as username and password parameters). Additionally, more parameters of interest can also be configured, such as the minimum and maximum number of threads that the connection will have configured, which in the previous case would be 10 and 20 respectively.

For more information, you can visit their documentation.

Observations

  • R2DBC facilites asynchronous integration with certain relational databases.
  • It makes all the sense in the world to make use of this specification when you want to avoid high times in your service, but bear in mind that the requests sent to the database are managed by the database itself.
    • This is why it is convenient to configure it correctly if you also want to reduce the query and insertion times.
    • It can even be counterproductive, given a bad configuration of THREADS, and cause the times to increase at the database level.
  • There are problems doing SELECT when making internal nested queries, which can make some complex developments difficult.
  • It is convenient to integrate it with a service that is designed to use reactivity, with webflux, messaging broker, etc.

References

✍🏻 Author(s)

One thought on “R2DBC (Reactive Relational Database Connectivity)

Leave a Reply