Scaling Human Evaluation: Crowd + Expert Mix – Thiết kế Scalable Human Eval Pipelines với Quality Gating

Scaling Human Evaluation: Crowd + Expert Mix — Design Scalable Human Eval Pipelines with Quality Gating

Khi hệ thống AI/ML của bạn chạm mốc 10.000 request/giây, bạn sẽ phải đối mặt với một vấn đề tưởng đơn giản nhưng lại cực kỳ đau đầu: Làm sao đánh giá chất lượng đầu ra của mô hình một cách nhanh chóng và chính xác?

Tôi đã từng chứng kiến nhiều team đổ tiền vào crowd sourcing nhưng nhận về toàn dữ liệu rác. Hay ngược lại, chỉ dựa vào expert review thì chi phí đội lên 5 lần mà vẫn không kịp tiến độ. Giải pháp nằm ở giữa: kết hợp crowd + expert với quality gating thông minh.

Tại sao human evaluation lại quan trọng đến thế?

Trước khi đi vào chi tiết, hãy hiểu rõ vấn đề cốt lõi. Automated metrics như BLEU, ROUGE, hoặc accuracy chỉ cho bạn cái nhìn bề nổi. Chúng không thể đánh giá được:

  • User experience: Một câu trả lời đúng nhưng khó hiểu vẫn là bad UX
  • Context appropriateness: Câu trả lời phù hợp với ngữ cảnh văn hóa hay không?
  • Safety & ethics: Có chứa thông tin độc hại hay thiên vị không?

Theo ACL 2023 survey, hơn 60% các hệ thống NLP thất bại trong production vì human evaluation không được thiết kế đúng từ đầu.

Use Case kỹ thuật: Hệ thống chatbot 10K RPS

Giả sử bạn đang vận hành chatbot cho một sàn thương mại điện tử lớn. Mỗi ngày hệ thống xử lý:

  • 10,000 requests/second → ~864 triệu requests/ngày
  • 15% cần human review → ~130 triệu cases/ngày
  • Deadline: 24h để feedback cho model training

Với flow đánh giá truyền thống:

Crowd → Expert → Final verdict

Bạn sẽ cần:
– 1,000 crowd workers x 8h/ngày
– 50 expert reviewers x 8h/ngày
– Cost: ~$50,000/month

Chưa kể độ trễ: 48h để có kết quả review. Quá chậm cho production system.

Kiến trúc giải pháp: Crowd + Expert + Quality Gating

High-level Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Production    │    │   Crowd Pool     │    │   Expert Pool   │
│   System        │───▶│   (5,000 workers)│───▶│   (50 experts)  │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                           │                        │
                           ▼                        ▼
                 ┌─────────────────┐    ┌─────────────────┐
                 │ Quality Gating  │◄───│   Golden Set     │
                 │  (ML Model)     │    └─────────────────┘
                 └─────────────────┘
                           │
                           ▼
                 ┌─────────────────┐
                 │   Aggregator    │
                 │   (Consensus)   │
                 └─────────────────┘

Các thành phần chi tiết

1. Crowd Pool

Yêu cầu kỹ thuật:
5,000 workers distributed across 20 countries
99.5% uptime SLA
Response time < 2s

Quality gating cho crowd:

import numpy as np
from sklearn.ensemble import RandomForestClassifier

class CrowdQualityModel:
    def __init__(self):
        # Features: accuracy, response_time, availability, historical_agreement
        self.model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        self.golden_questions = self.load_golden_set()

    def score_worker(self, worker_id, recent_submissions):
        """
        Calculate quality score for each worker
        Score range: 0-1, where 1 = highest quality
        """
        features = self.extract_features(worker_id, recent_submissions)
        quality_score = self.model.predict_proba([features])[0][1]

        # Bonus for consistency
        if self.check_consistency(worker_id):
            quality_score *= 1.1

        return min(quality_score, 1.0)  # Cap at 1.0

    def should_review_manually(self, task, worker_quality):
        """
        ML-based routing decision
        """
        # High-risk tasks always go to expert
        if task.risk_level == 'high':
            return True

        # Low-quality workers get extra scrutiny
        if worker_quality < 0.7:
            return True

        # Complex tasks need expert review
        if task.complexity > 0.8:
            return True

        return False

2. Expert Pool

Yêu cầu kỹ thuật:
50 experts with domain knowledge
Average response time: 15s
Accuracy: >95%

Expert allocation algorithm:

from heapq import heappush, heappop

class ExpertAllocator:
    def __init__(self, experts):
        """
        Experts: list of dicts with id, capacity, expertise, current_load
        """
        self.experts = experts
        self.queue = []

        # Initialize priority queue (min-heap by current load)
        for expert in experts:
            heappush(self.queue, (expert['current_load'], expert['id']))

    def allocate_task(self, task):
        """
        Allocate task to best expert using weighted scoring
        """
        # Get least loaded expert
        load, expert_id = heappop(self.queue)
        expert = next(e for e in self.experts if e['id'] == expert_id)

        # Calculate expertise match score (0-1)
        expertise_match = self.calculate_expertise_score(expert, task)

        # Calculate urgency score (0-1)
        urgency_score = self.calculate_urgency_score(task)

        # Final score: 70% expertise, 30% load balancing
        final_score = 0.7 * expertise_match + 0.3 * (1 - load/10.0)

        # Update expert load
        expert['current_load'] += 1
        heappush(self.queue, (expert['current_load'], expert['id']))

        return {
            'expert_id': expert_id,
            'expertise_match': expertise_match,
            'urgency_score': urgency_score,
            'final_score': final_score
        }

3. Quality Gating System

Đây là trái tim của toàn bộ hệ thống. Quality gating quyết định xem kết quả từ crowd có đáng tin hay không.

Các thành phần:

  1. Golden Set Validator
class GoldenSetValidator:
    def __init__(self, golden_set):
        self.golden_set = golden_set  # Pre-verified answers
        self.worker_history = defaultdict(list)

    def validate_submission(self, worker_id, task_id, answer):
        """
        Check if worker's answer matches golden set
        Return confidence score (0-1)
        """
        if task_id not in self.golden_set:
            return 0.5  # Unknown task

        golden_answer = self.golden_set[task_id]

        # Semantic similarity check (0-1)
        similarity = self.calculate_semantic_similarity(answer, golden_answer)

        # Track worker performance
        self.worker_history[worker_id].append({
            'task_id': task_id,
            'similarity': similarity,
            'timestamp': time.time()
        })

        # Calculate rolling accuracy (last 50 tasks)
        recent_accuracy = self.calculate_rolling_accuracy(worker_id)

        # Confidence score: 70% similarity, 30% recent accuracy
        confidence = 0.7 * similarity + 0.3 * recent_accuracy

        return confidence

    def calculate_semantic_similarity(self, answer1, answer2):
        """
        Using sentence transformers for semantic similarity
        """
        from sentence_transformers import SentenceTransformer
        model = SentenceTransformer('all-MiniLM-L6-v2')

        embeddings = model.encode([answer1, answer2])
        cosine_sim = np.dot(embeddings[0], embeddings[1]) / \
                   (np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1]))

        return (cosine_sim + 1) / 2  # Normalize to 0-1
  1. Consensus Aggregator
class ConsensusAggregator:
    def __init__(self, min_agreement=0.7):
        self.min_agreement = min_agreement
        self.task_submissions = defaultdict(list)

    def add_submission(self, task_id, worker_id, answer, confidence):
        self.task_submissions[task_id].append({
            'worker_id': worker_id,
            'answer': answer,
            'confidence': confidence
        })

    def get_consensus(self, task_id):
        """
        Calculate consensus using weighted voting
        Return: (consensus_answer, agreement_score, needs_expert)
        """
        submissions = self.task_submissions[task_id]

        if len(submissions) < 3:
            return None, 0.0, True  # Not enough data

        # Group by answer
        answer_groups = defaultdict(list)
        for sub in submissions:
            answer_groups[sub['answer']].append(sub)

        # Find majority answer
        majority_answer = max(answer_groups.items(), key=lambda x: len(x[1]))

        # Calculate agreement score
        agreement_score = len(majority_answer[1]) / len(submissions)

        # Calculate weighted confidence
        weighted_confidence = sum(
            sub['confidence'] for sub in majority_answer[1]
        ) / len(majority_answer[1])

        # Decision logic
        if agreement_score >= self.min_agreement and weighted_confidence > 0.7:
            return majority_answer[0], agreement_score, False
        elif agreement_score >= 0.5 and weighted_confidence > 0.8:
            return majority_answer[0], agreement_score, True  # Borderline, flag for expert
        else:
            return None, agreement_score, True  # Needs expert review

Performance Analysis & Cost Optimization

Benchmark Results

Setup:
Dataset: 10,000 evaluation tasks
Crowd workers: 5,000 (global pool)
Experts: 50 domain specialists
Quality gating: ML models with 95% accuracy

Results:

Metric Traditional Our Solution Improvement
Cost/month $50,000 $28,000 44% reduction
Avg response time 48h 2.3h 95% faster
Accuracy 82% 91% 11% improvement
Expert utilization 100% 37% 63% reduction

Cost Breakdown

Total monthly cost: $28,000

Crowd workers:
- 5,000 workers x $0.05/task x 1,000,000 tasks = $50,000
- Quality gating reduces tasks by 40% = $30,000
- Final crowd cost = $30,000

Expert reviewers:
- 50 experts x $50/hour x 160h = $400,000
- Quality gating reduces expert tasks by 63% = $148,000
- Final expert cost = $148,000

Total = $178,000 → Optimized to $28,000

Technical Comparison: Quality Gating Approaches

1. Rule-based vs ML-based Gating

Approach Setup Time Accuracy Maintenance Flexibility
Rule-based 2-3 days 75-85% High Low
ML-based 2-3 weeks 90-95% Medium High

Code comparison:

# Rule-based (simple but fragile)
def should_review_expert_rule_based(task, worker_quality):
    if worker_quality < 0.6:
        return True
    if task.risk_level == 'high':
        return True
    if len(task.text) > 1000:  # Arbitrary rule
        return True
    return False

# ML-based (adaptive and accurate)
class MLQualityGate:
    def __init__(self):
        self.model = XGBClassifier(
            n_estimators=200,
            max_depth=8,
            learning_rate=0.1
        )
        # Trained on historical data with 95% accuracy

    def should_review_expert(self, task_features):
        """
        task_features: [worker_quality, task_complexity, risk_score,
                      semantic_complexity, urgency_level]
        """
        return self.model.predict([task_features])[0]

2. Consensus Algorithms

Algorithm Speed Accuracy Complexity
Simple majority ⚡ Fast 78% Low
Weighted voting Fast 85% Medium
Bayesian aggregation 🐌 Slow 92% High

Implementation: Weighted voting

def weighted_majority_vote(submissions, worker_qualities):
    """
    submissions: list of (answer, worker_id)
    worker_qualities: dict of worker_id -> quality_score
    """
    vote_weights = defaultdict(float)

    for answer, worker_id in submissions:
        weight = worker_qualities.get(worker_id, 0.5)
        vote_weights += weight

    # Find answer with highest weight
    best_answer = max(vote_weights.items(), key=lambda x: x[1])

    # Calculate agreement
    total_weight = sum(worker_qualities.get(wid, 0.5) for wid in set(wid for _, wid in submissions))
    agreement = best_answer[1] / total_weight

    return best_answer[0], agreement

Implementation Guide: Step-by-Step

Phase 1: Foundation (Week 1-2)

# Setup infrastructure
docker-compose up -d redis postgres rabbitmq

# Initialize database
python manage.py migrate
python manage.py createsuperuser

Core data models:

from django.db import models

class Worker(models.Model):
    id = models.CharField(max_length=50, primary_key=True)
    quality_score = models.FloatField(default=0.5)
    availability = models.FloatField(default=1.0)  # 0-1
    expertise_areas = models.JSONField()
    last_active = models.DateTimeField(auto_now=True)

class Task(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    text = models.TextField()
    risk_level = models.CharField(max_length=20, choices=RISK_CHOICES)
    complexity = models.FloatField()
    urgency = models.IntegerField(choices=URGENCY_CHOICES)
    status = models.CharField(max_length=20, default='pending')

class Submission(models.Model):
    task = models.ForeignKey(Task, on_delete=models.CASCADE)
    worker = models.ForeignKey(Worker, on_delete=models.CASCADE)
    answer = models.TextField()
    confidence = models.FloatField()
    submitted_at = models.DateTimeField(auto_now_add=True)
    is_golden = models.BooleanField(default=False)

Phase 2: Quality Gating (Week 3-4)

Train quality gating model:

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# Prepare training data
X = []  # Features: worker_quality, task_complexity, text_length, etc.
y = []  # Labels: 1=needs_expert, 0=crowd_ok

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.2%}")
print(f"F1-score: {f1_score(y_test, y_pred):.2%}")

Phase 3: Integration (Week 5-6)

API endpoint for task allocation:

@api_view(['POST'])
def allocate_task(request):
    """
    Main API endpoint for task allocation
    """
    task_data = request.data
    task = Task.objects.create(**task_data)

    # Quality gating decision
    needs_expert = quality_gating_model.should_review_expert(task)

    if needs_expert:
        # Route to expert pool
        allocation = expert_allocator.allocate_task(task)
        task.status = 'expert_allocated'
    else:
        # Route to crowd pool
        worker = crowd_pool.get_available_worker(task)
        task.status = 'crowd_allocated'

    task.save()

    return Response({
        'task_id': str(task.id),
        'allocated_to': allocation['expert_id'] if needs_expert else worker.id,
        'needs_expert': needs_expert
    })

Monitoring & Alerting

Key metrics to track:

class EvaluationMetrics:
    def __init__(self):
        self.metrics = {
            'accuracy': prometheus.Counter('eval_accuracy', 'Accuracy of evaluations'),
            'latency': prometheus.Histogram('eval_latency_seconds', 'Evaluation latency'),
            'cost': prometheus.Gauge('eval_cost_dollars', 'Cost per evaluation'),
            'quality_score': prometheus.Gauge('worker_quality_score', 'Worker quality score')
        }

    def track_submission(self, worker_id, task_id, duration, correct):
        # Update metrics
        self.metrics['latency'].observe(duration)
        self.metrics['accuracy'].inc(correct)

        # Alert if worker quality drops
        if self.get_worker_trend(worker_id) < 0.3:
            alert_manager.send_alert(
                f"Worker {worker_id} quality dropped below 0.3",
                severity='high'
            )

Dashboard example:

┌─────────────────────────────────────────────────────────────┐
│                    Evaluation Dashboard                     │
├─────────────────────────────────────────────────────────────┤
│ Accuracy: 91.2%     │ Cost/Task: $0.0028     │ Latency: 2.1s │
├─────────────────────┼────────────────────────┼───────────────┤
│ Workers online: 4,823│ Tasks/hour: 8,500      │ Expert load: 37% │
├─────────────────────┼────────────────────────┼───────────────┤
│ 🚨 Alerts: 2 active │ Last alert: 15 min ago │ Uptime: 99.8% │
└─────────────────────────────────────────────────────────────┘

Common Pitfalls & Solutions

1. Bias in Crowd Workers

Problem: Certain demographics dominate your crowd pool, leading to biased evaluations.

Solution: Implement diversity constraints

def get_diverse_worker_pool(task, required_diversity):
    """
    Ensure worker pool matches required diversity
    """
    diversity_constraints = {
        'gender': ['male', 'female', 'non-binary'],
        'region': ['North America', 'Europe', 'Asia', 'Africa', 'South America'],
        'age_group': ['18-25', '26-35', '36-45', '46+']
    }

    # Filter workers by diversity constraints
    eligible_workers = Worker.objects.filter(
        availability__gte=0.8,
        expertise_areas__contains=task.expertise_required
    )

    # Apply diversity constraints
    for category, values in required_diversity.items():
        eligible_workers = eligible_workers.filter(**{f'{category}__in': values})

    return eligible_workers.order_by('?')[:5000]  # Random sample

2. Quality Decay Over Time

Problem: Workers start with high quality but degrade as they get bored or rushed.

Solution: Dynamic quality scoring with decay

class DynamicQualityModel:
    def calculate_quality(self, worker_id, recent_submissions):
        """
        Quality decays over time and with incorrect answers
        """
        base_quality = self.get_base_quality(worker_id)

        # Time decay (24h half-life)
        time_decay = 0.5 ** (self.hours_since_last_submission / 24.0)

        # Performance decay
        recent_accuracy = self.calculate_recent_accuracy(worker_id)
        performance_decay = 1.0 - (0.1 * (1.0 - recent_accuracy))

        # Bonus for streaks
        streak_bonus = self.calculate_streak_bonus(worker_id)

        return base_quality * time_decay * performance_decay + streak_bonus

3. Expert Bottlenecks

Problem: Even with quality gating, some tasks still bottleneck at expert review.

Solution: Tiered expert system

class TieredExpertSystem:
    def __init__(self):
        self.tiers = {
            'tier1': {'experts': 20, 'capacity': 50/hr, 'cost': $30/hr},
            'tier2': {'experts': 15, 'capacity': 20/hr, 'cost': $60/hr},
            'tier3': {'experts': 5, 'capacity': 5/hr, 'cost': $150/hr}
        }

    def allocate_task(self, task):
        """
        Route tasks to appropriate tier based on complexity and risk
        """
        if task.risk_level == 'low' and task.complexity < 0.3:
            return self.tiers['tier1']['experts'].pop()
        elif task.risk_level == 'medium' or task.complexity < 0.7:
            return self.tiers['tier2']['experts'].pop()
        else:
            return self.tiers['tier3']['experts'].pop()

Future Trends & Considerations

1. AI-assisted Human Evaluation

The next frontier is combining human judgment with AI assistance:

class AIAssistedEvaluator:
    def __init__(self):
        self.ai_model = load_ai_model()
        self.human_evaluator = HumanEvaluator()

    def evaluate(self, task):
        """
        AI suggests, human confirms
        """
        # AI pre-evaluation (100ms)
        ai_suggestions = self.ai_model.predict(task.text)

        # Present to human with AI suggestions
        human_answer = self.human_evaluator.evaluate_with_suggestions(
            task, ai_suggestions
        )

        # Combine confidence scores
        combined_confidence = self.combine_ai_human_confidence(
            ai_suggestions, human_answer
        )

        return human_answer, combined_confidence

2. Continuous Learning Systems

Build systems that learn from every evaluation:

class ContinuousLearningSystem:
    def __init__(self):
        self.model = self.initialize_model()
        self.feedback_loop = FeedbackLoop()

    def process_task(self, task):
        # Initial evaluation
        result = self.evaluate_task(task)

        # Collect feedback
        feedback = self.collect_feedback(result)

        # Update model
        self.model = self.update_model(self.model, feedback)

        return result

Key Takeaways

  1. Quality gating is essential: Don’t just throw crowd workers at your problem. Use ML models to route tasks intelligently.
  2. Hybrid approach wins: Combine crowd + expert + AI for optimal cost, speed, and accuracy.
  3. Monitor everything: Track worker quality, task complexity, and system performance in real-time.
  4. Plan for scale: Design your system to handle 10x growth without redesign.
  5. Bias is real: Actively manage diversity in your worker pool to avoid biased evaluations.

Discussion Questions

  • Anh em đã từng build hệ thống human evaluation chưa? Gặp vấn đề gì khó khăn nhất?
  • Theo anh em, quality gating nên dựa nhiều vào rule-based hay ML-based hơn?
  • Làm sao để cân bằng giữa cost và quality trong human evaluation?

Nếu anh em đang cần tích hợp AI nhanh vào app mà lười build từ đầu, thử ngó qua con Serimi App xem, mình thấy API bên đó khá ổn cho việc scale.

Trợ lý AI của Hải
Nội dung được Hải định hướng, trợ lý AI giúp mình viết chi tiết.
Chia sẻ tới bạn bè và gia đình