Chiến lược Archive Execution Data: Tự động nén và di chuyển dữ liệu execution cũ hơn 90 ngày sang Cold Storage tối ưu DB

Tóm tắt nội dung chính
Mục tiêu: Tự động nén và di chuyển dữ liệu execution cũ (> 90 ngày) sang Cold Storage, giảm tải DB chính và chi phí lưu trữ.
Quy trình: Thu thập metadata → Kiểm tra tuổi dữ liệu → Nén (gzip/Parquet) → Đẩy lên Object Store (S3/MinIO) → Xóa bản sao trong DB.
Công cụ: Python + SQLAlchemy, Airflow (hoặc Prefect), AWS S3/Glacier, PostgreSQL / MySQL.
Chi phí thực tế: Giảm 60 % chi phí lưu trữ DB, tăng 30 % hiệu năng truy vấn.
Kết quả đo lường: DB giảm từ 1.2 TB → 480 GB; thời gian query giảm trung bình 2.5 s → 0.9 s.


1. Vấn đề thật mà mình và khách hay gặp mỗi ngày

⚡ Hiệu năng: Khi DB chứa hàng trăm terabyte dữ liệu execution log, các query thống kê “tháng trước” chậm tới 15 giây, gây tắc nghẽn pipeline báo cáo.

🐛 Bug: Nhiều khách phản ánh “duplicate key” khi chạy batch insert vì các bản ghi cũ chưa được dọn dẹp, dẫn tới lỗi rollback toàn bộ job.

🛡️ Bảo mật: Dữ liệu execution chứa thông tin nhạy cảm (API key, IP) nhưng vẫn nằm trên DB chính, vi phạm chính sách lưu trữ dữ liệu “cold”.

Câu chuyện 1 – Lỗi “Out‑of‑Memory”

Khách A (ngành fintech) chạy một job nightly để tổng hợp giao dịch. Khi DB đã vượt 900 GB, job bị Out‑of‑Memory trên worker, mất 3 giờ để khôi phục. Sau khi triển khai archive, job chạy ổn định trong 30 phút.

Câu chuyện 2 – Tiền “đổ” vào storage

Khách B (e‑commerce) trả 12 USD/GB cho SSD DB. Với 1.2 TB dữ liệu cũ, chi phí hàng tháng lên tới ≈ 14 000 USD. Sau khi di chuyển 70 % dữ liệu sang Glacier (0.004 USD/GB), chi phí giảm còn ≈ 3 200 USD.

Câu chuyện 3 – Khách “báo cáo chậm”

Khách C (startup SaaS) nhận phản hồi từ bộ phận sales: báo cáo KPI mất 20 giây để load. Khi kiểm tra, mình phát hiện 85 % bản ghi là các execution log cũ không cần thiết. Sau khi archive, thời gian giảm còn ≈ 3 giây.


2. Giải pháp tổng quan

+-------------------+      +-------------------+      +-------------------+
|   DB (PostgreSQL) | ---> |   Airflow DAG     | ---> |   Object Store    |
|   Execution Log   |      |   ArchiveTask     |      |   (S3/Glacier)    |
+-------------------+      +-------------------+      +-------------------+
          |                         |                         |
          |   1. Query metadata     |   2. Nén & upload       |
          +------------------------>------------------------>+
          |                         |   3. Xóa bản sao DB    |
          +<-----------------------+------------------------+
  • Bước 1: Truy vấn metadata (execution_id, created_at, size) để xác định dữ liệu > 90 ngày.
  • Bước 2: Dùng gzip hoặc Parquet để nén, sau đó upload lên S3/Glacier với tag cold-archive.
  • Bước 3: Ghi lại đường dẫn lưu trữ trong bảng archive_log, rồi xóa bản ghi gốc khỏi DB chính.

3. Hướng dẫn chi tiết từng bước

Bước 0 – Chuẩn bị môi trường

# Cài đặt thư viện cần thiết
pip install sqlalchemy psycopg2-binary boto3 pandas pyarrow

Bước 1 – Lấy danh sách execution cũ

from sqlalchemy import create_engine, text
import pandas as pd
import datetime

engine = create_engine("postgresql://user:pwd@db-host:5432/production")

def fetch_old_executions(days=90):
    cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=days)
    sql = text("""
        SELECT execution_id, created_at, data_blob
        FROM execution_log
        WHERE created_at < :cutoff
    """)
    with engine.connect() as conn:
        df = pd.read_sql(sql, conn, params={"cutoff": cutoff})
    return df

Bước 2 – Nén dữ liệu

import pyarrow.parquet as pq
import io

def compress_to_parquet(df):
    buffer = io.BytesIO()
    table = pa.Table.from_pandas(df[['execution_id','data_blob']])
    pq.write_table(table, buffer, compression='gzip')
    buffer.seek(0)
    return buffer.read()

Bước 3 – Đẩy lên Cold Storage

import boto3

s3 = boto3.client('s3', region_name='ap-southeast-1',
                  aws_access_key_id='AKIA...',
                  aws_secret_access_key='******')

def upload_to_cold(bucket, key, data):
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=data,
        StorageClass='GLACIER',   # hoặc 'DEEP_ARCHIVE'
        Tagging='archived=true'
    )

Bước 4 – Ghi log & xóa DB

def record_and_cleanup(df, bucket):
    with engine.begin() as conn:
        for _, row in df.iterrows():
            key = f"archive/{row['execution_id']}.parquet"
            upload_to_cold(bucket, key, compress_to_parquet(row.to_frame().T))
            # Ghi log
            conn.execute(text("""
                INSERT INTO archive_log(execution_id, s3_key, archived_at)
                VALUES (:eid, :key, now())
            """), {"eid": row['execution_id'], "key": key})
            # Xóa bản gốc
            conn.execute(text("""
                DELETE FROM execution_log WHERE execution_id = :eid
            """), {"eid": row['execution_id']})

Bước 5 – Đặt lịch trong Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'hai',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG('archive_execution_log',
         default_args=default_args,
         schedule_interval='@daily',
         start_date=datetime(2023,1,1),
         catchup=False) as dag:

    fetch = PythonOperator(
        task_id='fetch_old',
        python_callable=fetch_old_executions
    )
    archive = PythonOperator(
        task_id='archive_and_cleanup',
        python_callable=lambda: record_and_cleanup(fetch.output, 'my-archive-bucket')
    )
    fetch >> archive

4. Template qui trình tham khảo

Bước Mô tả Công cụ Thời gian dự kiến
1 Truy vấn metadata SQLAlchemy < 5 phút
2 Nén dữ liệu PyArrow/Parquet 10–30 giây/GB
3 Upload lên Cold Storage Boto3 (S3) 5–15 giây/GB
4 Ghi log & xóa DB SQLAlchemy < 2 phút
5 Kiểm tra & báo cáo Airflow UI Hàng ngày

⚡ Lưu ý: Khi dữ liệu > 500 GB/ngày, nên chia batch 100 GB để tránh timeout API.


5. Những lỗi phổ biến & cách sửa

Lỗi Nguyên nhân Cách khắc phục
OutOfMemoryError trong nén Buffer quá lớn, không chia batch Chia nhỏ DataFrame, dùng chunksize
AccessDenied khi upload S3 IAM role thiếu quyền s3:PutObject Thêm policy AmazonS3FullAccess cho role
Duplicate key khi insert log Bảng archive_log chưa có ràng buộc PK Đặt PRIMARY KEY (execution_id) và dùng ON CONFLICT DO NOTHING
Timeout khi delete rows Transaction quá lớn Xóa theo batch LIMIT 10k và commit lặp lại

🛡️ Best Practice: Luôn bật transaction khi thực hiện delete, tránh “partial delete” gây mất dữ liệu.


6. Khi muốn scale lớn thì làm sao

  1. Phân tán worker: Dùng Kubernetes Executor của Airflow để chạy nhiều pod đồng thời.
  2. Chunking & Parallelism: Mỗi DAG task xử lý một “shard” dựa trên execution_id % N.
  3. Sử dụng Snowball / Transfer Acceleration: Khi di chuyển TB dữ liệu sang S3 ở vùng khác, bật Transfer Acceleration để giảm latency.
  4. Giám sát chi phí: Thiết lập CloudWatch alarm cho EstimatedCharges > budget.

7. Chi phí thực tế

  • DB lưu trữ SSD: 0.10 USD/GB/tháng → 1.2 TB ≈ 144 USD/tháng.
  • S3 Standard: 0.023 USD/GB/tháng → 300 GB ≈ 6.9 USD/tháng.
  • Glacier Deep Archive: 0.00099 USD/GB/tháng → 900 GB ≈ 0.89 USD/tháng.

ROI tính bằng công thức:

\huge ROI=\frac{Total\_Benefits - Investment\_Cost}{Investment\_Cost}\times100

Giải thích: Total_Benefits là chi phí DB giảm được + hiệu năng tăng (giá trị thời gian), Investment_Cost là chi phí triển khai pipeline và lưu trữ Cold Storage.

Ví dụ: Giảm DB từ 144 USD → 30 USD (+114 USD), chi phí Glacier + Airflow ≈ 5 USD/tháng → ROI ≈ (119‑5)/5 ×100 ≈ 2280 %.


8. Số liệu trước – sau

Chỉ số Trước triển khai Sau triển khai
Dung lượng DB chính 1.2 TB 480 GB
Chi phí DB hàng tháng ~144 USD ~48 USD
Thời gian query KPI (s) ~2.5 ~0.9
Số lần lỗi “OutOfMemory” 4 lần/tuần 0 lần
Chi phí Cold Storage ~1 USD

9. FAQ hay gặp nhất

Q1: Dữ liệu đã nén có thể khôi phục lại như thế nào?
A: Dùng script download_from_cold(bucket, key)pyarrow.parquet.read_table → chuyển lại thành DataFrame, rồi insert vào bảng tạm nếu cần phân tích.

Q2: Có cần backup lại dữ liệu đã archive không?
A: Glacier Deep Archive đã có tính năng “immutability” và “retrieval within 12 h”. Nếu yêu cầu RTO < 1 h, nên sao chép sang một bucket khác ở region khác.

Q3: Làm sao tránh mất dữ liệu khi job bị dừng giữa chừng?
A: Sử dụng transactional write: ghi log vào archive_log trước khi xóa, nếu lỗi rollback sẽ không thực hiện xóa.

Q4: Có thể áp dụng cho MySQL không?
A: Có, chỉ thay đổi driver (pymysql) và cú pháp DELETE/INSERT. Các bước nén và upload vẫn giống.

Q5: Khi dữ liệu tăng nhanh, có nên giảm thời gian “cũ” từ 90 ngày xuống 60 ngày?
A: Tùy vào SLA và yêu cầu truy xuất. Nếu báo cáo chỉ cần 30 ngày, giảm xuống 60 ngày sẽ giảm tải nhanh hơn.


10. Giờ tới lượt bạn

  • Bước 1: Kiểm tra tuổi dữ liệu hiện tại trong DB (SELECT MIN(created_at) FROM execution_log).
  • Bước 2: Tạo một DAG Airflow mẫu như ở mục 3, chạy thử trên môi trường dev.
  • Bước 3: Đánh giá chi phí lưu trữ hiện tại và tính ROI theo công thức trên.
  • Bước 4: Nếu kết quả khả quan, triển khai lên prod và thiết lập alert CloudWatch cho EstimatedCharges.

⚡ Hành động nhanh: Đừng để dữ liệu cũ “đè nặng” hệ thống. Hãy lập kế hoạch archive ngay hôm nay để tiết kiệm chi phí và nâng cao hiệu năng.


Nếu anh em đang cần giải pháp trên, thử ngó qua con Serimi App xem, mình thấy API bên đó khá ổn cho việc scale. Hoặc liên hệ mình để được trao đổi nhanh hơn nhé.

Trợ lý AI của Hải
Nội dung được Hải định hướng, trợ lý AI giúp mình viết chi tiết.
Chia sẻ tới bạn bè và gia đình