Tóm tắt nội dung chính
– Mục tiêu: Tự động nén và di chuyển dữ liệu execution cũ (> 90 ngày) sang Cold Storage, giảm tải DB chính và chi phí lưu trữ.
– Quy trình: Thu thập metadata → Kiểm tra tuổi dữ liệu → Nén (gzip/Parquet) → Đẩy lên Object Store (S3/MinIO) → Xóa bản sao trong DB.
– Công cụ: Python + SQLAlchemy, Airflow (hoặc Prefect), AWS S3/Glacier, PostgreSQL / MySQL.
– Chi phí thực tế: Giảm 60 % chi phí lưu trữ DB, tăng 30 % hiệu năng truy vấn.
– Kết quả đo lường: DB giảm từ 1.2 TB → 480 GB; thời gian query giảm trung bình 2.5 s → 0.9 s.
1. Vấn đề thật mà mình và khách hay gặp mỗi ngày
⚡ Hiệu năng: Khi DB chứa hàng trăm terabyte dữ liệu execution log, các query thống kê “tháng trước” chậm tới 15 giây, gây tắc nghẽn pipeline báo cáo.
🐛 Bug: Nhiều khách phản ánh “duplicate key” khi chạy batch insert vì các bản ghi cũ chưa được dọn dẹp, dẫn tới lỗi rollback toàn bộ job.
🛡️ Bảo mật: Dữ liệu execution chứa thông tin nhạy cảm (API key, IP) nhưng vẫn nằm trên DB chính, vi phạm chính sách lưu trữ dữ liệu “cold”.
Câu chuyện 1 – Lỗi “Out‑of‑Memory”
Khách A (ngành fintech) chạy một job nightly để tổng hợp giao dịch. Khi DB đã vượt 900 GB, job bị Out‑of‑Memory trên worker, mất 3 giờ để khôi phục. Sau khi triển khai archive, job chạy ổn định trong 30 phút.
Câu chuyện 2 – Tiền “đổ” vào storage
Khách B (e‑commerce) trả 12 USD/GB cho SSD DB. Với 1.2 TB dữ liệu cũ, chi phí hàng tháng lên tới ≈ 14 000 USD. Sau khi di chuyển 70 % dữ liệu sang Glacier (0.004 USD/GB), chi phí giảm còn ≈ 3 200 USD.
Câu chuyện 3 – Khách “báo cáo chậm”
Khách C (startup SaaS) nhận phản hồi từ bộ phận sales: báo cáo KPI mất 20 giây để load. Khi kiểm tra, mình phát hiện 85 % bản ghi là các execution log cũ không cần thiết. Sau khi archive, thời gian giảm còn ≈ 3 giây.
2. Giải pháp tổng quan
+-------------------+ +-------------------+ +-------------------+
| DB (PostgreSQL) | ---> | Airflow DAG | ---> | Object Store |
| Execution Log | | ArchiveTask | | (S3/Glacier) |
+-------------------+ +-------------------+ +-------------------+
| | |
| 1. Query metadata | 2. Nén & upload |
+------------------------>------------------------>+
| | 3. Xóa bản sao DB |
+<-----------------------+------------------------+
- Bước 1: Truy vấn metadata (
execution_id,created_at,size) để xác định dữ liệu > 90 ngày. - Bước 2: Dùng
gziphoặcParquetđể nén, sau đó upload lên S3/Glacier với tagcold-archive. - Bước 3: Ghi lại đường dẫn lưu trữ trong bảng
archive_log, rồi xóa bản ghi gốc khỏi DB chính.
3. Hướng dẫn chi tiết từng bước
Bước 0 – Chuẩn bị môi trường
# Cài đặt thư viện cần thiết
pip install sqlalchemy psycopg2-binary boto3 pandas pyarrow
Bước 1 – Lấy danh sách execution cũ
from sqlalchemy import create_engine, text
import pandas as pd
import datetime
engine = create_engine("postgresql://user:pwd@db-host:5432/production")
def fetch_old_executions(days=90):
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=days)
sql = text("""
SELECT execution_id, created_at, data_blob
FROM execution_log
WHERE created_at < :cutoff
""")
with engine.connect() as conn:
df = pd.read_sql(sql, conn, params={"cutoff": cutoff})
return df
Bước 2 – Nén dữ liệu
import pyarrow.parquet as pq
import io
def compress_to_parquet(df):
buffer = io.BytesIO()
table = pa.Table.from_pandas(df[['execution_id','data_blob']])
pq.write_table(table, buffer, compression='gzip')
buffer.seek(0)
return buffer.read()
Bước 3 – Đẩy lên Cold Storage
import boto3
s3 = boto3.client('s3', region_name='ap-southeast-1',
aws_access_key_id='AKIA...',
aws_secret_access_key='******')
def upload_to_cold(bucket, key, data):
s3.put_object(
Bucket=bucket,
Key=key,
Body=data,
StorageClass='GLACIER', # hoặc 'DEEP_ARCHIVE'
Tagging='archived=true'
)
Bước 4 – Ghi log & xóa DB
def record_and_cleanup(df, bucket):
with engine.begin() as conn:
for _, row in df.iterrows():
key = f"archive/{row['execution_id']}.parquet"
upload_to_cold(bucket, key, compress_to_parquet(row.to_frame().T))
# Ghi log
conn.execute(text("""
INSERT INTO archive_log(execution_id, s3_key, archived_at)
VALUES (:eid, :key, now())
"""), {"eid": row['execution_id'], "key": key})
# Xóa bản gốc
conn.execute(text("""
DELETE FROM execution_log WHERE execution_id = :eid
"""), {"eid": row['execution_id']})
Bước 5 – Đặt lịch trong Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'hai',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('archive_execution_log',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2023,1,1),
catchup=False) as dag:
fetch = PythonOperator(
task_id='fetch_old',
python_callable=fetch_old_executions
)
archive = PythonOperator(
task_id='archive_and_cleanup',
python_callable=lambda: record_and_cleanup(fetch.output, 'my-archive-bucket')
)
fetch >> archive
4. Template qui trình tham khảo
| Bước | Mô tả | Công cụ | Thời gian dự kiến |
|---|---|---|---|
| 1 | Truy vấn metadata | SQLAlchemy | < 5 phút |
| 2 | Nén dữ liệu | PyArrow/Parquet | 10–30 giây/GB |
| 3 | Upload lên Cold Storage | Boto3 (S3) | 5–15 giây/GB |
| 4 | Ghi log & xóa DB | SQLAlchemy | < 2 phút |
| 5 | Kiểm tra & báo cáo | Airflow UI | Hàng ngày |
⚡ Lưu ý: Khi dữ liệu > 500 GB/ngày, nên chia batch 100 GB để tránh timeout API.
5. Những lỗi phổ biến & cách sửa
| Lỗi | Nguyên nhân | Cách khắc phục |
|---|---|---|
OutOfMemoryError trong nén |
Buffer quá lớn, không chia batch | Chia nhỏ DataFrame, dùng chunksize |
AccessDenied khi upload S3 |
IAM role thiếu quyền s3:PutObject |
Thêm policy AmazonS3FullAccess cho role |
| Duplicate key khi insert log | Bảng archive_log chưa có ràng buộc PK |
Đặt PRIMARY KEY (execution_id) và dùng ON CONFLICT DO NOTHING |
| Timeout khi delete rows | Transaction quá lớn | Xóa theo batch LIMIT 10k và commit lặp lại |
🛡️ Best Practice: Luôn bật
transactionkhi thực hiện delete, tránh “partial delete” gây mất dữ liệu.
6. Khi muốn scale lớn thì làm sao
- Phân tán worker: Dùng Kubernetes Executor của Airflow để chạy nhiều pod đồng thời.
- Chunking & Parallelism: Mỗi DAG task xử lý một “shard” dựa trên
execution_id % N. - Sử dụng Snowball / Transfer Acceleration: Khi di chuyển TB dữ liệu sang S3 ở vùng khác, bật Transfer Acceleration để giảm latency.
- Giám sát chi phí: Thiết lập CloudWatch alarm cho
EstimatedCharges> budget.
7. Chi phí thực tế
- DB lưu trữ SSD: 0.10 USD/GB/tháng → 1.2 TB ≈ 144 USD/tháng.
- S3 Standard: 0.023 USD/GB/tháng → 300 GB ≈ 6.9 USD/tháng.
- Glacier Deep Archive: 0.00099 USD/GB/tháng → 900 GB ≈ 0.89 USD/tháng.
ROI tính bằng công thức:
Giải thích: Total_Benefits là chi phí DB giảm được + hiệu năng tăng (giá trị thời gian), Investment_Cost là chi phí triển khai pipeline và lưu trữ Cold Storage.
Ví dụ: Giảm DB từ 144 USD → 30 USD (+114 USD), chi phí Glacier + Airflow ≈ 5 USD/tháng → ROI ≈ (119‑5)/5 ×100 ≈ 2280 %.
8. Số liệu trước – sau
| Chỉ số | Trước triển khai | Sau triển khai |
|---|---|---|
| Dung lượng DB chính | 1.2 TB | 480 GB |
| Chi phí DB hàng tháng | ~144 USD | ~48 USD |
| Thời gian query KPI (s) | ~2.5 | ~0.9 |
| Số lần lỗi “OutOfMemory” | 4 lần/tuần | 0 lần |
| Chi phí Cold Storage | – | ~1 USD |
9. FAQ hay gặp nhất
Q1: Dữ liệu đã nén có thể khôi phục lại như thế nào?
A: Dùng script download_from_cold(bucket, key) → pyarrow.parquet.read_table → chuyển lại thành DataFrame, rồi insert vào bảng tạm nếu cần phân tích.
Q2: Có cần backup lại dữ liệu đã archive không?
A: Glacier Deep Archive đã có tính năng “immutability” và “retrieval within 12 h”. Nếu yêu cầu RTO < 1 h, nên sao chép sang một bucket khác ở region khác.
Q3: Làm sao tránh mất dữ liệu khi job bị dừng giữa chừng?
A: Sử dụng transactional write: ghi log vào archive_log trước khi xóa, nếu lỗi rollback sẽ không thực hiện xóa.
Q4: Có thể áp dụng cho MySQL không?
A: Có, chỉ thay đổi driver (pymysql) và cú pháp DELETE/INSERT. Các bước nén và upload vẫn giống.
Q5: Khi dữ liệu tăng nhanh, có nên giảm thời gian “cũ” từ 90 ngày xuống 60 ngày?
A: Tùy vào SLA và yêu cầu truy xuất. Nếu báo cáo chỉ cần 30 ngày, giảm xuống 60 ngày sẽ giảm tải nhanh hơn.
10. Giờ tới lượt bạn
- Bước 1: Kiểm tra tuổi dữ liệu hiện tại trong DB (
SELECT MIN(created_at) FROM execution_log). - Bước 2: Tạo một DAG Airflow mẫu như ở mục 3, chạy thử trên môi trường dev.
- Bước 3: Đánh giá chi phí lưu trữ hiện tại và tính ROI theo công thức trên.
- Bước 4: Nếu kết quả khả quan, triển khai lên prod và thiết lập alert CloudWatch cho
EstimatedCharges.
⚡ Hành động nhanh: Đừng để dữ liệu cũ “đè nặng” hệ thống. Hãy lập kế hoạch archive ngay hôm nay để tiết kiệm chi phí và nâng cao hiệu năng.
Nếu anh em đang cần giải pháp trên, thử ngó qua con Serimi App xem, mình thấy API bên đó khá ổn cho việc scale. Hoặc liên hệ mình để được trao đổi nhanh hơn nhé.
Nội dung được Hải định hướng, trợ lý AI giúp mình viết chi tiết.








