Freshness & Schema Tracking¶
Monitor data staleness and track schema evolution over time.
Freshness Quick Start¶
from duckguard import connect
from datetime import timedelta
data = connect("orders.csv")
# Simple freshness check
print(data.freshness.age_human) # "2 hours ago"
print(data.freshness.is_fresh) # True
# Custom threshold
if not data.is_fresh(timedelta(hours=6)):
print("Data is stale!")
Freshness Methods¶
DuckGuard checks freshness via file modification time or timestamp columns:
File Modification Time¶
from duckguard.freshness import FreshnessMonitor
monitor = FreshnessMonitor(threshold=timedelta(hours=6))
result = monitor.check_file_mtime("data.csv")
print(result.last_modified) # datetime
print(result.age_human) # "3 hours ago"
print(result.is_fresh) # True/False
print(result.method) # FreshnessMethod.FILE_MTIME
Column Timestamp¶
result = monitor.check_column_timestamp(data, "updated_at")
# Uses MAX(updated_at) to determine freshness
# Use MIN instead (oldest record)
result = monitor.check_column_timestamp(data, "created_at", use_max=False)
Auto-Detection¶
monitor.check() picks the best method automatically:
- Local file → uses file mtime
- Dataset with timestamp column → auto-detects columns like
updated_at,created_at,timestamp
FreshnessResult¶
result = monitor.check(data)
result.source # Data source path
result.last_modified # datetime or None
result.age_seconds # Float or None
result.age_human # "2 hours ago"
result.is_fresh # True/False
result.threshold_seconds # Configured threshold
result.method # FILE_MTIME, COLUMN_MAX, etc.
result.to_dict() # JSON-serializable dict
CLI¶
duckguard freshness data.csv
duckguard freshness data.csv --max-age 6h
duckguard freshness data.csv --column updated_at
duckguard freshness data.csv --format json
Schema Tracking¶
Capture schema snapshots over time and detect changes.
Capture a Snapshot¶
from duckguard import connect
from duckguard.schema_history import SchemaTracker
tracker = SchemaTracker()
data = connect("data.csv")
snapshot = tracker.capture(data)
print(f"Captured {snapshot.column_count} columns, {snapshot.row_count} rows")
View History¶
history = tracker.get_history(data.source, limit=10)
for snap in history:
print(f"{snap.captured_at}: {snap.column_count} columns")
Get Latest Snapshot¶
latest = tracker.get_latest(data.source)
if latest:
for col in latest.columns:
print(f"{col.name}: {col.dtype} (nullable: {col.nullable})")
Detect Changes¶
from duckguard.schema_history import SchemaChangeAnalyzer
analyzer = SchemaChangeAnalyzer()
# Compare current schema against the last snapshot
# (uses tracker internally)
CLI¶
duckguard schema data.csv # Show current schema
duckguard schema data.csv --action capture # Capture snapshot
duckguard schema data.csv --action history # View history
duckguard schema data.csv --action changes # Detect changes
Schema Snapshot Structure¶
Each snapshot captures:
- Column name and data type
- Nullable flag
- Column position (ordering)
- Row count at capture time
- Timestamp of capture
Snapshots are stored in a local SQLite database and compared for drift detection.