Este proyecto implementa un pipeline de procesamiento de datos en tiempo real y por lotes utilizando servicios de AWS. El objetivo es ingerir, almacenar, procesar y analizar datos meteorológicos simulados.
El flujo de datos sigue el siguiente recorrido:
-
Generación de Datos:
generate_weather_data.py: Genera datos sintéticos de estaciones meteorológicas (temperatura, humedad, presión, viento) en formato JSON.kinesis.py: Lee estos datos y actúa como productor, enviando registros al Kinesis Data Stream.
-
Ingesta (Kinesis & Firehose):
- Amazon Kinesis Data Stream: Recibe los datos en tiempo real.
- Amazon Kinesis Data Firehose: Consume los datos del stream para entregarlos a Amazon S3.
- AWS Lambda (
firehose.py): Firehose invoca esta función para transformar los datos antes de guardarlos. La función:- Decodifica los registros.
- Extrae el
station_idy la fecha (date) del timestamp. - Añade estos campos como metadatos de partición para que Firehose organice los archivos en S3 dinámicamente (
raw/weather/station_id=.../date=.../).
-
Almacenamiento (Amazon S3):
- Se utiliza un bucket S3 (
datalake-weather-stations-...) como Data Lake. - Carpeta
raw/: Almacena los datos crudos particionados. - Carpeta
processed/: Almacena los datos procesados y agregados.
- Se utiliza un bucket S3 (
-
Catalogación (AWS Glue Crawler):
- Un Glue Crawler (
weather-raw-crawler) escanea la carpetaraw/en S3. - Infiere el esquema de los datos (JSON) y crea una tabla (
weather) en la base de datos de Glue (weather_db). Esto hace que los datos sean consultables inmediatamente.
- Un Glue Crawler (
-
Procesamiento (AWS Glue ETL & Spark):
- AWS Glue Job: Ejecuta un script de PySpark (
weather_aggregation.py). - Lee los datos crudos del Catálogo de Datos.
- Realiza agregaciones diarias (promedios de temperatura, humedad, etc.) agrupando por estación y fecha.
- Escribe los resultados en formato Parquet (optimizado para consultas) en la carpeta
processed/weather_daily/.
- AWS Glue Job: Ejecuta un script de PySpark (
-
Análisis (Amazon Athena):
- Se utiliza Athena para realizar consultas SQL estándar sobre los datos crudos y procesados almacenados en S3, aprovechando las tablas definidas en el Glue Data Catalog.
El despliegue de la infraestructura se orquesta mediante el script script.ps1.
-
Amazon Kinesis Data Streams:
- Stream:
weather-stream - Capacidad: 1 Shard (suficiente para la demostración).
- Stream:
-
Amazon Kinesis Data Firehose:
- Delivery Stream:
weather-delivery-stream - Fuente: Kinesis Data Stream.
- Transformación: Lambda activada.
- Particionamiento Dinámico: Habilitado. Organiza los objetos en S3 basándose en claves extraídas por la Lambda (Partition Projection).
- Delivery Stream:
-
AWS Glue:
- Base de Datos:
weather_db - Crawler: Actualiza la metadata de los datos crudos.
- Job ETL:
weather-daily-aggregation(Workers G.1X).
- Base de Datos:
generate_weather_data.py: Crea el datasetweather_data.json.kinesis.py: Script en Python (boto3) que simula el envío de datos de sensores al stream.firehose.py: Código de la función Lambda de transformación.weather_aggregation.py: Script ETL de Spark para el Glue Job.script.ps1: Script de PowerShell que utiliza AWS CLI para crear y configurar todos los recursos en la nube (S3, IAM Roles, Kinesis, Lambda, Glue, etc.).
- Configurar Credenciales: Asegurarse de tener AWS CLI configurado (
aws configure) con permisos adecuados. - Desplegar Infraestructura: Ejecutar
.\script.ps1. Esto creará todos los recursos necesarios en AWS. - Generar Datos: Ejecutar
python generate_weather_data.pypara crear el archivo local de datos. - Enviar Datos: Ejecutar
python kinesis.pypara empezar a transmitir datos al Kinesis Stream. - Verificar Ingesta: Revisar la consola de S3 (carpeta
raw/) o usar Athena para consultar la tablaweather. - Ejecutar ETL: Iniciar el Glue Job desde la consola o CLI para generar los agregados diarios en
processed/.