| hide |
|
|---|
This guide helps you migrate data from various formats and systems to Apache Iceberg using PyIceberg.
Migrating to Iceberg provides numerous benefits:
- Performance: Columnar Parquet format with predicate pushdown
- Reliability: ACID transactions with snapshot isolation
- Flexibility: Schema evolution without breaking queries
- Time Travel: Query historical data at any point in time
- Compatibility: Works with multiple compute engines
CSV is one of the most common formats to migrate from. The CSV migration process involves reading CSV files, converting them to Iceberg's schema, and writing the data to Iceberg tables.
import pyarrow.csv as csv_pa
from pyiceberg.catalog import load_catalog
# Read CSV
csv_data = csv_pa.read_csv('data.csv')
# Create Iceberg table
catalog = load_catalog("my_catalog")
table = catalog.create_table("my_table", schema=csv_data.schema)
# Migrate data
table.append(csv_data)- Schema Enhancement: Add computed columns during migration
- Type Conversion: Ensure proper data types
- Partitioning: Organize data by partition keys
- Data Validation: Clean and validate data
Best Practices:
- Use PyArrow for efficient CSV reading
- Handle missing values explicitly
- Validate data ranges and types
- Consider partitioning for large datasets
Parquet to Iceberg migration is straightforward since Iceberg uses Parquet as its default file format.
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
# Read Parquet
parquet_data = pq.read_table('data.parquet')
# Create Iceberg table
catalog = load_catalog("my_catalog")
table = catalog.create_table("my_table", schema=parquet_data.schema)
# Migrate data
table.append(parquet_data)- No conversion needed: Iceberg uses Parquet natively
- Schema preservation: Maintains existing schema
- Performance: Leverages existing columnar format
- Metadata: Preserves existing metadata
JSON data requires schema inference and conversion to Iceberg's schema.
import pyarrow.json as pj
from pyiceberg.catalog import load_catalog
# Read JSON
json_data = pj.read_json('data.json')
# Create Iceberg table
catalog = load_catalog("my_catalog")
table = catalog.create_table("my_table", schema=json_data.schema)
# Migrate data
table.append(json_data)- Schema inference: JSON may have inconsistent schemas
- Nested structures: Handle nested JSON objects
- Data types: Ensure proper type conversion
- Performance: JSON is slower than Parquet
Migrate existing Hive tables to Iceberg while maintaining compatibility.
from pyiceberg.catalog import load_catalog
# Load Hive catalog
catalog = load_catalog("hive", uri="thrift://hive-metastore:9083")
# Register existing Hive table as Iceberg table
catalog.register_table(
identifier="database.table_name",
metadata_location="s3://warehouse/path/to/metadata.json"
)- Schema compatibility: Ensure Hive schema maps to Iceberg types
- Partitioning: Preserve or optimize partition strategy
- Data location: Keep data in existing location or migrate
- Query compatibility: Test existing queries against Iceberg table
Migrate Delta Lake tables to Iceberg for multi-engine compatibility.
import delta.pandas as delta_pd
import pyarrow as pa
from pyiceberg.catalog import load_catalog
# Read Delta Lake table
delta_data = delta_pd.read_delta('delta_table_path').to_arrow()
# Create Iceberg table
catalog = load_catalog("my_catalog")
table = catalog.create_table("my_table", schema=delta_data.schema)
# Migrate data
table.append(delta_data)- Schema evolution: Handle Delta Lake schema changes
- Time travel: Preserve Delta Lake time travel capabilities
- Performance: Compare performance after migration
- ACID properties: Both systems support ACID, but implementation differs
Migrate data from traditional databases to Iceberg.
import pyarrow as pa
from pyiceberg.catalog import load_catalog
import some_database_connector
# Connect to database
conn = some_database_connector.connect('database_url')
# Read data
cursor = conn.cursor()
cursor.execute("SELECT * FROM table_name")
data = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Convert to PyArrow
arrow_data = pa.array(data)
schema = pa.schema([(col, pa.string()) for col in columns]) # Adjust types as needed
table_data = pa.Table.from_arrays(arrow_data, schema=schema)
# Create Iceberg table
catalog = load_catalog("my_catalog")
table = catalog.create_table("my_table", schema=table_data.schema)
# Migrate data
table.append(table_data)- Data types: Map database types to Iceberg types
- Primary keys: Handle primary key constraints
- Foreign keys: Iceberg doesn't enforce foreign keys
- Indexes: Plan for query performance without traditional indexes
- Assess current data: Understand data volume, structure, and access patterns
- Define migration strategy: Choose appropriate migration approach
- Plan downtime: Schedule migration during low-usage periods
- Set up monitoring: Monitor migration progress and data quality
- Validate schemas: Ensure data types map correctly
- Handle nulls: Decide on null handling strategy
- Check constraints: Validate data constraints after migration
- Test queries: Verify query results match expectations
- Batch size: Process data in appropriate batch sizes
- Parallel processing: Use parallel processing for large datasets
- File size optimization: Target appropriate Iceberg file sizes
- Partitioning: Design partition strategy based on query patterns
- Row count validation: Ensure all rows migrated
- Data sampling: Compare sample data before and after
- Query validation: Test representative queries
- Performance validation: Compare query performance
Problem: Source schema doesn't match Iceberg type system
Solution:
# Explicit type conversion
converted_schema = pa.schema([
pa.field("id", pa.int64()), # Convert to int64
pa.field("name", pa.string()),
pa.field("value", pa.float64()) # Convert to float64
])
converted_data = original_data.cast(converted_schema)Problem: Dataset too large for memory
Solution:
# Process in batches
batch_size = 100000
for i in range(0, len(data), batch_size):
batch = data.slice(i, batch_size)
table.append(batch)Problem: Incompatible data types between systems
Solution:
# Custom type conversion
def convert_type(value):
if isinstance(value, str):
try:
return int(value)
except ValueError:
return float(value)
return valueProblem: Optimal partitioning unclear
Solution:
- Analyze query patterns
- Choose high-cardinality columns for partitioning
- Consider date/time-based partitioning for time-series data
- Test different partitioning strategies
- Data integrity: Verify data accuracy
- Query testing: Test all critical queries
- Performance testing: Compare query performance
- User acceptance: Get user sign-off
- File compaction: Optimize file sizes
- Statistics: Update table statistics
- Z-ordering: Implement Z-ordering if beneficial
- Partitioning: Refine partitioning based on usage
- Update documentation: Document new table locations
- Update queries: Modify queries to use Iceberg tables
- Train users: Train users on Iceberg-specific features
- Monitor performance: Set up ongoing performance monitoring
- Archive old data: Archive or remove source data
- Update permissions: Update access permissions
- Clean up resources: Remove temporary files and resources
- Update monitoring: Update monitoring and alerting
- Schema evolution: Modify schemas without breaking queries
- Partitioning: Flexible partitioning strategies
- Time travel: Query historical data
- ACID transactions: Reliable data operations
- Spark: Distributed processing with Iceberg
- Trino: SQL query engine with Iceberg support
- Pandas: Data analysis with Iceberg integration
For detailed implementation examples and patterns, see the practical examples guide.
- Documentation: Check the API documentation
- Community: Join the Apache Iceberg community
- Issues: Report bugs on GitHub Issues
- Examples: Review the practical examples
Migrating to Iceberg provides significant benefits for data management and analytics. By following this guide and leveraging PyIceberg's capabilities, you can successfully migrate your data while minimizing disruption and maximizing the benefits of Iceberg's advanced features.