Skip to content

garfado/Breweries

Repository files navigation

Breweries Pipeline

Pipeline de ETL para processamento de dados de cervejarias usando Python, Apache Airflow e Docker.

Summary

  1. Introdução

  2. Passo a Passo Docker

  1. Implementação da DAG

  2. Camadas do Pipeline

  1. Estrutura do Repositório
  2. Dependências do Projeto
  3. Objetivo







Introdução

Este repositório contém um estudo de caso sobre um banco de dados de cervejarias por localização, onde você pode examinar todo o fluxo de trabalho de engenharia de dados. este projeto suporta todas as ferramentas abaixo:

  • Airflow Airflow for orchestration
  • Docker Docker
  • Pyspark Pyspark
  • Databricks DataBricks
  • Python Python for API requests
  • GCP GCP
  • Azure







Passo a Passo Docker

Vamos Navegar pelo projeto:

   cd /breweries_pipeline/

⚠️ Atenção: Se você estiver executando este projeto no Windows usando o VS Code, certifique-se de que o arquivo start_airflow.sh esteja configurado com finais de linha no formato LF (Line Feed) para evitar erros.


**Inicializando o containers Docker :**
docker-compose up -d

** Criando seu docker-compose.yaml
version: '3.8' 
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres_db_volume:/var/lib/postgresql/data

  webserver:
    image: apache/airflow:2.3.3
    environment:
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__WEBSERVER__DEFAULT_UI_USER: admin
      AIRFLOW__WEBSERVER__DEFAULT_UI_PASSWORD: admin
      AIRFLOW__WEBSERVER__SECRET_KEY: '102030'
      AIRFLOW__WEBSERVER__BASE_URL: 'http://webserver:8080'  # URL base definida
      PYTHONPATH: /opt/airflow/dags
      TZ: America/Sao_Paulo  # Ajuste do fuso horário
    depends_on:
      - postgres
    volumes:
      - ./dags:/opt/airflow/dags
      - ./data:/opt/airflow/data
      - ./logs:/opt/airflow/logs  # Montagem dos logs
    ports:
      - "8080:8080"
    command: webserver

  scheduler:
    image: apache/airflow:2.3.3
    environment:
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
      AIRFLOW__WEBSERVER__SECRET_KEY: '102030'
      AIRFLOW__WEBSERVER__BASE_URL: 'http://webserver:8080'  # URL base alinhada com o webserver
      PYTHONPATH: /opt/airflow/dags
      TZ: America/Sao_Paulo  # Ajuste do fuso horário
    depends_on:
      - postgres
    volumes:
      - ./dags:/opt/airflow/dags
      - ./data:/opt/airflow/data
      - ./logs:/opt/airflow/logs  # Montagem dos logs para acesso pelo scheduler
    command: scheduler

volumes:
  postgres_db_volume:

**Verificando se o containers do Airflow e a Base de Dados estão inicializada:*
docker ps

Para acessar o Airflow container:

docker exec -it breweries_pipeline-webserver-1 sh

**Criar usuario do Airflow : Airflow container, Siga os passos abaixo:**
<br/>
This will create an admin user with the username admin and password admin.
```bash
airflow users create \
  --username admin \
  --password admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com

Para sair Airflow container:

exit

procedimento para desativar Docker containers: you can stop and remove the containers:

docker-compose down







Configuração Docker Compose

para ter acesso ao Airflow Container:

docker exec -it breweries_pipeline-webserver-1 sh

** Localizar o arquivo principal de configuracao do Airflow:**

find / -name "airflow.cfg"

Editando o arquivo de configuração do Airflow:

vi /path/to/airflow.cfg

** Um breve exemplo de como desabilitar uma DAGs manualmente via linha de comando:**

sed -i 's/load_examples = True/load_examples = False/' /path/to/airflow.cfg

** Restart Airflow:**

exit
docker restart breweries_pipeline-webserver-1

Executar Airflow-DAG


Se voce deseja criar uma DAG de exemplo, altere os exemplos atuais, abaixo um exemplo.
docker exec -it breweries_pipeline-webserver-1 sh
sed -i 's/load_examples = False/load_examples = True/' /path/to/airflow.cfg
exit
docker restart breweries_pipeline-webserver-1

Implementação da DAG

Os scripts principais da DAG e de cada etapa do pipeline estão nos seguintes arquivos: Script principal que controla a DAG breweries_dag.py

Seguencia principal DAG breweries

image

Preview Breweries DAG. image


Camadas do Pipeline

Este projeto implementa um pipeline de ETL utilizando Airflow para processar dados de cervejarias. O objetivo é transformar dados de uma API em diferentes camadas do pipeline:

  • Bronze: Dados brutos extraídos da API.
  • Silver: Dados limpos e tratados.
  • Gold: Dados finais prontos para análises e visualizações.

Camada Bronze

Este script realiza a extração de dados da API pública de cervejarias (https://api.openbrewerydb.org/breweries) e salva os dados brutos no formato JSON na camada Bronze. Ele inclui tratamento de erros, como falhas de conexão ou problemas no formato da resposta, e implementa um sistema de tentativas com backoff para garantir maior resiliência. O arquivo gerado é salvo com um timestamp único no diretório data/bronze. Nome do Script fetch_breweries.py


Camada Silver

Este script transforma os dados brutos extraídos da camada Bronze para a camada Silver. Ele realiza limpeza, tratamento e padronização dos dados como: Seleção de colunas relevantes. Tratamento de valores ausentes, substituindo por padrões como "unknown". Ajustes no formato de texto (ex.: capitalização de estados e cidades). Conversão de tipos de dados, como longitude e latitude. Os dados tratados são salvos em um arquivo Parquet no diretório data/silver, acumulando os dados ao longo das execuções e eliminando duplicatas com base no identificador único (id). Além disso, inclui uma coluna de data_ingestao para rastrear a data da última ingestão. Nome do script que faz o tratamento de dadoa Camada Silver transform_breweries.py

Preview Camada Dataframe Silver - script - check_dataframe_silver.py

image

image


Camada Gold

Este script realiza a agregação dos dados processados na camada Silver, consolidando informações para análises finais: Busca o arquivo mais recente na camada Silver: Identifica o arquivo Parquet mais recente que contém os dados já limpos e transformados. Realiza a agregação: Agrupa as cervejarias por estado e tipo, gerando uma contagem do número de cervejarias em cada categoria. Salva os dados agregados na camada Gold: Os resultados são exportados em formato Parquet, com um nome único baseado no timestamp. Objetivo: Preparar os dados em um formato consolidado, pronto para consumo em análises ou visualizações.

Preview Camada Dataframe Gold - script - check_dataframe_gold.py

image

Preview Schemma de Dados image


Estrutura do Repositório:

  • ├── dags/
  • │ ├── breweries_dag.py
  • │ ├── scripts/
  • │ ├── fetch_breweries.py
  • │ ├── transform_breweries.py
  • │ ├── aggregate_breweries.py
  • ├── data/
  • │ ├── bronze/
  • │ ├── silver/
  • │ ├── gold/
  • ├── docker-compose.yaml
  • ├── README.md

Dependências do Projeto:

  • pandas==1.5.3
  • requests==2.28.2
  • apache-airflow==2.6.0

Objetivo

  • Aqui estão os pontos importantes que pode justificar este projeto entrar em produção:

  • Modularidade e Reutilização de Código: Estruturamos o código para que cada tarefa na pipeline seja modular e reutilizável, facilitando manutenções e futuras adaptações.

  • Automação Inteligente com a lógica de buscar automaticamente o arquivo mais recente na camada Bronze e acumular dados na Silver, tornamos o processo mais robusto e menos sujeito a erros, uma otimização que agrega grande valor ao desafio.

  • Mecanismos de Controle e Logging: Com as implementações de logging e validação, foi criada uma camada extra de monitoramento que permite entender o status de cada etapa, além de capturar e tratar erros de forma mais amigável. Isso traz confiabilidade e torna o sistema mais sustentável.

  • Customização Avançada com Airflow: foi aplicado no Airflow os parametros XComs e controle de dependências e catchup para evitar reprocessamentos automáticos indesejados, com isto a plataforma vai ser aplicada em produção de forma segura.

  • Em resumo, o desafio busca um tratamento de dados e integração entre linguagns e orquestradores e integração com cloud, foi criado um pipeline robusto e adaptável pronto para um grande tratamento de dados e tranformação tudo de forma automatica.

  • Este pipeline, pode ser aplicado em ambientes com GCP - Azure e AWS.