При создании хранилища данных, для их перемещения в это хранилище обязательно встанет вопрос об ETLએ (от англ. Extract, Transform, Load — дословно «извлечение, преобразование, загрузка»). Первоначально данные извлекаются из массивов различных источников. Затем необходимо сделать преобразования в формат, который нужен для использования данных в дальнейшем, и, наконец, происходит загрузка в свое хранилище данных.
Есть масса способов организовать это процесс, и один из них — использование языка программирования Python. Сообщество Python создало ряд инструментов для контроля над процессом и облегчения вашей жизни с ETL. Если есть время, деньги и терпение, использование Python обеспечит оптимизацию конвейера ETL в точном соответствии с потребностями вашего бизнеса.
Исходя из этих соображений, вот вам лучшие инструменты Python ETL на 2021 год. Некоторые из них позволяют управлять каждым этапом процесса ETL, в то время как другие превосходны только на отдельных этапах. Для того, что бы было легче сравнивать они разделены на группы.
Для анализа и машинного обучения данных много требуется. Можно было бы собрать их самостоятельно, но это крайне утомительно. Тут на помощь приходят готовые датасеты в самых разных категориях:
Системы управления рабочим процессом — Workflow management systems (WMS)
Сначала мы рассмотрим инструменты Python meta-ETL. Системы управления рабочим процессом (WMS) позволяют планировать, организовывать и отслеживать любые повторяющиеся задачи в вашем бизнесе. Таким образом, вы можете использовать WMS для настройки и запуска рабочих процессов ETL.
Apache Airflow
С помощью Airflow вы строите рабочие процессы как
Вот простой DAG, адаптированный из учебника для начинающих, который запускает пару простых команд bash каждый день:
from datetime import timedelta # Объект, который мы используем для инициализации DAG from airflow import DAG # Нам это нужно для работы! from airflow.operators.bash_operator import BashOperator # Dict, содержащий аргументы по умолчанию для DAG, исключен для краткости default_args = dict(...) # Инициализировать DAY dag = DAG( ‘tutorial’, default_args=default_args, description=‘A simple DAG’, schedule_interval=timedelta(days=1) ) # Инициализировать задачи оператора bash, которые мы хотим выполнить t1 = BashOperator( task_id=‘print_date’, bash_command=‘date’, dag=dag) t2 = BashOperator( task_id=‘sleep’, depends_on_past=False, bash_command=‘sleep 5’, retries=3, dag=dag) # t2 зависит от успешного выполнения t1 t1 >> t2
Airflow — это Ferrari инструментов Python ETL. Он действительно может все. Но за такую расширяемость приходится платить. Это может быть немного сложно для начинающих пользователей (несмотря на их отличную документацию и учебные пособия) и может быть больше, чем вам нужно прямо сейчас. Если вы хотите немедленно запустить процесс ETL, может быть лучше выбрать что-нибудь попроще. Но если у вас есть время и деньги, ваш единственный предел — ваше воображение, если вы работаете с Airflow.
Luigi
Luigi поставляется с веб-интерфейсом, который позволяет пользователю визуализировать задачи и обрабатывать зависимости. Концептуально он похож на GNU Make, но предназначен не только для Hadoop (хотя и упрощает работу с Hadoop). Кроме того, создавать рабочие процессы довольно просто, поскольку все они являются просто классами Python.
Вот схема того, как выглядит типичная задача (адаптировано из
import luigi class MyTask(luigi.Task): # Параметры для этой задачи param = luigi.Parameter(default=42) # Другие задачи, от которых зависит def requires(self): return SomeOtherTask(self.param) # Бизнес-логика задачи def run(self): with self.output().open('w') as f: f.write('hello world!') # Куда пишет вывод def output(self): return luigi.LocalTarget(f'/tmp/foo/bar-{self.param}.txt') if __name__ == '__main__': luigi.run()
Хотя пакет регулярно обновляется, он не так активно развивается, как Airflow, а
Обработка данных
Ядром ETL является обработка данных. Хотя с этим справляется множество инструментов Python, некоторые из них специально разработаны для этой задачи. Давайте посмотрим на возможные варианты:
pandas
Вот пример, в котором мы извлекаем данные из файла CSV, применяем некоторые преобразования данных и загружаем их в базу данных PostgreSQL:
# Стандартный импорт import pandas as pd import numpy as np # Библиотека для взаимодействия с движком базы данных PostgreSQL import pg8000 # Извлечь из файла csv в DataFrame df = pd.read_csv('my_csv.csv') # Новый DataFrame - это сумма столбцов из первого DataFrame df_sum = df.apply(np.sum, axis=0) # Загрузить новый DataFrame в базу данных PostgreSQL con = pg8000.connect('postgres', password='secret_word') df_sum.to_sql(name='my_table', con=con)
Однако есть загвоздка. Pandas разработан в первую очередь как инструмент анализа данных. Таким образом, он делает все в памяти и может работать довольно медленно, если вы работаете с большими данными. Это был бы хороший выбор для создания экспериментального конвейера ETL, но если вы хотите запустить в производство большой конвейер ETL, этот инструмент, вероятно, не для вас.
Spark
Вот пример кода, показывающий, как инициализировать сеанс Spark, читать в CSV, применять некоторые преобразования и записывать в другой CSV:
from pyspark.sql import SparkSession # Инициализировать SparkSession spark = SparkSession.builder.appName('foo').getOrCreate() # Извлечь данные из файла csv df = spark.read.csv('input_csv.csv') # Преобразование с использованием пользовательской функции, определенной в другом месте df = apply_data_transforms(df) # Записать в файл csv df.write.csv('output_csv.csv')
Очевидно, Spark может делать гораздо больше, чем просто читать и писать в файлы CSV, но это дает вам представление о его интуитивно понятном API. Подумайте о Spark, если вам нужна скорость и объем операций с данными.
petl
Python ETL (petl) — это инструмент, в основе которого лежит простота использования и удобство. Если вы работаете со смешанными качественными, незнакомыми и разнородными данными, petl создан для вас! С помощью petl вы можете создавать таблицы на Python из различных источников данных (CSV, XLS, HTML, TXT, JSON и т.д.) И выводить их в желаемый формат хранения.
Petl ориентирован только на ETL. Таким образом, он более эффективен, чем pandas, поскольку он не загружает базу данных в память каждый раз, когда выполняет строку кода. С другой стороны, он не включает дополнительных функций, таких как встроенный анализ данных или визуализация.
Вот пример того, как читать в нескольких файлах CSV,объедините их вместе и запишите в новый файл CSV:
import petl # Распаковать два CSV-файла в две таблицы table1 = petl.fromcsv('input_csv1.csv') table2 = petl.fromcsv('input_csv2.csv') # Объединяем таблицы вместе, чтобы создать новую таблицу table3 = petl.cat(table1, table2) # Записываем новую таблицу в csv petl.tocsv(table3, 'output_csv.csv')
Petl все еще находится в активной разработке, но существует расширенная библиотека
ETL‑фреймворки
Мы обсудили некоторые инструменты, которые можно комбинировать для создания собственного решения Python ETL (например, Airflow и Spark). Но теперь давайте посмотрим на инструменты Python, которые могут обрабатывать каждый шаг процесса извлечения-преобразования-загрузки.
Bonobo
Если вам нравится работать с Python, вы не хотите изучать новый API и хотите создавать полусложные масштабируемые конвейеры ETL,
Bonobo имеет инструменты ETL для создания конвейеров данных, которые могут обрабатывать несколько источников данных параллельно, и имеет расширение SQLAlchemy (в настоящее время в альфа-версии), которое позволяет подключать конвейер напрямую к базам данных SQL.
Эта структура должна быть доступна для всех, кто имеет базовый уровень владения Python, и включает в себя визуализатор графа процесса ETL, который упрощает отслеживание вашего процесса. Кроме того, вы можете начать работу в течение 10 минут благодаря превосходно написанному руководству.
Вот базовый конвейер Bonobo ETL, адаптированный из учебника. Обратите внимание, что все это просто функция или генератор Python.
import bonobo def extract(): # Данные, с которыми будет работать наш конвейер ETL yield 'hello' yield 'world!' def transform(*args): # Делает входные данные из функции extract () заглавными yield (map(str.title, args)) def load(*args): # Печатает (теперь заглавные) вводимые данные на экран print(*args)
Вы можете связать эти функции вместе в виде графика (исключенного здесь для краткости) и запустить его в командной строке как простой файл Python, например, $ python my_etl_job.py
.
Одна из проблем заключается в том, что Bonobo еще не до версии 1.0, и их Github не обновлялся с июля 2019 года. Кроме того,
Mara
Если вы взглянули на Airflow и считаете, что он слишком сложен для того, что вам нужно, и вам не нравится идея писать всю логику ETL самостоятельно,
Mara снижает сложность вашего конвейера ETL, делая некоторые предположения. Вот некоторые из них: 1) у вас должен быть PostgreSQL в качестве механизма обработки данных, 2) вы используете декларативный код Python для определения ваших конвейеров интеграции данных, 3) вы используете командную строку в качестве основного инструмента для взаимодействия с вашими базами данных, и 4) вы используете их красиво оформленный веб-интерфейс (который можно вставить в любое приложение Flask) в качестве основного инструмента для проверки, запуска и отладки ваших конвейеров.
Вот демонстрационный Mara-конвейер, который трижды проверяет локальный хост:
from mara_pipelines.commands.bash import RunBash from mara_pipelines.pipelines import Pipeline, Task # Инициализировать объект конвейера pipeline = Pipeline( id='demo', description='A pipeline that pings localhost 3 times') # Добавить задачу в конвейер - трижды пинговать localhost pipeline.add(Task(id='ping_localhost', description='Pings localhost', commands=[RunBash('ping -c 3 localhost')]))
Обратите внимание, что документация все еще находится в стадии разработки, и что Mara изначально не работает в Windows. Однако он все еще находится в активной разработке, поэтому, если вы хотите что-то среднее между двумя крайностями, упомянутыми выше, попробуйте Mara.
Pygrametl
В приведенном ниже примере мы создаем FactTable для книжного магазина с подробным описанием того, какие книги были проданы и в какое время:
from pygrametl.tables import Dimension, FactTable # Информация о проданной книге book_dimension = Dimension( name='book', key='bookid', attributes=['book', 'genre']) # Информация о том, когда книга была продана time_dimension = Dimension( name='time', key='timeid', attributes=['day', 'month', 'year']) # Объедините информацию о книге и времени в таблицу фактов, измеряющую общий объем продаж fact_table = FactTable( name='facttable', keyrefs=['bookid', 'timeid'], measures=['sale'])
Одним из потенциальных недостатков является то, что эта библиотека существует уже более десяти лет, но еще не приобрела широкой популярности. Это может указывать на то, что на практике это не так удобно. Однако pygrametl работает как в CPython, так и в Jython, поэтому он может быть хорошим выбором, если у вас есть существующий код Java и/или драйверы JDBC в конвейере обработки ETL.
Маленький, но мощный
Следующие ниже инструменты Python ETL не являются полноценными решениями ETL, но предназначены для выполнения тяжелой работы на определенной части процесса. Если вы смешиваете много инструментов, подумайте о добавлении одного из следующих.
odo
Это может быть наградой за лучшую маленькую библиотеку ETL на свете.
Функция принимает два аргумента odo (источник, цель) и преобразует источник в цель. Итак, чтобы преобразовать кортеж (1, 2, 3) в список, выполните:
>>> from odo import odo >>> odo((1, 2, 3), list) [1, 2, 3]
Или для перехода между HDF5 и PostgreSQL выполните:
>>> odo('myfile.hdf5::/data', 'postgresql://user:pass@host::my-table') Table('my-table', MetaData(bind=Engine(postgresql://user:****@host)), ...)
Odo работает под капотом, соединяя разные типы данных через
Более того, odo использует собственные возможности загрузки CSV баз данных на основе SQL, которые значительно быстрее, чем при использовании чистого Python. Документация показывает, что Odo в 11 раз быстрее, чем чтение вашего CSV-файла в pandas, а затем его отправка в базу данных. Если вы обнаружите, что загружаете много данных из CSV в базы данных SQL, odo может стать для вас инструментом ETL.
Обратите внимание, что
ETLAlchemy
Этот легкий инструмент Python ETL позволяет выполнять миграцию между любыми двумя типами СУБД всего за 4 строки кода. ETLAlchemy может перенести вас от MySQL к SQLite, от SQL Server к Postgres или любой другой разновидности комбинаций.
Вот код в действии:
from etlalchemy import ETLAlchemySource, ETLAlchemyTarget source = ETLAlchemySource("mssql+pyodbc://username:password@DSN_NAME") target = ETLAlchemyTarget("mysql://username:password@hostname/db_name", drop_database=True) target.addSource(source) target.migrate()
Довольно просто, да?
Последний раз
Riko
Основное внимание
>>> from riko.modules import fetch # stream is an iterator of dictionaries containing info from # the RSS feed - a number of blog articles >>> stream = fetch.pipe(conf={'url': 'https://news.ycombinator.com/rss'}) # Get the first item (blog article) in the iterator of dictionaries >>> item = next(stream) # Inspect information about the blog article >>> item['title'], item['author'], item['id'] ('Gravity doesn’t care about quantum spin', 'Chris Lee', 'http://arstechnica.com/?p=924009')
(Результаты будут отличаться от приведенных выше, поскольку фид обновляется несколько раз в день).
Riko все еще находится в стадии разработки, поэтому, если вы ищете движок потоковой обработки, это может быть вашим ответом.
Locopy
Capital One создал мощный инструмент Python ETL с Locopy, который позволяет легко (раз) загружать и копировать данные в Redshift или Snowflake. API прост, понятен и выполняет свою работу.
Например, вот как вы можете загрузить данные из Redshift в CSV:
# Библиотека для взаимодействия с движком базы данных PostgreSQL import pg8000 import locopy # Информация вашего профиля my_profile = "some_profile_with_valid_tokens" # Настроить конфигурацию экземпляра Redshift redshift_config = locopy.Redshift(dbapi=pg8000, config_yaml="config.yml", profile=my_profile) with redshift_config as redshift: redshift.unload_and_copy( query="SELECT * FROM schema.table", s3_bucket="my_s3_bucket", export_path="my_output_destination.csv")
Он все еще активно поддерживается, поэтому, если вы ищете специально инструмент, который упрощает ETL с Redshift и Snowflake, обратите внимание на locopy.
Инструменты, проверенные временем
Вот список инструментов, которые мы рекомендовали в прошлом, но сейчас они не находятся в активной разработке. Возможно, вам удастся их использовать в краткосрочной перспективе, но мы не советуем вам создавать что-либо большого размера из-за присущей им нестабильности из-за отсутствия разработки.
Bubbles
Bubbles — это популярная среда Python ETL, упрощающая создание конвейеров ETL. Bubbles написан на Python, но не зависит от технологий. Он настроен для работы с объектами данных — представлениями наборов данных, являющихся ETL, — чтобы максимизировать гибкость в конвейере ETL пользователя. Если ваш конвейер ETL имеет много узлов с форматом -зависимое поведение, Пузыри могут быть решением для вас. Репозиторий Github не подвергался активной разработке с 2015 года, поэтому некоторые функции могут быть устаревшими.
mETL
mETL — это инструмент Python ETL, который автоматически создает файл YAML для извлечения данных из заданного файла и загрузки их в базу данных SQL. Это немного больше рук-по сравнению с некоторыми другими описанными здесь пакетами, но может работать с широким спектром источников данных и целевых объектов, включая стандартные плоские файлы, таблицы Google и полный набор диалектов SQL (включая Microsoft SQL Server). Недавние обновления предоставили некоторые настройки для обхода замедления, вызванного некоторыми драйверами Python SQL,так что это может быть пакет для вас, если вам нравится, что ваш процесс ETL похож на Python, но быстрее.
Carry
Carry — это пакет Python, который объединяет SQLAlchemy и Pandas. Это полезно для перехода между CSV и общими типами реляционных баз данных, включая Microsoft SQL Server, PostgreSQL, SQLite, Oracle и другие. Используя Carry,несколько таблиц могут быть перенесены параллельно, и в процессе могут выполняться сложные преобразования данных. Carry может автоматически создавать и сохранять представления на основе перенесенных данных SQL для использования в будущем.
Etlpy
Etlpy — это библиотека Python, предназначенная для оптимизации конвейера ETL, который включает в себя парсинг веб-страниц и очистку данных.Однако большая часть документации на китайском языке, поэтому он может не подойти вам, если вы не говорите по-китайски или не привыкли полагаться на Google Translate. Etlpy предоставляет графический интерфейс для разработки поисковых роботов / парсеров и инструментов для очистки данных. Создав инструмент, вы можете сохранить его как файл XML и передать его в механизм etlpy., который, по-видимому, предоставляет словарь Python в качестве вывода. Это может быть ваш выбор, если вы хотите извлечь большой объем данных, использовать для этого графический интерфейс и говорить по-китайски.
Более простая альтернатива
Использование инструментов Python ETL — один из способов настроить вашу инфраструктуру ETL. Однако, как и в случае со всеми проектами кодирования, это может быть дорогостоящим, трудоемким и полным неожиданных проблем.
Если вы просто хотите синхронизировать, хранить и легко получать доступ к своим данным, Panoply для вас. Вместо того, чтобы тратить недели на кодирование конвейера ETL на Python, сделать это за несколько минут и щелкнуть мышью с Panoply. Это единственный инструмент конвейера данных, который легко помещает все ваши бизнес-данные в одно место, предоставляет всем сотрудникам неограниченный доступ, который им нужен, и не требует обслуживания. Кроме того, у Panoply есть встроенное хранилище, поэтому вам не нужно манипулировать несколькими поставщиками, чтобы обеспечить поток данных.
По материалам
Лучшие инструменты Python ETL на 2021 год, опубликовано К ВВ, лицензия — Creative Commons Attribution-NonCommercial 4.0 International.
Респект и уважуха