| hide |
|
|---|
This guide provides practical guidance for common PyIceberg use cases and implementation patterns.
Migrating CSV files to Iceberg tables involves reading CSV data, converting it to Iceberg's schema, and writing it to Iceberg tables. This is one of the most common migration scenarios.
Key Steps:
- Read CSV files using PyArrow
- Convert data types appropriately
- Create Iceberg table with proper schema
- Write data to Iceberg table
- Validate migration success
Best Practices:
- Use PyArrow for efficient CSV reading
- Handle missing values explicitly
- Validate data ranges and types
- Consider partitioning for large datasets
Iceberg's time travel feature allows you to query historical data and manage table versions through snapshots.
Key Concepts:
- Snapshots: Each commit creates a snapshot with unique ID and timestamp
- Historical Queries: Query data as it existed at specific times
- Rollback: Revert tables to previous states when needed
- Audit Trail: Complete history of all table changes
Common Patterns:
- Query data as of a specific snapshot ID
- Query data as of a specific timestamp
- List table history to track changes
- Rollback to known good states
Implementing data quality checks during and after migration ensures data integrity.
Validation Steps:
- Row count validation
- Data sampling and comparison
- Query validation with representative tests
- Performance comparison
Common Issues:
- Schema mismatches between source and target
- Missing or null values
- Duplicate records
- Data type conversion errors
import pyarrow.csv as csv_pa
from pyiceberg.catalog import load_catalog
# Read CSV
csv_data = csv_pa.read_csv('data.csv')
# Create Iceberg table
catalog = load_catalog("my_catalog")
table = catalog.create_table("my_table", schema=csv_data.schema)
# Migrate data
table.append(csv_data)# Query historical data
historical_data = table.scan(snapshot_id=old_snapshot_id).to_arrow()
# View table history
for snapshot in table.history():
print(f"Snapshot: {snapshot.snapshot_id}, Time: {snapshot.timestamp_ms}")# Add new column to existing table
with table.update_schema() as update_schema:
update_schema.add_column(
field_id=1000,
name="new_column",
field_type="string",
required=False
)Install PyIceberg with required dependencies:
pip install pyiceberg[pyarrow]PyIceberg provides convenient Make commands:
# Basic PyIceberg examples (no external infrastructure)
make notebook
# Spark integration examples (requires Docker infrastructure)
make notebook-infra# Install Jupyter
pip install jupyter
# Start Jupyter Lab
jupyter lab notebooks/- Use appropriate file sizes: Target 128MB-1GB for Iceberg data files
- Leverage partitioning: Design partition strategies based on query patterns
- Use column pruning: Only select needed columns
- Filter early: Apply filters as early as possible in your queries
- Validate schemas: Ensure data types match expectations
- Handle nulls: Decide on null handling strategies
- Test migrations: Validate data integrity after migration
- Monitor quality: Set up data quality checks
- Error handling: Implement comprehensive error handling
- Logging: Use appropriate logging levels for troubleshooting
- Testing: Test examples in non-production environments first
- Documentation: Document your customizations and patterns
# Ensure all dependencies are installed
pip install pyiceberg[pyarrow,s3fs]# Check catalog credentials in .pyiceberg.yaml
# Verify file system permissions for warehouse location# Process data in batches for large files
# Use DuckDB for out-of-core processing- Documentation: Check the API documentation
- Community: Join the Apache Iceberg community
- Issues: Report bugs on GitHub Issues
We welcome contributions of additional practical examples! When contributing:
- Follow the pattern: Use existing code examples as templates
- Include error handling: Add appropriate error handling
- Add documentation: Explain the use case and when to use it
- Test thoroughly: Ensure examples work correctly
- Document dependencies: List all required packages
See the contributing guide for more details.