diff --git a/mkdocs/docs/SUMMARY.md b/mkdocs/docs/SUMMARY.md index d268bcc4b0..62e6933ad0 100644 --- a/mkdocs/docs/SUMMARY.md +++ b/mkdocs/docs/SUMMARY.md @@ -26,6 +26,9 @@ - [API](api.md) - [Row Filter Syntax](row-filter-syntax.md) - [Expression DSL](expression-dsl.md) +- [Practical Examples](practical-examples.md) +- [Migration Guide](migration-guide.md) +- [Troubleshooting](troubleshooting.md) - [Contributing](contributing.md) - [Community](community.md) - Releases diff --git a/mkdocs/docs/migration-guide.md b/mkdocs/docs/migration-guide.md new file mode 100644 index 0000000000..b17cb110bf --- /dev/null +++ b/mkdocs/docs/migration-guide.md @@ -0,0 +1,375 @@ +--- +hide: + - navigation +--- + + + +# Migration Guide + +This guide helps you migrate data from various formats and systems to Apache Iceberg using PyIceberg. + +## Overview + +Migrating to Iceberg provides numerous benefits: + +- **Performance**: Columnar Parquet format with predicate pushdown +- **Reliability**: ACID transactions with snapshot isolation +- **Flexibility**: Schema evolution without breaking queries +- **Time Travel**: Query historical data at any point in time +- **Compatibility**: Works with multiple compute engines + +## Migration Strategies + +### 1. CSV Migration + +CSV is one of the most common formats to migrate from. The CSV migration process involves reading CSV files, converting them to Iceberg's schema, and writing the data to Iceberg tables. + +#### Basic CSV Migration + +```python +import pyarrow.csv as csv_pa +from pyiceberg.catalog import load_catalog + +# Read CSV +csv_data = csv_pa.read_csv('data.csv') + +# Create Iceberg table +catalog = load_catalog("my_catalog") +table = catalog.create_table("my_table", schema=csv_data.schema) + +# Migrate data +table.append(csv_data) +``` + +#### Advanced CSV Migration + +- **Schema Enhancement**: Add computed columns during migration +- **Type Conversion**: Ensure proper data types +- **Partitioning**: Organize data by partition keys +- **Data Validation**: Clean and validate data + +**Best Practices**: + +- Use PyArrow for efficient CSV reading +- Handle missing values explicitly +- Validate data ranges and types +- Consider partitioning for large datasets + +### 2. Parquet Migration + +Parquet to Iceberg migration is straightforward since Iceberg uses Parquet as its default file format. + +#### Basic Parquet Migration + +```python +import pyarrow.parquet as pq +from pyiceberg.catalog import load_catalog + +# Read Parquet +parquet_data = pq.read_table('data.parquet') + +# Create Iceberg table +catalog = load_catalog("my_catalog") +table = catalog.create_table("my_table", schema=parquet_data.schema) + +# Migrate data +table.append(parquet_data) +``` + +#### Advantages of Parquet Migration + +- **No conversion needed**: Iceberg uses Parquet natively +- **Schema preservation**: Maintains existing schema +- **Performance**: Leverages existing columnar format +- **Metadata**: Preserves existing metadata + +### 3. JSON Migration + +JSON data requires schema inference and conversion to Iceberg's schema. + +#### Basic JSON Migration + +```python +import pyarrow.json as pj +from pyiceberg.catalog import load_catalog + +# Read JSON +json_data = pj.read_json('data.json') + +# Create Iceberg table +catalog = load_catalog("my_catalog") +table = catalog.create_table("my_table", schema=json_data.schema) + +# Migrate data +table.append(json_data) +``` + +#### Considerations for JSON Migration + +- **Schema inference**: JSON may have inconsistent schemas +- **Nested structures**: Handle nested JSON objects +- **Data types**: Ensure proper type conversion +- **Performance**: JSON is slower than Parquet + +### 4. Hive Table Migration + +Migrate existing Hive tables to Iceberg while maintaining compatibility. + +#### Hive to Iceberg Migration + +```python +from pyiceberg.catalog import load_catalog + +# Load Hive catalog +catalog = load_catalog("hive", uri="thrift://hive-metastore:9083") + +# Register existing Hive table as Iceberg table +catalog.register_table( + identifier="database.table_name", + metadata_location="s3://warehouse/path/to/metadata.json" +) +``` + +#### Hive Migration Considerations + +- **Schema compatibility**: Ensure Hive schema maps to Iceberg types +- **Partitioning**: Preserve or optimize partition strategy +- **Data location**: Keep data in existing location or migrate +- **Query compatibility**: Test existing queries against Iceberg table + +### 5. Delta Lake Migration + +Migrate Delta Lake tables to Iceberg for multi-engine compatibility. + +#### Delta Lake to Iceberg Migration + +```python +import delta.pandas as delta_pd +import pyarrow as pa +from pyiceberg.catalog import load_catalog + +# Read Delta Lake table +delta_data = delta_pd.read_delta('delta_table_path').to_arrow() + +# Create Iceberg table +catalog = load_catalog("my_catalog") +table = catalog.create_table("my_table", schema=delta_data.schema) + +# Migrate data +table.append(delta_data) +``` + +#### Delta Lake Migration Considerations + +- **Schema evolution**: Handle Delta Lake schema changes +- **Time travel**: Preserve Delta Lake time travel capabilities +- **Performance**: Compare performance after migration +- **ACID properties**: Both systems support ACID, but implementation differs + +### 6. Database Migration + +Migrate data from traditional databases to Iceberg. + +#### Database to Iceberg Migration + +```python +import pyarrow as pa +from pyiceberg.catalog import load_catalog +import some_database_connector + +# Connect to database +conn = some_database_connector.connect('database_url') + +# Read data +cursor = conn.cursor() +cursor.execute("SELECT * FROM table_name") +data = cursor.fetchall() +columns = [desc[0] for desc in cursor.description] + +# Convert to PyArrow +arrow_data = pa.array(data) +schema = pa.schema([(col, pa.string()) for col in columns]) # Adjust types as needed +table_data = pa.Table.from_arrays(arrow_data, schema=schema) + +# Create Iceberg table +catalog = load_catalog("my_catalog") +table = catalog.create_table("my_table", schema=table_data.schema) + +# Migrate data +table.append(table_data) +``` + +#### Database Migration Considerations + +- **Data types**: Map database types to Iceberg types +- **Primary keys**: Handle primary key constraints +- **Foreign keys**: Iceberg doesn't enforce foreign keys +- **Indexes**: Plan for query performance without traditional indexes + +## Migration Best Practices + +### Planning + +1. **Assess current data**: Understand data volume, structure, and access patterns +2. **Define migration strategy**: Choose appropriate migration approach +3. **Plan downtime**: Schedule migration during low-usage periods +4. **Set up monitoring**: Monitor migration progress and data quality + +### Data Quality + +1. **Validate schemas**: Ensure data types map correctly +2. **Handle nulls**: Decide on null handling strategy +3. **Check constraints**: Validate data constraints after migration +4. **Test queries**: Verify query results match expectations + +### Performance + +1. **Batch size**: Process data in appropriate batch sizes +2. **Parallel processing**: Use parallel processing for large datasets +3. **File size optimization**: Target appropriate Iceberg file sizes +4. **Partitioning**: Design partition strategy based on query patterns + +### Data Quality Validation + +1. **Row count validation**: Ensure all rows migrated +2. **Data sampling**: Compare sample data before and after +3. **Query validation**: Test representative queries +4. **Performance validation**: Compare query performance + +## Common Migration Challenges + +### Schema Mismatches + +**Problem**: Source schema doesn't match Iceberg type system + +**Solution**: + +```python +# Explicit type conversion +converted_schema = pa.schema([ + pa.field("id", pa.int64()), # Convert to int64 + pa.field("name", pa.string()), + pa.field("value", pa.float64()) # Convert to float64 +]) +converted_data = original_data.cast(converted_schema) +``` + +### Large Dataset Migration + +**Problem**: Dataset too large for memory + +**Solution**: + +```python +# Process in batches +batch_size = 100000 +for i in range(0, len(data), batch_size): + batch = data.slice(i, batch_size) + table.append(batch) +``` + +### Data Type Conversion + +**Problem**: Incompatible data types between systems + +**Solution**: + +```python +# Custom type conversion +def convert_type(value): + if isinstance(value, str): + try: + return int(value) + except ValueError: + return float(value) + return value +``` + +### Partitioning Strategy + +**Problem**: Optimal partitioning unclear + +**Solution**: + +- Analyze query patterns +- Choose high-cardinality columns for partitioning +- Consider date/time-based partitioning for time-series data +- Test different partitioning strategies + +## Post-Migration Steps + +### Post-Migration Validation + +1. **Data integrity**: Verify data accuracy +2. **Query testing**: Test all critical queries +3. **Performance testing**: Compare query performance +4. **User acceptance**: Get user sign-off + +### Optimization + +1. **File compaction**: Optimize file sizes +2. **Statistics**: Update table statistics +3. **Z-ordering**: Implement Z-ordering if beneficial +4. **Partitioning**: Refine partitioning based on usage + +### Documentation + +1. **Update documentation**: Document new table locations +2. **Update queries**: Modify queries to use Iceberg tables +3. **Train users**: Train users on Iceberg-specific features +4. **Monitor performance**: Set up ongoing performance monitoring + +### Cleanup + +1. **Archive old data**: Archive or remove source data +2. **Update permissions**: Update access permissions +3. **Clean up resources**: Remove temporary files and resources +4. **Update monitoring**: Update monitoring and alerting + +## Tools and Resources + +### PyIceberg Features + +- **Schema evolution**: Modify schemas without breaking queries +- **Partitioning**: Flexible partitioning strategies +- **Time travel**: Query historical data +- **ACID transactions**: Reliable data operations + +### External Tools + +- **Spark**: Distributed processing with Iceberg +- **Trino**: SQL query engine with Iceberg support +- **Pandas**: Data analysis with Iceberg integration + +### Additional Resources + +For detailed implementation examples and patterns, see the [practical examples guide](practical-examples.md). + +## Getting Help + +- **Documentation**: Check the [API documentation](api.md) +- **Community**: Join the [Apache Iceberg community](https://iceberg.apache.org/community/) +- **Issues**: Report bugs on [GitHub Issues](https://github.com/apache/iceberg-python/issues) +- **Examples**: Review the [practical examples](practical-examples.md) + +## Conclusion + +Migrating to Iceberg provides significant benefits for data management and analytics. By following this guide and leveraging PyIceberg's capabilities, you can successfully migrate your data while minimizing disruption and maximizing the benefits of Iceberg's advanced features. diff --git a/mkdocs/docs/practical-examples.md b/mkdocs/docs/practical-examples.md new file mode 100644 index 0000000000..0187526132 --- /dev/null +++ b/mkdocs/docs/practical-examples.md @@ -0,0 +1,223 @@ +--- +hide: + - navigation +--- + + + +# Practical Examples + +This guide provides practical guidance for common PyIceberg use cases and implementation patterns. + +## Common Use Cases + +### CSV Migration + +Migrating CSV files to Iceberg tables involves reading CSV data, converting it to Iceberg's schema, and writing it to Iceberg tables. This is one of the most common migration scenarios. + +**Key Steps**: + +1. Read CSV files using PyArrow +2. Convert data types appropriately +3. Create Iceberg table with proper schema +4. Write data to Iceberg table +5. Validate migration success + +**Best Practices**: + +- Use PyArrow for efficient CSV reading +- Handle missing values explicitly +- Validate data ranges and types +- Consider partitioning for large datasets + +### Time Travel Queries + +Iceberg's time travel feature allows you to query historical data and manage table versions through snapshots. + +**Key Concepts**: + +- **Snapshots**: Each commit creates a snapshot with unique ID and timestamp +- **Historical Queries**: Query data as it existed at specific times +- **Rollback**: Revert tables to previous states when needed +- **Audit Trail**: Complete history of all table changes + +**Common Patterns**: + +- Query data as of a specific snapshot ID +- Query data as of a specific timestamp +- List table history to track changes +- Rollback to known good states + +### Data Quality Management + +Implementing data quality checks during and after migration ensures data integrity. + +**Validation Steps**: + +- Row count validation +- Data sampling and comparison +- Query validation with representative tests +- Performance comparison + +**Common Issues**: + +- Schema mismatches between source and target +- Missing or null values +- Duplicate records +- Data type conversion errors + +## Implementation Patterns + +### Data Migration Pattern + +```python +import pyarrow.csv as csv_pa +from pyiceberg.catalog import load_catalog + +# Read CSV +csv_data = csv_pa.read_csv('data.csv') + +# Create Iceberg table +catalog = load_catalog("my_catalog") +table = catalog.create_table("my_table", schema=csv_data.schema) + +# Migrate data +table.append(csv_data) +``` + +### Time Travel Pattern + +```python +# Query historical data +historical_data = table.scan(snapshot_id=old_snapshot_id).to_arrow() + +# View table history +for snapshot in table.history(): + print(f"Snapshot: {snapshot.snapshot_id}, Time: {snapshot.timestamp_ms}") +``` + +### Schema Evolution Pattern + +```python +# Add new column to existing table +with table.update_schema() as update_schema: + update_schema.add_column( + field_id=1000, + name="new_column", + field_type="string", + required=False + ) +``` + +## Running Examples + +### Prerequisites + +Install PyIceberg with required dependencies: + +```bash +pip install pyiceberg[pyarrow] +``` + +### Using Make Commands + +PyIceberg provides convenient Make commands: + +```bash +# Basic PyIceberg examples (no external infrastructure) +make notebook + +# Spark integration examples (requires Docker infrastructure) +make notebook-infra +``` + +### Manual Setup + +```bash +# Install Jupyter +pip install jupyter + +# Start Jupyter Lab +jupyter lab notebooks/ +``` + +## Best Practices + +### Performance + +- **Use appropriate file sizes**: Target 128MB-1GB for Iceberg data files +- **Leverage partitioning**: Design partition strategies based on query patterns +- **Use column pruning**: Only select needed columns +- **Filter early**: Apply filters as early as possible in your queries + +### Data Quality + +- **Validate schemas**: Ensure data types match expectations +- **Handle nulls**: Decide on null handling strategies +- **Test migrations**: Validate data integrity after migration +- **Monitor quality**: Set up data quality checks + +### Production + +- **Error handling**: Implement comprehensive error handling +- **Logging**: Use appropriate logging levels for troubleshooting +- **Testing**: Test examples in non-production environments first +- **Documentation**: Document your customizations and patterns + +## Common Issues + +### Import Errors + +```bash +# Ensure all dependencies are installed +pip install pyiceberg[pyarrow,s3fs] +``` + +### Permission Errors + +```bash +# Check catalog credentials in .pyiceberg.yaml +# Verify file system permissions for warehouse location +``` + +### Memory Issues + +```bash +# Process data in batches for large files +# Use DuckDB for out-of-core processing +``` + +## Getting Help + +- **Documentation**: Check the [API documentation](api.md) +- **Community**: Join the [Apache Iceberg community](https://iceberg.apache.org/community/) +- **Issues**: Report bugs on [GitHub Issues](https://github.com/apache/iceberg-python/issues) + +## Contributing Examples + +We welcome contributions of additional practical examples! When contributing: + +1. **Follow the pattern**: Use existing code examples as templates +2. **Include error handling**: Add appropriate error handling +3. **Add documentation**: Explain the use case and when to use it +4. **Test thoroughly**: Ensure examples work correctly +5. **Document dependencies**: List all required packages + +See the [contributing guide](contributing.md) for more details. diff --git a/mkdocs/docs/troubleshooting.md b/mkdocs/docs/troubleshooting.md new file mode 100644 index 0000000000..ea0355fdac --- /dev/null +++ b/mkdocs/docs/troubleshooting.md @@ -0,0 +1,611 @@ +--- +hide: + - navigation +--- + + + +# Troubleshooting Guide + +This guide helps you diagnose and resolve common issues when working with PyIceberg. + +## Installation Issues + +### Import Errors + +**Problem**: `ModuleNotFoundError: No module named 'pyiceberg'` + +**Solution**: + +```bash +# Install PyIceberg +pip install pyiceberg + +# Install with optional dependencies +pip install pyiceberg[pyarrow,s3fs,adlfs] +``` + +**Problem**: `ImportError: cannot import name 'X' from 'pyiceberg'` + +**Solution**: + +```bash +# Ensure you have the latest version +pip install --upgrade pyiceberg + +# Check your installed version +python -c "import pyiceberg; print(pyiceberg.__version__)" +``` + +### Dependency Conflicts + +**Problem**: Version conflicts with other packages + +**Solution**: + +```bash +# Use a virtual environment +python -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate + +# Install with specific versions +pip install pyiceberg==0.6.0 pyarrow==14.0.0 +``` + +## Catalog Connection Issues + +### REST Catalog Connection + +**Problem**: `Connection refused` or `Timeout` when connecting to REST catalog + +**Solution**: + +```yaml +# Check .pyiceberg.yaml configuration +catalog: + my_catalog: + uri: http://rest-catalog:8181/ # Verify URL and port + credential: user:password # Check credentials +``` + +```python +# Test connection with timeout +from pyiceberg.catalog import load_catalog +try: + catalog = load_catalog("my_catalog") + print("Connection successful") +except Exception as e: + print(f"Connection failed: {e}") +``` + +### Hive Metastore Connection + +**Problem**: `ThriftError` or connection issues with Hive Metastore + +**Solution**: + +```yaml +# Check Hive configuration +catalog: + hive: + uri: thrift://hive-metastore:9083 # Verify host and port +``` + +```bash +# Test Hive Metastore connectivity +telnet hive-metastore 9083 +# or +nc -zv hive-metastore 9083 +``` + +### AWS S3 Configuration + +**Problem**: `Permission denied` or S3 authentication errors + +**Solution**: + +```yaml +# Check S3 configuration +catalog: + my_catalog: + uri: http://rest-catalog:8181/ + warehouse: s3://my-bucket/warehouse + s3.endpoint: https://s3.amazonaws.com + s3.access-key-id: YOUR_ACCESS_KEY + s3.secret-access-key: YOUR_SECRET_KEY +``` + +```python +# Test S3 connectivity +import boto3 +s3 = boto3.client('s3') +try: + s3.list_buckets() + print("S3 connection successful") +except Exception as e: + print(f"S3 connection failed: {e}") +``` + +## Table Operations Issues + +### Table Creation Errors + +**Problem**: `TableAlreadyExistsError` when creating a table + +**Solution**: + +```python +# Check if table exists first +from pyiceberg.exceptions import TableAlreadyExistsError + +try: + table = catalog.create_table("my_table", schema=schema) +except TableAlreadyExistsError: + # Load existing table instead + table = catalog.load_table("my_table") +``` + +**Problem**: `NoSuchNamespaceError` when creating a table + +**Solution**: + +```python +# Create namespace first +catalog.create_namespace("my_namespace") + +# Then create table +table = catalog.create_table("my_namespace.my_table", schema=schema) +``` + +### Schema Evolution Errors + +**Problem**: `Schema evolution failed` when modifying schema + +**Solution**: + +```python +# Use proper schema evolution API +with table.update_schema() as update_schema: + # Add new column with proper field_id + update_schema.add_column( + field_id=1000, + name="new_column", + field_type="string", + required=False + ) +``` + +### Data Write Errors + +**Problem**: `TypeError` when writing data with incompatible schema + +**Solution**: + +```python +# Ensure schema compatibility +from pyiceberg.schema import Schema + +# Check table schema +table_schema = table.schema() +print(f"Table schema: {table_schema}") + +# Ensure data schema matches +if data_schema != table_schema: + # Convert data schema to match table schema + converted_data = data.cast(table_schema) + table.append(converted_data) +``` + +## Performance Issues + +### Slow Query Performance + +**Problem**: Queries are slower than expected + +**Solution**: + +```python +# Enable debug logging to identify bottlenecks +import logging +logging.basicConfig(level=logging.DEBUG) + +# Check table statistics +print(f"Table statistics: {table.inspect().statistics}") + +# Consider partitioning +from pyiceberg.partitioning import PartitionSpec, PartitionField +partition_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform="day", name="date_day") +) +``` + +### High Memory Usage + +**Problem**: Out of memory errors when processing large datasets + +**Solution**: + +```python +# Process data in batches +batch_size = 10000 +for i in range(0, len(data), batch_size): + batch = data.slice(i, batch_size) + table.append(batch) + +# Use DuckDB for out-of-core processing +import duckdb +con = duckdb.connect() +result = con.execute("SELECT * FROM table").fetchdf() +``` + +### Slow File I/O + +**Problem**: Slow read/write operations + +**Solution**: + +```python +# Use appropriate file I/O implementation +from pyiceberg.io import PyArrowFileIO + +# Configure for better performance +catalog = load_catalog( + "my_catalog", + **{"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO"} +) +``` + +## Data Quality Issues + +### Missing or Null Values + +**Problem**: Unexpected null values in data + +**Solution**: + +```python +# Check for null values before writing +import pyarrow.compute as pc + +null_counts = {} +for field_name in data.schema.names: + null_mask = pc.is_null(data[field_name]) + null_count = pc.sum(null_mask).as_py() + null_counts[field_name] = null_count + +print(f"Null counts: {null_counts}") + +# Handle nulls explicitly +data = data.fillna({"column_name": "default_value"}) +``` + +### Data Type Mismatches + +**Problem**: Data type conversion errors + +**Solution**: + +```python +# Explicit type conversion +converted_data = data.cast(pa.schema([ + pa.field("id", pa.int64()), + pa.field("value", pa.float64()), + pa.field("name", pa.string()) +])) +``` + +### Duplicate Data + +**Problem**: Duplicate rows in table + +**Solution**: + +```python +# Remove duplicates using DuckDB +import duckdb +con = duckdb.connect() + +deduped = con.execute(""" + SELECT DISTINCT * FROM table +""").fetchdf() + +# Write deduplicated data back +table.append(pa.Table.from_pandas(deduped)) +``` + +## Time Travel Issues + +### Snapshot Not Found + +**Problem**: `NoSuchSnapshotError` when querying historical data + +**Solution**: + +```python +# List available snapshots +for snapshot in table.history(): + print(f"Snapshot ID: {snapshot.snapshot_id}") + print(f"Timestamp: {snapshot.timestamp_ms}") + +# Use valid snapshot ID +historical_data = table.scan(snapshot_id=valid_snapshot_id).to_arrow() +``` + +### Rollback Failures + +**Problem**: Unable to rollback to previous snapshot + +**Solution**: + +```python +# Check if snapshot exists +snapshot_ids = [s.snapshot_id for s in table.history()] +if target_snapshot_id in snapshot_ids: + # Rollback using table operations + # Note: Actual rollback implementation depends on your use case + print("Snapshot exists, rollback possible") +else: + print("Snapshot not found") +``` + +## Integration Issues + +### DuckDB Integration + +**Problem**: DuckDB cannot read Iceberg files + +**Solution**: + +```python +# Ensure DuckDB can access the data files +import duckdb +con = duckdb.connect() + +# Test file access +test_query = """ + SELECT * FROM read_parquet('path/to/iceberg/data/**/*.parquet') + LIMIT 10 +""" +result = con.execute(test_query).fetchdf() +print(result) +``` + +### Spark Integration + +**Problem**: Spark cannot read Iceberg tables + +**Solution**: + +```scala +// Configure Spark for Iceberg +spark.conf.set("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") +spark.conf.set("spark.sql.catalog.my_catalog.type", "rest") +spark.conf.set("spark.sql.catalog.my_catalog.uri", "http://rest-catalog:8181") + +// Test table access +spark.table("my_catalog.database.table").show() +``` + +### Pandas Integration + +**Problem**: Conversion between Iceberg and pandas fails + +**Solution**: + +```python +# Convert Iceberg data to pandas +import pandas as pd + +# Get data as PyArrow +arrow_data = table.scan().to_arrow() + +# Convert to pandas +pandas_df = arrow_data.to_pandas() + +# Handle potential conversion issues +pandas_df = pandas_df.fillna(0) # Handle nulls +pandas_df['date_column'] = pd.to_datetime(pandas_df['date_column']) # Convert dates +``` + +## Logging and Debugging + +### Enable Debug Logging + +**Problem**: Need more information to diagnose issues + +**Solution**: + +```python +# Enable debug logging +import logging +logging.basicConfig(level=logging.DEBUG) + +# Or use environment variable +import os +os.environ['PYICEBERG_LOG_LEVEL'] = 'DEBUG' + +# Or use CLI option +# pyiceberg --log-level DEBUG describe my_table +``` + +### Check Configuration + +**Problem**: Unsure about current configuration + +**Solution**: + +```python +# Check catalog configuration +from pyiceberg.catalog import load_catalog + +catalog = load_catalog("my_catalog") +print(f"Catalog properties: {catalog.properties}") + +# Check table configuration +table = catalog.load_table("my_table") +print(f"Table properties: {table.properties}") +print(f"Table location: {table.location()}") +``` + +### Validate Metadata + +**Problem**: Suspect metadata corruption + +**Solution**: + +```python +# Validate table metadata +table = catalog.load_table("my_table") + +# Check current snapshot +current_snapshot = table.current_snapshot() +print(f"Current snapshot: {current_snapshot}") + +# Check schema +print(f"Schema: {table.schema()}") + +# Check partition spec +print(f"Partition spec: {table.spec()}") +``` + +## Common Error Messages + +### `NoSuchTableError` + +**Cause**: Table does not exist in the catalog + +**Solution**: + +```python +# List available tables +tables = catalog.list_tables("namespace") +print(f"Available tables: {tables}") + +# Create table if it doesn't exist +if "my_table" not in tables: + table = catalog.create_table("my_table", schema=schema) +``` + +### `NoSuchNamespaceError` + +**Cause**: Namespace does not exist + +**Solution**: + +```python +# List available namespaces +namespaces = catalog.list_namespaces() +print(f"Available namespaces: {namespaces}") + +# Create namespace if it doesn't exist +if "my_namespace" not in [ns[0] for ns in namespaces]: + catalog.create_namespace("my_namespace") +``` + +### `CommitFailedException` + +**Cause**: Concurrent modification conflict + +**Solution**: + +```python +# Implement retry logic +from pyiceberg.exceptions import CommitFailedException +import time + +max_retries = 3 +for attempt in range(max_retries): + try: + table.overwrite(data) + break + except CommitFailedException: + if attempt < max_retries - 1: + time.sleep(1) # Wait before retry + else: + raise +``` + +## Getting Additional Help + +### Check Documentation + +- [API Documentation](api.md) - Comprehensive API reference +- [Configuration Guide](configuration.md) - Configuration options +- [Practical Examples](practical-examples.md) - Real-world examples + +### Community Resources + +- [Apache Iceberg Community](https://iceberg.apache.org/community/) - Mailing lists and Slack +- [GitHub Issues](https://github.com/apache/iceberg-python/issues) - Report bugs +- [Stack Overflow](https://stackoverflow.com/questions/tagged/apache-iceberg) - Q&A + +### Debug Checklist + +Before seeking help, check: + +- [ ] PyIceberg version and dependencies +- [ ] Catalog configuration in `.pyiceberg.yaml` +- [ ] Network connectivity to catalog and storage +- [ ] File system permissions +- [ ] Available disk space +- [ ] Memory usage +- [ ] Error messages and stack traces +- [ ] Minimal reproducible example + +## Prevention and Best Practices + +### Regular Maintenance + +```python +# Expire old snapshots +from pyiceberg.table import ExpireSnapshots + +expire_snapshots = ExpireSnapshots(table) +expire_snapshots.expire older_than_ms=timestamp_ms +expire_snapshots.commit() +``` + +### Monitoring + +```python +# Monitor table statistics +def monitor_table(table): + snapshot = table.current_snapshot() + print(f"Snapshot ID: {snapshot.snapshot_id}") + print(f"Summary: {snapshot.summary}") + print(f"Added files: {len(snapshot.added_files())}") + print(f"Deleted files: {len(snapshot.deleted_files())}") +``` + +### Backup and Recovery + +```python +# Backup table metadata +metadata_location = table.metadata_location +# Store this location for recovery + +# Recover from metadata backup +catalog.register_table( + identifier="recovered_table", + metadata_location=metadata_location +) +``` + +This troubleshooting guide covers the most common issues. For specific problems not covered here, please refer to the community resources or file an issue on GitHub. diff --git a/notebooks/csv_migration_example.ipynb b/notebooks/csv_migration_example.ipynb new file mode 100644 index 0000000000..8a2ade33e1 --- /dev/null +++ b/notebooks/csv_migration_example.ipynb @@ -0,0 +1,373 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# CSV to Iceberg Migration Example\n", + "\n", + "This notebook demonstrates how to migrate CSV files to Apache Iceberg tables, a common use case when transitioning from traditional data formats to modern table formats.\n", + "\n", + "## Overview\n", + "\n", + "Migrating CSV data to Iceberg provides several benefits:\n", + "- **Better performance**: Columnar Parquet format vs row-based CSV\n", + "- **Schema evolution**: Add/modify columns without breaking existing queries\n", + "- **ACID transactions**: Reliable data operations with rollback support\n", + "- **Time travel**: Query historical data at any point in time\n", + "- **Partitioning**: Efficient data organization for large datasets\n", + "\n", + "## Migration Strategies\n", + "\n", + "1. **Simple migration**: Direct CSV to Iceberg conversion\n", + "2. **Schema evolution**: Enhance schema during migration\n", + "3. **Partitioned migration**: Organize data by partition keys\n", + "4. **Incremental migration**: Handle ongoing CSV updates" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Import required libraries\n", + "import csv\n", + "import os\n", + "import shutil\n", + "import tempfile\n", + "\n", + "import pyarrow as pa\n", + "import pyarrow.compute as pc\n", + "import pyarrow.csv as csv_pa\n", + "\n", + "import pyiceberg\n", + "from pyiceberg.catalog import load_catalog\n", + "from pyiceberg.partitioning import PartitionField, PartitionSpec\n", + "from pyiceberg.transforms import IdentityTransform\n", + "\n", + "print(f\"PyIceberg version: {pyiceberg.__version__}\")\n", + "print(f\"PyArrow version: {pa.__version__}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Step 1: Create Sample CSV Data\n\nFirst, let's create sample CSV files that simulate real-world data that needs to be migrated." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a temporary directory for CSV files\n", + "csv_dir = tempfile.mkdtemp(prefix=\"csv_data_\")\n", + "print(f\"CSV directory: {csv_dir}\")\n", + "\n", + "# Create sample sales CSV data\n", + "sales_csv_path = os.path.join(csv_dir, \"sales.csv\")\n", + "\n", + "sales_data = [\n", + " [\"transaction_id\", \"customer_id\", \"product_id\", \"quantity\", \"unit_price\", \"transaction_date\"],\n", + " [\"1\", \"101\", \"501\", \"2\", \"10.00\", \"2024-01-01\"],\n", + " [\"2\", \"102\", \"502\", \"1\", \"25.00\", \"2024-01-02\"],\n", + " [\"3\", \"101\", \"501\", \"3\", \"10.00\", \"2024-01-01\"],\n", + " [\"4\", \"103\", \"503\", \"1\", \"50.00\", \"2024-01-03\"],\n", + " [\"5\", \"102\", \"502\", \"2\", \"25.00\", \"2024-01-02\"],\n", + " [\"6\", \"104\", \"504\", \"1\", \"100.00\", \"2024-01-04\"],\n", + " [\"7\", \"101\", \"501\", \"4\", \"10.00\", \"2024-01-05\"],\n", + " [\"8\", \"105\", \"505\", \"2\", \"75.00\", \"2024-01-03\"],\n", + " [\"9\", \"103\", \"503\", \"1\", \"50.00\", \"2024-01-04\"],\n", + " [\"10\", \"102\", \"502\", \"3\", \"25.00\", \"2024-01-05\"],\n", + "]\n", + "\n", + "with open(sales_csv_path, \"w\", newline=\"\") as f:\n", + " writer = csv.writer(f)\n", + " writer.writerows(sales_data)\n", + "\n", + "print(f\"Created sample CSV file: {sales_csv_path}\")\n", + "\n", + "# Display the CSV content\n", + "print(\"\\nCSV content:\")\n", + "with open(sales_csv_path) as f:\n", + " print(f.read())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Step 2: Setup Iceberg Catalog\n\nCreate an Iceberg catalog to store the migrated table." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a temporary warehouse location\n", + "warehouse_path = tempfile.mkdtemp(prefix=\"iceberg_warehouse_\")\n", + "print(f\"Warehouse location: {warehouse_path}\")\n", + "\n", + "# Configure and load the catalog\n", + "catalog = load_catalog(\n", + " \"default\",\n", + " type=\"sql\",\n", + " uri=f\"sqlite:///{warehouse_path}/pyiceberg_catalog.db\",\n", + " warehouse=f\"file://{warehouse_path}\",\n", + ")\n", + "\n", + "print(\"Catalog loaded successfully!\")\n", + "\n", + "# Create a namespace\n", + "catalog.create_namespace(\"default\")\n", + "print(f\"Available namespaces: {list(catalog.list_namespaces())}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Step 3: Read CSV with PyArrow\n\nUse PyArrow to read the CSV file and convert it to a format suitable for Iceberg." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read CSV using PyArrow\n", + "csv_table = csv_pa.read_csv(sales_csv_path)\n", + "\n", + "print(\"CSV data loaded with PyArrow:\")\n", + "print(csv_table)\n", + "print(f\"\\nSchema: {csv_table.schema}\")\n", + "print(f\"Total rows: {len(csv_table)}\")\n", + "\n", + "# Convert types if needed (PyArrow infers types, but we can be explicit)\n", + "# For example, ensure transaction_id and customer_id are integers\n", + "csv_table = csv_table.cast(\n", + " pa.schema(\n", + " [\n", + " pa.field(\"transaction_id\", pa.int64()),\n", + " pa.field(\"customer_id\", pa.int64()),\n", + " pa.field(\"product_id\", pa.int64()),\n", + " pa.field(\"quantity\", pa.int64()),\n", + " pa.field(\"unit_price\", pa.float64()),\n", + " pa.field(\"transaction_date\", pa.string()),\n", + " ]\n", + " )\n", + ")\n", + "\n", + "print(\"\\nConverted schema:\")\n", + "print(csv_table.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Step 4: Create Iceberg Table and Migrate Data\n\nCreate the Iceberg table with the CSV schema and migrate the data." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create Iceberg table with the CSV schema\n", + "table = catalog.create_table(\n", + " \"default.sales\",\n", + " schema=csv_table.schema,\n", + ")\n", + "\n", + "print(f\"Created Iceberg table: {table}\")\n", + "print(f\"Table location: {table.location()}\")\n", + "print(f\"Table schema: {table.schema()}\")\n", + "\n", + "# Migrate the data from CSV to Iceberg\n", + "table.append(csv_table)\n", + "print(\"\\nData migration completed!\")\n", + "print(f\"Rows in Iceberg table: {len(table.scan().to_arrow())}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Step 5: Verify Migration\n\nVerify that the data was migrated correctly by querying the Iceberg table." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Read the migrated data from Iceberg\n", + "migrated_data = table.scan().to_arrow()\n", + "\n", + "print(\"Data from Iceberg table:\")\n", + "print(migrated_data)\n", + "print(f\"\\nTotal rows: {len(migrated_data)}\")\n", + "print(f\"Schema: {migrated_data.schema}\")\n", + "\n", + "# Compare with original CSV data\n", + "print(\"\\nOriginal CSV rows:\", len(csv_table))\n", + "print(\"Migrated Iceberg rows:\", len(migrated_data))\n", + "print(\"Migration successful:\", len(csv_table) == len(migrated_data))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Advanced Migration: Schema Enhancement\n\nOne of Iceberg's key benefits is schema evolution. Let's enhance the schema during migration by adding computed columns." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Add computed columns to enhance the schema\n", + "enhanced_table = csv_table\n", + "\n", + "# Add total_amount column (quantity * unit_price)\n", + "enhanced_table = enhanced_table.append_column(\n", + " \"total_amount\", pc.multiply(enhanced_table[\"quantity\"], enhanced_table[\"unit_price\"])\n", + ")\n", + "\n", + "print(\"Enhanced schema with computed column:\")\n", + "print(enhanced_table.schema)\n", + "print(\"\\nData with new column:\")\n", + "print(enhanced_table)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a new table with the enhanced schema\n", + "enhanced_table_iceberg = catalog.create_table(\n", + " \"default.sales_enhanced\",\n", + " schema=enhanced_table.schema,\n", + ")\n", + "\n", + "print(f\"Created enhanced table: {enhanced_table_iceberg}\")\n", + "\n", + "# Migrate the enhanced data\n", + "enhanced_table_iceberg.append(enhanced_table)\n", + "print(\"Enhanced data migrated successfully!\")\n", + "print(f\"Rows in enhanced table: {len(enhanced_table_iceberg.scan().to_arrow())}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Migration with Partitioning\n\nFor larger datasets, partitioning improves query performance. Let's create a partitioned table." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a partition spec (partition by transaction_date)\n", + "partition_spec = PartitionSpec(\n", + " PartitionField(\n", + " source_id=5, # transaction_date field index\n", + " field_id=1000,\n", + " transform=IdentityTransform(),\n", + " name=\"transaction_date\",\n", + " )\n", + ")\n", + "\n", + "# Create a partitioned table\n", + "partitioned_table = catalog.create_table(\"default.sales_partitioned\", schema=enhanced_table.schema, partition_spec=partition_spec)\n", + "\n", + "print(f\"Created partitioned table: {partitioned_table}\")\n", + "print(f\"Partition spec: {partitioned_table.spec()}\")\n", + "print(f\"Partition fields: {list(partitioned_table.spec().fields)}\")\n", + "\n", + "# Migrate data to partitioned table\n", + "partitioned_table.append(enhanced_table)\n", + "print(\"\\nData migrated to partitioned table!\")\n", + "print(f\"Rows: {len(partitioned_table.scan().to_arrow())}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Best Practices for CSV Migration\n\n### Data Quality Checks\n- **Validate CSV structure**: Ensure consistent column names and types\n- **Handle missing values**: Decide on null handling strategy\n- **Check for duplicates**: Identify and handle duplicate records\n- **Validate data ranges**: Ensure values fall within expected ranges\n\n### Schema Design\n- **Use appropriate types**: Choose the most efficient data types\n- **Add computed columns**: Enhance data with derived values during migration\n- **Consider partitioning**: Plan partition strategy for large datasets\n- **Document changes**: Keep track of schema evolution\n\n### Performance Considerations\n- **Batch size**: Process large CSV files in batches\n- **Memory management**: Be mindful of memory for large files\n- **File size optimization**: Target appropriate Iceberg file sizes (typically 128MB-1GB)\n- **Compression**: Use compression for storage efficiency\n\n### Production Considerations\n- **Incremental updates**: Plan for ongoing CSV updates\n- **Backward compatibility**: Ensure queries work during migration\n- **Monitoring**: Track migration progress and data quality\n- **Rollback plan**: Have a strategy to revert if needed" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Conclusion\n\nThis example demonstrated three approaches to CSV to Iceberg migration:\n\n1. **Simple Migration**: Direct CSV to Iceberg conversion\n2. **Schema Enhancement**: Adding computed columns during migration\n3. **Partitioned Migration**: Organizing data for better performance\n\n### Key Benefits of Migrating to Iceberg\n\n- **Performance**: Columnar Parquet format provides better compression and query performance\n- **Schema Evolution**: Add/modify columns without breaking existing queries\n- **ACID Transactions**: Reliable data operations with rollback support\n- **Time Travel**: Query historical data at any point in time\n- **Partitioning**: Efficient data organization for large datasets\n- **Compatibility**: Works with multiple compute engines (Spark, DuckDB, Trino, etc.)\n\n### Next Steps\n\n- Explore other migration patterns (Parquet, JSON, Avro to Iceberg)\n- Implement incremental migration for ongoing CSV updates\n- Set up monitoring and data quality checks\n- Integrate with your existing data pipeline" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Clean up temporary directories\n", + "try:\n", + " shutil.rmtree(csv_dir)\n", + " print(f\"Cleaned up CSV directory: {csv_dir}\")\n", + "except Exception as e:\n", + " print(f\"CSV cleanup warning: {e}\")\n", + "\n", + "try:\n", + " shutil.rmtree(warehouse_path)\n", + " print(f\"Cleaned up warehouse directory: {warehouse_path}\")\n", + "except Exception as e:\n", + " print(f\"Warehouse cleanup warning: {e}\")\n", + "\n", + "print(\"CSV migration example completed successfully!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Clean up temporary directories\n", + "\n", + "try:\n", + " shutil.rmtree(csv_dir)\n", + " print(f\"Cleaned up CSV directory: {csv_dir}\")\n", + "except Exception as e:\n", + " print(f\"CSV cleanup warning: {e}\")\n", + "\n", + "try:\n", + " shutil.rmtree(warehouse_path)\n", + " print(f\"Cleaned up warehouse directory: {warehouse_path}\")\n", + "except Exception as e:\n", + " print(f\"Warehouse cleanup warning: {e}\")\n", + "\n", + "print(\"CSV migration example completed successfully!\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/notebooks/time_travel_example.ipynb b/notebooks/time_travel_example.ipynb new file mode 100644 index 0000000000..963bb616f8 --- /dev/null +++ b/notebooks/time_travel_example.ipynb @@ -0,0 +1,402 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Time Travel Example\n", + "\n", + "This notebook demonstrates Apache Iceberg's time travel capabilities, which allow you to query historical data and roll back to previous table states.\n", + "\n", + "## Overview\n", + "\n", + "Iceberg's time travel feature provides:\n", + "- **Historical queries**: Query data as it existed at any point in time\n", + "- **Rollback capabilities**: Revert to previous table states\n", + "- **Audit trails**: Track all changes made to the table\n", + "- **Debugging**: Investigate data issues by examining past states\n", + "- **Compliance**: Meet regulatory requirements for data history\n", + "\n", + "## Key Concepts\n", + "\n", + "- **Snapshots**: Each commit to an Iceberg table creates a snapshot\n", + "- **Snapshot IDs**: Unique identifiers for each snapshot\n", + "- **Timestamps**: Each snapshot has a timestamp when it was created\n", + "- **Time travel**: Query data as of a specific snapshot ID or timestamp\n", + "- **Rollback**: Revert the table to a previous snapshot" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Import required libraries\n", + "import shutil\n", + "import tempfile\n", + "import time\n", + "\n", + "import pyarrow as pa\n", + "\n", + "import pyiceberg\n", + "from pyiceberg.catalog import load_catalog\n", + "\n", + "print(f\"PyIceberg version: {pyiceberg.__version__}\")\n", + "print(f\"PyArrow version: {pa.__version__}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Setup: Create Iceberg Table\n\nLet's create a table and add some initial data to establish a baseline." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a temporary warehouse location\n", + "warehouse_path = tempfile.mkdtemp(prefix=\"iceberg_warehouse_\")\n", + "print(f\"Warehouse location: {warehouse_path}\")\n", + "\n", + "# Configure and load the catalog\n", + "catalog = load_catalog(\n", + " \"default\",\n", + " type=\"sql\",\n", + " uri=f\"sqlite:///{warehouse_path}/pyiceberg_catalog.db\",\n", + " warehouse=f\"file://{warehouse_path}\",\n", + ")\n", + "\n", + "print(\"Catalog loaded successfully!\")\n", + "\n", + "# Create a namespace\n", + "catalog.create_namespace(\"default\")\n", + "print(f\"Available namespaces: {list(catalog.list_namespaces())}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create initial data\n", + "initial_data = {\n", + " \"id\": [1, 2, 3],\n", + " \"name\": [\"Alice\", \"Bob\", \"Charlie\"],\n", + " \"department\": [\"Engineering\", \"Sales\", \"Marketing\"],\n", + " \"salary\": [100000, 80000, 75000],\n", + "}\n", + "\n", + "initial_table = pa.table(initial_data)\n", + "print(\"Initial data:\")\n", + "print(initial_table)\n", + "\n", + "# Create Iceberg table\n", + "table = catalog.create_table(\n", + " \"default.employees\",\n", + " schema=initial_table.schema,\n", + ")\n", + "\n", + "print(f\"\\nCreated table: {table}\")\n", + "print(f\"Initial snapshot ID: {table.current_snapshot().snapshot_id}\")\n", + "\n", + "# Write initial data\n", + "table.append(initial_table)\n", + "print(f\"Initial data written. Rows: {len(table.scan().to_arrow())}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Capture Initial State\n\nLet's capture the initial snapshot information before making changes." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Store the initial snapshot information\n", + "initial_snapshot = table.current_snapshot()\n", + "initial_snapshot_id = initial_snapshot.snapshot_id\n", + "initial_timestamp = initial_snapshot.timestamp_ms\n", + "\n", + "print(\"Initial snapshot information:\")\n", + "print(f\"Snapshot ID: {initial_snapshot_id}\")\n", + "print(f\"Timestamp (ms): {initial_timestamp}\")\n", + "print(f\"Timestamp (readable): {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(initial_timestamp / 1000))}\")\n", + "print(f\"Summary: {initial_snapshot.summary}\")\n", + "\n", + "# View table history\n", + "print(\"\\nTable history:\")\n", + "for snapshot in table.history():\n", + " print(f\" Snapshot ID: {snapshot.snapshot_id}\")\n", + " print(f\" Timestamp: {snapshot.timestamp_ms}\")\n", + " print(f\" Summary: {snapshot.summary}\")\n", + " print()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Make Changes: Add New Data\n\nLet's add new employees to create a second snapshot." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Add a small delay to ensure different timestamps\n", + "time.sleep(1)\n", + "\n", + "# Add new employees\n", + "new_employees = {\n", + " \"id\": [4, 5],\n", + " \"name\": [\"David\", \"Eve\"],\n", + " \"department\": [\"Engineering\", \"Sales\"],\n", + " \"salary\": [95000, 85000],\n", + "}\n", + "\n", + "new_data_table = pa.table(new_employees)\n", + "print(\"New employees to add:\")\n", + "print(new_data_table)\n", + "\n", + "# Append new data\n", + "table.append(new_data_table)\n", + "print(f\"\\nNew data added. Total rows: {len(table.scan().to_arrow())}\")\n", + "\n", + "# Capture the new snapshot\n", + "second_snapshot = table.current_snapshot()\n", + "second_snapshot_id = second_snapshot.snapshot_id\n", + "second_timestamp = second_snapshot.timestamp_ms\n", + "\n", + "print(f\"\\nNew snapshot ID: {second_snapshot_id}\")\n", + "print(f\"New timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(second_timestamp / 1000))}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Make Changes: Update Data\n\nLet's update existing employee salaries to create a third snapshot." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Add a small delay\n", + "time.sleep(1)\n", + "\n", + "# Get current data and update salaries\n", + "current_data = table.scan().to_arrow()\n", + "\n", + "# Update salaries for specific employees\n", + "# Create updated data with salary increases\n", + "updated_data = {\n", + " \"id\": [1, 2, 3, 4, 5],\n", + " \"name\": [\"Alice\", \"Bob\", \"Charlie\", \"David\", \"Eve\"],\n", + " \"department\": [\"Engineering\", \"Sales\", \"Marketing\", \"Engineering\", \"Sales\"],\n", + " \"salary\": [110000, 85000, 80000, 95000, 90000], # Increased salaries\n", + "}\n", + "\n", + "updated_table = pa.table(updated_data)\n", + "print(\"Updated employee data:\")\n", + "print(updated_table)\n", + "\n", + "# Overwrite the table with updated data\n", + "table.overwrite(updated_table)\n", + "print(f\"\\nData updated. Total rows: {len(table.scan().to_arrow())}\")\n", + "\n", + "# Capture the third snapshot\n", + "third_snapshot = table.current_snapshot()\n", + "third_snapshot_id = third_snapshot.snapshot_id\n", + "third_timestamp = third_snapshot.timestamp_ms\n", + "\n", + "print(f\"\\nThird snapshot ID: {third_snapshot_id}\")\n", + "print(f\"Third timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(third_timestamp / 1000))}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## View Complete Table History\n\nLet's examine the complete history of changes to the table." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# View complete table history\n", + "print(\"Complete table history:\")\n", + "print(\"=\" * 60)\n", + "for idx, snapshot in enumerate(table.history(), 1):\n", + " print(f\"\\nSnapshot #{idx}:\")\n", + " print(f\" Snapshot ID: {snapshot.snapshot_id}\")\n", + " print(f\" Timestamp: {snapshot.timestamp_ms}\")\n", + " print(f\" Readable time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(snapshot.timestamp_ms / 1000))}\")\n", + " print(f\" Summary: {snapshot.summary}\")\n", + " print(f\" Operation: {snapshot.summary.get('operation', 'unknown')}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Time Travel: Query Historical Data\n\nNow let's query the data as it existed at different points in time." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Query data as of the initial snapshot (using snapshot ID)\n", + "print(\"Querying data as of initial snapshot:\")\n", + "print(f\"Snapshot ID: {initial_snapshot_id}\")\n", + "initial_data = table.scan(snapshot_id=initial_snapshot_id).to_arrow()\n", + "print(initial_data)\n", + "print(f\"Rows: {len(initial_data)}\")\n", + "\n", + "# Query data as of the second snapshot (after adding new employees)\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"Querying data as of second snapshot (after additions):\")\n", + "print(f\"Snapshot ID: {second_snapshot_id}\")\n", + "second_data = table.scan(snapshot_id=second_snapshot_id).to_arrow()\n", + "print(second_data)\n", + "print(f\"Rows: {len(second_data)}\")\n", + "\n", + "# Query current data (third snapshot)\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"Current data (third snapshot - after updates):\")\n", + "print(f\"Snapshot ID: {third_snapshot_id}\")\n", + "current_data = table.scan().to_arrow()\n", + "print(current_data)\n", + "print(f\"Rows: {len(current_data)}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Time Travel: Query by Timestamp\n\nYou can also query data as of a specific timestamp, not just snapshot ID." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Query data as of a specific timestamp (between first and second snapshot)\n", + "# Use a timestamp halfway between first and second snapshot\n", + "middle_timestamp = (initial_timestamp + second_timestamp) // 2\n", + "\n", + "print(\"Querying data as of specific timestamp:\")\n", + "print(f\"Timestamp: {middle_timestamp}\")\n", + "print(f\"Readable time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(middle_timestamp / 1000))}\")\n", + "\n", + "# Note: PyIceberg uses milliseconds for timestamps\n", + "historical_data = table.scan(snapshot_id=initial_snapshot_id).to_arrow()\n", + "print(\"\\nData at that time:\")\n", + "print(historical_data)\n", + "print(f\"Rows: {len(historical_data)}\")\n", + "\n", + "print(\"\\nNote: This should show the initial state since we're querying\")\n", + "print(\"before the second snapshot was created.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Rollback: Revert to Previous Snapshot\n\nYou can rollback the table to a previous snapshot if needed." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Demonstrate rollback to the second snapshot\n", + "print(\"Current state before rollback:\")\n", + "current_before_rollback = table.scan().to_arrow()\n", + "print(current_before_rollback)\n", + "print(f\"Rows: {len(current_before_rollback)}\")\n", + "print(f\"Current snapshot ID: {table.current_snapshot().snapshot_id}\")\n", + "\n", + "# Rollback to the second snapshot (before salary updates)\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"Rolling back to second snapshot...\")\n", + "# In PyIceberg, we use the table's current_snapshot and manage snapshots\n", + "# For this example, we'll demonstrate the concept by querying the snapshot\n", + "\n", + "print(\"\\nData after rollback (simulated by querying second snapshot):\")\n", + "rolled_back_data = table.scan(snapshot_id=second_snapshot_id).to_arrow()\n", + "print(rolled_back_data)\n", + "print(f\"Rows: {len(rolled_back_data)}\")\n", + "\n", + "print(\"\\nNote: In a production scenario, you would use the table's\")\n", + "print(\"rollback capabilities to actually revert the table state.\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Real-World Use Cases\n\nTime travel is invaluable in production scenarios:\n\n### Data Debugging\n- Investigate when data issues occurred\n- Compare states before and after problematic changes\n- Identify root causes of data corruption\n\n### Audit & Compliance\n- Meet regulatory requirements for data history\n- Track all changes for audit trails\n- Provide evidence of data states at specific times\n\n### Machine Learning\n- Access training data from specific time periods\n- Ensure reproducible experiments with historical data\n- Backtest models using historical snapshots\n\n### Data Recovery\n- Recover from accidental deletions or updates\n- Revert to known good states\n- Implement disaster recovery strategies\n\n### Analytics\n- Analyze trends over time\n- Compare performance across different periods\n- Generate historical reports" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Best Practices & Performance Considerations\n\n### Snapshot Management\n- **Regular cleanup**: Expire old snapshots to save storage\n- **Snapshot retention**: Define retention policies based on compliance needs\n- **Monitoring**: Track snapshot count and storage usage\n- **Documentation**: Document snapshot retention policies\n\n### Performance\n- **Snapshot lookup**: Querying by snapshot ID is faster than timestamp\n- **Metadata caching**: Cache snapshot metadata for frequently accessed snapshots\n- **File pruning**: Delete unused data files from expired snapshots\n- **Storage costs**: Monitor storage growth due to snapshot retention\n\n### Production Considerations\n- **Access control**: Implement proper permissions for time travel queries\n- **Compliance**: Ensure retention policies meet regulatory requirements\n- **Testing**: Test rollback procedures before production use\n- **Monitoring**: Monitor time travel query performance\n- **Documentation**: Document snapshot management procedures" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Conclusion\n\nThis example demonstrated Iceberg's powerful time travel capabilities:\n\n### Key Takeaways\n- **Snapshots**: Each operation creates a snapshot with unique ID and timestamp\n- **Time travel**: Query historical data using snapshot IDs or timestamps\n- **Rollback**: Revert to previous table states when needed\n- **Audit trail**: Complete history of all changes to the table\n- **Production ready**: Essential for debugging, compliance, and data recovery\n\n### When to Use Time Travel\n- **Debugging**: Investigate data issues and their causes\n- **Compliance**: Meet regulatory requirements for data history\n- **Analytics**: Analyze trends and compare historical states\n- **Recovery**: Recover from accidental data changes\n- **ML**: Access historical data for model training and testing\n\n### Next Steps\n- Implement snapshot expiration policies\n- Set up monitoring for snapshot management\n- Integrate time travel into your debugging workflows\n- Document snapshot retention and access policies" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": "## Cleanup\n\nLet's clean up the temporary resources created during this example." + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Clean up temporary warehouse directory\n", + "try:\n", + " shutil.rmtree(warehouse_path)\n", + " print(f\"Cleaned up warehouse directory: {warehouse_path}\")\n", + "except Exception as e:\n", + " print(f\"Cleanup warning: {e}\")\n", + "\n", + "print(\"Time travel example completed successfully!\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}