Workshops

Workshop sobre la Plataforma. Parte 2: Ingesta de datos

En la primera parte de este workshop hablamos sobre los componentes que se iban a utilizar y dimos de alta dos ontologías. En esta segunda parte nos centraremos en la ingesta de datos desde un portal Open Data utilizando el Flow Engine.

Vamos a crear dos flujos de información, uno por cada servicio, que consultará la información, la transformará al formato semántico de la ontología definida, y la almacenará en la ontología correspondiente.
Para crear los flujos accede al Flow Engine de la Plataforma desde el menú Development > My Flows

Menú.

En caso de no haber creado un dominio previamente, nos aparecerá un listado vacío.

En este caso, tendrás que crear una instancia del motor de flujos pulsando el botón «Create». Se abrirá una pantalla desde donde podrás dar una identificación a tu instancia y crearla pulsando en «New».

Esto instanciará tu motor de flujos, que puedes arrancar pulsando en «Start»:

Una vez arrancado, accede al editor de flujos pulsando en «View it»:

Se abrirá otra pantalla con el modo el editor de flujos:

Flujo de ingesta de Inventario de estaciones

A continuación, programarás un flujo que diariamente se conectará al servicio Open Data, consultará las estaciones inventariadas, convertirá dicha información al modelo semántico de la Plataforma (ontología BicingBCN_Inventory) y la almacenará en la ontología.

Para realizar este lanzamiento periódico del flujo, programa la ejecución periódica del flujo mediante un nodo de tipo inject. Para implementarlo, localiza el nodo en la paleta y arrástralo al grid.

A continuación, haz doble clic sobre el nodo para acceder a su configuración, y confíguralo que se ejecute todos los días a las 06:00. Hecho esto, pulsa en «Done».

Para llevar a cabo la invocación al servicio Open Data, la siguiente etapa en el flujo es recuperar la información de las estaciones desde el servicio Open Data. Para ello usa un nodo de tipo HTTP Request, que debes localizar también en la paleta de nodos y arrastrar hasta el grid.

Seguidamente, haz doble clic sobre el mismo para establecer su configuración, indicando la URL a la que tiene que invocar y pulsa en «Done».

Finalmente establece la conexión con el nodo anterior para conectarlo al flujo.

Una vez conectado, puedes probar el flujo verificando que la salida del nodo HTTP request es la respuesta del servicio Open Data. Para ello, conéctale un nodo de tipo debug. Este nodo muestra en la consola de depuración de la herramienta el mensaje de salida del nodo que se le conecta.

A continuación despliega el flujo pulsando Deploy y puedes probarlo habilitando la pestaña debug  y pulsando el botón del nodo inject.

Hecho esto, el siguiente paso será la conversión de la información al modelo semántico de la ontología. La información recuperada del servicio Open Data es una lista de estaciones con el siguiente formato cada una:

{
   "station_id": 1,
   "name": "GRAN VIA CORTS CATALANES, 760",
   "physical_configuration": "ELECTRICBIKESTATION",
   "lat": 41.3979779,
   "lon": 2.1801069,
   "altitude": 16.0,
   "address": "GRAN VIA CORTS CATALANES, 760",
   "post_code": "08013",
   "capacity": 46,
   "nearby_distance": 1000.0
}

Y para almacenarlas en la plataforma hay que convertirla al modelo definido en la ontología BicingBCN_Inventory, donde las instancias deben tener la siguiente estructura:

Para ello, en el flujo, convierte la respuesta del servicio Open Data, que es un string con el JSON de la respuesta, a un modelo de objetos JSON puro. Para ello, conecta un nodo de tipo JSON, que hace precisamente esto, convertir la respuesta del servicio en un array de objetos JSON.

Tras ello, procesa el array de estaciones con el formato del servicio Open Data, convirtiéndolo en un array de estaciones con el formato de la ontología BicingBCN_Inventory. Para ello utiliza un nodo de tipo Function, que permite programar lógica en lenguaje JavaScript. Haz doble clic sobre el nodo de tipo Function recién añadido para configurarlo, incluyendo el siguiente código, que realiza el procesamiento de transformación indicado anteriormente y pulsamos «Done»:

var stations=msg.payload.data.stations;
var gsmaStations=[];
for(i=0;i<stations.length;i++){
var station ={BikeHireDockingStation : {
id: ""+stations[i].station_id,
type: "BikeHireDockingStation",
name: ""+stations[i].name,
category: ""+stations[i].physical_configuration,
status: "ON",
availableBikeNumber: stations[i].capacity,
freeSlotNumber: stations[i].capacity,
totalSlotNumber: stations[i].capacity,
location: {
coordinates: [stations[i].lon, stations[i].lat],
type: "Point"
},
address: ""+stations[i].address,
description: ""+stations[i].physical_configuration,
dateModified: "03-04-2019",
ownership:"Ayto Barcelona"
}
};
gsmaStations[i]=station;
}
msg.payload=gsmaStations;
return msg;

Tu flujo final debería parecerse a esto:

El siguiente paso consistirá en realizar la inserción en Plataforma. Por tanto, una vez convertida la información al formato de la ontología, ya se puede almacenar en la Plataforma. Incluye en el flujo un nodo de tipo onesaitplatform-hazhacemos doble clic sobre el nodo, configura la ontología BicingBCN_Inventory como destino de la información y pulsa «Done».

Añade un nodo tipo debug para visualizar el resultado de la inserción en la pestaña de debug, que al tratarse de una inserción, es el numero de elementos insertados en la Plataforma. Tras desplegar y probar lo que se obtiene, puedes ver tu flujo terminado:

A continuación, procederás al borrado de registros previos; como viste al iniciar el flujo, se trata de un flujo que se ejecutará periódicamente todos los días, por lo que en cada ejecución se debería actualizar el inventario. La forma más sencilla es borrando previamente las estaciones de la última ejecución del flujo y volviendo a cargar las de la ejecución actual. Para ello, añadirás otro nodo al flujo para el borrado previo de estaciones justo al lanzarse el flujo.

Para ello, añade un nodo de tipo onesaitplatform-query-static y conéctalo al nodo inicial (aprovecha también para añadir un nodo de tipo debug para visualizar el resultado).

Haz doble clic sobre el nodo onesaitplatform-query-static y configura la ontología que quieres borrar:  BicingBCN_Inventory y la sentencia SQL:  delete from BicingBCN_Inventory. Ahora pulsa «Done».

Vuelve a desplegar y comprueba en la pestaña de debug que previamente se borra la información anterior y posteriormente se almacena la nueva.

Flujo de ingesta de estado de estaciones

El flujo de ingesta del estado de las estaciones es muy parecido al anterior. Requiere de los mismos nodos, solo que cambiando el servicio origen de los datos y la ontología destino BicingBCN_Status, por lo no entraremos tanto en detalle.

Empieza creando un nuevo flujo. Para ello, pulsa el botón  para añadir una nueva pestaña con un nuevo grid en blanco para definir otro flujo.

Nuevamente, programa un lanzamiento periódico del flujo. Para ello, prepara la ejecución periódica del flujo mediante un nodo de tipo inject. Como se trata del estado de las estaciones, configura el nodo para que se ejecute cada 15 minutos.

La siguiente etapa en el flujo es invocar el servicio Open Data para recuperar la información de estado de las estaciones desde el servicio Open Data.

Seguidamente, llevarás a cabo la conversión de la información al modelo semántico de la ontología. La información recuperada del servicio Open Data es la lista de estaciones con su estado, por lo que para almacenarlas en la plataforma hay que convertirla al modelo definido en la ontología BicingBCN_Status.

Para ello, vuelve a utilizar el nodo JSON para convertir el string en un JSON puro y añade un nodo de tipo Function con el siguiente código:


var stations=msg.payload.data.stations;
var ontologyStations=[];
for(i=0;i<stations.length;i++){
var station= {BicingBCN_Status: {
station_id:""+stations[i].station_id,
numBikesAvailable:stations[i].num_bikes_available,
numBikesMechanical:stations[i].num_bikes_available_types.mechanical,
numBikesElectric:stations[i].num_bikes_available_types.ebike,
status:stations[i].status
}
};
ontologyStations[i]=station;
}
 
msg.payload=ontologyStations;
return msg;

A continuación, llevarás a cabo la inserción en la Plataforma. Incluye en el flujo un nodo de tipo onesaitplatform-insert y configúralo para que almacene la información en la ontologia BicingBCN_Status.

Añade un nodo tipo debug para visualizar el resultado de la inserción en la pestaña de debug, que al tratarse de una inserción, es el número de elementos insertados en plataforma. Tras desplegar y probar, puedes ver:

Borrado de registros previos: Del mismo modo que en el flujo anterior, tienes que borrar la información previamente insertada en cada ejecución del flujo, de manera que configures el borrado de la ontología BicingBCN_Status con un nodo onesaitplatform-query-static.

Despliega y verifica la respuesta en la pestaña de debug.

Para terminar, realiza una verificación de inserción, comprobando en la Plataforma que la información se ha insertado de forma correcta en la ontología. Para ello, accede a Tools > Query tool y lanza una sentencia de consulta sobre la ontología BicingBCN_Status:

Aquí termina esta segunda parte del workshop. Próximamente seguiremos con la siguiente parte: el Visor GIS.

Esperamos que lo hayas encontrado interesante y no dudes en dejarnos un comentario con tus impresiones o dudas.

✍🏻 Author(s)

Deja una respuesta