Cross-model Ensembling & Distillation Strategies: Kết Hợp Sức Mạnh Các Mô Hình Dị Chủng

Cross-model Ensembling & Distillation Strategies — Kết hợp sức mạnh của các mô hình đa dạng

Giới thiệu: Tại sao lại cần Ensemble?

Trong suốt 12 năm làm nghề, tôi đã chứng kiến không ít dự án thất bại chỉ vì tin tưởng quá mức vào một mô hình duy nhất. Một mô hình deep learning có thể xuất sắc trong 90% trường hợp nhưng lại thất bại thảm hại trong 10% còn lại – những trường hợp đó thường lại là quan trọng nhất.

Cross-model Ensembling (tạm dịch: Tổng hợp mô hình đa dạng) chính là giải pháp cho vấn đề này. Thay vì dựa vào một “siêu mô hình”, chúng ta kết hợp nhiều mô hình khác nhau, mỗi mô hình có ưu điểm riêng, để tạo ra hệ thống dự đoán mạnh mẽ hơn.

Các chiến lược Ensembling phổ biến

1. Bagging (Bootstrap Aggregating)

Bagging là kỹ thuật đơn giản nhất: huấn luyện nhiều mô hình trên các tập dữ liệu con khác nhau, sau đó lấy trung bình (regression) hoặc vote (classification).

from sklearn.ensemble import BaggingClassifier
from sklearn.tree import DecisionTreeClassifier

# Base model
base_model = DecisionTreeClassifier(max_depth=5)

# Bagging ensemble
bagging_model = BaggingClassifier(
    base_estimator=base_model,
    n_estimators=50,  # 50 cây quyết định
    max_samples=0.8,  # Mỗi mô hình dùng 80% dữ liệu
    bootstrap=True,   # Có lấy mẫu lại không
    n_jobs=-1         # Dùng tất cả CPU
)

bagging_model.fit(X_train, y_train)

Performance comparison:

Mô hình Accuracy Training Time Memory Usage
Single Decision Tree 82.3% 2.1s 45MB
Bagging Ensemble 91.7% 18.5s 892MB
Cải thiện +9.4% +781% +1882%

2. Boosting (AdaBoost, Gradient Boosting)

Boosting xây dựng mô hình tuần tự, mỗi mô hình mới tập trung sửa lỗi của mô hình trước.

from sklearn.ensemble import GradientBoostingClassifier

# Gradient Boosting với 100 cây quyết định
gb_model = GradientBoostingClassifier(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=3,
    subsample=0.8
)

gb_model.fit(X_train, y_train)

Key insight: Boosting thường cho accuracy cao hơn Bagging nhưng dễ overfit hơn trên dữ liệu nhiễu.

3. Stacking (Stacked Generalization)

Stacking sử dụng nhiều mô hình khác nhau (heterogeneous models), sau đó dùng một mô hình meta để kết hợp kết quả.

from sklearn.ensemble import StackingClassifier
from sklearn.linear_model import LogisticRegression

# Các base models khác nhau
base_models = [
    ('rf', RandomForestClassifier(n_estimators=100)),
    ('svm', SVC(probability=True)),
    ('nn', MLPClassifier(hidden_layer_sizes=(100,)))
]

# Mô hình meta
meta_model = LogisticRegression()

# Stacking
stacking_model = StackingClassifier(
    estimators=base_models,
    final_estimator=meta_model,
    cv=5  # Cross-validation
)

stacking_model.fit(X_train, y_train)

Model Distillation — Nghệ thuật cô đọng tri thức

Model distillation (học chuyển giao tri thức) cho phép chúng ta tạo ra một mô hình nhỏ gọn hơn nhưng vẫn giữ được hiệu năng của mô hình lớn.

Cơ chế hoạt động

Mô hình lớn (teacher) huấn luyện trước, sau đó “dạy” cho mô hình nhỏ (student) bằng cách truyền đạt không chỉ nhãn mà còn cả xác suất dự đoán.

import torch
import torch.nn as nn

class TeacherModel(nn.Module):
    def __init__(self):
        super().__init__()
        # Mạng lớn, nhiều layers
        self.layers = nn.Sequential(
            nn.Linear(784, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.layers(x)

class StudentModel(nn.Module):
    def __init__(self):
        super().__init__()
        # Mạng nhỏ gọn hơn
        self.layers = nn.Sequential(
            nn.Linear(784, 128),
            nn.ReLU(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        return self.layers(x)

# Hàm loss kết hợp cross-entropy và KL divergence
def distillation_loss(outputs, labels, teacher_outputs, T=2):
    """
    T: Temperature - độ mềm của probability
    """
    # Cross-entropy với hard labels
    ce_loss = nn.CrossEntropyLoss()(outputs, labels)

    # KL divergence với soft targets
    soft_targets = nn.functional.softmax(teacher_outputs / T, dim=1)
    soft_outputs = nn.functional.softmax(outputs / T, dim=1)
    kl_div = nn.KLDivLoss()(torch.log(soft_outputs), soft_targets) * (T * T)

    return ce_loss + kl_div

# Training loop
for epoch in range(num_epochs):
    for inputs, labels in dataloader:
        # Forward pass teacher
        with torch.no_grad():
            teacher_outputs = teacher_model(inputs)

        # Forward pass student
        student_outputs = student_model(inputs)

        # Calculate loss
        loss = distillation_loss(
            student_outputs, labels, teacher_outputs, T=2
        )

        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

Lợi ích của Distillation

Lợi ích Chi tiết Số liệu thực tế
Giảm kích thước Mô hình nhỏ hơn 70-90% Từ 500MB xuống 50MB
Tăng tốc inference Nhanh hơn 3-5 lần Từ 120ms xuống 25ms
Giảm memory Dùng ít RAM hơn Từ 2GB xuống 300MB
Tiết kiệm năng lượng Tiêu thụ ít điện hơn Giảm 80% năng lượng

Use Case kỹ thuật: Hệ thống đề xuất sản phẩm

Bài toán

Một sàn thương mại điện tử cần hệ thống đề xuất sản phẩm cho 10 triệu user hoạt động mỗi ngày. Yêu cầu:
– Latency < 50ms
– Accuracy > 85%
– Cost < $0.001/request

Giải pháp Ensembling

class HybridRecommendationSystem:
    def __init__(self):
        # Content-based filtering
        self.content_model = TfidfVectorizer(max_features=10000)

        # Collaborative filtering (Matrix Factorization)
        self.cf_model = ImplicitMF(
            factors=50,
            learning_rate=0.01,
            regularization=0.1
        )

        # Deep learning model
        self.dl_model = NeuralCollaborativeFiltering(
            n_users=10000000,
            n_items=1000000,
            embedding_dim=64
        )

        # Meta model (Logistic Regression)
        self.meta_model = LogisticRegression()

    def fit(self, user_data, product_data, interactions):
        # Train content model
        product_features = self.content_model.fit_transform(product_data['description'])

        # Train collaborative filtering
        self.cf_model.fit(interactions)

        # Train deep learning model
        self.dl_model.fit(user_data, product_data, interactions)

        # Generate meta features
        meta_features = self._generate_meta_features(user_data, product_data)

        # Train meta model
        self.meta_model.fit(meta_features, interactions['rating'])

    def _generate_meta_features(self, user_data, product_data):
        # Kết hợp features từ các mô hình khác nhau
        content_features = self.content_model.transform(product_data['description'])
        cf_predictions = self.cf_model.predict(user_data['user_id'], product_data['product_id'])
        dl_predictions = self.dl_model.predict(user_data['user_id'], product_data['product_id'])

        return np.hstack([content_features, cf_predictions, dl_predictions])

    def predict(self, user_id, product_id):
        # Ensemble prediction
        content_score = self._content_based_score(user_id, product_id)
        cf_score = self.cf_model.predict(user_id, product_id)
        dl_score = self.dl_model.predict(user_id, product_id)

        # Meta features
        meta_features = np.array([content_score, cf_score, dl_score]).reshape(1, -1)

        # Final prediction
        final_score = self.meta_model.predict_proba(meta_features)[0][1]

        return final_score

# Performance metrics
def evaluate_system(system, test_data):
    predictions = system.predict_batch(test_data)
    accuracy = accuracy_score(test_data['rating'], predictions)
    latency = np.mean(predictions['latency'])

    return {
        'accuracy': accuracy,
        'latency': latency,
        'throughput': 1000/latency  # requests per second
    }

Kết quả

Mô hình đơn lẻ Accuracy Latency Cost/request
Content-based 72.4% 12ms $0.0002
Collaborative 78.9% 45ms $0.0005
Deep Learning 81.3% 38ms $0.0008
Ensemble 89.7% 52ms $0.0015

Phân tích: Ensemble vượt trội hơn mỗi mô hình đơn lẻ, nhưng latency hơi vượt yêu cầu. Giải pháp: dùng distillation để tạo student model.

Model Distillation trong Production

Chiến lược triển khai

class ProductionDistillation:
    def __init__(self, teacher_model, student_model, dataset):
        self.teacher = teacher_model
        self.student = student_model
        self.dataset = dataset

        # Load pre-trained teacher
        self.teacher.load_state_dict(torch.load('teacher.pth'))
        self.teacher.eval()

    def generate_soft_labels(self, batch_size=256):
        """
        Generate soft labels từ teacher model
        """
        soft_labels = []
        loader = DataLoader(self.dataset, batch_size=batch_size, shuffle=False)

        with torch.no_grad():
            for inputs, _ in loader:
                inputs = inputs.to(device)
                outputs = self.teacher(inputs)
                soft_labels.append(outputs.cpu())

        return torch.cat(soft_labels, dim=0)

    def train_student(self, epochs=10, T=3):
        """
        Train student model với soft labels
        """
        # Generate soft labels
        soft_labels = self.generate_soft_labels()

        # Training loop
        optimizer = torch.optim.Adam(self.student.parameters(), lr=0.001)
        criterion = nn.KLDivLoss()

        for epoch in range(epochs):
            total_loss = 0
            loader = DataLoader(self.dataset, batch_size=256, shuffle=True)

            for inputs, labels in loader:
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward
                student_outputs = self.student(inputs)
                soft_targets = soft_labels[inputs]

                # Calculate loss
                loss = criterion(
                    torch.log_softmax(student_outputs / T, dim=1),
                    torch.softmax(soft_targets / T, dim=1)
                ) * (T * T)

                # Backward
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            print(f'Epoch {epoch+1}, Loss: {total_loss/len(loader):.4f}')

    def export_optimized(self):
        """
        Export student model đã tối ưu hóa
        """
        # Quantization
        quantized_model = torch.quantization.quantize_dynamic(
            self.student, {nn.Linear}, dtype=torch.qint8
        )

        # Save
        torch.jit.save(torch.jit.script(quantized_model), 'student_optimized.pth')

Benchmark so sánh

Mô hình Size Latency Accuracy Memory
Teacher (BERT-base) 440MB 220ms 92.1% 2.1GB
Student (DistilBERT) 66MB 45ms 89.3% 380MB
Student (Quantized) 18MB 28ms 88.7% 120MB

Thách thức và Giải pháp

1. Vấn đề về Data Drift

Vấn đề: Hiệu năng mô hình giảm dần theo thời gian do thay đổi phân phối dữ liệu.

class DriftDetector:
    def __init__(self, model, threshold=0.05):
        self.model = model
        self.threshold = threshold
        self.reference_metrics = None

    def collect_reference_metrics(self, data_stream, window_size=1000):
        """
        Thu thập metrics trên dữ liệu hiện tại
        """
        predictions = []
        for i, (inputs, labels) in enumerate(data_stream):
            if i >= window_size:
                break
            outputs = self.model.predict(inputs)
            predictions.append(outputs)

        self.reference_metrics = {
            'accuracy': accuracy_score(labels, predictions),
            'distribution': self._compute_distribution(predictions)
        }

    def detect_drift(self, new_data_stream, window_size=500):
        """
        Phát hiện drift trên dữ liệu mới
        """
        predictions = []
        for i, (inputs, labels) in enumerate(new_data_stream):
            if i >= window_size:
                break
            outputs = self.model.predict(inputs)
            predictions.append(outputs)

        new_accuracy = accuracy_score(labels, predictions)
        new_distribution = self._compute_distribution(predictions)

        # So sánh với reference
        accuracy_diff = abs(new_accuracy - self.reference_metrics['accuracy'])
        distribution_diff = self._compare_distributions(
            self.reference_metrics['distribution'], new_distribution
        )

        return {
            'accuracy_drift': accuracy_diff > self.threshold,
            'distribution_drift': distribution_diff > self.threshold
        }

2. Vấn đề về Latency

Giải pháp: Sử dụng Model Pruning và Quantization.

from transformers import AutoModelForSequenceClassification

# Load pre-trained model
model = AutoModelForSequenceClassification.from_pretrained('distilbert-base-uncased')

# Pruning
from transformers import HeadPruner

pruner = HeadPruner(
    pruner_type="sigmad",
    head_to_keep=4,  # Giữ lại 4 heads
    importance_score="attention",
    importance_method="mask_mean",
    prune_method="block",
    prune_order="lowest",
    n_heads=12
)

pruned_model = pruner.prune(model)

# Quantization
quantized_model = torch.quantization.quantize_dynamic(
    pruned_model, {nn.Linear}, dtype=torch.qint8
)

# Benchmark
def benchmark_model(model, test_data):
    import time

    start_time = time.time()
    for inputs in test_data:
        _ = model(**inputs)
    total_time = time.time() - start_time

    return {
        'latency_avg': total_time/len(test_data),
        'size': sum(p.numel() for p in model.parameters()) * 4 / 1024 / 1024,
        'accuracy': evaluate_accuracy(model, test_data)
    }

Best Practices cho Production

1. A/B Testing cho Ensemble

class ABTestEnsemble:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.metrics = {
            'a': {'correct': 0, 'total': 0, 'latency': []},
            'b': {'correct': 0, 'total': 0, 'latency': []}
        }

    def route_request(self, request_id):
        """
        Route request dựa trên ID để đảm bảo consistency
        """
        if request_id % 100 < self.traffic_split * 100:
            return 'a'
        else:
            return 'b'

    def process_request(self, request_id, data):
        route = self.route_request(request_id)
        model = self.model_a if route == 'a' else self.model_b

        start_time = time.time()
        prediction = model.predict(data)
        latency = time.time() - start_time

        # Collect metrics
        self.metrics[route]['total'] += 1
        self.metrics[route]['latency'].append(latency)

        return prediction

    def get_results(self):
        """
        Tính toán metrics cho cả hai nhóm
        """
        results = {}
        for group in ['a', 'b']:
            metrics = self.metrics[group]
            results[group] = {
                'accuracy': metrics['correct'] / metrics['total'],
                'latency_p50': np.percentile(metrics['latency'], 50),
                'latency_p95': np.percentile(metrics['latency'], 95),
                'throughput': metrics['total'] / sum(metrics['latency'])
            }

        return results

2. Monitoring và Alerting

class EnsembleMonitor:
    def __init__(self, models, alert_thresholds):
        self.models = models
        self.alert_thresholds = alert_thresholds
        self.metrics = {
            'accuracy': [],
            'latency': [],
            'disagreement': []  # Disagreement rate giữa các mô hình
        }

    def collect_metrics(self, batch_data):
        """
        Thu thập metrics từ một batch dữ liệu
        """
        accuracies = []
        latencies = []
        disagreements = []

        for model in self.models:
            start_time = time.time()
            predictions = model.predict(batch_data['inputs'])
            latency = time.time() - start_time

            accuracy = accuracy_score(batch_data['labels'], predictions)
            disagreement = self._compute_disagreement(predictions)

            accuracies.append(accuracy)
            latencies.append(latency)
            disagreements.append(disagreement)

        # Lưu metrics
        self.metrics['accuracy'].append(np.mean(accuracies))
        self.metrics['latency'].append(np.mean(latencies))
        self.metrics['disagreement'].append(np.mean(disagreements))

    def check_alerts(self):
        """
        Kiểm tra xem có cần alert không
        """
        alerts = []

        # Check accuracy
        if np.mean(self.metrics['accuracy'][-100:]) < self.alert_thresholds['accuracy']:
            alerts.append('Accuracy below threshold')

        # Check latency
        if np.mean(self.metrics['latency'][-100:]) > self.alert_thresholds['latency']:
            alerts.append('Latency above threshold')

        # Check disagreement
        if np.mean(self.metrics['disagreement'][-100:]) > self.alert_thresholds['disagreement']:
            alerts.append('High disagreement rate')

        return alerts

# Alerting system
def send_alert(message):
    # Send alert qua Slack, email, hoặc SMS
    pass

# Usage
monitor = EnsembleMonitor(
    models=[model_a, model_b, model_c],
    alert_thresholds={
        'accuracy': 0.85,
        'latency': 0.1,  # 100ms
        'disagreement': 0.2
    }
)

# Trong production loop
while True:
    batch_data = get_batch_from_stream()
    monitor.collect_metrics(batch_data)

    alerts = monitor.check_alerts()
    if alerts:
        for alert in alerts:
            send_alert(alert)

Tương lai của Ensemble Learning

1. Neural Architecture Search (NAS) cho Ensemble

from torch import nn
import torch

class EnsembleNAS:
    def __init__(self, search_space, num_generations=100):
        self.search_space = search_space
        self.num_generations = num_generations
        self.population = self._initialize_population()

    def _initialize_population(self):
        """
        Khởi tạo population với các kiến trúc ngẫu nhiên
        """
        population = []
        for _ in range(50):  # 50 cá thể ban đầu
            architecture = self._random_architecture()
            population.append(architecture)
        return population

    def _random_architecture(self):
        """
        Tạo ra một kiến trúc ngẫu nhiên
        """
        # Ví dụ: Random số layer, units, activation functions
        num_layers = np.random.randint(2, 6)
        layers = []

        for i in range(num_layers):
            layer_type = np.random.choice([
                'linear', 'conv2d', 'lstm', 'gru'
            ])

            if layer_type == 'linear':
                units = np.random.choice([64, 128, 256, 512])
                layers.append(nn.Linear(784, units))
                layers.append(self._random_activation())
            elif layer_type == 'conv2d':
                layers.append(nn.Conv2d(1, 32, 3))
                layers.append(nn.ReLU())
                layers.append(nn.MaxPool2d(2))

        return nn.Sequential(*layers)

    def _random_activation(self):
        return np.random.choice([
            nn.ReLU(), nn.Tanh(), nn.Sigmoid(), nn.LeakyReLU()
        ])

    def evolve(self, fitness_fn):
        """
        Tiến hóa population qua các thế hệ
        """
        for generation in range(self.num_generations):
            # Đánh giá fitness
            fitness_scores = []
            for individual in self.population:
                score = fitness_fn(individual)
                fitness_scores.append(score)

            # Chọn lọc
            selected = self._select(self.population, fitness_scores, k=20)

            # Lai tạo
            offspring = []
            for _ in range(30):
                parent1, parent2 = np.random.choice(selected, 2)
                child = self._crossover(parent1, parent2)
                offspring.append(child)

            # Đột biến
            for child in offspring:
                if np.random.random() < 0.1:  # 10% probability
                    self._mutate(child)

            # Tạo population mới
            self.population = selected + offspring

    def _select(self, population, fitness, k=10):
        """
        Chọn lọc top-k cá thể
        """
        sorted_indices = np.argsort(fitness)[::-1]  # Giảm dần
        return [population[i] for i in sorted_indices[:k]]

    def _crossover(self, parent1, parent2):
        """
        Lai tạo giữa hai cha mẹ
        """
        child = nn.Sequential()
        for i in range(len(parent1)):
            if np.random.random() < 0.5:
                child.append(parent1[i])
            else:
                child.append(parent2[i])
        return child

    def _mutate(self, individual):
        """
        Đột biến một cá thể
        """
        if np.random.random() < 0.5 and len(individual) > 2:
            # Xóa một layer
            index = np.random.randint(1, len(individual)-1)
            individual.pop(index)
        else:
            # Thêm một layer
            index = np.random.randint(1, len(individual))
            new_layer = self._random_layer()
            individual.insert(index, new_layer)

2. Federated Ensemble Learning

import torch.distributed as dist

class FederatedEnsemble:
    def __init__(self, num_clients=100, num_rounds=100):
        self.num_clients = num_clients
        self.num_rounds = num_rounds
        self.global_model = self._initialize_global_model()

    def _initialize_global_model(self):
        # Tạo global model
        return EnsembleModel()

    def train_round(self, data_center):
        """
        Một vòng huấn luyện federated
        """
        # Phân phối model cho các client
        client_models = self._distribute_model()

        # Huấn luyện trên các client
        updated_models = []
        for client_id in range(self.num_clients):
            client_data = data_center.get_client_data(client_id)
            updated_model = client_models[client_id].train(client_data)
            updated_models.append(updated_model)

        # Aggregate models
        self._aggregate_models(updated_models)

    def _distribute_model(self):
        """
        Phân phối global model cho các client
        """
        client_models = []
        for client_id in range(self.num_clients):
            client_model = copy.deepcopy(self.global_model)
            client_models.append(client_model)
        return client_models

    def _aggregate_models(self, models):
        """
        Aggregate các client models thành global model
        """
        # Weighted average dựa trên dữ liệu size
        weights = [model.data_size for model in models]
        total_weight = sum(weights)

        aggregated_state = {}
        for key in models[0].state_dict().keys():
            weighted_sum = sum(
                weights[i] * models[i].state_dict()[key] for i in range(len(models))
            )
            aggregated_state[key] = weighted_sum / total_weight

        self.global_model.load_state_dict(aggregated_state)

Kết luận

Cross-model Ensembling và Model Distillation không chỉ là những kỹ thuật “cool” trong AI research, mà còn là giải pháp thiết thực cho các bài toán production thực tế. Từ việc cải thiện accuracy, giảm latency, đến tiết kiệm chi phí hạ tầng, những chiến lược này mang lại giá trị thực sự cho doanh nghiệp.

Key Takeaways:

  1. Đa dạng là sức mạnh: Kết hợp các mô hình khác nhau (heterogeneous) thường cho kết quả tốt hơn một mô hình “siêu to khổng lồ”.
  2. Distillation là nghệ thuật: Biết cách truyền đạt tri thức từ mô hình lớn sang mô hình nhỏ là kỹ năng quan trọng.
  3. Production là chiến trường thực sự: Lý thuyết đẹp mấy cũng phải đo bằng metrics: accuracy, latency, cost, và reliability.

Câu hỏi thảo luận: Anh em đã từng gặp trường hợp nào phải lựa chọn giữa accuracy và latency chưa? Giải quyết thế nào?

Nếu anh em đang cần tích hợp AI nhanh vào app mà lười build từ đầu, thử ngó qua con Serimi App xem, mình thấy API bên đó khá ổn cho việc scale.

Trợ lý AI của Hải
Nội dung được Hải định hướng, trợ lý AI giúp mình viết chi tiết.
Chia sẻ tới bạn bè và gia đình