En esta cuarta y última semana del curso, se aborda el tema de la orquestación, un concepto fundamental en el ámbito de DataOps. La orquestación está íntimamente relacionada con los pilares de automatización, observabilidad y monitoreo, que se exploraron en semanas anteriores. A lo largo de esta semana, se profundizará en cómo implementar la orquestación en las canalizaciones de datos, utilizando herramientas como Airflow.
| Herramienta | Descripción | Uso en la Industria |
|---|---|---|
| Airflow | Plataforma de orquestación para canalizaciones de datos | Herramienta número 1 |
| Otras | Varias herramientas disponibles en el mercado | Dependiente del contexto |
La orquestación es un componente esencial en el ciclo de vida de la ingeniería de datos. Esta semana se centrará en cómo aplicar los conceptos aprendidos en DataOps a través de la práctica con Airflow, preparando a los participantes para su uso en el entorno laboral.
Acompáñame en el siguiente vídeo para comenzar con la práctica.
En este documento se presenta un resumen sobre la automatización de canalizaciones de datos utilizando la herramienta Cron, así como sus limitaciones y el contexto en el que se utiliza. Se explican los conceptos básicos de Cron y se proporcionan ejemplos de cómo se pueden programar tareas de canalización de datos.
Cron es una utilidad de línea de comandos que permite programar la ejecución de tareas en momentos específicos. Introducido en la década de 1970, permite automatizar tareas mediante la creación de "Cron Jobs".
Un Cron Job se define mediante una serie de cinco números seguidos del comando a ejecutar. La estructura es la siguiente:
* * * * * comando_a_ejecutar
Donde los cinco números representan: 1. Minuto (0-59) 2. Hora (0-23) 3. Día del mes (1-31) 4. Mes (1-12) 5. Día de la semana (0-6, donde 0 es domingo)
Se puede usar un asterisco (*) para indicar que no hay restricciones en ese valor.
Un ejemplo de un Cron Job que imprime "Feliz Año Nuevo" a medianoche del 1 de enero sería:
00 01 01 * * echo "Feliz Año Nuevo"
Supongamos que se tiene una canalización de datos que ingiere datos de una API REST. A continuación, se presentan ejemplos de cómo se podrían programar las tareas utilizando Cron:
| Tarea | Cron Job | Descripción |
|---|---|---|
| Ingestión de datos de API | 00 * * * * python ingest_from_rest_api.py |
Ejecuta la ingesta a medianoche todos los días. |
| Transformación de datos | 01 * * * * python transform_api_data.py |
Realiza la transformación a la 1 AM todos los días. |
| Ingestión de datos de base de datos | 00 * * * * python ingest_from_database.py |
Ejecuta la ingesta a medianoche todos los días. |
| Combinación de datos | 02 * * * * python combine_api_and_database.py |
Combina datos a las 2 AM todos los días. |
Aunque Cron puede ser útil para tareas simples, tiene varias limitaciones: - Fallas en la ejecución: Si una tarea no se ejecuta o produce resultados inesperados, puede fallar toda la canalización. - Falta de monitoreo: Sin alertas o monitoreo, es difícil saber si algo salió mal hasta que se presenta un problema. - Dificultad para depurar: Identificar la causa de un fallo puede requerir pruebas y depuración extensivas.
Aunque la programación con Cron puede ser una forma intuitiva de automatizar canalizaciones de datos, no es recomendable para configuraciones complejas debido a sus limitaciones. En el siguiente video, se explorarán herramientas de orquestación más avanzadas que han evolucionado en los últimos años, como Apache Airflow, que ofrecen mejores capacidades de monitoreo y gestión de tareas.
¡Nos vemos en el siguiente video!
La orquestación de datos ha sido una capacidad clave en el procesamiento de datos, pero su accesibilidad ha cambiado significativamente en la última década. Este documento resume los puntos clave discutidos en el curso sobre la evolución de las herramientas de orquestación, con un enfoque especial en Apache Airflow.
Crear soluciones internas era complicado y costoso.
Desarrollo de Herramientas:
Aunque Airflow es popular, no es la única herramienta disponible. Existen otras alternativas que están ganando terreno:
| Herramienta | Descripción |
|---|---|
| Luigi | Proyecto de orquestación de código abierto. |
| Conductor | Herramienta de orquestación que ofrece características únicas. |
| Prefect | Ofrece soluciones de orquestación más escalables que Airflow. |
| Dagster | Proporciona capacidades integradas para pruebas de calidad de datos. |
| Mage | Enfocado en transformaciones de datos y calidad. |
La orquestación de datos está en constante evolución, y aunque Airflow es la herramienta más utilizada actualmente, es fundamental estar abierto a nuevas herramientas que puedan ofrecer mejores soluciones en el futuro. Acompáñame en el siguiente video para profundizar en los detalles de la orquestación y la práctica.
La orquestación de canalizaciones de datos es un proceso crucial en la gestión de flujos de datos, que permite la configuración de tareas y dependencias de manera eficiente. A continuación, se presentan los conceptos y componentes clave relacionados con la orquestación.
La orquestación implica la gestión de tareas en una canalización de datos, permitiendo la configuración de dependencias, monitoreo de tareas, alertas y planes alternativos en caso de fallos. A diferencia de la programación civil de Cron, la orquestación ofrece un control más robusto sobre el flujo de datos.
| Característica | Cron | Orquestación |
|---|---|---|
| Control de tareas | Limitado | Avanzado |
| Monitoreo | No disponible | Disponible |
| Alertas | No disponible | Configurable |
| Manejo de dependencias | No soportado | Soportado |
En Airflow, los DAG se definen mediante programación en Python. A continuación, se muestra un ejemplo básico de cómo se puede definir un DAG:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
dag = DAG('mi_dag', default_args=default_args, schedule_interval='@daily')
tarea1 = DummyOperator(task_id='tarea1', dag=dag)
tarea2 = DummyOperator(task_id='tarea2', dag=dag)
tarea1 >> tarea2 # tarea2 depende de tarea1
Los DAG pueden ser programados para ejecutarse:
Para esperar a que un archivo esté disponible en un bucket de S3, se puede utilizar un sensor en Airflow:
from airflow.sensors.s3_key_sensor import S3KeySensor
sensor = S3KeySensor(
task_id='esperar_archivo',
bucket_name='mi_bucket',
bucket_key='myfile.csv',
dag=dag
)
La orquestación también permite:
La orquestación de canalizaciones de datos es esencial para garantizar un flujo de datos eficiente y controlado. En los próximos videos, se explorarán los pasos necesarios para configurar la orquestación de canalizaciones de datos utilizando Airflow.
¡Te espero en el próximo video!
En este documento se presenta una introducción a Apache Airflow, sus componentes principales y cómo interactúan entre sí para la ejecución de DAGs (Directed Acyclic Graphs). Al final de esta serie de vídeos, se espera que el lector esté preparado para construir un DAG simple en Airflow.
Apache Airflow es una plataforma de programación de flujos de trabajo que permite a los usuarios definir, programar y monitorear tareas. Los DAGs son la forma en que se estructuran las tareas en Airflow, y varios componentes trabajan en conjunto para su ejecución.
A continuación se describen los componentes clave de Airflow:
| Componente | Descripción |
|---|---|
| Servidor Web | Ejecuta la interfaz de usuario (IU) de Airflow, donde se pueden visualizar y gestionar los DAGs. |
| Planificador | Supervisa los DAGs y las tareas, activando las tareas según un horario o dependencias. |
| Trabajadores | Ejecutan las tareas que han sido activadas por el planificador. |
| Base de Datos de Metadatos | Almacena el estado de los DAGs y las tareas. |
| Directorio DAG | Carpeta donde se almacenan los scripts Python que definen los DAGs. |
Las tareas pueden ser activadas de varias maneras: - Manual: A través de la IU. - Programación: Basada en un horario definido. - Eventos: Utilizando el programador de Airflow.
El planificador verifica cada minuto si hay tareas listas para ser activadas, y una vez que identifica una tarea, la coloca en una cola para su ejecución.
Los estados de las tareas cambian a medida que se ejecutan: 1. Programada: La tarea está lista para ser ejecutada. 2. En Cola: La tarea ha sido colocada en la cola para su ejecución. 3. En Ejecución: La tarea está siendo ejecutada por un trabajador. 4. Éxito/Fallo: La tarea ha finalizado, ya sea con éxito o con error.
Cuando se utiliza un servicio administrado como Amazon Managed Workflows for Apache Airflow (MWAA), todos los componentes de Airflow se crean y gestionan automáticamente. Por ejemplo: - Bucket de Amazon S3: Utilizado como directorio DAG. - Base de Datos Aurora PostgreSQL: Utilizada como base de datos de metadatos.
En los laboratorios de esta semana, se guiará al usuario para: 1. Configurar el entorno de Airflow. 2. Escribir DAGs como scripts de Python. 3. Subir los scripts al bucket de S3. 4. Abrir la IU para revisar los DAGs creados.
Comprender la arquitectura y los componentes de Airflow es fundamental para solucionar problemas y optimizar el uso de esta herramienta. En el siguiente vídeo, se revisarán algunas características de la IU de Airflow.
La interfaz de usuario (UI) de Airflow es una herramienta fundamental para supervisar y gestionar los DAG (Directed Acyclic Graphs) y sus tareas. A través de esta interfaz, los usuarios pueden obtener información sobre el estado de sus ejecuciones, solucionar problemas y revisar el historial de ejecuciones.
Al abrir la interfaz de usuario, se accede a la Dag View, donde se puede encontrar una lista de todos los DAG creados. A continuación se detallan los elementos clave que se pueden observar en esta vista:
| Elemento | Descripción |
|---|---|
| Identificador del DAG | Nombre único del DAG. |
| Etiqueta | Etiquetas personalizadas asignadas al DAG. |
| Propietario | Usuario responsable del DAG. |
| Programación | Frecuencia con la que se ejecuta el DAG. |
| Última Ejecución | Fecha y hora de la última ejecución del DAG. |
| Estado | Estado actual del DAG (en cola, en ejecución, completado, fallido). |
| Tareas | Estado de las tareas individuales dentro del DAG. |
En la vista de DAG, se pueden realizar las siguientes acciones:
Al hacer clic en el identificador de un DAG, se accede a la Vista de Cuadrícula, que proporciona información más detallada sobre las ejecuciones y sus tareas:
En la parte derecha de la vista de cuadrícula, se encuentran cuatro pestañas:
Duración mínima, media y máxima.
Gráfico: Visualización de la estructura del DAG y sus dependencias.
Registros: Mensajes de error de tareas específicas, útiles para la depuración.
Diagrama de Gantt: Muestra la duración exacta de cada tarea en una ejecución específica, ayudando a identificar cuellos de botella.
Código: Muestra el código correspondiente al DAG, asegurando que esté sincronizado con el directorio de DAG.
La interfaz de usuario de Airflow ofrece múltiples funciones que facilitan la gestión y supervisión de los DAG. En este video, se han cubierto las funciones básicas que se utilizarán con mayor frecuencia. En el siguiente video, se explorarán los pasos para crear un DAG sencillo.
¡Nos vemos en el próximo video!
En este documento, se describen los pasos para crear un DAG (Directed Acyclic Graph) simple utilizando Apache Airflow, centrándonos en un proceso ETL (Extracción, Transformación y Carga). A continuación, se detallan los conceptos básicos y la configuración necesaria para implementar un DAG en Airflow.
Un DAG es una representación gráfica de un flujo de trabajo que consiste en tareas interconectadas. En este caso, el DAG que se creará tendrá tres tareas principales:
Para crear un DAG en Airflow, se debe seguir el siguiente procedimiento:
mi_primer_dag.py.Importar los paquetes necesarios:
python
from airflow import DAG
from datetime import datetime
Definir la instancia del DAG utilizando un administrador de contexto:
python
with DAG(
dag_id='mi_primer_dag',
description='Un DAG simple para un proceso ETL',
tags=['data_engineering_team'],
schedule_interval='0 8 * * *', # Ejecutar todos los días a las 8 AM
start_date=datetime(2023, 1, 1),
catchup=False
) as dag:
| Parámetro | Descripción |
|---|---|
dag_id |
Identificador único del DAG. |
description |
Descripción del DAG que aparece en la interfaz de usuario. |
tags |
Lista de etiquetas para filtrar los DAGs en la interfaz de usuario. |
schedule_interval |
Define cuándo se ejecutará el DAG (expresión cron o timedelta). |
start_date |
Fecha de inicio para la ejecución del DAG. |
catchup |
Si es True, ejecuta intervalos perdidos cuando el DAG se reanuda. |
Las tareas se definen utilizando operadores de Airflow. Los operadores encapsulan la lógica de las tareas. Para este ejemplo, se utilizará el operador Python.
Importar el operador Python:
python
from airflow.operators.python import PythonOperator
Definir las tareas: ```python def extraer_datos(): print("Extrayendo datos...")
def transformar_datos(): print("Transformando datos...")
def cargar_datos(): print("Cargando datos...")
tarea_extraccion = PythonOperator( task_id='extract', python_callable=extraer_datos, dag=dag )
tarea_transformacion = PythonOperator( task_id='transform', python_callable=transformar_datos, dag=dag )
tarea_carga = PythonOperator( task_id='load', python_callable=cargar_datos, dag=dag ) ```
Las dependencias entre las tareas se definen utilizando el operador de bitwise (>>):
tarea_extraccion >> tarea_transformacion >> tarea_carga
Esto significa que la tarea de extracción debe completarse antes de que comience la tarea de transformación, y la tarea de transformación debe completarse antes de que comience la tarea de carga.
Ahora se ha configurado un DAG simple en Airflow con tres tareas: extracción, transformación y carga. En el siguiente laboratorio, se tendrá la oportunidad de crear otro DAG y cargar el script en el kit de herramientas Airflow S3, donde se podrá supervisar el DAG a través de la interfaz de usuario de Airflow.
Para más información, se explorarán conceptos adicionales como XCOM y variables en el próximo video.
En este curso, se exploran las funcionalidades avanzadas de Apache Airflow, centrándose en la creación de DAGs (Directed Acyclic Graphs), la interacción con la interfaz de usuario y la solución de problemas. Se aprenderá a utilizar XCom para pasar datos entre tareas y a crear variables globales en la interfaz de usuario de Airflow.
Información almacenada en XCom: | Campo | Descripción | |----------------|--------------------------------------------------| | Clave | Nombre de la variable XCom | | Valor | Valor almacenado | | Marca de tiempo| Momento en que se creó la variable | | ID de DAG | Identificador del DAG donde se creó la variable | | ID de tarea | Identificador de la tarea de origen |
extract_from_apiUsa xcom_push para almacenar el valor calculado.
Tarea 2: print_data
xcom_pull para recuperar el valor de la primera tarea.variable.get para recuperar valores.deserialize_JSON=true para obtener un diccionario.En el siguiente laboratorio, se practicará la creación de variables en la interfaz de usuario de Airflow y el uso de XCom para pasar datos entre tareas. Se recomienda revisar los materiales de lectura para mejorar la calidad del código en DAGs.
En este documento, se presentará la API de TaskFlow de Airflow, una nueva forma de definir DAGs (Directed Acyclic Graphs) que simplifica la escritura de código, especialmente cuando se utilizan muchas funciones de Python. Se abordarán los conceptos clave, ejemplos y diferencias con el paradigma tradicional.
La API de TaskFlow fue introducida en Airflow 2.0 y tiene como objetivo facilitar la creación de DAGs mediante el uso de decoradores. Esto permite una escritura más concisa y clara del código, sin reemplazar el paradigma tradicional, sino complementándolo.
| Característica | Paradigma Tradicional | API de TaskFlow |
|---|---|---|
| Definición de DAG | Instanciación de un objeto DAG | Uso del decorador @dag |
| Definición de tareas | Uso de operadores de Python | Uso del decorador @task |
| Manejo de dependencias | Uso de operadores de cambio de bits | Llamada a funciones directamente |
| Uso de XCom | xcom_push y xcom_pull |
Declaraciones de retorno y parámetros |
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False)
def mi_dag():
@task
def tarea_extraccion():
# Lógica de extracción de datos
return datos
@task
def tarea_transformacion(datos):
# Lógica de transformación de datos
return datos_transformados
@task
def tarea_carga(datos_transformados):
# Lógica de carga de datos
pass
datos = tarea_extraccion()
datos_transformados = tarea_transformacion(datos)
tarea_carga(datos_transformados)
mi_dag_instance = mi_dag()
@task
def tarea_extraccion():
datos = {"clave": "valor"}
return datos # Almacena datos en XCom
@task
def tarea_transformacion(datos):
# Lógica de transformación
return datos_transformados
@task
def tarea_carga(datos_transformados):
# Lógica de carga
pass
datos = tarea_extraccion()
datos_transformados = tarea_transformacion(datos)
tarea_carga(datos_transformados)
La API de TaskFlow en Airflow proporciona una forma más sencilla y clara de definir DAGs y tareas, facilitando la programación y el mantenimiento del código. Se recomienda a los usuarios familiarizarse con esta nueva API y considerar su uso en proyectos futuros.
En este documento se presenta un resumen sobre las diferentes opciones de orquestación de tareas en AWS, centrándose en Apache Airflow y otros servicios disponibles en la plataforma.
El uso de Airflow para organizar canalizaciones de datos es una habilidad valiosa que se puede aplicar en diversas herramientas de orquestación. Este resumen explora las opciones de implementación de Airflow en AWS y otros servicios de orquestación como AWS Glue y AWS Step Functions.
Proporciona control total sobre la configuración y escalado, pero requiere gestión de la infraestructura.
Amazon Managed Workflows for Apache Airflow (MWAA):
Contiene tareas, rastreadores y activadores que deben ser creados previamente.
Ejecución:
Organiza varios servicios y estados de AWS en flujos de trabajo denominados máquinas de estado.
Funcionalidades:
| Herramienta | Tipo | Ventajas | Desventajas |
|---|---|---|---|
| Apache Airflow | Código abierto / Gestionado (MWAA) | Flexibilidad, ecosistema de complementos | Requiere gestión de infraestructura (en código abierto) |
| AWS Glue | Servicio ETL | Diseñado para procesos de ETL, sin servidor | Limitado a flujos de trabajo de ETL |
| AWS Step Functions | Sin servidor | Amplia integración con servicios de AWS | Puede ser menos flexible para flujos de trabajo complejos |
La elección de la herramienta de orquestación dependerá de los requisitos específicos y de lo que se busque optimizar. Es importante mantenerse actualizado sobre las herramientas emergentes en el panorama de la orquestación para tomar decisiones informadas en la arquitectura de datos.
Este resumen proporciona una visión general de las opciones de orquestación en AWS, destacando las características y consideraciones de cada herramienta.
Este documento resume el contenido del segundo curso de la especialización en ingeniería de datos. A lo largo de este curso, se han abordado diversos temas relacionados con la ingesta de datos, operaciones y orquestación, proporcionando una base sólida para el desarrollo de habilidades en el campo de la ingeniería de datos.
¡Buen trabajo y éxito en el próximo curso!