Skip to content

Data Conditioning Pipeline Design

Date: December 4, 2025 Status: Design Phase Related: LIST_LOADER_DESIGN.md, cbapp GITHUB_ISSUES.md


Overview

Rather than sending raw CSV data directly from cbtenant to cbapp, implement a data conditioning pipeline that cleans, validates, deduplicates, and enriches data before database insertion. This approach:

✅ Improves data quality at the source ✅ Reduces database load and storage waste ✅ Enables fast progress bars (numpy/pandas are fast) ✅ Supports future AI-powered enrichment ✅ Keeps cbapp schema clean and focused ✅ Makes testing easier (pure functions)


Architecture

┌──────────────────────────────────────────────────────────────────┐
│                    Tenant Manager (cbtenant)                      │
│                                                                   │
│  ┌─────────────┐    ┌──────────────┐    ┌──────────────────┐   │
│  │  CSV Upload │───▶│ ListLoader   │───▶│ DataConditioner  │   │
│  │  (raw file) │    │ Agent        │    │ (clean/dedupe)   │   │
│  └─────────────┘    │ - AI Mapping │    └────────┬─────────┘   │
│                     └──────────────┘             │              │
│                                                  │              │
│                                    Conditioned Data              │
│                                    (pandas DataFrame)            │
│                                                  │              │
│                                                  ▼              │
│                                    ┌──────────────────────┐    │
│                                    │  Import Service      │    │
│                                    │  - Batch to cbapp    │    │
│                                    │  - Progress tracking │    │
│                                    └──────────┬───────────┘    │
└───────────────────────────────────────────────┼────────────────┘
                                                │ HTTP API
                                                │ POST /persons/batch
┌──────────────────────────────────────────────────────────────────┐
│                       Tenant App (cbapp)                          │
│                                                                   │
│  ┌─────────────────┐    ┌──────────────────┐                    │
│  │  POST           │───▶│  DuckDB          │                    │
│  │  /persons/batch │    │  person table    │                    │
│  │  (clean data)   │    │  (high quality)  │                    │
│  └─────────────────┘    └──────────────────┘                    │
└──────────────────────────────────────────────────────────────────┘

Pipeline Stages

1. Parse & Load

Input: Raw CSV file Output: pandas DataFrame Operations: - Detect encoding (UTF-8, Latin-1, etc.) - Detect delimiter (comma, tab, semicolon) - Parse CSV to DataFrame - Handle quoted fields - Detect data types

def load_csv(file_path: str) -> pd.DataFrame:
    """Load CSV with auto-detection."""
    # Try UTF-8 first
    try:
        df = pd.read_csv(file_path, encoding='utf-8')
    except UnicodeDecodeError:
        df = pd.read_csv(file_path, encoding='latin-1')

    return df

2. Apply AI Mappings

Input: Raw DataFrame + FieldMappings Output: Renamed DataFrame with target schema columns Operations: - Rename columns based on AI mappings - Map to custom_fields for unmapped columns - Drop columns marked as "skip" - Preserve original data in JSON column

def apply_mappings(df: pd.DataFrame, mappings: list[FieldMapping]) -> pd.DataFrame:
    """Apply AI-suggested field mappings."""
    rename_map = {}
    custom_field_map = {}

    for mapping in mappings:
        if mapping.target.startswith('custom_fields.'):
            field_name = mapping.target.replace('custom_fields.', '')
            custom_field_map[mapping.source] = field_name
        elif mapping.target != 'skip':
            rename_map[mapping.source] = mapping.target

    # Rename standard fields
    df_mapped = df.rename(columns=rename_map)

    # Handle custom fields as JSON
    if custom_field_map:
        custom_cols = list(custom_field_map.keys())
        df_mapped['custom_fields'] = df[custom_cols].apply(
            lambda row: {custom_field_map[k]: v for k, v in row.items() if pd.notna(v)},
            axis=1
        )

    # Store original data
    df_mapped['original_data'] = df.to_dict('records')

    return df_mapped

3. Clean & Normalize

Input: Mapped DataFrame Output: Cleaned DataFrame Operations: - Phone number normalization - Email validation and cleaning - Name cleaning (title case, whitespace) - Address standardization - Remove invalid/empty rows

class DataCleaner:
    """Clean and normalize data."""

    def clean_phones(self, df: pd.DataFrame) -> pd.DataFrame:
        """Normalize phone numbers to digits only."""
        phone_cols = ['home_phone', 'cell_phone']
        for col in phone_cols:
            if col in df.columns:
                # Remove all non-digits
                df[col] = df[col].str.replace(r'[^0-9]', '', regex=True)
                # Validate 10 digits (US)
                df[col] = df[col].apply(
                    lambda x: x if len(x) == 10 else None
                )
        return df

    def clean_emails(self, df: pd.DataFrame) -> pd.DataFrame:
        """Validate and clean emails."""
        if 'email' in df.columns:
            # Strip whitespace
            df['email'] = df['email'].str.strip().str.lower()
            # Basic validation
            df['email'] = df['email'].apply(
                lambda x: x if '@' in str(x) and '.' in str(x) else None
            )
        return df

    def clean_names(self, df: pd.DataFrame) -> pd.DataFrame:
        """Clean and normalize names."""
        name_cols = ['first_name', 'last_name']
        for col in name_cols:
            if col in df.columns:
                # Title case
                df[col] = df[col].str.title()
                # Remove extra whitespace
                df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
                df[col] = df[col].str.strip()
        return df

    def clean_addresses(self, df: pd.DataFrame) -> pd.DataFrame:
        """Standardize addresses."""
        if 'address1' in df.columns:
            # Common abbreviations
            abbreviations = {
                r'\bStreet\b': 'St',
                r'\bAvenue\b': 'Ave',
                r'\bBoulevard\b': 'Blvd',
                r'\bRoad\b': 'Rd',
                r'\bDrive\b': 'Dr',
                r'\bApartment\b': 'Apt',
                r'\bSuite\b': 'Ste',
            }
            for pattern, abbr in abbreviations.items():
                df['address1'] = df['address1'].str.replace(
                    pattern, abbr, regex=True, flags=re.IGNORECASE
                )
        return df

    def clean_all(self, df: pd.DataFrame) -> pd.DataFrame:
        """Run all cleaning operations."""
        df = self.clean_phones(df)
        df = self.clean_emails(df)
        df = self.clean_names(df)
        df = self.clean_addresses(df)
        return df

4. Deduplicate (Within File)

Input: Cleaned DataFrame Output: Deduplicated DataFrame + duplicate report Operations: - Detect duplicates based on strategy (email, phone, name+address) - Mark preferred record (most complete data) - Optionally merge fields from duplicates - Generate duplicate report

class Deduplicator:
    """Detect and remove duplicates within import file."""

    def deduplicate_by_email(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[dict]]:
        """Remove duplicates by email address."""
        if 'email' not in df.columns:
            return df, []

        # Find duplicates
        email_counts = df['email'].value_counts()
        duplicates = email_counts[email_counts > 1].index.tolist()

        duplicate_report = []
        rows_to_drop = []

        for email in duplicates:
            dup_rows = df[df['email'] == email]

            # Choose most complete record (most non-null fields)
            best_idx = dup_rows.notna().sum(axis=1).idxmax()

            # Mark others for removal
            for idx in dup_rows.index:
                if idx != best_idx:
                    rows_to_drop.append(idx)
                    duplicate_report.append({
                        'email': email,
                        'kept': best_idx,
                        'removed': idx,
                        'reason': 'duplicate email'
                    })

        # Remove duplicates
        df_clean = df.drop(rows_to_drop)

        return df_clean, duplicate_report

    def deduplicate_by_phone(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[dict]]:
        """Remove duplicates by phone number."""
        # Similar logic to email...
        pass

    def deduplicate_by_name_address(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[dict]]:
        """Remove duplicates by name + address."""
        # Create composite key
        df['_dup_key'] = (
            df['first_name'].str.lower() + '|' +
            df['last_name'].str.lower() + '|' +
            df['zip'].fillna('')
        )

        # Find duplicates
        key_counts = df['_dup_key'].value_counts()
        duplicates = key_counts[key_counts > 1].index.tolist()

        duplicate_report = []
        rows_to_drop = []

        for key in duplicates:
            dup_rows = df[df['_dup_key'] == key]
            best_idx = dup_rows.notna().sum(axis=1).idxmax()

            for idx in dup_rows.index:
                if idx != best_idx:
                    rows_to_drop.append(idx)
                    duplicate_report.append({
                        'key': key,
                        'kept': best_idx,
                        'removed': idx,
                        'reason': 'duplicate name+zip'
                    })

        df_clean = df.drop(rows_to_drop).drop(columns=['_dup_key'])
        return df_clean, duplicate_report

5. Add Metadata

Input: Deduplicated DataFrame Output: DataFrame with import metadata Operations: - Add import_source - Add import_date - Add import_batch_id - Add import_filename - Ensure original_data is JSON

def add_metadata(
    df: pd.DataFrame,
    filename: str,
    batch_id: str,
    source: str = "csv_upload"
) -> pd.DataFrame:
    """Add import metadata columns."""
    df['import_source'] = source
    df['import_date'] = datetime.now()
    df['import_batch_id'] = batch_id
    df['import_filename'] = filename

    # Ensure original_data is JSON string
    if 'original_data' in df.columns:
        df['original_data'] = df['original_data'].apply(json.dumps)

    return df

6. Add Auto-Tags

Input: DataFrame with metadata Output: DataFrame with tags column Operations: - Generate date-based tag (import-YYYY-MM-DD) - Generate source-based tag (source:csv) - Generate batch-based tag (batch:imp_abc123) - Add custom tags from user options

def add_auto_tags(
    df: pd.DataFrame,
    batch_id: str,
    source: str = "csv_upload",
    custom_tags: list[str] = None
) -> pd.DataFrame:
    """Add auto-generated tags."""
    today = datetime.now().date()

    # Generate auto tags
    auto_tags = [
        f"import-{today}",
        f"source:{source}",
        f"batch:{batch_id}"
    ]

    # Add custom tags
    if custom_tags:
        auto_tags.extend(custom_tags)

    # Add tags column (list of strings)
    df['tags'] = [auto_tags] * len(df)

    return df

7. Validate Schema

Input: Final DataFrame Output: Validation report Operations: - Check required fields (first_name, last_name) - Validate data types - Check constraints (whip_status values) - Generate warnings for missing optional fields

class SchemaValidator:
    """Validate data against cbapp Person schema."""

    REQUIRED_FIELDS = ['first_name', 'last_name']

    WHIP_STATUS_VALUES = [
        'Hard Support', 'Soft Support', 'Undecided',
        'Soft Oppose', 'Hard Opposed', 'Unknown'
    ]

    def validate(self, df: pd.DataFrame) -> dict:
        """Validate DataFrame against Person schema."""
        errors = []
        warnings = []

        # Check required fields
        for field in self.REQUIRED_FIELDS:
            if field not in df.columns:
                errors.append(f"Missing required field: {field}")
            else:
                null_count = df[field].isna().sum()
                if null_count > 0:
                    errors.append(f"{null_count} rows missing {field}")

        # Validate whip_status if present
        if 'whip_status' in df.columns:
            invalid = ~df['whip_status'].isin(self.WHIP_STATUS_VALUES)
            invalid_count = invalid.sum()
            if invalid_count > 0:
                warnings.append(
                    f"{invalid_count} rows have invalid whip_status"
                )

        # Check contact info
        contact_fields = ['email', 'cell_phone', 'home_phone']
        for idx, row in df.iterrows():
            has_contact = any(pd.notna(row.get(f)) for f in contact_fields)
            if not has_contact:
                warnings.append(f"Row {idx}: No contact info")

        return {
            'valid': len(errors) == 0,
            'error_count': len(errors),
            'warning_count': len(warnings),
            'errors': errors,
            'warnings': warnings
        }

Complete Pipeline Class

class DataConditioner:
    """Full data conditioning pipeline."""

    def __init__(self):
        self.cleaner = DataCleaner()
        self.deduplicator = Deduplicator()
        self.validator = SchemaValidator()

    def condition(
        self,
        file_path: str,
        mappings: list[FieldMapping],
        options: dict
    ) -> dict:
        """
        Run full conditioning pipeline.

        Args:
            file_path: Path to CSV file
            mappings: AI-generated field mappings
            options: Import options (duplicate_strategy, tags, etc.)

        Returns:
            dict with conditioned DataFrame and reports
        """
        # 1. Load CSV
        df = self.load_csv(file_path)

        # 2. Apply mappings
        df = self.apply_mappings(df, mappings)

        # 3. Clean data
        df = self.cleaner.clean_all(df)

        # 4. Deduplicate
        strategy = options.get('duplicate_strategy', 'email')
        if strategy == 'email':
            df, dup_report = self.deduplicator.deduplicate_by_email(df)
        elif strategy == 'name_address':
            df, dup_report = self.deduplicator.deduplicate_by_name_address(df)
        else:
            dup_report = []

        # 5. Add metadata
        batch_id = options.get('batch_id', f"imp_{uuid.uuid4().hex[:12]}")
        df = add_metadata(df, Path(file_path).name, batch_id)

        # 6. Add tags
        custom_tags = options.get('tags', [])
        df = add_auto_tags(df, batch_id, custom_tags=custom_tags)

        # 7. Validate
        validation = self.validator.validate(df)

        return {
            'data': df,
            'total_rows': len(df),
            'duplicate_report': dup_report,
            'duplicates_removed': len(dup_report),
            'validation': validation,
            'batch_id': batch_id
        }

    def to_json_records(self, df: pd.DataFrame) -> list[dict]:
        """Convert DataFrame to JSON records for API."""
        # Replace NaN with None
        df = df.where(pd.notna(df), None)
        return df.to_dict('records')

Integration with List Loader

Location: cbtenant/api/routes/list_loader.py

@router.post("/{job_id}/condition")
async def condition_data(
    tenant_id: str,
    job_id: str,
    options: ConditionOptions,
    current_user: dict = Depends(get_current_user),
):
    """
    Run data conditioning pipeline.

    This step happens AFTER AI mapping but BEFORE import.
    """
    with get_db_context() as session:
        job = get_job_or_404(session, job_id, tenant_id)

        # Get mappings
        mappings = json.loads(job.mappings)

        # Run conditioning
        conditioner = DataConditioner()
        result = conditioner.condition(
            job.file_path,
            mappings=[FieldMapping(**m) for m in mappings],
            options=options.dict()
        )

        # Store conditioned data (as CSV or JSON)
        conditioned_path = f"/tmp/conditioned_{job_id}.csv"
        result['data'].to_csv(conditioned_path, index=False)

        # Update job
        job.conditioned_path = conditioned_path
        job.total_rows = result['total_rows']
        job.duplicates_removed = result['duplicates_removed']
        job.validation_result = json.dumps(result['validation'])
        job.status = 'conditioned'
        session.commit()

        return {
            'job_id': job_id,
            'status': 'conditioned',
            'total_rows': result['total_rows'],
            'duplicates_removed': result['duplicates_removed'],
            'validation': result['validation'],
            'preview': result['data'].head(5).to_dict('records')
        }

Updated Import Flow

1. Upload CSV
2. AI Mapping (ListLoaderAgent)
3. User Adjusts Mappings (optional)
4. Condition Data (NEW) ← DataConditioner
   - Clean, dedupe, validate
   - Show preview of changes
5. User Reviews Conditioning Report
   - X duplicates removed
   - Y rows cleaned
   - Z warnings
6. Execute Import
   - Send conditioned data to cbapp
   - POST /persons/batch
7. Show Results

Progress Bar Support

Since conditioning uses pandas/numpy (fast), we can show real-time progress:

def condition_with_progress(
    self,
    file_path: str,
    mappings: list[FieldMapping],
    options: dict,
    progress_callback: callable = None
) -> dict:
    """Condition data with progress updates."""

    if progress_callback:
        progress_callback(0, "Loading CSV...")

    df = self.load_csv(file_path)

    if progress_callback:
        progress_callback(20, "Applying mappings...")

    df = self.apply_mappings(df, mappings)

    if progress_callback:
        progress_callback(40, "Cleaning data...")

    df = self.cleaner.clean_all(df)

    if progress_callback:
        progress_callback(60, "Removing duplicates...")

    df, dup_report = self.deduplicator.deduplicate_by_email(df)

    if progress_callback:
        progress_callback(80, "Adding metadata...")

    df = add_metadata(df, Path(file_path).name, options['batch_id'])
    df = add_auto_tags(df, options['batch_id'])

    if progress_callback:
        progress_callback(100, "Validation complete!")

    validation = self.validator.validate(df)

    return {
        'data': df,
        'total_rows': len(df),
        'duplicate_report': dup_report,
        'validation': validation
    }

Configuration File

Location: cbtenant/config/data_conditioning.yaml

# Data conditioning configuration

cleaning:
  phone:
    normalize: true
    validate_length: 10
    country_code: "1"  # US

  email:
    lowercase: true
    validate: true
    remove_invalid: true

  names:
    title_case: true
    remove_extra_whitespace: true

  addresses:
    abbreviate_street_types: true
    standardize_apt_format: true

deduplication:
  default_strategy: "email"  # email, phone, name_address, name_zip
  merge_fields: false         # Merge data from duplicates
  prefer_most_complete: true  # Keep record with most data

metadata:
  include_original_data: true
  original_data_format: "json"

tagging:
  auto_tag: true
  tag_patterns:
    - "import-{date}"
    - "source:{source}"
    - "batch:{batch_id}"

validation:
  strict_mode: false          # Fail on warnings
  required_fields:
    - first_name
    - last_name
  require_contact_info: false # email, phone, or address

Testing Plan

Unit Tests

  • Test each cleaning function independently
  • Test deduplication strategies
  • Test metadata addition
  • Test validation logic

Integration Tests

  • Full pipeline with sample CSV
  • Various duplicate scenarios
  • Malformed data handling
  • Large file performance (10k rows)

Performance Benchmarks

  • 1k rows: < 1 second
  • 10k rows: < 10 seconds
  • 100k rows: < 60 seconds

Benefits Summary

Data Quality: Clean data at source, not in database ✅ Performance: Fast numpy operations, progress bars work ✅ Extensibility: Easy to add new cleaning rules ✅ Testing: Pure functions, easy to test ✅ User Experience: Preview changes before import ✅ Database Health: Only clean data reaches cbapp ✅ Future-Proof: Foundation for AI enrichment


Next Steps

  1. Create cbtenant/agents/data_conditioner.py
  2. Add conditioning route to List Loader API
  3. Update frontend to show conditioning step
  4. Create configuration file
  5. Write tests
  6. Update cbapp to accept metadata columns (Issue #2)
  7. Deploy and test with real data

Questions

  1. Should we store conditioned CSV or send directly from memory?
  2. Recommendation: Store temporarily for retry/debug

  3. How to handle partial failures in conditioning?

  4. Recommendation: Continue with valid rows, report invalid

  5. Should users be able to disable specific cleaning steps?

  6. Recommendation: Yes, via options parameter

  7. Maximum file size for conditioning?

  8. Recommendation: 100MB or 100k rows, then use streaming

Ready to implement!