Làm thế nào để xây dựng hệ thống gợi ý Next Best Action trên trang web?

Mục lục

Giới thiệu về Next Best Action (NBA) Recommendation

Trong thế giới thương mại điện tử, việc dự đoán chính xác hành động tiếp theo của khách hàng là chìa khóa để tăng tỷ lệ chuyển đổi và doanh thu. Hệ thống Next Best Action (NBA) phân tích chuỗi hành vi của người dùng (click, xem, thêm vào giỏ, mua hàng, v.v.) để đưa ra gợi ý tối ưu nhất tại thời điểm thực. Theo báo cáo của Gartner, các doanh nghiệp triển khai NBA có thể tăng doanh thu lên đến 15% và cải thiện trải nghiệm khách hàng đáng kể.

Bài viết này sẽ hướng dẫn chi tiết cách xây dựng hệ thống NBA sử dụng mô hình Sequence Prediction (dự đoán chuỗi), từ kiến trúc tổng quan, lựa chọn công nghệ, đến từng bước triển khai cụ thể, kèm theo timeline, chi phí, rủi ro và checklist go-live. Mục tiêu là để đội ngũ kỹ thuật có thể “cầm lên làm được ngay”.

Kiến trúc tổng quan

Hệ thống NBA gồm các thành phần chính sau:

  1. Data Ingestion: Thu thập sự kiện từ web/app qua Apache Kafka.
  2. Real-time Processing: Spark Streaming xử lý, làm sạch và lưu vào Data Lake (S3) đồng thời cập nhật Feature Store (Redis).
  3. Batch Processing: Airflow chạy job hàng ngày để tổng hợp dữ liệu huấn luyện.
  4. Model Training: Huấn luyện mô hình SASRec (Transformer-based) trên GPU, lưu model vào MLflow.
  5. Model Serving: TorchServe phục vụ dự đoán real-time, được gọi qua API Gateway (FastAPI + Nginx).
  6. Integration: Frontend gọi API để nhận gợi ý, CRM tích hợp để gửi email/SMS.
  7. Monitoring: Prometheus + Grafana giám sát hiệu năng, ELK tập trung logs.

Dưới đây là sơ đồ luồng dữ liệu (text‑art):

+-------------+      +-------------+      +-------------------+
|   Client    | ---> |   Kafka     | ---> | Spark Streaming   |
| (Web/App)   |      | (MSK)       |      | (EMR)             |
+-------------+      +-------------+      +-------------------+
                                           |
                                           v
                                     +------------+
                                     |   S3       |
                                     | (Data Lake)|
                                     +------------+
                                           |
                                           v
+----------------+                  +-------------------+
|   Model        | <--------------- | Feature Store     |
| Serving        |                  | (Redis)           |
| (TorchServe)   |                  +-------------------+
+----------------+                          ^
       ^                                   |
       |                                   |
       v                                   |
+----------------+                  +-------------------+
|   API Gateway  |                  | Training Pipeline |
| (FastAPI/Nginx)|                  | (SageMaker)       |
+----------------+                  +-------------------+
       ^
       |
+-------------+
|   Client    |
| (Web/App)   |
+-------------+

Lựa chọn công nghệ

So sánh các mô hình Sequence Prediction

Mô hình Độ chính xác (NDCG@10) Thời gian huấn luyện Bộ nhớ Phù hợp
LSTM 0.35 8h 2 GB Chuỗi ngắn, ít phức tạp
GRU 0.36 7h 1.8 GB Chuỗi ngắn, cần tốc độ
SASRec 0.42 12h 3 GB Chuỗi dài, độ chính xác cao, SOTA
BERT4Rec 0.44 15h 4 GB Chuỗi dài, yêu cầu tài nguyên lớn

Lựa chọn: SASRec – cân bằng giữa độ chính xác và tài nguyên, dễ triển khai production.

So sánh framework ML

Framework Ưu điểm Nhược điểm
TensorFlow TF Serving ổn định, tooling phong phú Graph mode khó debug
PyTorch Dynamic graph, dễ tùy chỉnh, TorchServe đủ tốt Serving chưa mạnh bằng TF Serving

Lựa chọn: PyTorch + TorchServe (vì team quen PyTorch, dễ tối ưu).

So sánh message broker

Broker Ưu điểm Nhược điểm
Kafka Hiệu năng cao, khả năng mở rộng, cộng đồng lớn Quản lý phức tạp nếu tự host
Kinesis Managed, tích hợp tốt với AWS Chi phí cao hơn, giới hạn throughput

Lựa chọn: Kafka trên AWS MSK (Managed Streaming for Kafka) – giảm ops, vẫn đảm bảo hiệu năng.

So sánh feature store

Công nghệ Ưu điểm Nhược điểm
Redis Tốc độ cao, hỗ trợ nhiều kiểu dữ liệu Không có versioning tự động
Feast Feature store chuyên biệt, versioning Overkill cho dự án nhỏ

Lựa chọn: Redis (Amazon ElastiCache) – đơn giản, real-time, chi phí thấp.

Tổng hợp tech stack

Thành phần Công nghệ chọn
Ngôn ngữ chính Python 3.9
Stream Processing Apache Spark Structured Streaming (EMR)
Message Broker Apache Kafka (AWS MSK)
Data Lake Amazon S3
Feature Store Redis (Amazon ElastiCache)
ML Framework PyTorch 1.12
Model Serving TorchServe
API Gateway FastAPI + Nginx (EC2 hoặc EKS)
Container Orchestration Kubernetes (EKS)
CI/CD GitHub Actions
Monitoring Prometheus + Grafana, ELK Stack
Model Registry MLflow
Workflow Orchestration Apache Airflow (MWAA)

Chi tiết triển khai theo từng giai đoạn

Phase 1: Khởi tạo dự án & Thiết lập môi trường (2 tuần)

Mục tiêu: Xác định yêu cầu, thiết lập repo, CI/CD cơ bản, môi trường dev.

Công việc:

  1. Kick-off meeting với stakeholders – PM.
  2. Thu thập yêu cầu nghiệp vụ (BRD) – BA.
  3. Thiết lập GitHub repository với cấu trúc thư mục chuẩn – Tech Lead.
  4. Cài đặt CI/CD cơ bản (GitHub Actions) cho linting, unit test – DevOps.
  5. Thiết lập môi trường dev local: Docker Compose chứa Kafka, Zookeeper, Spark, Redis – DevOps.
  6. Lập kế hoạch chi tiết dự án (Gantt) – PM.

Phụ thuộc: Không.

Phase 2: Xây dựng Data Pipeline (4 tuần)

Mục tiêu: Thu thập sự kiện real-time, xử lý và lưu trữ.

Công việc:

  1. Thiết kế schema sự kiện (click, view, add_to_cart, purchase) – Data Engineer.
  2. Triển khai Kafka cluster trên AWS MSK – DevOps.
  3. Viết producer SDK (JavaScript) để gửi event từ frontend – Frontend Dev.
  4. Xây dựng Spark Streaming job (PySpark) đọc từ Kafka, làm sạch, chuẩn hóa, ghi S3 – Data Engineer.
  5. Thiết lập Feature Store (Redis) lưu embedding/user profile real-time – Data Engineer.
  6. Xây dựng batch pipeline (Airflow) tổng hợp dữ liệu hàng ngày tạo training dataset – Data Engineer.
  7. Thiết lập monitoring cho pipeline: số event, latency, error – DevOps.

Phụ thuộc: Phase 1 hoàn thành.

Phase 3: Xây dựng và huấn luyện mô hình (6 tuần)

Mục tiêu: Phát triển mô hình SASRec, huấn luyện, đánh giá.

Công việc:

  1. EDA (Exploratory Data Analysis) – Data Scientist.
  2. Feature engineering: mã hóa sản phẩm (item ID), chuẩn hóa thời gian, xử lý missing – Data Scientist.
  3. Lựa chọn kiến trúc và implement SASRec bằng PyTorch – ML Engineer.
  4. Xây dựng data loader cho chuỗi (dạng sliding window) – ML Engineer.
  5. Huấn luyện mô hình trên GPU (AWS SageMaker hoặc EC2 p3) – ML Engineer.
  6. Đánh giá bằng metrics: Precision@k, Recall@k, NDCG@k – Data Scientist.
  7. Tối ưu hyperparameter với Optuna – ML Engineer.
  8. Lưu model vào MLflow Model Registry – ML Engineer.
  9. Viết unit test cho model – ML Engineer.

Phụ thuộc: Phase 2 hoàn thành (có dữ liệu đủ để huấn luyện).

Phase 4: Triển khai Model Serving (3 tuần)

Mục tiêu: Đưa mô hình vào production, cung cấp API real-time.

Công việc:

  1. Thiết lập TorchServe trên Kubernetes cluster (EKS) – DevOps.
  2. Đóng gói model thành .mar file và deploy lên TorchServe – ML Engineer.
  3. Xây dựng API Gateway (FastAPI + Nginx) nhận request, tiền xử lý, gọi TorchServe – Backend Dev.
  4. Triển khai caching (Redis) để giảm tải – Backend Dev.
  5. Thiết lập auto-scaling cho TorchServe dựa trên CPU/GPU – DevOps.
  6. Viết integration test cho API – QA.
  7. Thiết lập monitoring cho model: latency, throughput, error rate – DevOps.

Phụ thuộc: Phase 3 hoàn thành.

Phase 5: Tích hợp với hệ thống hiện tại (2 tuần)

Mục tiêu: Kết nối NBA recommendation vào frontend và CRM.

Công việc:

  1. Cập nhật frontend (React/Next.js) gọi API recommendation tại các điểm: trang chủ, trang sản phẩm, email – Frontend Dev.
  2. Tích hợp với CRM (Salesforce/HubSpot) để gửi gợi ý qua email/SMS – Backend Dev.
  3. Thiết lập A/B testing (Split.io) để đo lường hiệu quả – Data Scientist.
  4. Triển khai feature toggle (LaunchDarkly) để bật/tắt dễ dàng – DevOps.
  5. Kiểm thử end-to-end – QA.

Phụ thuộc: Phase 4 hoàn thành.

Phase 6: Kiểm thử, tối ưu và chuẩn bị go-live (2 tuần)

Mục tiêu: Đảm bảo hệ thống ổn định, đáp ứng SLA.

Công việc:

  1. Load testing với Locust, mô phỏng 10k RPS – DevOps.
  2. Tối ưu hiệu năng: caching, batching inference, model quantization – ML Engineer.
  3. Security audit: kiểm tra OWASP Top 10, xác thực API key – Security Engineer.
  4. Backup và disaster recovery plan – DevOps.
  5. Chuẩn bị rollback plan – DevOps.
  6. Training cho đội vận hành – Tech Lead.
  7. Final review và sign-off – PM.

Phụ thuộc: Phase 5 hoàn thành.

Kế hoạch dự án (Timeline & Gantt chart)

Bảng timeline chi tiết

ID Công việc Phase Người phụ trách Bắt đầu (tuần) Kết thúc (tuần) Phụ thuộc
1.1 Kick-off meeting 1 PM 1.1 1.1
1.2 Thu thập yêu cầu nghiệp vụ 1 BA 1.2 1.3 1.1
1.3 Thiết lập GitHub repo 1 Tech Lead 1.4 1.4
1.4 Cài đặt CI/CD cơ bản 1 DevOps 1.5 1.6 1.3
1.5 Thiết lập môi trường dev (Docker) 1 DevOps 1.7 1.8 1.4
1.6 Lập kế hoạch chi tiết 1 PM 1.9 2.0 1.2
2.1 Thiết kế schema sự kiện 2 Data Engineer 2.1 2.2 1.6
2.2 Triển khai Kafka (MSK) 2 DevOps 2.3 2.4 2.1
2.3 Viết producer SDK (JS) 2 Frontend Dev 2.5 2.6 2.2
2.4 Xây dựng Spark Streaming job 2 Data Engineer 2.7 3.2 2.3
2.5 Thiết lập Feature Store (Redis) 2 Data Engineer 3.3 3.4 2.4
2.6 Xây dựng batch pipeline (Airflow) 2 Data Engineer 3.5 4.0 2.5
2.7 Monitoring pipeline 2 DevOps 4.1 4.2 2.6
3.1 EDA 3 Data Scientist 4.3 4.5 2.6
3.2 Feature engineering 3 Data Scientist 4.6 5.0 3.1
3.3 Implement SASRec (PyTorch) 3 ML Engineer 5.1 5.5 3.2
3.4 Xây dựng data loader 3 ML Engineer 5.6 6.0 3.3
3.5 Huấn luyện mô hình (GPU) 3 ML Engineer 6.1 7.5 3.4
3.6 Đánh giá mô hình 3 Data Scientist 7.6 8.0 3.5
3.7 Tối ưu hyperparameter (Optuna) 3 ML Engineer 8.1 8.5 3.6
3.8 Lưu model vào MLflow 3 ML Engineer 8.6 8.7 3.7
3.9 Unit test cho model 3 ML Engineer 8.8 9.0 3.8
4.1 Thiết lập TorchServe trên EKS 4 DevOps 9.1 9.3 3.9
4.2 Đóng gói model (.mar) và deploy 4 ML Engineer 9.4 9.5 4.1
4.3 Xây dựng API Gateway (FastAPI+Nginx) 4 Backend Dev 9.6 10.2 4.2
4.4 Triển khai caching (Redis) 4 Backend Dev 10.3 10.4 4.3
4.5 Auto-scaling cho TorchServe 4 DevOps 10.5 10.6 4.4
4.6 Integration test cho API 4 QA 10.7 11.0 4.5
4.7 Monitoring model serving 4 DevOps 11.1 11.2 4.6
5.1 Cập nhật frontend gọi API 5 Frontend Dev 11.3 11.5 4.7
5.2 Tích hợp với CRM 5 Backend Dev 11.6 12.0 5.1
5.3 Thiết lập A/B testing (Split.io) 5 Data Scientist 12.1 12.3 5.2
5.4 Triển khai feature toggle 5 DevOps 12.4 12.5 5.3
5.5 Kiểm thử end-to-end 5 QA 12.6 13.0 5.4
6.1 Load testing (Locust) 6 DevOps 13.1 13.3 5.5
6.2 Tối ưu hiệu năng 6 ML Engineer 13.4 13.6 6.1
6.3 Security audit 6 Security Engineer 13.7 14.0 6.2
6.4 Backup & disaster recovery plan 6 DevOps 14.1 14.2 6.3
6.5 Chuẩn bị rollback plan 6 DevOps 14.3 14.4 6.4
6.6 Training vận hành 6 Tech Lead 14.5 14.6 6.5
6.7 Final review & sign-off 6 PM 14.7 15.0 6.6

Gantt chart (text art)

Tuần    : 1   2   3   4   5   6   7   8   9   10  11  12  13  14  15
Phase 1 : [===]
Phase 2 :       [===========]
Phase 3 :                     [===================]
Phase 4 :                                           [=======]
Phase 5 :                                                 [====]
Phase 6 :                                                      [====]

Chi phí dự kiến (3 năm)

Chi phí được chia theo năm, bao gồm nhân sự, hạ tầng cloud, dịch vụ managed, license, và phát sinh.

Hạng mục Mô tả Năm 1 (VND) Năm 2 (VND) Năm 3 (VND)
Nhân sự phát triển 8 người (PM, Tech Lead, Data Engineer, Data Scientist, ML Engineer, Backend, Frontend, DevOps) x 12 tháng 12.450.000.000
Nhân sự vận hành 3 người (DevOps, Data Engineer, ML Engineer) x 12 tháng 4.800.000.000 4.800.000.000
AWS Infrastructure EC2, S3, MSK, ElastiCache, EKS, CloudWatch 1.235.500.000 987.600.000 987.600.000
AWS SageMaker (training) p3.2xlarge spot instances 324.000.000 162.000.000 162.000.000
Managed Services Confluent Cloud (Kafka), MLflow Server, Split.io 480.000.000 480.000.000 480.000.000
Monitoring & Logging Grafana Cloud, ELK Stack (Elastic Cloud) 120.000.000 120.000.000 120.000.000
Chi phí phát sinh Dự phòng 200.000.000 100.000.000 100.000.000
Tổng 14.809.500.000 6.649.600.000 6.649.600.000

Lưu ý: Chi phí trên mang tính tham khảo, có thể thay đổi tùy quy mô và thương lượng với nhà cung cấp.

Tài liệu bàn giao

Dự án phải bàn giao 15 tài liệu sau:

STT Tên tài liệu Người phụ trách Mô tả
1 Yêu cầu nghiệp vụ (BRD) BA Mô tả chi tiết yêu cầu từ business, use cases, KPI.
2 Đặc tả kỹ thuật (Technical Specification) Tech Lead Kiến trúc hệ thống, công nghệ, luồng dữ liệu, API.
3 Thiết kế hệ thống (System Architecture) Tech Lead Sơ đồ chi tiết các thành phần, tương tác.
4 Thiết kế cơ sở dữ liệu (Database Schema) Data Engineer Schema của các bảng trong Data Lake, Redis, v.v.
5 API Specification (OpenAPI/Swagger) Backend Dev Tài liệu API endpoints, request/response, error codes.
6 Hướng dẫn triển khai (Deployment Guide) DevOps Các bước deploy lên môi trường staging/production.
7 Hướng dẫn vận hành (Operation Manual) DevOps Cách monitor, scale, backup, restore.
8 Hướng dẫn xử lý sự cố (Troubleshooting Guide) DevOps Các lỗi thường gặp và cách khắc phục.
9 Mô hình ML (Model Card) Data Scientist Mô tả mô hình, metrics, hyperparameters, limitations.
10 Quy trình huấn luyện và cập nhật mô hình ML Engineer Cách chạy pipeline training, retraining schedule.
11 Kế hoạch kiểm thử (Test Plan & Report) QA Test cases, kết quả test, đánh giá.
12 Đánh giá bảo mật (Security Assessment) Security Engineer Kết quả scan, các lỗ hổng và biện pháp khắc phục.
13 Backup & Recovery Plan DevOps Chiến lược backup, RPO/RTO, recovery steps.
14 Tài liệu đào tạo người dùng (User Training Manual) BA Hướng dẫn sử dụng cho marketing, vận hành.
15 Tài liệu mã nguồn (Source Code Documentation) Tech Lead Giải thích cấu trúc mã nguồn, cách đóng góp.

Rủi ro và phương án dự phòng

Rủi ro Tác động Phương án B Phương án C
Dữ liệu không đủ chất lượng Mô hình kém chính xác Làm sạch dữ liệu kỹ hơn, bổ sung dữ liệu giả lập Sử dụng mô hình đơn giản dựa trên quy tắc (rule-based)
Latency inference cao (>200ms) Trải nghiệm người dùng giảm Tối ưu model quantization, caching kết quả Giảm số lượng item dự đoán, fallback về rule-based
Lưu lượng cao gây nghẽn Kafka Mất dữ liệu sự kiện Tăng số partition, scale consumer, điều chỉnh batch size Chuyển sang Kinesis hoặc dùng buffer tại client
Model drift theo thời gian Độ chính xác giảm Triển khai monitoring và retraining định kỳ (hàng tuần) Fallback về model cũ, cảnh báo manual
Thiếu nhân sự có kinh nghiệm Chậm tiến độ Thuê consultant, training nội bộ cấp tốc Outsource một phần công việc
Ngân sách vượt dự kiến Không đủ tiền hoàn thành Ưu tiên tính năng core, cắt giảm tính năng phụ Tìm nguồn tài trợ thêm, thương lượng giảm chi phí cloud
Lỗi tích hợp với CRM Không gửi được gợi ý qua email/SMS Sử dụng webhook dự phòng, ghi log để gửi sau Tạm thời dùng manual export CSV

KPI đo lường hiệu quả

KPI Mục tiêu Công cụ đo Tần suất
Độ chính xác dự đoán (NDCG@10) >= 0.4 Offline evaluation (MLflow) Hàng tuần
Thời gian phản hồi API (p95) < 200 ms Prometheus + Grafana Liên tục
Tỷ lệ thành công của API 99.9% Prometheus (HTTP status codes) Liên tục
Tăng tỷ lệ chuyển đổi (conversion rate) +10% Google Analytics, A/B test Hàng tuần
Tăng doanh thu từ recommendation +15% BI dashboard (Looker) Hàng tháng
Số lượng sự kiện xử lý/giây 10k events/s Kafka monitoring Liên tục
Thời gian huấn luyện mô hình < 12h MLflow Mỗi lần huấn luyện
Tỷ lệ cache hit > 60% Redis monitoring Liên tục

Checklist Go-live

Security & Compliance

  • [ ] API endpoints có xác thực bằng API Key hoặc JWT.
  • [ ] Dữ liệu cá nhân (PII) được mã hóa ở rest và transit.
  • [ ] Tuân thủ GDPR/CCPA (nếu có khách hàng EU/California).
  • [ ] Firewall cấu hình chỉ cho phép IP cần thiết.
  • [ ] SSL/TLS enabled cho tất cả public endpoints.
  • [ ] Không lưu mật khẩu plaintext, sử dụng Secrets Manager.
  • [ ] Đã quét lỗ hổng bảo mật (OWASP Top 10) và khắc phục.
  • [ ] Có cơ chế rate limiting (ví dụ: Nginx limit_req).
  • [ ] Backup encryption enabled.
  • [ ] Audit logging enabled cho các hành động quan trọng.

Performance & Scalability

  • [ ] Load test đạt SLA (10k RPS, p95 latency <200ms).
  • [ ] Auto-scaling configured cho TorchServe và Spark.
  • [ ] Caching layer (Redis) hoạt động, hit rate >60%.
  • [ ] Database indexes tối ưu (nếu có).
  • [ ] CDN configured cho static assets (nếu cần).
  • [ ] Monitoring alerts set up (CPU, memory, latency, error rate).
  • [ ] Resource limits và requests được đặt trong Kubernetes.
  • [ ] Có kế hoạch mở rộng khi traffic tăng (ví dụ: thêm node).
  • [ ] Docker image size tối ưu (<1GB).
  • [ ] Đã kiểm tra memory leaks (ví dụ: với Valgrind).

Business & Data Accuracy

  • [ ] Mô hình đạt KPI offline (NDCG@10 >=0.4).
  • [ ] A/B test plan ready (Split.io configuration).
  • [ ] Dữ liệu training đại diện cho toàn bộ user base.
  • [ ] Feature engineering đúng (mã hóa item ID, xử lý missing).
  • [ ] Có fallback mechanism khi model fail (rule-based).
  • [ ] Đã validate dữ liệu đầu vào (schema, range).
  • [ ] Có logging đầy đủ để debug (request/response).
  • [ ] BI dashboard tích hợp để đo doanh thu.
  • [ ] Đã test với dữ liệu thực tế (staging traffic mirroring).
  • [ ] Có cơ chế phát hiện drift (monitoring distribution shift).

Payment & Finance

  • [ ] Gợi ý không chứa sản phẩm hết hàng (integration với inventory).
  • [ ] Gợi ý tuân thủ chính sách giá (không hiển thị giá sai).
  • [ ] Gợi ý không vi phạm quy định pháp lý (ví dụ: độ tuổi).
  • [ ] Có cơ chế kiểm soát chiết khấu tự động (nếu có).
  • [ ] Đã tích hợp với hệ thống inventory để cập nhật tồn kho real-time.
  • [ ] Đảm bảo không có lỗi hiển thị giá (format, currency).
  • [ ] Đã test với các chương trình khuyến mãi (discount, flash sale).
  • [ ] Có logging để đối soát doanh thu (mỗi recommendation ghi log).

Monitoring & Rollback

  • [ ] Dashboard Grafana hiển thị key metrics (latency, error, throughput).
  • [ ] Cảnh báo qua email/SMS/PagerDuty khi lỗi.
  • [ ] Có script rollback nhanh (ví dụ: kubectl rollback deployment).
  • [ ] Health check endpoint trả về 200 OK.
  • [ ] Đã test rollback procedure (về phiên bản model cũ).
  • [ ] Backup dữ liệu định kỳ (S3 versioning, RDS snapshot).
  • [ ] Disaster recovery plan documented (multi-AZ, restore từ backup).
  • [ ] Log aggregation (ELK) hoạt động, có thể truy vấn.
  • [ ] Distributed tracing (OpenTelemetry) configured.
  • [ ] Runbook xử lý sự cố đã được viết và chia sẻ.

Mã nguồn và cấu hình minh họa

Dưới đây là một số đoạn mã và cấu hình quan trọng giúp bạn hình dung rõ hơn về cách triển khai.

1. Kafka Producer (JavaScript – dùng trên frontend)

// utils/tracker.js
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'web-client',
  brokers: ['kafka-broker-1:9092', 'kafka-broker-2:9092'],
});

const producer = kafka.producer();

export async function sendUserEvent(eventType, productId, userId) {
  await producer.connect();
  await producer.send({
    topic: 'user-events',
    messages: [
      {
        key: userId,
        value: JSON.stringify({
          event_type: eventType,
          product_id: productId,
          user_id: userId,
          timestamp: Date.now(),
        }),
      },
    ],
  });
  await producer.disconnect();
}

2. Spark Streaming Job (PySpark)

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType

spark = SparkSession.builder.appName("NBA-Streaming").getOrCreate()

schema = StructType([
    StructField("event_type", StringType()),
    StructField("product_id", StringType()),
    StructField("user_id", StringType()),
    StructField("timestamp", LongType())
])

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
    .option("subscribe", "user-events") \
    .load()

events = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

# Clean and transform
cleaned = events.filter(col("user_id").isNotNull()) \
                .withColumn("event_time", from_unixtime(col("timestamp")/1000))

query = cleaned.writeStream \
    .format("parquet") \
    .option("path", "s3a://nba-data-lake/events/") \
    .option("checkpointLocation", "s3a://nba-data-lake/checkpoints/") \
    .start()

query.awaitTermination()

3. Data Preprocessing (Pandas)

import pandas as pd
import numpy as np

def prepare_sequences(df, max_len=50):
    # df columns: user_id, item_id, timestamp
    df = df.sort_values(['user_id', 'timestamp'])
    # Group by user and create sequence of item_ids
    seq = df.groupby('user_id')['item_id'].apply(list).reset_index()
    seq['seq_len'] = seq['item_id'].apply(len)
    # Pad/truncate sequences
    seq['padded_seq'] = seq['item_id'].apply(lambda x: x[-max_len:] if len(x) >= max_len else [0]*(max_len-len(x)) + x)
    return seq[['user_id', 'padded_seq']]

4. SASRec Model (PyTorch)

import torch
import torch.nn as nn
import torch.nn.functional as F

class SASRec(nn.Module):
    def __init__(self, item_num, hidden_size, num_heads, num_layers, max_len, dropout=0.2):
        super(SASRec, self).__init__()
        self.item_emb = nn.Embedding(item_num, hidden_size, padding_idx=0)
        self.pos_emb = nn.Embedding(max_len, hidden_size)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=num_heads, dropout=dropout)
        self.transformer_enc = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.layer_norm = nn.LayerNorm(hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.max_len = max_len

    def forward(self, input_seqs):
        seqs = input_seqs  # (batch, seq_len)
        positions = torch.arange(0, seqs.size(1), dtype=torch.long, device=seqs.device).unsqueeze(0)
        pos_emb = self.pos_emb(positions)
        item_emb = self.item_emb(seqs)
        seq_emb = item_emb + pos_emb
        seq_emb = self.layer_norm(seq_emb)
        seq_emb = self.dropout(seq_emb)
        # Transformer expects (seq_len, batch, hidden)
        seq_emb = seq_emb.permute(1, 0, 2)
        mask = self.generate_square_subsequent_mask(seqs.size(1)).to(seqs.device)
        output = self.transformer_enc(seq_emb, mask)
        output = output.permute(1, 0, 2)  # (batch, seq_len, hidden)
        return output

    def generate_square_subsequent_mask(self, sz):
        return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

5. Training Loop

from torch.utils.data import DataLoader, Dataset

class SequenceDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets
    def __len__(self):
        return len(self.sequences)
    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for seqs, labels in dataloader:
        seqs, labels = seqs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(seqs)  # (batch, seq_len, hidden)
        # predict next item based on last output
        last_output = outputs[:, -1, :]  # (batch, hidden)
        logits = torch.matmul(last_output, model.item_emb.weight.T)  # (batch, num_items)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

6. Export Model to TorchScript

model.eval()
example_input = torch.randint(0, num_items, (1, max_len)).to(device)
traced_script = torch.jit.trace(model, example_input)
traced_script.save("sasrec_model.pt")

7. FastAPI Endpoint

from fastapi import FastAPI, HTTPException
import torch
import redis
import json

app = FastAPI()
redis_client = redis.Redis(host='redis', port=6379, db=0)

# Load model
model = torch.jit.load("sasrec_model.pt")
model.eval()

@app.post("/recommend")
async def recommend(user_id: str, sequence: list[int], k: int = 5):
    cache_key = f"rec:{user_id}:{hash(tuple(sequence))}"
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    # Convert to tensor
    seq_tensor = torch.tensor([sequence], dtype=torch.long)
    with torch.no_grad():
        output = model(seq_tensor)
        last_output = output[:, -1, :]
        scores = torch.matmul(last_output, model.item_emb.weight.T)
        topk = torch.topk(scores, k).indices.tolist()[0]
    result = {"items": topk}
    redis_client.setex(cache_key, 300, json.dumps(result))  # cache 5 phút
    return result

8. TorchServe Configuration (config.properties)

inference_address=http://0.0.0.0:8080
management_address=http://0.0.0.0:8081
model_store=/home/model-server/model-store
load_models=all
enable_metrics_api=true
metrics_format=prometheus

9. Dockerfile for Model Serving

FROM pytorch/torchserve:latest

COPY sasrec_model.pt /home/model-server/model-store/
COPY config.properties /home/model-server/config.properties

RUN torch-model-archiver --model-name nba --version 1.0 \
    --serialized-file /home/model-server/model-store/sasrec_model.pt \
    --handler /home/model-server/handler.py \
    --export-path /home/model-server/model-store -f

CMD ["torchserve", "--start", "--model-store", "/home/model-server/model-store", "--ts-config", "/home/model-server/config.properties"]

10. docker-compose.yml (Local Development)

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  spark:
    image: bitnami/spark:latest
    command: bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 /app/streaming.py
    volumes:
      - ./spark-app:/app
    depends_on:
      - kafka
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
  model-server:
    build: ./model-server
    ports:
      - "8080:8080"
      - "8081:8081"
    depends_on:
      - redis
  api:
    build: ./api
    ports:
      - "8000:8000"
    depends_on:
      - model-server
      - redis

11. Nginx Config (Load Balancer)

upstream model_servers {
    server model-server1:8080;
    server model-server2:8080;
    server model-server3:8080;
}

server {
    listen 80;
    server_name api.nba.example.com;

    location /recommend {
        proxy_pass http://model_servers/recommend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_connect_timeout 60s;
        proxy_read_timeout 60s;
    }
}

12. GitHub Actions CI/CD (deploy to EKS)

name: Deploy to EKS

on:
  push:
    branches:
      - main

jobs:
  build-and-deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: ap-southeast-1
      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1
      - name: Build and push Docker image
        run: |
          docker build -t ${{ secrets.ECR_REPO }}:latest .
          docker push ${{ secrets.ECR_REPO }}:latest
      - name: Update k8s deployment
        run: |
          aws eks update-kubeconfig --name nba-cluster
          kubectl set image deployment/nba-api nba-api=${{ secrets.ECR_REPO }}:latest
          kubectl rollout status deployment/nba-api

13. Prometheus Scrape Config for TorchServe

scrape_configs:
  - job_name: 'torchserve'
    static_configs:
      - targets: ['model-server:8082']

14. Script đối soát dữ liệu (Python)

import pandas as pd
import numpy as np

def check_data_quality(df):
    # Kiểm tra missing values
    missing = df.isnull().sum()
    # Kiểm tra distribution
    stats = df.describe()
    # Kiểm tra duplicate events
    dup = df.duplicated(subset=['user_id', 'timestamp', 'event_type']).sum()
    return {
        'missing': missing,
        'stats': stats,
        'duplicates': dup
    }

Công thức toán học

Mô hình SASRec sử dụng cơ chế self-attention. Hàm loss thường dùng là cross-entropy:

\mathcal{L} = -\sum_{i=1}^{N} y_i \log(\hat{y}_i)

Trong đó (y_i) là one-hot encoding của item đích, (\hat{y}_i) là xác suất dự đoán.

Đánh giá bằng NDCG (Normalized Discounted Cumulative Gain):

\mathrm{NDCG}_k = \frac{DCG_k}{IDCG_k} DCG_k = \sum_{i=1}^{k} \frac{2^{rel_i} - 1}{\log_2(i+1)}

Với (rel_i) là relevance của item tại vị trí (i) (thường là 1 nếu đúng, 0 nếu sai).

Kết luận

Xây dựng hệ thống Next Best Action với Sequence Prediction là một dự án phức tạp nhưng mang lại giá trị lớn. Bài viết đã cung cấp một lộ trình chi tiết từ kiến trúc, công nghệ, timeline, chi phí đến các đoạn mã thực tế. Hy vọng anh em có thể áp dụng ngay vào dự án của mình.

Lưu ý: Các con số chi phí và timeline có thể thay đổi tùy quy mô và điều kiện thực tế. Hãy điều chỉnh cho phù hợp.

Thảo luận: Anh em đã từng triển khai hệ thống NBA chưa? Gặp khó khăn gì? Chia sẻ bên dưới nhé!


Chốt marketing: Nếu anh em đang cần tích hợp AI nhanh vào app mà lười build từ đầu, thử ngó qua con Serimi App xem, mình thấy API bên đó khá ổn cho việc scale.

Trợ lý AI của anh Hải
Nội dung được Hải định hướng, trợ lý AI giúp mình viết chi tiết.
Chia sẻ tới bạn bè và gia đình