Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

Overview

Olá!

Esse é o meu primeiro repo tratando de fim a fim, uma pipeline de dados abertos do governo brasileiro relacionado a compras de contrato e cronogramas anuais com spark, em pyspark e SQL!

O código se encontra aqui e o dado pode ser obtido por meio desse link

from pyspark.sql import SparkSession

##################################################### VARIABLES #####################################################

PATH_LANDING_ZONE_CSV = '../datalake/landing/comprasnet-contratos-anual-cronogramas-latest.csv'
PATH_PROCESSING_ZONE = '../datalake/processing'
PATH_CURATED_ZONE = '../datalake/curated'

##################################################### QUERY #########################################################

QUERY = """ 

WITH tmp as (
  SELECT 
    cast(id as integer) as id,
    cast(contrato_id as integer) as contrato_id,
    tipo,
    numero,
    receita_despesa,
    observacao,
    mesref,
    anoref,
    cast(vencimento as date) as vencimento,
    retroativo,
    cast(valor as decimal (10,2)) as valor,
    year(vencimento) as year,
    month(vencimento) as month,
    dayofmonth(vencimento) as day
  FROM 
    df
)
SELECT
  *
FROM 
  tmp
WHERE   
  year = 2021 OR 
  year = 2022
ORDER BY
  year desc

"""

##################################################### SCRIPT #########################################################

def csv_to_parquet(spark, path_csv, path_parquet):
  df = spark.read.option('header', True).csv(path_csv)
  return df.write.mode('overwrite').format('parquet').save(path_parquet)

def create_view(spark, path_parquet):
  df = spark.read.parquet(path_parquet) 
  df.createOrReplaceTempView('df')

def write_curated(spark, path_curated):
 
  df2 = spark.sql(QUERY)
    
  (
      df2
      .orderBy('year', ascending=False)
      .orderBy('month', ascending=False)
      .orderBy('day', ascending=False)
      .write.partitionBy('year','month','day')
      .mode('overwrite')
      .format('parquet')
      .save(path_curated)
  )


if __name__ == "__main__":
  
  spark = (
    SparkSession.builder
    .master("local[*]")
    .getOrCreate()
  )

  spark.sparkContext.setLogLevel("ERROR")
  
  csv_to_parquet(spark, PATH_LANDING_ZONE_CSV, PATH_PROCESSING_ZONE)

  create_view(spark, PATH_PROCESSING_ZONE)
  
  write_curated(spark, PATH_CURATED_ZONE )
  • Basicamente, extraimos os dados para a zona landing, depois, escrevemos o mesmo dado em diferente formato na zona processing, no caso parquet, por se tratar de um formato otimizado e mais leve.
  • Após, criamos uma view do dado recém salvo na zona processing, já em parquet, que otimiza a leitura do spark, aplicamos uma query de transformação que enriquece o schema do dado e seleciona apenas os dados de 2021 e 2022, já pronto para ser consumido.
  • E por fim, escrevemos na zona curated o dado já tratado, enriquecido, particionado por ano, mês e dia e pronto para consumo.

Para rodar o script, basicamente você pode fazer no terminal:

spark-submit etl.py

Você também encontrará o mesmo código e ideia de ETL em notebooks, em versão pyspark ou spark-sql.

Espero que gostem!

Qualquer dúvida, entrar em contato pelo LinkedIn.

:)

Owner
Henrique de Paula
Games e tech!
Henrique de Paula
Gaussian Process Optimization using GPy

End of maintenance for GPyOpt Dear GPyOpt community! We would like to acknowledge the obvious. The core team of GPyOpt has moved on, and over the past

Sheffield Machine Learning Software 847 Dec 19, 2022
Simple, fast, and parallelized symbolic regression in Python/Julia via regularized evolution and simulated annealing

Parallelized symbolic regression built on Julia, and interfaced by Python. Uses regularized evolution, simulated annealing, and gradient-free optimization.

Miles Cranmer 924 Jan 03, 2023
MooGBT is a library for Multi-objective optimization in Gradient Boosted Trees.

MooGBT is a library for Multi-objective optimization in Gradient Boosted Trees. MooGBT optimizes for multiple objectives by defining constraints on sub-objective(s) along with a primary objective. Th

Swiggy 66 Dec 06, 2022
Coursera Machine Learning - Python code

Coursera Machine Learning This repository contains python implementations of certain exercises from the course by Andrew Ng. For a number of assignmen

Jordi Warmenhoven 859 Dec 10, 2022
Free MLOps course from DataTalks.Club

MLOps Zoomcamp Our MLOps Zoomcamp course Sign up here: https://airtable.com/shrCb8y6eTbPKwSTL (it's not automated, you will not receive an email immed

DataTalksClub 4.6k Dec 31, 2022
CinnaMon is a Python library which offers a number of tools to detect, explain, and correct data drift in a machine learning system

CinnaMon is a Python library which offers a number of tools to detect, explain, and correct data drift in a machine learning system

Zelros 67 Dec 28, 2022
List of Data Science Cheatsheets to rule the world

Data Science Cheatsheets List of Data Science Cheatsheets to rule the world. Table of Contents Business Science Business Science Problem Framework Dat

Favio André Vázquez 11.7k Dec 30, 2022
The easy way to combine mlflow, hydra and optuna into one machine learning pipeline.

mlflow_hydra_optuna_the_easy_way The easy way to combine mlflow, hydra and optuna into one machine learning pipeline. Objective TODO Usage 1. build do

shibuiwilliam 9 Sep 09, 2022
A Python implementation of the Robotics Toolbox for MATLAB

Robotics Toolbox for Python A Python implementation of the Robotics Toolbox for MATLAB® GitHub repository Documentation Wiki (examples and details) Sy

Peter Corke 1.2k Jan 07, 2023
Real-time domain adaptation for semantic segmentation

Advanced-Machine-Learning This repository contains the code for the project Real

Andrea Cavallo 1 Jan 30, 2022
This jupyter notebook project was completed by me and my friend using the dataset from Kaggle

ARM This jupyter notebook project was completed by me and my friend using the dataset from Kaggle. The world Happiness 2017, which ranks 155 countries

1 Jan 23, 2022
Repository for DCA0305, an undergraduate course about Machine Learning Workflows and Pipelines

Federal University of Rio Grande do Norte Technology Center Department of Computer Engineering and Automation Machine Learning Based Systems Design Re

Ivanovitch Silva 81 Oct 18, 2022
LibTraffic is a unified, flexible and comprehensive traffic prediction library based on PyTorch

LibTraffic is a unified, flexible and comprehensive traffic prediction library, which provides researchers with a credibly experimental tool and a convenient development framework. Our library is imp

432 Jan 05, 2023
A toolkit for making real world machine learning and data analysis applications in C++

dlib C++ library Dlib is a modern C++ toolkit containing machine learning algorithms and tools for creating complex software in C++ to solve real worl

Davis E. King 11.6k Jan 02, 2023
Mixing up the Invariant Information clustering architecture, with self supervised concepts from SimCLR and MoCo approaches

Self Supervised clusterer Combined IIC, and Moco architectures, with some SimCLR notions, to get state of the art unsupervised clustering while retain

Bendidi Ihab 9 Feb 13, 2022
Built various Machine Learning algorithms (Logistic Regression, Random Forest, KNN, Gradient Boosting and XGBoost. etc)

Built various Machine Learning algorithms (Logistic Regression, Random Forest, KNN, Gradient Boosting and XGBoost. etc). Structured a custom ensemble model and a neural network. Found a outperformed

Chris Yuan 1 Feb 06, 2022
Lightweight Machine Learning Experiment Logging 📖

Simple logging of statistics, model checkpoints, plots and other objects for your Machine Learning Experiments (MLE). Furthermore, the MLELogger comes with smooth multi-seed result aggregation and co

Robert Lange 65 Dec 08, 2022
Turning images into '9-pan' palettes using KMeans clustering from sklearn.

img2palette Turning images into '9-pan' palettes using KMeans clustering from sklearn. Requirements We require: Pillow, for opening and processing ima

Samuel Vidovich 2 Jan 01, 2022
Automated Machine Learning Pipeline for tabular data. Designed for predictive maintenance applications, failure identification, failure prediction, condition monitoring, etc.

Automated Machine Learning Pipeline for tabular data. Designed for predictive maintenance applications, failure identification, failure prediction, condition monitoring, etc.

Amplo 10 May 15, 2022
Regularization and Feature Selection in Least Squares Temporal Difference Learning

Regularization and Feature Selection in Least Squares Temporal Difference Learning Description This is Python implementations of Least Angle Regressio

Mina Parham 0 Jan 18, 2022