current position:Home>Python ETL tool

Python ETL tool

2022-02-02 07:16:33 Programmer Zeng Zeng

pyetl It's pure. python Developed ETL frame , comparison sqoop, datax And so on. ETL Tools ,pyetl You can add... To each field udf function , Make the data conversion process more flexible , Compared with professional ETL Tools pyetl A lightweight , pure python Code operation , More in line with developers' habits

install

pip3 install pyetl

Examples of use

database Data synchronization between tables

from pyetl import Task, DatabaseReader, DatabaseWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")
Task(reader, writer).start()
 Copy code 

Database table to hive Table synchronization

from pyetl import Task, DatabaseReader, HiveWriter2
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = HiveWriter2("hive://localhost:10000/default", table_name="target")
Task(reader, writer).start()
 Copy code 

Database table synchronization es

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget")
Task(reader, writer).start()
 Copy code 

The field names of the original table and the target table are different , Need to add field mapping add to

#  Original table source contain uuid,full_name Field 
reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
#  Target table target contain id,name Field 
writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# columns Configure the field mapping relationship between the target table and the original table 
columns = {"id": "uuid", "name": "full_name"}
Task(reader, writer, columns=columns).start()
 Copy code 

Field udf mapping , Perform rule verification on the field 、 Data standardization 、 Data cleaning, etc

# functions Configuration field udf mapping , as follows id Turn the string ,name Remove the space before and after 
functions={"id": str, "name": lambda x: x.strip()}
Task(reader, writer, columns=columns, functions=functions).start()
 Copy code 

Inherit Task Class flexible extension ETL Mission

import json
from pyetl import Task, DatabaseReader, DatabaseWriter

class NewTask(Task):
  reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
  writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
  
  def get_columns(self):
    """ Generate field mapping configuration by function , More flexible use """
    #  The following example takes out the field mapping configuration in the database and returns the conversion dictionary type 
    sql = "select columns from task where name='new_task'"
    columns = self.writer.db.read_one(sql)["columns"]
    return json.loads(columns)
   
  def get_functions(self):
    """ Generate the field by function udf mapping """
    #  The following example converts each field type to a string 
    return {col: str for col in self.columns}
   
  def apply_function(self, record):
    """ The of an entire piece of data in a data stream udf"""
    record["flag"] = int(record["id"]) % 2
    return record

  def before(self):
    """ What to do before the task starts ,  Such as initializing the task table , Create target table, etc """
    sql = "create table destination_table(id int, name varchar(100))"
    self.writer.db.execute(sql)
  
  def after(self):
    """ Actions to be performed after the task is completed , Such as updating task status, etc """
    sql = "update task set status='done' where name='new_task'"
    self.writer.db.execute(sql)

NewTask().start()
 Copy code 

At present, it has been realized Reader and Writer list

Reader Introduce
DatabaseReader Support reading of all relational databases
FileReader Structured text data reading , Such as csv file
ExcelReader Excel Table file read
Writer Introduce
DatabaseWriter Support the writing of all relational databases
ElasticSearchWriter Batch write data to es Indexes
HiveWriter Batch insert hive surface
HiveWriter2 Load data Mode import hive surface ( recommend )
FileWriter Write data to a text file

summary

This is about python ETL Tools pyetl This is the end of the article

copyright notice
author[Programmer Zeng Zeng],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/02/202202020716298154.html

Random recommended