Framework for creating efficient data processing pipelines

Related tags

Miscellaneousaqueduct
Overview

Aqueduct

Framework for creating efficient data processing pipelines.

Contact

Feel free to ask questions in telegram t.me/avito-ml

Key Features

  • Increase RPS (Requests Per Second) for your service
  • All optimisations in one library
  • Uses shared memory for transfer big data between processes

Get started

Simple example how to start with aqueduct using aiohttp. For better examples see examples

web.Application: app = web.Application() app['flow'] = Flow( FlowStep(SumHandler()), ) app.router.add_post('/sum', SumView) app['flow'].start() return app if __name__ == '__main__': web.run_app(prepare_app()) ">
from aiohttp import web
from aqueduct import Flow, FlowStep, BaseTaskHandler, BaseTask


class MyModel:
    """This is CPU bound model example."""
    
    def process(self, number):
        return sum(i * i for i in range(number))

class Task(BaseTask):
    """Container to send arguments to model."""
    def __init__(self, number):
        super().__init__()
        self.number = number
        self.sum = None  # result will be here
    
class SumHandler(BaseTaskHandler):
    """With aqueduct we need to wrap you're model."""
    def __init__(self):
        self._model = None

    def on_start(self):
        """Runs in child process, so memory no memory consumption in parent process."""
        self._model = MyModel()

    def handle(self, *tasks: Task):
        """List of tasks because it can be batching."""
        for task in tasks:
            task.sum = self._model.process(task.number)

            
class SumView(web.View):
    """Simple aiohttp-view handler"""

    async def post(self):
        number = await self.request.read()
        task = Task(int(number))
        await self.request.app['flow'].process(task)
        return web.json_response(data={'result': task.sum})


def prepare_app() -> web.Application:
    app = web.Application()

    app['flow'] = Flow(
        FlowStep(SumHandler()),
    )
    app.router.add_post('/sum', SumView)

    app['flow'].start()
    return app


if __name__ == '__main__':
    web.run_app(prepare_app())
    

Batching

Aqueduct supports the ability to process tasks with batches. Default batch size is one.

np.array: """Always says that there is a cat in the image. The image is represented by a one-dimensional array. The model spends less time for processing batch of images due to GPU optimizations. It's emulated with BATCH_REDUCTION_FACTOR coefficient. """ batch_size = images.shape[0] if batch_size == 1: time.sleep(self.IMAGE_PROCESS_TIME) else: time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR) return np.ones(batch_size, dtype=bool) class CatDetectorHandler(BaseTaskHandler): def handle(self, *tasks: ArrayFieldTask): images = np.array([task.array for task in tasks]) predicts = CatDetector().predict(images) for task, predict in zip(tasks, predicts): task.result = predict def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]: return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)] async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]): await asyncio.gather(*(flow.process(task) for task in tasks)) tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE)) flow_with_batch_handler.start() # checks if no one result assert not any(task.result for task in tasks_batch) # task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME, ) # checks if all results were set assert all(task.result for task in tasks_batch) await flow_with_batch_handler.stop() # if we have batch size more than tasks number, we can limit batch accumulation time # with timeout parameter for processing time optimization tasks_batch = get_tasks_batch() flow_with_batch_handler = Flow( FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01) ) flow_with_batch_handler.start() await asyncio.wait_for( process_tasks(flow_with_batch_handler, tasks_batch), timeout=CatDetector.BATCH_PROCESS_TIME + 0.01, ) await flow_with_batch_handler.stop() ">
import asyncio
import time
from typing import List

import numpy as np

from aqueduct.flow import Flow, FlowStep
from aqueduct.handler import BaseTaskHandler
from aqueduct.task import BaseTask

# this constant needs just for example
TASKS_BATCH_SIZE = 20


class ArrayFieldTask(BaseTask):
    def __init__(self, array: np.array, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.array = array
        self.result = None


class CatDetector:
    """GPU model emulator that predicts the presence of the cat in the image."""
    IMAGE_PROCESS_TIME = 0.01
    BATCH_REDUCTION_FACTOR = 0.7
    OVERHEAD_TIME = 0.02
    BATCH_PROCESS_TIME = IMAGE_PROCESS_TIME * TASKS_BATCH_SIZE * BATCH_REDUCTION_FACTOR + OVERHEAD_TIME

    def predict(self, images: np.array) -> np.array:
        """Always says that there is a cat in the image.

        The image is represented by a one-dimensional array.
        The model spends less time for processing batch of images due to GPU optimizations. It's emulated
        with BATCH_REDUCTION_FACTOR coefficient.
        """
        batch_size = images.shape[0]
        if batch_size == 1:
            time.sleep(self.IMAGE_PROCESS_TIME)
        else:
            time.sleep(self.IMAGE_PROCESS_TIME * batch_size * self.BATCH_REDUCTION_FACTOR)
        return np.ones(batch_size, dtype=bool)


class CatDetectorHandler(BaseTaskHandler):
    def handle(self, *tasks: ArrayFieldTask):
        images = np.array([task.array for task in tasks])
        predicts = CatDetector().predict(images)
        for task, predict in zip(tasks, predicts):
            task.result = predict


def get_tasks_batch(batch_size: int = TASKS_BATCH_SIZE) -> List[BaseTask]:
    return [ArrayFieldTask(np.array([1, 2, 3])) for _ in range(batch_size)]


async def process_tasks(flow: Flow, tasks: List[ArrayFieldTask]):
    await asyncio.gather(*(flow.process(task) for task in tasks))


tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(FlowStep(CatDetectorHandler(), batch_size=TASKS_BATCH_SIZE))
flow_with_batch_handler.start()

# checks if no one result
assert not any(task.result for task in tasks_batch)
# task handling takes 0.16 secs that is less than sequential task processing with 0.22 secs
await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME,
)
# checks if all results were set
assert all(task.result for task in tasks_batch)

await flow_with_batch_handler.stop()

# if we have batch size more than tasks number, we can limit batch accumulation time 
# with timeout parameter for processing time optimization
tasks_batch = get_tasks_batch()
flow_with_batch_handler = Flow(
    FlowStep(CatDetectorHandler(), batch_size=2*TASKS_BATCH_SIZE, batch_timeout=0.01)
)
flow_with_batch_handler.start()

await asyncio.wait_for(
    process_tasks(flow_with_batch_handler, tasks_batch), 
    timeout=CatDetector.BATCH_PROCESS_TIME + 0.01,
)

await flow_with_batch_handler.stop()

Sentry

The implementation allows you to receive logger events from the workers and the main process. To integrate with Sentry, you need to write something like this:

import logging
import os

from raven import Client
from raven.handlers.logging import SentryHandler
from raven.transport.http import HTTPTransport

from aqueduct.logger import log


if os.getenv('SENTRY_ENABLED') is True:
    dsn = os.getenv('SENTRY_DSN')
    sentry_handler = SentryHandler(client=Client(dsn=dsn, transport=HTTPTransport), level=logging.ERROR)
    log.addHandler(sentry_handler)
Owner
avito.tech
avito.ru engineering team open source projects
avito.tech
ToDoListAndroid - To-do list application created using Kivymd

ToDoListAndroid To-do list application created using Kivymd. Version 1.0.0 (1/Jan/2022). Planned to do next: -Add setting (theme selector, etc) -Add f

AghnatHs 1 Jan 01, 2022
Машинное обучение на ФКН ВШЭ

Курс "Машинное обучение" на ФКН ВШЭ Конспекты лекций, материалы семинаров и домашние задания (теоретические, практические, соревнования) по курсу "Маш

Evgeny Sokolov 2.2k Jan 04, 2023
A Guide for Feature Engineering and Feature Selection, with implementations and examples in Python.

Feature Engineering & Feature Selection A comprehensive guide [pdf] [markdown] for Feature Engineering and Feature Selection, with implementations and

Yimeng.Zhang 968 Dec 29, 2022
This repository contains all the data analytics projects that I've worked on in python.

93_Python_Data_Analytics_Projects This repository contains all the data analytics projects that I've worked on in python. No. Name 01 001_Cervical_Can

Milaan Parmar / Милан пармар / _米兰 帕尔马 267 Jan 06, 2023
sawa (ꦱꦮ) is an open source programming language, an interpreter to be precise, where you can write python code using javanese character.

ꦱꦮ sawa (ꦱꦮ) is an open source programming language, an interpreter to be precise, where you can write python code using javanese character. sawa iku

Rony Lantip 307 Jan 07, 2023
This program goes thru reddit, finds the most mentioned tickers and uses Vader SentimentIntensityAnalyzer to calculate the ticker compound value.

This program goes thru reddit, finds the most mentioned tickers and uses Vader SentimentIntensityAnalyzer to calculate the ticker compound value.

195 Dec 13, 2022
Prometheus exporter for chess.com player data

chess-exporter Prometheus exporter for chess.com player data implemented via chess.com's published data API and Prometheus Python Client Example use c

Mário Uhrík 7 Feb 28, 2022
RDFLib is a Python library for working with RDF, a simple yet powerful language for representing information.

RDFLib RDFLib is a pure Python package for working with RDF. RDFLib contains most things you need to work with RDF, including: parsers and serializers

RDFLib 1.8k Jan 02, 2023
Stop ask your soraka to ult you, just ult yourself

Lollo's auto-ultimate script Are you tired of your low elo friend who can't ult you with soraka when you ask for it? Use Useless Support and just ult

9 Oct 20, 2022
Procscan is a quick and dirty python script used to look for potentially dangerous api call patterns in a Procmon PML file.

PROCSCAN Procscan is a quick and dirty python script used to look for potentially dangerous api call patterns in a Procmon PML file. Installation git

Daniel Santos 9 Sep 02, 2022
Extrator de dados do jupiterweb

Extrator de dados do jupiterweb O programa é composto de dois arquivos: Um constando apenas de classes complementares que representam as unidades e as

Bruno Aricó 2 Nov 28, 2022
Poetry workspace plugin for Python monorepos.

poetry-workspace-plugin Poetry workspace plugin for Python monorepos. Inspired by Yarn Workspaces. Adds a new subcommand group, poetry workspace, whic

Jack Smith 74 Jan 01, 2023
Just imagine normal bancho, but you can have multiple profiles and funorange speed up maps ranked

Local osu! server Just imagine normal bancho, but you can have multiple profiles and funorange speed up maps ranked (coming soon)! Windows Setup Insta

Cover 25 Nov 15, 2022
Run unpatched binaries on Nix/NixOS

Run unpatched binaries on Nix/NixOS

Thiago Kenji Okada 160 Jan 08, 2023
GEGVL: Google Earth Based Geoscience Video Library

Google Earth Based Geoscience Video Library is transforming to Server Based. The

3 Feb 11, 2022
Set of scripts that schedules employees for shifts throughout the week based on availability, shift times, and shift necessities

Automatic-Scheduler Set of scripts that schedules employees for shifts throughout the week based on availability, shift times, and shift necessities *

Matthew 1 May 01, 2022
Reverse the infix string. Note that while reversing the string you must interchange left and right parentheses

Reverse the infix string. Note that while reversing the string you must interchange left and right parentheses. Obtain the postfix expression of the infix expression Step 1.Reverse the postfix expres

Sazzad Hossen 1 Jan 04, 2022
Password manager using MySQL and Python 3.10.2

Password Manager Password manager using MySQL and Python 3.10.2 Installation Install my-project with github git clone https://github.com/AyaanSiddiq

1 Feb 18, 2022
Reproduction repository for the MDX 2021 Hybrid Demucs model

Submission This is the submission for MDX 2021 Track A, for Track B go to the track_b branch. Submission Summary Submission ID: 151378 Submitter: defo

Alexandre Défossez 62 Dec 18, 2022
Dev-meme - A repository that contains memes just for people like us

A repository that contains memes just for people like us. Coders are constantly

Padmashree Jha 4 Oct 31, 2022