Data Conditioning Pipeline Design¶
Date: December 4, 2025 Status: Design Phase Related: LIST_LOADER_DESIGN.md, cbapp GITHUB_ISSUES.md
Overview¶
Rather than sending raw CSV data directly from cbtenant to cbapp, implement a data conditioning pipeline that cleans, validates, deduplicates, and enriches data before database insertion. This approach:
✅ Improves data quality at the source ✅ Reduces database load and storage waste ✅ Enables fast progress bars (numpy/pandas are fast) ✅ Supports future AI-powered enrichment ✅ Keeps cbapp schema clean and focused ✅ Makes testing easier (pure functions)
Architecture¶
┌──────────────────────────────────────────────────────────────────┐
│ Tenant Manager (cbtenant) │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ CSV Upload │───▶│ ListLoader │───▶│ DataConditioner │ │
│ │ (raw file) │ │ Agent │ │ (clean/dedupe) │ │
│ └─────────────┘ │ - AI Mapping │ └────────┬─────────┘ │
│ └──────────────┘ │ │
│ │ │
│ Conditioned Data │
│ (pandas DataFrame) │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Import Service │ │
│ │ - Batch to cbapp │ │
│ │ - Progress tracking │ │
│ └──────────┬───────────┘ │
└───────────────────────────────────────────────┼────────────────┘
│ HTTP API
│ POST /persons/batch
▼
┌──────────────────────────────────────────────────────────────────┐
│ Tenant App (cbapp) │
│ │
│ ┌─────────────────┐ ┌──────────────────┐ │
│ │ POST │───▶│ DuckDB │ │
│ │ /persons/batch │ │ person table │ │
│ │ (clean data) │ │ (high quality) │ │
│ └─────────────────┘ └──────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
Pipeline Stages¶
1. Parse & Load¶
Input: Raw CSV file Output: pandas DataFrame Operations: - Detect encoding (UTF-8, Latin-1, etc.) - Detect delimiter (comma, tab, semicolon) - Parse CSV to DataFrame - Handle quoted fields - Detect data types
def load_csv(file_path: str) -> pd.DataFrame:
"""Load CSV with auto-detection."""
# Try UTF-8 first
try:
df = pd.read_csv(file_path, encoding='utf-8')
except UnicodeDecodeError:
df = pd.read_csv(file_path, encoding='latin-1')
return df
2. Apply AI Mappings¶
Input: Raw DataFrame + FieldMappings Output: Renamed DataFrame with target schema columns Operations: - Rename columns based on AI mappings - Map to custom_fields for unmapped columns - Drop columns marked as "skip" - Preserve original data in JSON column
def apply_mappings(df: pd.DataFrame, mappings: list[FieldMapping]) -> pd.DataFrame:
"""Apply AI-suggested field mappings."""
rename_map = {}
custom_field_map = {}
for mapping in mappings:
if mapping.target.startswith('custom_fields.'):
field_name = mapping.target.replace('custom_fields.', '')
custom_field_map[mapping.source] = field_name
elif mapping.target != 'skip':
rename_map[mapping.source] = mapping.target
# Rename standard fields
df_mapped = df.rename(columns=rename_map)
# Handle custom fields as JSON
if custom_field_map:
custom_cols = list(custom_field_map.keys())
df_mapped['custom_fields'] = df[custom_cols].apply(
lambda row: {custom_field_map[k]: v for k, v in row.items() if pd.notna(v)},
axis=1
)
# Store original data
df_mapped['original_data'] = df.to_dict('records')
return df_mapped
3. Clean & Normalize¶
Input: Mapped DataFrame Output: Cleaned DataFrame Operations: - Phone number normalization - Email validation and cleaning - Name cleaning (title case, whitespace) - Address standardization - Remove invalid/empty rows
class DataCleaner:
"""Clean and normalize data."""
def clean_phones(self, df: pd.DataFrame) -> pd.DataFrame:
"""Normalize phone numbers to digits only."""
phone_cols = ['home_phone', 'cell_phone']
for col in phone_cols:
if col in df.columns:
# Remove all non-digits
df[col] = df[col].str.replace(r'[^0-9]', '', regex=True)
# Validate 10 digits (US)
df[col] = df[col].apply(
lambda x: x if len(x) == 10 else None
)
return df
def clean_emails(self, df: pd.DataFrame) -> pd.DataFrame:
"""Validate and clean emails."""
if 'email' in df.columns:
# Strip whitespace
df['email'] = df['email'].str.strip().str.lower()
# Basic validation
df['email'] = df['email'].apply(
lambda x: x if '@' in str(x) and '.' in str(x) else None
)
return df
def clean_names(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean and normalize names."""
name_cols = ['first_name', 'last_name']
for col in name_cols:
if col in df.columns:
# Title case
df[col] = df[col].str.title()
# Remove extra whitespace
df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
df[col] = df[col].str.strip()
return df
def clean_addresses(self, df: pd.DataFrame) -> pd.DataFrame:
"""Standardize addresses."""
if 'address1' in df.columns:
# Common abbreviations
abbreviations = {
r'\bStreet\b': 'St',
r'\bAvenue\b': 'Ave',
r'\bBoulevard\b': 'Blvd',
r'\bRoad\b': 'Rd',
r'\bDrive\b': 'Dr',
r'\bApartment\b': 'Apt',
r'\bSuite\b': 'Ste',
}
for pattern, abbr in abbreviations.items():
df['address1'] = df['address1'].str.replace(
pattern, abbr, regex=True, flags=re.IGNORECASE
)
return df
def clean_all(self, df: pd.DataFrame) -> pd.DataFrame:
"""Run all cleaning operations."""
df = self.clean_phones(df)
df = self.clean_emails(df)
df = self.clean_names(df)
df = self.clean_addresses(df)
return df
4. Deduplicate (Within File)¶
Input: Cleaned DataFrame Output: Deduplicated DataFrame + duplicate report Operations: - Detect duplicates based on strategy (email, phone, name+address) - Mark preferred record (most complete data) - Optionally merge fields from duplicates - Generate duplicate report
class Deduplicator:
"""Detect and remove duplicates within import file."""
def deduplicate_by_email(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[dict]]:
"""Remove duplicates by email address."""
if 'email' not in df.columns:
return df, []
# Find duplicates
email_counts = df['email'].value_counts()
duplicates = email_counts[email_counts > 1].index.tolist()
duplicate_report = []
rows_to_drop = []
for email in duplicates:
dup_rows = df[df['email'] == email]
# Choose most complete record (most non-null fields)
best_idx = dup_rows.notna().sum(axis=1).idxmax()
# Mark others for removal
for idx in dup_rows.index:
if idx != best_idx:
rows_to_drop.append(idx)
duplicate_report.append({
'email': email,
'kept': best_idx,
'removed': idx,
'reason': 'duplicate email'
})
# Remove duplicates
df_clean = df.drop(rows_to_drop)
return df_clean, duplicate_report
def deduplicate_by_phone(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[dict]]:
"""Remove duplicates by phone number."""
# Similar logic to email...
pass
def deduplicate_by_name_address(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list[dict]]:
"""Remove duplicates by name + address."""
# Create composite key
df['_dup_key'] = (
df['first_name'].str.lower() + '|' +
df['last_name'].str.lower() + '|' +
df['zip'].fillna('')
)
# Find duplicates
key_counts = df['_dup_key'].value_counts()
duplicates = key_counts[key_counts > 1].index.tolist()
duplicate_report = []
rows_to_drop = []
for key in duplicates:
dup_rows = df[df['_dup_key'] == key]
best_idx = dup_rows.notna().sum(axis=1).idxmax()
for idx in dup_rows.index:
if idx != best_idx:
rows_to_drop.append(idx)
duplicate_report.append({
'key': key,
'kept': best_idx,
'removed': idx,
'reason': 'duplicate name+zip'
})
df_clean = df.drop(rows_to_drop).drop(columns=['_dup_key'])
return df_clean, duplicate_report
5. Add Metadata¶
Input: Deduplicated DataFrame Output: DataFrame with import metadata Operations: - Add import_source - Add import_date - Add import_batch_id - Add import_filename - Ensure original_data is JSON
def add_metadata(
df: pd.DataFrame,
filename: str,
batch_id: str,
source: str = "csv_upload"
) -> pd.DataFrame:
"""Add import metadata columns."""
df['import_source'] = source
df['import_date'] = datetime.now()
df['import_batch_id'] = batch_id
df['import_filename'] = filename
# Ensure original_data is JSON string
if 'original_data' in df.columns:
df['original_data'] = df['original_data'].apply(json.dumps)
return df
6. Add Auto-Tags¶
Input: DataFrame with metadata Output: DataFrame with tags column Operations: - Generate date-based tag (import-YYYY-MM-DD) - Generate source-based tag (source:csv) - Generate batch-based tag (batch:imp_abc123) - Add custom tags from user options
def add_auto_tags(
df: pd.DataFrame,
batch_id: str,
source: str = "csv_upload",
custom_tags: list[str] = None
) -> pd.DataFrame:
"""Add auto-generated tags."""
today = datetime.now().date()
# Generate auto tags
auto_tags = [
f"import-{today}",
f"source:{source}",
f"batch:{batch_id}"
]
# Add custom tags
if custom_tags:
auto_tags.extend(custom_tags)
# Add tags column (list of strings)
df['tags'] = [auto_tags] * len(df)
return df
7. Validate Schema¶
Input: Final DataFrame Output: Validation report Operations: - Check required fields (first_name, last_name) - Validate data types - Check constraints (whip_status values) - Generate warnings for missing optional fields
class SchemaValidator:
"""Validate data against cbapp Person schema."""
REQUIRED_FIELDS = ['first_name', 'last_name']
WHIP_STATUS_VALUES = [
'Hard Support', 'Soft Support', 'Undecided',
'Soft Oppose', 'Hard Opposed', 'Unknown'
]
def validate(self, df: pd.DataFrame) -> dict:
"""Validate DataFrame against Person schema."""
errors = []
warnings = []
# Check required fields
for field in self.REQUIRED_FIELDS:
if field not in df.columns:
errors.append(f"Missing required field: {field}")
else:
null_count = df[field].isna().sum()
if null_count > 0:
errors.append(f"{null_count} rows missing {field}")
# Validate whip_status if present
if 'whip_status' in df.columns:
invalid = ~df['whip_status'].isin(self.WHIP_STATUS_VALUES)
invalid_count = invalid.sum()
if invalid_count > 0:
warnings.append(
f"{invalid_count} rows have invalid whip_status"
)
# Check contact info
contact_fields = ['email', 'cell_phone', 'home_phone']
for idx, row in df.iterrows():
has_contact = any(pd.notna(row.get(f)) for f in contact_fields)
if not has_contact:
warnings.append(f"Row {idx}: No contact info")
return {
'valid': len(errors) == 0,
'error_count': len(errors),
'warning_count': len(warnings),
'errors': errors,
'warnings': warnings
}
Complete Pipeline Class¶
class DataConditioner:
"""Full data conditioning pipeline."""
def __init__(self):
self.cleaner = DataCleaner()
self.deduplicator = Deduplicator()
self.validator = SchemaValidator()
def condition(
self,
file_path: str,
mappings: list[FieldMapping],
options: dict
) -> dict:
"""
Run full conditioning pipeline.
Args:
file_path: Path to CSV file
mappings: AI-generated field mappings
options: Import options (duplicate_strategy, tags, etc.)
Returns:
dict with conditioned DataFrame and reports
"""
# 1. Load CSV
df = self.load_csv(file_path)
# 2. Apply mappings
df = self.apply_mappings(df, mappings)
# 3. Clean data
df = self.cleaner.clean_all(df)
# 4. Deduplicate
strategy = options.get('duplicate_strategy', 'email')
if strategy == 'email':
df, dup_report = self.deduplicator.deduplicate_by_email(df)
elif strategy == 'name_address':
df, dup_report = self.deduplicator.deduplicate_by_name_address(df)
else:
dup_report = []
# 5. Add metadata
batch_id = options.get('batch_id', f"imp_{uuid.uuid4().hex[:12]}")
df = add_metadata(df, Path(file_path).name, batch_id)
# 6. Add tags
custom_tags = options.get('tags', [])
df = add_auto_tags(df, batch_id, custom_tags=custom_tags)
# 7. Validate
validation = self.validator.validate(df)
return {
'data': df,
'total_rows': len(df),
'duplicate_report': dup_report,
'duplicates_removed': len(dup_report),
'validation': validation,
'batch_id': batch_id
}
def to_json_records(self, df: pd.DataFrame) -> list[dict]:
"""Convert DataFrame to JSON records for API."""
# Replace NaN with None
df = df.where(pd.notna(df), None)
return df.to_dict('records')
Integration with List Loader¶
Location: cbtenant/api/routes/list_loader.py
@router.post("/{job_id}/condition")
async def condition_data(
tenant_id: str,
job_id: str,
options: ConditionOptions,
current_user: dict = Depends(get_current_user),
):
"""
Run data conditioning pipeline.
This step happens AFTER AI mapping but BEFORE import.
"""
with get_db_context() as session:
job = get_job_or_404(session, job_id, tenant_id)
# Get mappings
mappings = json.loads(job.mappings)
# Run conditioning
conditioner = DataConditioner()
result = conditioner.condition(
job.file_path,
mappings=[FieldMapping(**m) for m in mappings],
options=options.dict()
)
# Store conditioned data (as CSV or JSON)
conditioned_path = f"/tmp/conditioned_{job_id}.csv"
result['data'].to_csv(conditioned_path, index=False)
# Update job
job.conditioned_path = conditioned_path
job.total_rows = result['total_rows']
job.duplicates_removed = result['duplicates_removed']
job.validation_result = json.dumps(result['validation'])
job.status = 'conditioned'
session.commit()
return {
'job_id': job_id,
'status': 'conditioned',
'total_rows': result['total_rows'],
'duplicates_removed': result['duplicates_removed'],
'validation': result['validation'],
'preview': result['data'].head(5).to_dict('records')
}
Updated Import Flow¶
1. Upload CSV
↓
2. AI Mapping (ListLoaderAgent)
↓
3. User Adjusts Mappings (optional)
↓
4. Condition Data (NEW) ← DataConditioner
- Clean, dedupe, validate
- Show preview of changes
↓
5. User Reviews Conditioning Report
- X duplicates removed
- Y rows cleaned
- Z warnings
↓
6. Execute Import
- Send conditioned data to cbapp
- POST /persons/batch
↓
7. Show Results
Progress Bar Support¶
Since conditioning uses pandas/numpy (fast), we can show real-time progress:
def condition_with_progress(
self,
file_path: str,
mappings: list[FieldMapping],
options: dict,
progress_callback: callable = None
) -> dict:
"""Condition data with progress updates."""
if progress_callback:
progress_callback(0, "Loading CSV...")
df = self.load_csv(file_path)
if progress_callback:
progress_callback(20, "Applying mappings...")
df = self.apply_mappings(df, mappings)
if progress_callback:
progress_callback(40, "Cleaning data...")
df = self.cleaner.clean_all(df)
if progress_callback:
progress_callback(60, "Removing duplicates...")
df, dup_report = self.deduplicator.deduplicate_by_email(df)
if progress_callback:
progress_callback(80, "Adding metadata...")
df = add_metadata(df, Path(file_path).name, options['batch_id'])
df = add_auto_tags(df, options['batch_id'])
if progress_callback:
progress_callback(100, "Validation complete!")
validation = self.validator.validate(df)
return {
'data': df,
'total_rows': len(df),
'duplicate_report': dup_report,
'validation': validation
}
Configuration File¶
Location: cbtenant/config/data_conditioning.yaml
# Data conditioning configuration
cleaning:
phone:
normalize: true
validate_length: 10
country_code: "1" # US
email:
lowercase: true
validate: true
remove_invalid: true
names:
title_case: true
remove_extra_whitespace: true
addresses:
abbreviate_street_types: true
standardize_apt_format: true
deduplication:
default_strategy: "email" # email, phone, name_address, name_zip
merge_fields: false # Merge data from duplicates
prefer_most_complete: true # Keep record with most data
metadata:
include_original_data: true
original_data_format: "json"
tagging:
auto_tag: true
tag_patterns:
- "import-{date}"
- "source:{source}"
- "batch:{batch_id}"
validation:
strict_mode: false # Fail on warnings
required_fields:
- first_name
- last_name
require_contact_info: false # email, phone, or address
Testing Plan¶
Unit Tests¶
- Test each cleaning function independently
- Test deduplication strategies
- Test metadata addition
- Test validation logic
Integration Tests¶
- Full pipeline with sample CSV
- Various duplicate scenarios
- Malformed data handling
- Large file performance (10k rows)
Performance Benchmarks¶
- 1k rows: < 1 second
- 10k rows: < 10 seconds
- 100k rows: < 60 seconds
Benefits Summary¶
✅ Data Quality: Clean data at source, not in database ✅ Performance: Fast numpy operations, progress bars work ✅ Extensibility: Easy to add new cleaning rules ✅ Testing: Pure functions, easy to test ✅ User Experience: Preview changes before import ✅ Database Health: Only clean data reaches cbapp ✅ Future-Proof: Foundation for AI enrichment
Next Steps¶
- Create
cbtenant/agents/data_conditioner.py - Add conditioning route to List Loader API
- Update frontend to show conditioning step
- Create configuration file
- Write tests
- Update cbapp to accept metadata columns (Issue #2)
- Deploy and test with real data
Questions¶
- Should we store conditioned CSV or send directly from memory?
-
Recommendation: Store temporarily for retry/debug
-
How to handle partial failures in conditioning?
-
Recommendation: Continue with valid rows, report invalid
-
Should users be able to disable specific cleaning steps?
-
Recommendation: Yes, via options parameter
-
Maximum file size for conditioning?
- Recommendation: 100MB or 100k rows, then use streaming
Ready to implement!