Pipeline de ETL para processamento de dados de cervejarias usando Python, Apache Airflow e Docker.
Este repositório contém um estudo de caso sobre um banco de dados de cervejarias por localização, onde você pode examinar todo o fluxo de trabalho de engenharia de dados. este projeto suporta todas as ferramentas abaixo:
Vamos Navegar pelo projeto:
cd /breweries_pipeline/**Inicializando o containers Docker :**
docker-compose up -d** Criando seu docker-compose.yaml
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_db_volume:/var/lib/postgresql/data
webserver:
image: apache/airflow:2.3.3
environment:
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__WEBSERVER__DEFAULT_UI_USER: admin
AIRFLOW__WEBSERVER__DEFAULT_UI_PASSWORD: admin
AIRFLOW__WEBSERVER__SECRET_KEY: '102030'
AIRFLOW__WEBSERVER__BASE_URL: 'http://webserver:8080' # URL base definida
PYTHONPATH: /opt/airflow/dags
TZ: America/Sao_Paulo # Ajuste do fuso horário
depends_on:
- postgres
volumes:
- ./dags:/opt/airflow/dags
- ./data:/opt/airflow/data
- ./logs:/opt/airflow/logs # Montagem dos logs
ports:
- "8080:8080"
command: webserver
scheduler:
image: apache/airflow:2.3.3
environment:
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__WEBSERVER__SECRET_KEY: '102030'
AIRFLOW__WEBSERVER__BASE_URL: 'http://webserver:8080' # URL base alinhada com o webserver
PYTHONPATH: /opt/airflow/dags
TZ: America/Sao_Paulo # Ajuste do fuso horário
depends_on:
- postgres
volumes:
- ./dags:/opt/airflow/dags
- ./data:/opt/airflow/data
- ./logs:/opt/airflow/logs # Montagem dos logs para acesso pelo scheduler
command: scheduler
volumes:
postgres_db_volume:
**Verificando se o containers do Airflow e a Base de Dados estão inicializada:*
docker psPara acessar o Airflow container:
docker exec -it breweries_pipeline-webserver-1 sh
**Criar usuario do Airflow : Airflow container, Siga os passos abaixo:**
<br/>
This will create an admin user with the username admin and password admin.
```bash
airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.comPara sair Airflow container:
exitprocedimento para desativar Docker containers: you can stop and remove the containers:
docker-compose downpara ter acesso ao Airflow Container:
docker exec -it breweries_pipeline-webserver-1 sh** Localizar o arquivo principal de configuracao do Airflow:**
find / -name "airflow.cfg"Editando o arquivo de configuração do Airflow:
vi /path/to/airflow.cfg** Um breve exemplo de como desabilitar uma DAGs manualmente via linha de comando:**
sed -i 's/load_examples = True/load_examples = False/' /path/to/airflow.cfg** Restart Airflow:**
exit
docker restart breweries_pipeline-webserver-1Se voce deseja criar uma DAG de exemplo, altere os exemplos atuais, abaixo um exemplo.
docker exec -it breweries_pipeline-webserver-1 sh
sed -i 's/load_examples = False/load_examples = True/' /path/to/airflow.cfg
exit
docker restart breweries_pipeline-webserver-1Os scripts principais da DAG e de cada etapa do pipeline estão nos seguintes arquivos: Script principal que controla a DAG breweries_dag.py
Seguencia principal DAG breweries
Este projeto implementa um pipeline de ETL utilizando Airflow para processar dados de cervejarias. O objetivo é transformar dados de uma API em diferentes camadas do pipeline:
- Bronze: Dados brutos extraídos da API.
- Silver: Dados limpos e tratados.
- Gold: Dados finais prontos para análises e visualizações.
Este script realiza a extração de dados da API pública de cervejarias (https://api.openbrewerydb.org/breweries) e salva os dados brutos no formato JSON na camada Bronze. Ele inclui tratamento de erros, como falhas de conexão ou problemas no formato da resposta, e implementa um sistema de tentativas com backoff para garantir maior resiliência. O arquivo gerado é salvo com um timestamp único no diretório data/bronze. Nome do Script fetch_breweries.py
Este script transforma os dados brutos extraídos da camada Bronze para a camada Silver. Ele realiza limpeza, tratamento e padronização dos dados como: Seleção de colunas relevantes. Tratamento de valores ausentes, substituindo por padrões como "unknown". Ajustes no formato de texto (ex.: capitalização de estados e cidades). Conversão de tipos de dados, como longitude e latitude. Os dados tratados são salvos em um arquivo Parquet no diretório data/silver, acumulando os dados ao longo das execuções e eliminando duplicatas com base no identificador único (id). Além disso, inclui uma coluna de data_ingestao para rastrear a data da última ingestão. Nome do script que faz o tratamento de dadoa Camada Silver transform_breweries.py
Preview Camada Dataframe Silver - script - check_dataframe_silver.py
Este script realiza a agregação dos dados processados na camada Silver, consolidando informações para análises finais: Busca o arquivo mais recente na camada Silver: Identifica o arquivo Parquet mais recente que contém os dados já limpos e transformados. Realiza a agregação: Agrupa as cervejarias por estado e tipo, gerando uma contagem do número de cervejarias em cada categoria. Salva os dados agregados na camada Gold: Os resultados são exportados em formato Parquet, com um nome único baseado no timestamp. Objetivo: Preparar os dados em um formato consolidado, pronto para consumo em análises ou visualizações.
Preview Camada Dataframe Gold - script - check_dataframe_gold.py
- ├── dags/
- │ ├── breweries_dag.py
- │ ├── scripts/
- │ ├── fetch_breweries.py
- │ ├── transform_breweries.py
- │ ├── aggregate_breweries.py
- ├── data/
- │ ├── bronze/
- │ ├── silver/
- │ ├── gold/
- ├── docker-compose.yaml
- ├── README.md
- pandas==1.5.3
- requests==2.28.2
- apache-airflow==2.6.0
-
Aqui estão os pontos importantes que pode justificar este projeto entrar em produção:
-
Modularidade e Reutilização de Código: Estruturamos o código para que cada tarefa na pipeline seja modular e reutilizável, facilitando manutenções e futuras adaptações.
-
Automação Inteligente com a lógica de buscar automaticamente o arquivo mais recente na camada Bronze e acumular dados na Silver, tornamos o processo mais robusto e menos sujeito a erros, uma otimização que agrega grande valor ao desafio.
-
Mecanismos de Controle e Logging: Com as implementações de logging e validação, foi criada uma camada extra de monitoramento que permite entender o status de cada etapa, além de capturar e tratar erros de forma mais amigável. Isso traz confiabilidade e torna o sistema mais sustentável.
-
Customização Avançada com Airflow: foi aplicado no Airflow os parametros XComs e controle de dependências e catchup para evitar reprocessamentos automáticos indesejados, com isto a plataforma vai ser aplicada em produção de forma segura.
-
Em resumo, o desafio busca um tratamento de dados e integração entre linguagns e orquestradores e integração com cloud, foi criado um pipeline robusto e adaptável pronto para um grande tratamento de dados e tranformação tudo de forma automatica.
-
Este pipeline, pode ser aplicado em ambientes com GCP - Azure e AWS.





