R2DBC (Reactive Relational Database Connectivity)
A día de hoy se está popularizando cada vez más la programación reactiva, donde los datos se envían y reciben de forma asíncrona. Es por ello que surge la necesidad de ejecutar peticiones a base de datos que no queden a la espera de ser resueltas para obedecer al paradigma anterior.
R2DBC pone solución a la necesidad comentada, pues se basa en los Reactive Streams haciendo uso de Publisher y Subscriber para permitir el acceso a datos de forma asíncrona. El uso de las funcionalidades ya conocidas de Spring para la programación reactiva facilita su integración en comparación con otras alternativas más «rústicas».
Configuración general
La configuración, como bien nos tiene acostumbrados Spring con sus artefactos starter, es bastante sencilla.
Simplemente empieza indicando que use la dependencia adecuada:
Dependencia R2DBC
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
Además de indicar la dependencia del driver de base de datos que quieres usar, entre los que se encuentran a día de hoy:
- H2 (
io.r2dbc:r2dbc-h2
) - MariaDB (
org.mariadb:r2dbc-mariadb
) - Microsoft SQL Server (
io.r2dbc:r2dbc-mssql
) - MySQL (
dev.miku:r2dbc-mysql
) - jasync-sql MySQL (
com.github.jasync-sql:jasync-r2dbc-mysql
) - Postgres (
io.r2dbc:r2dbc-postgresql
) - Oracle (
com.oracle.database.r2dbc:oracle-r2dbc
)
Una vez añadida la dependencia del driver, siguiendo las opciones que se indican en sus repositorios únicamente tendrás que añadir los datos de la conexión con tu BD vía application.yml o como configuración de Java, según se desee.
Configuración YAML
spring:
r2dbc:
url: r2dbc:<driver>://<host>:<port>/<database_name>
username: <username>
password: <password>
Cambios
Es natural que, al tratarse de un paradigma distinto, haya cosas que deban cambiar y no solo a nivel de dependencias. En el código como tal, también.
Definición de entidades
Las anotaciones conocidas de javax para indicar columnas, tablas, identificadores de tablas, etc., ya no son las que hay que usar. Ahora hay que hacer uso de las de Spring, que se definen de una forma ligeramente diferente como vemos a continuación.
Ejemplo de entidad
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
import lombok.Data;
@Data
@Table("SCHEMA.EXAMPLE_ENTITY")
public class ExampleEntity {
@Id
@Column("ID")
private Long id;
@Column("COLUMN_NAME")
private String columnName;
}
El anterior ejemplo conectaría con la tabla EXAMPLE_ENTITY con el esquema SCHEMA y tendría dos atributos:
- ID: que sería el identificador único.
- COLUMN_NAME.
Por supuesto, si los nombres de los atributos concuerdan totalmente con los definidos en la base de datos de forma previa, no hay que usar estas anotaciones. Ocurre lo mismo con la tabla y el esquema de la misma, en caso de que se defina a nivel de conexión con la base de datos en la configuración.
Definición de repositorios
Como ya nos tiene acostumbrados Spring con JPA, los repositorios se definen de igual forma pero únicamente cambiando la extensión a usar.
Repositorios con Spring Data
@Repository
public interface ExampleEntityRepository extends ReactiveCrudRepository<ExampleEntity, Long>{
}
Por defecto, esta interfaz implementaría todos los métodos conocidos, consultables a través de su API.
Casos de uso
Uso de ReactiveCrudRepository
Supongamos que hemos definido el repositorio comentado previamente y queremos hacer una consulta de un registro de la tabla por identificador.
Consulta por ID
@Slf4j
@Service
public class ExampleEntityServiceImpl implements ExampleEntityService {
@Autowired
private ExampleEntityRepository exampleEntityRepository;
@Autowired
private ExampleEntityMapper exampleEntityMapper;
@Override
public Mono<ExampleEntityDTO> getById(Long id) {
return exampleEntityRepository.findById(id)
.map(e -> exampleEntityMapper.mapToDto(e));
}
}
En el ejemplo anterior se está haciendo uso del repository definido previamente, y de un mapper que se encargaría de transformar la entidad a un DTO, como dictan las buenas prácticas.
Ejecución por lotes
Existen casos donde hay que hacer un elevado número de peticiones simultáneas a la base de datos y por eficiencia se decide hacerlo por lotes de N peticiones. Con las conexiones síncronas una de las formas de hacerlo es a través de Statements, y en el caso de R2DBC también es posible.
Para ello es necesario definir mediante configuración la conexión de la base de datos y el cliente que la consume de la siguiente forma:
Configuración R2DBC en Java
@Configuration
public class DBConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get(ConnectionFactoryOptions.parse("r2dbc:<driver>://<host>:<port>/<database_name>")
.mutate()
.option(ConnectionFactoryOptions.USER, "<user>")
.option(ConnectionFactoryOptions.PASSWORD, "<password>")
.build());
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.builder()
.connectionFactory(connectionFactory)
.namedParameters(true)
.build();
}
}
El siguiente paso sería definir un @Repository que hiciera uso de esta conexión definida en configuración, donde se implementarían las operaciones deseadas. En este caso, vamos a mostrarte un ejemplo de inserción de múltiples registros en una tabla.
R2DBC Statement Repository
@Repository
public class ExampleEntityStatementRepository {
public static final String SQL_STATEMENT = "INSERT INTO SCHEMA.EXAMPLE_ENTITY(ID, COLUMN_VALUE) VALUES(?,?)";
public Flux<?> insertMultiple(List<ExampleEntityDTO> insertList) {
return databaseClient.inConnectionMany(connection -> {
Statement statement = connection.createStatement(SQL_STATEMENT);
insertList.forEach(exampleEntity -> {
addStatement(statement, exampleEntity);
});
return Flux.from(statement.execute());
});
}
private void addStatement(Statement statement, ExampleEntityDTO exampleEntity) {
statement
.bind(0, exampleEntity.getId())
.bind(1, exampleEntity.getColumnName())
.add();
}
}
El código anterior estaría preparando una colección asíncrona de Statements para posteriormente poder ser lanzado, como en el ejemplo siguiente:
Suscripción de Statements
@Service
public class ExampleEntityServiceImpl implements ExampleEntityService {
@Autowired
private ExampleEntityStatementRepository exampleEntityStatementRepository;
@Override
public void insertList(List<ExampleEntityDTO> insertList) {
exampleEntityStatementRepository.insertMultiple(insertList).subscribe();
}
}
El anterior servicio al ser consumido generaría un Statement que se ejecutaría de forma asíncrona, no dando lugar a la espera del microservicio para resolver esa petición de inserción de N elementos.
Pool de conexiones
Como es natural, en este punto conviene preguntarse cómo configurar un pool de conexiones para lanzar varios hilos gestionados por R2DBC, aprovechando así la potencia de la máquina y la base de datos, en caso de que este correctamente configurada para ser óptima con múltiples hilos.
Para ello simplemente hay que indicar en la propia conexión que se está haciendo uso de un pool, de la siguiente forma:
Configuración de pool de conexiones R2DBC
@Override
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get(ConnectionFactoryOptions.parse("r2dbc:pool:<driver>://<host>:<port>/<database_name>")
.mutate()
.option(ConnectionFactoryOptions.USER, "<user>")
.option(ConnectionFactoryOptions.PASSWORD, "<password>")
.option(INITIAL_SIZE, 10)
.option(MAX_SIZE, 20)
.build());
}
Como puedes observar, simplemente indicando pool sería suficiente (también puede hacerse mediante una opción como los parámetros de usuario y contraseña). Adicionalmente también pueden configurarse más parámetros de interés, como el número mínimo y máximo de hilos que tendrá configurada la conexión, que en el caso anterior serían 10 y 20 respectivamente.
Para obtener más información, puedes visitar su documentación.
Observaciones
- R2DBC facilita la integración asíncrona con ciertas bases de datos relacionales.
- Tiene todo el sentido del mundo hacer uso de esta especificación cuando quieras evitar altos tiempos en tu servicio, pero hay que tener en cuenta que las peticiones enviadas a base de datos las gestiona la propia base de datos.
- Es por ello que conviene hacer una correcta configuración de la misma si también queremos disminuir los tiempos de consulta e inserción.
- Puede llegar incluso a ser contraproducente, dada una mala configuración de THREADs, y hacer que los tiempos incrementen a nivel de base de datos.
- Presenta problemas haciendo SELECT cuando se hacen consultas anidadas internas, lo cual puede dificultar ciertos desarrollos más complejos.
- Conviene integrarlo con un servicio que esté pensado para usar reactividad, con webflux, broker de mensajería, etc.