+11

Cứ thực hành Airflow dễ hiểu và đơn giản đã, chưa làm gì phức tạp cả

Mở đầu

Tiếp nối bài viết chỉ toàn lý thuyết Bài viết về Airflow cho người mới như mình thì chúng ta đi ngay tới bài thực hành này thôi

Chú ý là mình sẽ thực hành cùng với Python nhé ạ, bài viết sẽ gồm 2 phần thực hành chính

  • Thực hành với các tác vụ đơn giản
  • Thực hành với bài toán đào tạo mô hình Deep Learning

Cài đặt môi trường

  • Trong bài viết của anh Hoàng, có đề cập tới việc setup nhanh chóng với docker-compose, các bạn có thể tham khảo nhé
  • Mình hướng tới một cái gì đó chân chất, dễ hiểu, dễ tiếp cận cho người mới, ít động vào nhiều cái liên quan nên mình sẽ setup tay =)))

Chuẩn bị trước

  • Python: 3.7, 3.8, 3.9, 3.10
  • Minimum memory: 4 gb

Cài đặt Airflow bằng pip

  • Cài đặt các dependencies của Linux:
sudo apt-get install libmysqlclient-dev
sudo apt-get install libssl-dev
sudo apt-get install libkrb5-dev
  • Setup đường dẫn tới Airflow
export AIRFLOW_HOME=~/airflow
  • Assign 3 biến environment: AIRFLOW_VERSION, PYTHON_VERSION và CONSTRAINT_URL
export AIRFLOW_VERSION=2.3.3
export PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
  • Khi bạn assign một biến environment bằng lệnh export, biến này chỉ tồn tại trong phiên làm việc hiện tại của terminal. Để biến environment này tồn tại trong các phiên làm việc khác, bạn cần thêm lệnh export vào tệp ~/.bashrc
  • Nhớ lưu lại các thay đổi
source .bashrc
  • Cài đặt Airflow bằng pip
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
  • Khởi chạy các components của airflow
airflow webserver -p 8080
airflow scheduler

Các bạn mở trình duyệt lên, vào localhost:8080 để xem kết quả nhé

image.png

Thử với một tác vụ cơ bản nào

Cứ đi từ dễ nhất trước nhé, để chúng ta hiểu hơn về cấu trúc một file DAG. Thử một chương trình Hello world xem sao. Chúng ta sẽ cùng tạo 1 file my_dag.py có nội dung như sau

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from pytz import timezone
import os 


local_tz = timezone('Asia/Ho_Chi_Minh')


# Định nghĩa các hàm xử lý dữ liệu
def process_data():
    print('process data')
    
def save_data():
    print('save data')
    print()


# Định nghĩa DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 6, 7, tzinfo=local_tz),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='my_dag',
    default_args=default_args,
    description='A simple DAG',
    schedule_interval='* * * * *'
)

# Định nghĩa các Task
task1 = BashOperator(
    task_id='task1',
    bash_command='echo "Task 1"',
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=process_data,
    dag=dag,
)

task3 = PythonOperator(
    task_id='task3',
    python_callable=save_data,
    dag=dag,
)

# Thiết lập phụ thuộc giữa các Task
task1 >> task2 >> task3

if __name__ == "__main__":
    dag.cli()

Trước khi giải thích các phần trong mã nguồn trên, sẽ có một số lưu ý như sau:

  • Tên file mã nguồn cần trung với dag_id
  • File mã nguồn cần được lưu trong thư mục dags của airflow, mặc định sẽ là AIRFLOW_HOME/dags
  • Mỗi khi tạo 1 file DAG và đưa vào thư mục dags cần khởi động lại airflow webserver

Chúng ta sẽ cùng phân tích từng thành phần có trong file mã nguồn trên

  • ĐỊnh nghĩa DAG: đây là phần chúng ta sẽ định danh 1 số thông tin như dag_id để phân biệt các DAGs, người sở hữu, có phụ thuộc vào DAG nào đó hay không, ngày bắt đầu chạy, số lần thử lại nếu lỗi, thời gian thử lại nếu lỗi, mô tả DAG, thời gian tự lặp lại chạy DAG, ...
# Định nghĩa DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 6, 7, tzinfo=local_tz),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='my_dag',
    default_args=default_args,
    description='A simple DAG',
    schedule_interval='* * * * *'
)
  • Định nghĩa các Tasks có trong DAG: Tùy vào các loại Task, chúng ta sẽ lựa chọn Operator hoặc Sensor sao cho phù hợp. MÌnh có giới thiệu các Operator và Sensor ở bài 1, các bạn có thể xem lại. Ví dụ mình dùng BashOperator, và lệnh thực thi của mình là echo "Task 1". Thông tin tối thiểu cần định nghĩa cho Task đó là task_id, dag và thực thi cái gì
# Định nghĩa các Task
task1 = BashOperator(
    task_id='task1',
    bash_command='echo "Task 1"',
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=process_data,
    dag=dag,
)

task3 = PythonOperator(
    task_id='task3',
    python_callable=save_data,
    dag=dag,
)
  • Thiết lập phụ thuộc giữa các Tasks: Dưới đây là những kiểu phụ thuộc thông dụng và ký hiệu tương ứng:

    • >>: Tác vụ bên trái phải được thực thi trước khi tác vụ bên phải được thực thi. Đây là kiểu phụ thuộc mặc định giữa các tác vụ trong Airflow.
    • <<: Tác vụ bên phải phải được thực thi trước khi tác vụ bên trái được thực thi.
    • >> và <<: Cả hai tác vụ phải được thực thi trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='all_success': Tất cả các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='one_success': Một trong các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='all_failed': Tất cả các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
    • >> với trigger_rule='one_failed': Một trong các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
    • Ví dụ về trigger_rule:
    t1 >> t2 >> t3
    t3.trigger_rule = 'all_failed'
    
  • Chạy DAG: Bạn có thể sử dụng dag.run() để chạy DAG của bạn trong một ứng dụng Python hoặc sử dụng dag.cli() để chạy các lệnh dòng lệnh Airflow từ DAG của bạn.

Vậy chúng ta cùng xem kết quả nào. Trước hêt nhớ những cái mình vừa note bên trên nhé, sau đó chúng ta sẽ thấy DAG của mình đã xuất hiện trên Webserver

image.png

Chúng ta sẽ chạy thử DAG của mình bằng nút gạt bên trái. Hiện tại code mình đang cho nó 1 phút chạy 1 lần và mỗi task sẽ in ra một dòng chữ. Trạng thái và số lượng những task theo từng trạng thái được hiển thị trong các ô tròn, các bạn trỏ chuột vào đó sẽ hiện ra những cái hint notes

image.png

Những thông tin in ra sẽ được lưu trữ trong log, mặc định ở thư mục AIRFLOW_HOME/logs

Xuất phát từ dân AI thì thử với Deep learning xem sao

image.png

Về nội dung phần này có lẽ mình sẽ nói chung chung một chút. Mình đặt ra một quy trình đơn giản trong việc thiết kế và xây dựng mô hình học máy như hình vẽ trên, trong đó chúng ta giả sử lựa chọn ra 3 kiến trúc mô hình phù hợp cho bài toán của mình, muốn tự động hóa việc training, lựa chọn mô hình có độ chính xác tốt nhất, sau đó đánh giá lại xem nó có hiệu quả hay không

Mình sẽ không trình bày các code liên quan đến training, evaluate quá sâu trong bài viết này

Khởi tạo DAG

Một công đoạn không thể thiếu khi chúng ta triển khai AirFlow

dag = DAG("my_dag", # Dag id
        start_date=datetime(2023, 1 ,1), # start date, the 1st of January 2021 
        schedule_interval='@daily',  # Cron expression, here it is a preset of Airflow, @daily means once every day.
        description='A simple ML flow with DAG',

Training mô hình

Ở đây chúng ta sẽ có 3 task đại diện cho việc training 3 môn hình

def _training_model():
    return random.randint(0, 10)

# Tasks are implemented under the dag object
training_model_A = PythonOperator(
    task_id="training_model_A",
    python_callable=_training_model,
    dag=dag
)
training_model_B = PythonOperator(
    task_id="training_model_B",
    python_callable=_training_model,
    dag=dag
)
training_model_C = PythonOperator(
    task_id="training_model_C",
    python_callable=_training_model,
    dag=dag
)

Kiểm tra xem có mô hình đạt yêu cầu hay không

def _choosing_best_model(ti):
    accuracies = ti.xcom_pull(task_ids=[
        'training_model_A',
        'training_model_B',
        'training_model_C'
    ])
    if max(accuracies) > 8:
        return 'accurate'
    return 'inaccurate'
    

choosing_best_model = BranchPythonOperator(
    task_id="choosing_best_model",
    python_callable=_choosing_best_model,
    dag=dag
)

Ở đây các bạn sẽ bắt gặp một từ khóa mới, đó là xcom_pull. Các bạn có thể hiểu đơn giản như sau:

  • XCOM (Cross-Communication Messages) là một cơ chế cho phép dữ liệu đổi giữa các tác vụ DAG
  • Hàm _choosing_best_model được sử dụng để lấy thông tin về độ chính xác của 3 task training mô hình A, B, C, nếu 1 trong 3 độ chính xác này đạt một ngưỡng nào đó (ở đây việc training mình cho trả về random 1 giá trị từ 0 đến 10) thì trả về "đạt"

Hậu xử lý khi tối thiểu 1 trong 3 mô hình đã đạt kỳ vọng

Sau khi training 3 mô hình và xảy ra 2 trường hợp là tối thiểu 1 trong 3 mô hình đã đạt kỳ vọng hoặc không có mô hình nào đạt, thì cần có những hành động tiếp theo. Dưới đây mình có ví dụ dễ hiểu 1 chút

accurate = BashOperator(
    task_id="accurate",
    bash_command="echo 'Prediction'"
)
inaccurate = BashOperator(
    task_id="inaccurate",
    bash_command=" echo 'Retraining'"
)

Xác định phụ thuộc cho các task

training_model_tasks = [
    PythonOperator(
        task_id=f"training_model_{model_id}",
        python_callable=_training_model,
        op_kwargs={
            "model": model_id
        }
    ) for model_id in ['A', 'B', 'C']
]
choosing_best_model = BranchPythonOperator(
    task_id="choosing_best_model",
    python_callable=_choosing_best_model
)

training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

Tổng kết

Trên đây mình có giới thiệu qua một chút về việc thực hành với Airflow một cách đơn giản để hiểu quy tình cũng như các bước cơ bản khi làm việc với nó. Thời gian tới khi có dịp làm việc với Airflow nhiều hơn, mình sẽ chia sẻ thêm với các bạn. Cảm ơn mọi người đã đọc đến những dòng cuối này ^^

Tài liệu tham khảo


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.