|
| 1 | +--- |
| 2 | +hide: |
| 3 | + - navigation |
| 4 | +--- |
| 5 | + |
| 6 | +<!-- |
| 7 | + - Licensed to the Apache Software Foundation (ASF) under one |
| 8 | + or more contributor license agreements. See the NOTICE file |
| 9 | + distributed with this work for additional information |
| 10 | + regarding copyright ownership. The ASF licenses this file |
| 11 | + to you under the Apache License, Version 2.0 (the |
| 12 | + "License"); you may not use it except in compliance |
| 13 | + with the License. You may obtain a copy of the License at |
| 14 | +
|
| 15 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 16 | +
|
| 17 | + Unless required by applicable law or agreed to in writing, |
| 18 | + software distributed under the License is distributed on an |
| 19 | + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 20 | + KIND, either express or implied. See the License for the |
| 21 | + specific language governing permissions and limitations |
| 22 | + under the License. |
| 23 | +--> |
| 24 | + |
| 25 | +# Migration Guide |
| 26 | + |
| 27 | +This guide helps you migrate data from various formats and systems to Apache Iceberg using PyIceberg. |
| 28 | + |
| 29 | +## Overview |
| 30 | + |
| 31 | +Migrating to Iceberg provides numerous benefits: |
| 32 | +- **Performance**: Columnar Parquet format with predicate pushdown |
| 33 | +- **Reliability**: ACID transactions with snapshot isolation |
| 34 | +- **Flexibility**: Schema evolution without breaking queries |
| 35 | +- **Time Travel**: Query historical data at any point in time |
| 36 | +- **Compatibility**: Works with multiple compute engines |
| 37 | + |
| 38 | +## Migration Strategies |
| 39 | + |
| 40 | +### 1. CSV Migration |
| 41 | + |
| 42 | +CSV is one of the most common formats to migrate from. See the [CSV Migration Example](../notebooks/csv_migration_example.ipynb) for a detailed walkthrough. |
| 43 | + |
| 44 | +#### Basic CSV Migration |
| 45 | + |
| 46 | +```python |
| 47 | +import pyarrow.csv as csv_pa |
| 48 | +from pyiceberg.catalog import load_catalog |
| 49 | + |
| 50 | +# Read CSV |
| 51 | +csv_data = csv_pa.read_csv('data.csv') |
| 52 | + |
| 53 | +# Create Iceberg table |
| 54 | +catalog = load_catalog("my_catalog") |
| 55 | +table = catalog.create_table("my_table", schema=csv_data.schema) |
| 56 | + |
| 57 | +# Migrate data |
| 58 | +table.append(csv_data) |
| 59 | +``` |
| 60 | + |
| 61 | +#### Advanced CSV Migration |
| 62 | + |
| 63 | +- **Schema Enhancement**: Add computed columns during migration |
| 64 | +- **Type Conversion**: Ensure proper data types |
| 65 | +- **Partitioning**: Organize data by partition keys |
| 66 | +- **Data Validation**: Clean and validate data |
| 67 | + |
| 68 | +**Best Practices**: |
| 69 | +- Use PyArrow for efficient CSV reading |
| 70 | +- Handle missing values explicitly |
| 71 | +- Validate data ranges and types |
| 72 | +- Consider partitioning for large datasets |
| 73 | + |
| 74 | +### 2. Parquet Migration |
| 75 | + |
| 76 | +Parquet to Iceberg migration is straightforward since Iceberg uses Parquet as its default file format. |
| 77 | + |
| 78 | +#### Basic Parquet Migration |
| 79 | + |
| 80 | +```python |
| 81 | +import pyarrow.parquet as pq |
| 82 | +from pyiceberg.catalog import load_catalog |
| 83 | + |
| 84 | +# Read Parquet |
| 85 | +parquet_data = pq.read_table('data.parquet') |
| 86 | + |
| 87 | +# Create Iceberg table |
| 88 | +catalog = load_catalog("my_catalog") |
| 89 | +table = catalog.create_table("my_table", schema=parquet_data.schema) |
| 90 | + |
| 91 | +# Migrate data |
| 92 | +table.append(parquet_data) |
| 93 | +``` |
| 94 | + |
| 95 | +#### Advantages of Parquet Migration |
| 96 | + |
| 97 | +- **No conversion needed**: Iceberg uses Parquet natively |
| 98 | +- **Schema preservation**: Maintains existing schema |
| 99 | +- **Performance**: Leverages existing columnar format |
| 100 | +- **Metadata**: Preserves existing metadata |
| 101 | + |
| 102 | +### 3. JSON Migration |
| 103 | + |
| 104 | +JSON data requires schema inference and conversion to Iceberg's schema. |
| 105 | + |
| 106 | +#### Basic JSON Migration |
| 107 | + |
| 108 | +```python |
| 109 | +import pyarrow.json as pj |
| 110 | +from pyiceberg.catalog import load_catalog |
| 111 | + |
| 112 | +# Read JSON |
| 113 | +json_data = pj.read_json('data.json') |
| 114 | + |
| 115 | +# Create Iceberg table |
| 116 | +catalog = load_catalog("my_catalog") |
| 117 | +table = catalog.create_table("my_table", schema=json_data.schema) |
| 118 | + |
| 119 | +# Migrate data |
| 120 | +table.append(json_data) |
| 121 | +``` |
| 122 | + |
| 123 | +#### Considerations for JSON Migration |
| 124 | + |
| 125 | +- **Schema inference**: JSON may have inconsistent schemas |
| 126 | +- **Nested structures**: Handle nested JSON objects |
| 127 | +- **Data types**: Ensure proper type conversion |
| 128 | +- **Performance**: JSON is slower than Parquet |
| 129 | + |
| 130 | +### 4. Hive Table Migration |
| 131 | + |
| 132 | +Migrate existing Hive tables to Iceberg while maintaining compatibility. |
| 133 | + |
| 134 | +#### Hive to Iceberg Migration |
| 135 | + |
| 136 | +```python |
| 137 | +from pyiceberg.catalog import load_catalog |
| 138 | + |
| 139 | +# Load Hive catalog |
| 140 | +catalog = load_catalog("hive", uri="thrift://hive-metastore:9083") |
| 141 | + |
| 142 | +# Register existing Hive table as Iceberg table |
| 143 | +catalog.register_table( |
| 144 | + identifier="database.table_name", |
| 145 | + metadata_location="s3://warehouse/path/to/metadata.json" |
| 146 | +) |
| 147 | +``` |
| 148 | + |
| 149 | +#### Hive Migration Considerations |
| 150 | + |
| 151 | +- **Schema compatibility**: Ensure Hive schema maps to Iceberg types |
| 152 | +- **Partitioning**: Preserve or optimize partition strategy |
| 153 | +- **Data location**: Keep data in existing location or migrate |
| 154 | +- **Query compatibility**: Test existing queries against Iceberg table |
| 155 | + |
| 156 | +### 5. Delta Lake Migration |
| 157 | + |
| 158 | +Migrate Delta Lake tables to Iceberg for multi-engine compatibility. |
| 159 | + |
| 160 | +#### Delta Lake to Iceberg Migration |
| 161 | + |
| 162 | +```python |
| 163 | +import delta.pandas as delta_pd |
| 164 | +import pyarrow as pa |
| 165 | +from pyiceberg.catalog import load_catalog |
| 166 | + |
| 167 | +# Read Delta Lake table |
| 168 | +delta_data = delta_pd.read_delta('delta_table_path').to_arrow() |
| 169 | + |
| 170 | +# Create Iceberg table |
| 171 | +catalog = load_catalog("my_catalog") |
| 172 | +table = catalog.create_table("my_table", schema=delta_data.schema) |
| 173 | + |
| 174 | +# Migrate data |
| 175 | +table.append(delta_data) |
| 176 | +``` |
| 177 | + |
| 178 | +#### Delta Lake Migration Considerations |
| 179 | + |
| 180 | +- **Schema evolution**: Handle Delta Lake schema changes |
| 181 | +- **Time travel**: Preserve Delta Lake time travel capabilities |
| 182 | +- **Performance**: Compare performance after migration |
| 183 | +- **ACID properties**: Both systems support ACID, but implementation differs |
| 184 | + |
| 185 | +### 6. Database Migration |
| 186 | + |
| 187 | +Migrate data from traditional databases to Iceberg. |
| 188 | + |
| 189 | +#### Database to Iceberg Migration |
| 190 | + |
| 191 | +```python |
| 192 | +import pyarrow as pa |
| 193 | +from pyiceberg.catalog import load_catalog |
| 194 | +import some_database_connector |
| 195 | + |
| 196 | +# Connect to database |
| 197 | +conn = some_database_connector.connect('database_url') |
| 198 | + |
| 199 | +# Read data |
| 200 | +cursor = conn.cursor() |
| 201 | +cursor.execute("SELECT * FROM table_name") |
| 202 | +data = cursor.fetchall() |
| 203 | +columns = [desc[0] for desc in cursor.description] |
| 204 | + |
| 205 | +# Convert to PyArrow |
| 206 | +arrow_data = pa.array(data) |
| 207 | +schema = pa.schema([(col, pa.string()) for col in columns]) # Adjust types as needed |
| 208 | +table_data = pa.Table.from_arrays(arrow_data, schema=schema) |
| 209 | + |
| 210 | +# Create Iceberg table |
| 211 | +catalog = load_catalog("my_catalog") |
| 212 | +table = catalog.create_table("my_table", schema=table_data.schema) |
| 213 | + |
| 214 | +# Migrate data |
| 215 | +table.append(table_data) |
| 216 | +``` |
| 217 | + |
| 218 | +#### Database Migration Considerations |
| 219 | + |
| 220 | +- **Data types**: Map database types to Iceberg types |
| 221 | +- **Primary keys**: Handle primary key constraints |
| 222 | +- **Foreign keys**: Iceberg doesn't enforce foreign keys |
| 223 | +- **Indexes**: Plan for query performance without traditional indexes |
| 224 | + |
| 225 | +## Migration Best Practices |
| 226 | + |
| 227 | +### Planning |
| 228 | + |
| 229 | +1. **Assess current data**: Understand data volume, structure, and access patterns |
| 230 | +2. **Define migration strategy**: Choose appropriate migration approach |
| 231 | +3. **Plan downtime**: Schedule migration during low-usage periods |
| 232 | +4. **Set up monitoring**: Monitor migration progress and data quality |
| 233 | + |
| 234 | +### Data Quality |
| 235 | + |
| 236 | +1. **Validate schemas**: Ensure data types map correctly |
| 237 | +2. **Handle nulls**: Decide on null handling strategy |
| 238 | +3. **Check constraints**: Validate data constraints after migration |
| 239 | +4. **Test queries**: Verify query results match expectations |
| 240 | + |
| 241 | +### Performance |
| 242 | + |
| 243 | +1. **Batch size**: Process data in appropriate batch sizes |
| 244 | +2. **Parallel processing**: Use parallel processing for large datasets |
| 245 | +3. **File size optimization**: Target appropriate Iceberg file sizes |
| 246 | +4. **Partitioning**: Design partition strategy based on query patterns |
| 247 | + |
| 248 | +### Validation |
| 249 | + |
| 250 | +1. **Row count validation**: Ensure all rows migrated |
| 251 | +2. **Data sampling**: Compare sample data before and after |
| 252 | +3. **Query validation**: Test representative queries |
| 253 | +4. **Performance validation**: Compare query performance |
| 254 | + |
| 255 | +## Common Migration Challenges |
| 256 | + |
| 257 | +### Schema Mismatches |
| 258 | + |
| 259 | +**Problem**: Source schema doesn't match Iceberg type system |
| 260 | + |
| 261 | +**Solution**: |
| 262 | +```python |
| 263 | +# Explicit type conversion |
| 264 | +converted_schema = pa.schema([ |
| 265 | + pa.field("id", pa.int64()), # Convert to int64 |
| 266 | + pa.field("name", pa.string()), |
| 267 | + pa.field("value", pa.float64()) # Convert to float64 |
| 268 | +]) |
| 269 | +converted_data = original_data.cast(converted_schema) |
| 270 | +``` |
| 271 | + |
| 272 | +### Large Dataset Migration |
| 273 | + |
| 274 | +**Problem**: Dataset too large for memory |
| 275 | + |
| 276 | +**Solution**: |
| 277 | +```python |
| 278 | +# Process in batches |
| 279 | +batch_size = 100000 |
| 280 | +for i in range(0, len(data), batch_size): |
| 281 | + batch = data.slice(i, batch_size) |
| 282 | + table.append(batch) |
| 283 | +``` |
| 284 | + |
| 285 | +### Data Type Conversion |
| 286 | + |
| 287 | +**Problem**: Incompatible data types between systems |
| 288 | + |
| 289 | +**Solution**: |
| 290 | +```python |
| 291 | +# Custom type conversion |
| 292 | +def convert_type(value): |
| 293 | + if isinstance(value, str): |
| 294 | + try: |
| 295 | + return int(value) |
| 296 | + except ValueError: |
| 297 | + return float(value) |
| 298 | + return value |
| 299 | +``` |
| 300 | + |
| 301 | +### Partitioning Strategy |
| 302 | + |
| 303 | +**Problem**: Optimal partitioning unclear |
| 304 | + |
| 305 | +**Solution**: |
| 306 | +- Analyze query patterns |
| 307 | +- Choose high-cardinality columns for partitioning |
| 308 | +- Consider date/time-based partitioning for time-series data |
| 309 | +- Test different partitioning strategies |
| 310 | + |
| 311 | +## Post-Migration Steps |
| 312 | + |
| 313 | +### Validation |
| 314 | + |
| 315 | +1. **Data integrity**: Verify data accuracy |
| 316 | +2. **Query testing**: Test all critical queries |
| 317 | +3. **Performance testing**: Compare query performance |
| 318 | +4. **User acceptance**: Get user sign-off |
| 319 | + |
| 320 | +### Optimization |
| 321 | + |
| 322 | +1. **File compaction**: Optimize file sizes |
| 323 | +2. **Statistics**: Update table statistics |
| 324 | +3. **Z-ordering**: Implement Z-ordering if beneficial |
| 325 | +4. **Partitioning**: Refine partitioning based on usage |
| 326 | + |
| 327 | +### Documentation |
| 328 | + |
| 329 | +1. **Update documentation**: Document new table locations |
| 330 | +2. **Update queries**: Modify queries to use Iceberg tables |
| 331 | +3. **Train users**: Train users on Iceberg-specific features |
| 332 | +4. **Monitor performance**: Set up ongoing performance monitoring |
| 333 | + |
| 334 | +### Cleanup |
| 335 | + |
| 336 | +1. **Archive old data**: Archive or remove source data |
| 337 | +2. **Update permissions**: Update access permissions |
| 338 | +3. **Clean up resources**: Remove temporary files and resources |
| 339 | +4. **Update monitoring**: Update monitoring and alerting |
| 340 | + |
| 341 | +## Tools and Resources |
| 342 | + |
| 343 | +### PyIceberg Features |
| 344 | + |
| 345 | +- **Schema evolution**: Modify schemas without breaking queries |
| 346 | +- **Partitioning**: Flexible partitioning strategies |
| 347 | +- **Time travel**: Query historical data |
| 348 | +- **ACID transactions**: Reliable data operations |
| 349 | + |
| 350 | +### External Tools |
| 351 | + |
| 352 | +- **DuckDB**: High-performance analytics on Iceberg data |
| 353 | +- **Spark**: Distributed processing with Iceberg |
| 354 | +- **Trino**: SQL query engine with Iceberg support |
| 355 | +- **Pandas**: Data analysis with Iceberg integration |
| 356 | + |
| 357 | +### Example Notebooks |
| 358 | + |
| 359 | +- [CSV Migration Example](../notebooks/csv_migration_example.ipynb) |
| 360 | +- [DuckDB Integration](../notebooks/duckdb_integration_example.ipynb) |
| 361 | +- [Time Travel Queries](../notebooks/time_travel_example.ipynb) |
| 362 | + |
| 363 | +## Getting Help |
| 364 | + |
| 365 | +- **Documentation**: Check the [API documentation](api.md) |
| 366 | +- **Community**: Join the [Apache Iceberg community](https://iceberg.apache.org/community/) |
| 367 | +- **Issues**: Report bugs on [GitHub Issues](https://github.com/apache/iceberg-python/issues) |
| 368 | +- **Examples**: Review the [practical examples](practical-examples.md) |
| 369 | + |
| 370 | +## Conclusion |
| 371 | + |
| 372 | +Migrating to Iceberg provides significant benefits for data management and analytics. By following this guide and leveraging PyIceberg's capabilities, you can successfully migrate your data while minimizing disruption and maximizing the benefits of Iceberg's advanced features. |
0 commit comments