GoldenGate for Big Data - Streaming de Datos de Oracle Database a Kafka Server

Por Francisco Riccio y Juan Guizado
Publicado en Febrero 2018


Introducción

Este artículo tiene como objetivo explicar todas las consideraciones y pasos a seguir para realizar una implementación de replicación de datos desde una base de datos Oracle 12cR2 hacia Kafka.
Antes de iniciar la explicación de la implementación, se explicará algunos conceptos que permitirán un mayor entendimiento.


Kafka

Kafka es un sistema de almacenamiento publicador/subscriptor distribuido, particionado y replicado; permitiendo construir plataformas de procesamiento en tiempo real. El producto cuenta con diversos casos de uso, los cuales están documentados en: https://kafka.apache.org/uses

El sistema almacena los datos mediante tópicos, siendo este capaz de mantener múltiples mensajes y publicarlos.

En ambientes productivos es altamente recomendable utilizar un esquema de cluster, donde cada instancia de Kafka es conocida como Broker.

Kafka utiliza Zookeeper, el cual es un programa que permite gestionar los diferentes Brokers asignados a un cluster, enviando notificaciones a los servidores Kafka por diferentes eventos como: tópicos creados o eliminados, Brokers caídos o reinicializados, actualización constante de la lista de nodos del cluster vivos, etc. Es importante resaltar que cada servidor Kafka debe ser capaz de mantener una sesión con Zookeeper. Debemos recordar que Kafka no puede trabajar sin Zookeeper.

Mayor información de Kafka está disponible en: https://kafka.apache.org/documentation/


Oracle GoldenGate for Big Data

Oracle GoldenGate es una solución que nos permite replicar datos entre base de datos relacionales (RDBMS) como: Oracle Database, MySQL, MS SQL Server, DB2, entre otros.

Existe otra solución llamada Oracle GoldenGate for Big Data, el cual permite copiar diversas transacciones hacia nuestros sistemas de Big Data en tiempo real.

En la actualidad es capaz de trabajar con: Hadoop Distributed File System (HDFS), Apache HBase, Flume, Apache Kafka, Cassandra, Mongo DB y cualquier repositorio que permita realizar conexiones vía JDBC.

Cabe resaltar que ambos productos cuentan con su propio instalador, ambos disponibles en: http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html

A continuación, se detalla una descripción básica de los servidores que forman parte del escenario a replicar dentro de la implementación que se realizará.

Característica

Servidor Oracle Database

Servidor Kafka

Versión de S.O

Oracle Linux 7.3 x64 bits

Oracle Linux 7.3 x64 bits

Versión de Software BD

Oracle Database 12.2.0.1

Kafka 1.0

Hostname

oracle

kafka

IP Interna

10.186.121.202

10.186.121.197

 



Finalmente nuestro despliegue de streaming de datos con GoldenGate estará configurado de la siguiente manera:

GoldenGate 12c

GoldenGate for Big Data 12c

Componentes - Origen

Componentes - Destino

1 Manager (MGR)
1 Extractor (EXT1)
1 Datapump (DP1)
1 Trail File (ta)

1 Manager (MGR)
1 Replicat (REP1)
1 Trail File (ta)



Gráficamente tendremos el siguiente proceso de Streaming:




Implementación

A continuación se detallará todos los pasos que se realizarán para replicar una tabla llamada Cliente de un Oracle Database 12cR2 hacia un tópico Cliente en Apache Kafka.

Los pasos de la implementación fueron divididos en las siguientes 4 secciones:

  • Instalación de Apache Kafka
  • Oracle GoldenGate 12c en Oracle Database 12cR2
  • Oracle GoldenGate for Big Data
  • Validación de Streaming de Datos


I. Instalación de Apache Kafka

1. Java OpenSDK

Antes de realizar el despliegue de Apache Kafka y Oracle GoldenGate for Big Data debemos contar con Java OpenJDK 1.8 instalado en el servidor.

Procedemos a instalar Java OpenJDK:
root@kafka ~]# yum install java

Validamos:



2. Instalación y Configuración de Apache Kafka Server

Procedemos a descargar Apache Kafka en la siguiente dirección: https://kafka.apache.org/downloads

Procedemos a descomprimirlo:
tar -xvf kafka_2.12-1.0.0.tgz


3. Configuramos Zookeeper Server

Dentro de la carpeta descomprimida tendremos 2 carpetas: carpeta bin, el cual tendrá una serie de utilitarios que nos permitirán inicializar o detener diversos servicios y carpeta config que contendrá las configuraciones de Zookeeper y Kafka.


Iniciamos el servicio de Zookeeper:
[root@kafka bin]# sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties


4. Configuración e Inicialización de Kafka Server

Antes de iniciar Kafka Server podríamos afinar la cantidad de memoria que este utilizará modificando el script de subida kafka-server-start.sh.

Ejemplo: En la configuración que se desplegará, Kafka no utilizará más de 512 MB de memoria RAM.


Asimismo es importante validar que nuestro Kafka Server pueda presentar un listener y conectarse a Zookeeper Server.

La configuración de estos 2 puntos se realiza en el siguiente archivo: config/server.properties

Se presenta su contenido:




Procedemos a iniciar Kafka Server


[root@kafka bin]# nohup sh kafka-server-start.sh ../config/server.properties &


5. Creación del Tópico Cliente y Validaciones

Para crear un tópico en Kafka contamos con un utilitario llamado kafka-topics el cual se encuentra en la carpeta bin.

A continuación se creará el tópico Cliente:

[root@kafka bin]# sh kafka-topics.sh --create --zookeeper localhost:2181        --replication-factor 1  --partitions 1 --topic Cliente


Existen múltiples parámetros al momento de crear un tópico, recomiendo revisar las diferentes opciones documentadas en: https://kafka.apache.org/documentation/#topicconfigs, principalmente el parámetro retention.ms.

Si deseamos eliminar un tópico lo realizamos a través del mismo utilitario.

Ejemplo: Si deseamos eliminar el tópico Cliente.

[root@kafka bin]# sh kafka-topics.sh --delete --zookeeper localhost:2181 --topic Cliente


Con la finalidad de listar todos los tópicos ejecutamos el siguiente comando:

[root@kafka bin]# sh kafka-topics.sh --list --zookeeper localhost:2181


Si deseamos obtener información a detalle de un tópico específico ejecutamos el siguiente comando:

[root@kafka bin]# sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic Cliente


La visualización de los datos contenidos en un tópico se puede obtener con el siguiente comando:

[root@kafka bin]# sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning


Como se puede apreciar, en todos los casos nos conectamos al servicio de Zookeeper por el puerto 2181 para ejecutar la operación que necesitamos sobre algún tópico.



II. Oracle GoldenGate 12c en Oracle Database 12cR2

Para nuestra implementación contamos con una base de datos con el nombre ORCL y en modo ARCHIVELOG.

A partir de este escenario iniciaremos los pasos de configuración.

1. Configuración de GoldenGate en la Base de Datos

A continuación se detalla los comandos a ejecutarse.



Como parte del procedimiento se debe crear un usuario en la base de datos y cuyos permisos están documentados en:

https://docs.oracle.com/goldengate/1212/gg-winux/GIORA/user_assignment.htm#GIORA546


Finalmente configuramos el parámetro ENABLE_GOLDENGATE_REPLICATION en TRUE.




2. Instalación de Oracle GoldenGate 12c

Los instaladores son descargados en el siguiente url:
http://www.oracle.com/technetwork/middleware/goldengate/downloads/index.html

Una vez descargado en el servidor de base de datos procedemos a instalarlo.




3. Configuración del Proceso Extractor de GoldenGate

Previo a la configuración del extractor se creará una tabla que será leída por GoldenGate para copiar sus datos en el tópico de Kafka en tiempo real, en caso la tabla exista se deberá omitir este paso.

Creamos un usuario y la tabla Cliente.
Finalmente le entregamos el último permiso al usuario.


Posterior a este paso, validamos que el proceso Manager de GoldenGate está en ejecución:



Procedemos a agregar Log Supplemental a la tabla Cliente.


GGSCI> dblogin userid gg@orcl, password oracle
GGSCI> add trandata friccio.cliente


Configuramos el Extractor EXT1

La configuración del EXT1 es la siguiente:
extract EXT1 (Nombre del Extractor)
USERID gg@orcl, PASSWORD oracle (Usuario que creamos)
exttrail dirdat/ta (Archivo que contendrá los cambios ocurridos en la tabla)
table friccio.Cliente (Nombre de la tabla a capturar sus cambios)



4. Configuración del Proceso Datapump de GoldenGate


La configuración del EXT1 es la siguiente:
extract DP1 (Nombre del Datapump)
passthru (Solicitamos que no ejecute ninguna validación)
rmthost 10.186.121.197, mgrport 7801 (IP y puerto del Manager destino)
rmttrail dirdat/ta (Nombre del archivo a crear en el servidor destino)
table FRICCIO.CLIENTE; (Cambios sobre la tabla especificada para almacenarlo en el archivo remoto)


Por último iniciamos los servicios tanto del extractor como del datapump.





III. Oracle GoldenGate for Big Data

1. Instalación de Oracle GoldenGate for Big Data

Descargado el producto procedemos a descomprimirlo.



Luego ejecutamos los siguientes comandos sobre la carpeta descomprimida

./ggsci
GGSCI> create subdirs 


Antes de iniciar creando los procesos de Manager y Replicat debemos configurar una lista de variables de ambientes en la sesión del usuario siempre antes de iniciar servicios.

Para la implementación realizada se configuró las siguientes variables:

export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
export JRE_HOME=/usr/lib/jvm/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:/root/gg/lib:/u01/kafka_2.12-1.0.0/libs
export LD_LIBRARY_PATH=/usr/lib/jvm/jre-1.8.0/lib/amd64/libjsig.so:/usr/lib/jvm/jre-1.8.0/
lib/amd64/server/libjvm.so:/usr/lib/jvm/jre-1.8.0/lib/amd64/server:/usr/lib/jvm/jre-1.8.0/lib/amd64
export PATH=$JAVA_HOME/bin:/u01/kafka_2.12-1.0.0/bin:$PATH:./ 


Nota: Usted debe validar las correctas rutas acorde a su configuración. En caso haya iniciado el componente Manager y modifica alguna variable de ambiente, deberá reiniciar el componente con la afinidad que los cambios realizados sean considerados.


2. Configuración del componente Manager

A diferencia de Oracle GoldenGate que crea por defecto un Manager, aquí es necesario realizarlo manualmente.


GGSCI> PORT 7801 (Especificamos el puerto con que atenderá el Manager)


3. Configuración del Proceso Replicat de GoldenGate for Big Data

GoldenGate for Big Data viene incluido con una serie de carpetas, entre las cuales se encuentra AdapterExamples, la cual viene con una serie de ejemplos que nos servirá para guiarnos en la configuración.

En nuestro caso copiaremos los ejemplos para Kafka y los colocaremos dentro de la carpeta dirprm (directorio de configuración de todos los componentes de GoldenGate).



Finalmente los archivos: custom_kafka_producer.properties y kafka.props son aquellos que modificaremos para configurar nuestro Replicat.



Configuración del archivo: custom_kafka_producer.properties.

Debemos definir la lista de Nodos Kafka.



Configuración del archivo: kafka.props

En el archivo se deben realizar diferentes cambios:

  • gg.handler.kafkahandler.format = Se especifica el formato con que se almacenará los datos en el tópico, siendo algunos valores posibles: delimitedtext, json, avro_row, avro_op.
    Avro es un formato donde los valores están delimitados pero almacenados en binario siendo uno de los formatos más preferidos.
  • gg.log.level = Se especifica el nivel de detalle que se almacenará sobre los eventos ocurridos en el replicador. En caso de incidencias y no es posible determinar el error se deberá cambiar a DEBUG.
  • gg.classpath = Se indicará donde se encuentran las librerías que permitirán conectar al replicador hacia el servicio Kafka.


Una vez iniciando la replicación, en tiempo real se creará un Schema llamado MySchemaTopic que contendrá la metadata de los datos copiados en el tópico. El nombre del schema puede ser cambiado en el parámetro: gg.handler.kafkahandler.SchemaTopicName.

Mayor información sobre los parámetros y configuraciones se encuentran en el siguiente url: https://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-2561CA12-9BAC-454B-A2E3-2D36C5C60EE5.htm#GADBD449


Configuramos el Replicador REP1

replicat REP1 (Nombre del Replicador)
getenv (JAVA_HOME)    (Opcional)
getenv (CLASSPATH)    (Opcional)
getenv (PATH)    (Opcional)
getenv (LD_LIBRARY_PATH)    (Opcional)
targetdb libfile    libggjava.so SET property=dirprm/kafka.props
MAP friccio.cliente, TARGET mySchemaTopic.Cliente; 
(Nombre del Schema y del Tópico)


Procedemos a registrar el trafile que leerá el replicador para comenzarlo a insertar en el tópico cliente. No olvidar que el archivo leído es copiado por el Datapump en tiempo real.

Posterior procedemos a iniciar los servicios del replicador.





IV. Validación de Streaming de Datos

A continuación insertaremos 2 filas en la tabla Cliente en la base de datos Oracle y se puede apreciar que los datos llegan en formato AVRO como también la fila actualizada que se realizó con la sentencia UPDATE.



Si revisamos las estadísticas del replicador veremos que se han realizado 2 operaciones de insert y 1 de update.



También se puede apreciar que el schema fue creado automáticamente (carpeta dirdef).



El contenido del archivo es el siguiente:

[root@kafka dirdef]# cat MYSCHEMATOPIC.CLIENTE.avsc
      {
      "type" : "record",
      "name" : "CLIENTE",
      "namespace" :    "MYSCHEMATOPIC",
      "fields" : [ {
      "name" : "table",
      "type" : "string"
      }, {
      "name" : "op_type",
      "type" : "string"
      }, {
      "name" : "op_ts",
      "type" : "string"
      }, {
      "name" :    "current_ts",
      "type" : "string"
      }, {
      "name" : "pos",
      "type" : "string"
      }, {
      "name" :    "primary_keys",
      "type" : {
      "type" : "array",
      "items" : "string"
      }
      }, {
      "name" : "tokens",
      "type" : {
      "type" : "map",
      "values" : "string"
      },
      "default" : { }
      }, {
      "name" : "before",
      "type" : [ "null", {
      "type" : "record",
      "name" : "columns",
      "fields" : [ {
      "name" : "COD",
      "type" : [    "null", "double" ],
      "default" : null
      }, {
      "name" :    "COD_isMissing",
      "type" :    "boolean"
      }, {
      "name" :    "NOMBRE",
      "type" : [    "null", "string" ],
      "default" : null
      }, {
      "name" :    "NOMBRE_isMissing",
      "type" :    "boolean"
      }, {
      "name" : "FNAC",
      "type" : [    "null", "string" ],
      "default" : null
      }, {
      "name" :    "FNAC_isMissing",
      "type" :    "boolean"
      } ]
      } ],
      "default" : null
      }, {
      "name" : "after",
      "type" : [ "null",    "columns" ],
      "default" : null
      } ]
      }


Ahora cambiaremos el valor del parámetro gg.handler.kafkahandler.format a json en el archivo kafka.props y reiniciaremos el replicador.



Insertaremos otro registro en la tabla Cliente de la base de datos Oracle.



Se puede apreciar que el nuevo valor está registrado en el Tópico Cliente en formato JSON como también se puede visualizar que es almacenado el tipo de operación (Insert) y la hora de la transacción.

Finalmente al momento de cambiar al formato a JSON, en tiempo real se ha creado un nuevo schema como se puede apreciar:



En caso que necesitemos copiar datos BLOB desde la base de datos Oracle hacia nuestro servidor Kafka debemos revisar la siguiente documentación:

My Oracle Support (MOS) Nota: How To Synchronize BLOB Field For OGG Adapter KAFKA [ID 2327418.1].



Conclusión

En este artículo hemos aprendido a realizar todos los pasos requeridos para implementar un streaming de datos desde nuestras base de datos relacionales, particularmente en este caso Oracle Database 12c pero extendiéndose también hacia MySQL Server, SQL Server, DB2 entre otros hacia repositorios de Big Data como Kafka en este caso.

Los próximos pasos que deberían continuar a la implementación es la explotación de los datos en Kafka para realizar diversos análisis que puedan servir para la toma de decisiones de negocio que nuestras empresas requieren.



Francisco Riccio, actualmente se desempeña como Arquitecto de Soluciones en Oracle Perú y es instructor de cursos oficiales de certificación Oracle. Es un Oracle Certified Professional en productos de Oracle Application, Base de Datos, Cloud & Virtualización.

 

Juan Pablo Guizado, actualmente es Arquitecto de soluciones en Oracle Perú hace 5 años con certificaciones en Metodologías Agiles (Scrum Master ),  tiene experiencia de más 15 años trabajando con soluciones Oracle,  ha sido  Senior Sales Consultant de Middleware con amplios conocimientos  en Servidores de Aplicaciones Weblogic,  Plataformas de Integración y soluciones de Seguridad adicionalmente es  instructor de cursos oficiales de Oracle. Ha diseñado arquitecturas de aplicaciones Java para empresas importantes en el país con soluciones de  capa media.

Este artículo ha sido revisado por el equipo de productos Oracle y se encuentra en cumplimiento de las normas y prácticas para el uso de los productos Oracle.