Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_ME
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS = 5;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS = 6;
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS;

static constexpr auto DATA_LAKE_TABLE_STATE_SNAPSHOT_PROTOCOL_VERSION = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
sort_order_id = sort_order_id_value.safeGet<Int32>();
}

const auto record_count = getValueFromRowByName(row_index, c_data_file_record_count, TypeIndex::Int64).safeGet<Int64>();
const auto file_size_in_bytes = getValueFromRowByName(row_index, c_data_file_file_size_in_bytes, TypeIndex::Int64).safeGet<Int64>();

switch (content_type)
{
case FileContentType::DATA: {
Expand All @@ -253,7 +256,9 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
/*lower_reference_data_file_path_ = */ std::nullopt,
/*upper_reference_data_file_path_ = */ std::nullopt,
/*equality_ids*/ std::nullopt,
sort_order_id);
sort_order_id,
record_count,
file_size_in_bytes);
}
case FileContentType::POSITION_DELETE: {
/// reference_file_path can be absent in schema for some reason, though it is present in specification: https://iceberg.apache.org/spec/#manifests
Expand Down Expand Up @@ -296,7 +301,9 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
lower_reference_data_file_path,
upper_reference_data_file_path,
/*equality_ids*/ std::nullopt,
/*sort_order_id = */ std::nullopt);
/*sort_order_id = */ std::nullopt,
record_count,
file_size_in_bytes);
}
case FileContentType::EQUALITY_DELETE: {
std::vector<Int32> equality_ids;
Expand Down Expand Up @@ -325,7 +332,9 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
/*lower_reference_data_file_path_ = */ std::nullopt,
/*upper_reference_data_file_path_ = */ std::nullopt,
equality_ids,
/*sort_order_id = */ std::nullopt);
/*sort_order_id = */ std::nullopt,
record_count,
file_size_in_bytes);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ DEFINE_ICEBERG_FIELD_COMPOUND(data_file, lower_bounds);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, upper_bounds);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, referenced_data_file);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, sort_order_id);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, record_count);
DEFINE_ICEBERG_FIELD_COMPOUND(data_file, file_size_in_bytes);

/// Fallback defaults for snapshot retention policy when table properties are absent.
/// These values follow the Java reference implementation; the Iceberg spec does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(
data_manifest_file_entry_->sequence_number,
data_manifest_file_entry_->parsed_entry->file_format,
/* position_deletes_objects */ {},
/* equality_deletes_objects */ {}}
/* equality_deletes_objects */ {},
data_manifest_file_entry_->parsed_entry->record_count,
data_manifest_file_entry_->parsed_entry->file_size_in_bytes}
{
}

Expand Down Expand Up @@ -137,6 +139,27 @@ void IcebergObjectSerializableInfo::serializeForClusterFunctionProtocol(WriteBuf
}
}
}
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS)
{
if (record_count.has_value())
{
writeVarUInt(1, out);
writeVarInt(*record_count, out);
}
else
{
writeVarUInt(0, out);
}
if (file_size_in_bytes.has_value())
{
writeVarUInt(1, out);
writeVarInt(*file_size_in_bytes, out);
}
else
{
writeVarUInt(0, out);
}
}
}

void IcebergObjectSerializableInfo::deserializeForClusterFunctionProtocol(ReadBuffer & in, size_t protocol_version)
Expand Down Expand Up @@ -193,6 +216,33 @@ void IcebergObjectSerializableInfo::deserializeForClusterFunctionProtocol(ReadBu
}
}
}
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_FILE_STATS)
{
size_t has_record_count = 0;
readVarUInt(has_record_count, in);
if (has_record_count)
{
Int64 value = 0;
readVarInt(value, in);
record_count = value;
}
else
{
record_count = std::nullopt;
}
size_t has_file_size = 0;
readVarUInt(has_file_size, in);
if (has_file_size)
{
Int64 value = 0;
readVarInt(value, in);
file_size_in_bytes = value;
}
else
{
file_size_in_bytes = std::nullopt;
}
}
}

void IcebergObjectSerializableInfo::checkVersion(size_t protocol_version) const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ struct IcebergObjectSerializableInfo
String file_format;
std::vector<Iceberg::PositionDeleteObject> position_deletes_objects;
std::vector<Iceberg::EqualityDeleteObject> equality_deletes_objects;
std::optional<Int64> record_count;
std::optional<Int64> file_size_in_bytes;

void serializeForClusterFunctionProtocol(WriteBuffer & out, size_t protocol_version) const;
void deserializeForClusterFunctionProtocol(ReadBuffer & in, size_t protocol_version);
Expand Down
10 changes: 9 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ struct ParsedManifestFileEntry : boost::noncopyable
/// Data file is sorted with this sort_order_id (can be read from metadata.json)
std::optional<Int32> sort_order_id;

/// File-level statistics from Iceberg manifest (required fields per spec)
Int64 record_count;
Int64 file_size_in_bytes;

ParsedManifestFileEntry(
FileContentType content_type_,
String file_path_key_,
Expand All @@ -110,7 +114,9 @@ struct ParsedManifestFileEntry : boost::noncopyable
std::optional<String> lower_reference_data_file_path_,
std::optional<String> upper_reference_data_file_path_,
std::optional<std::vector<Int32>> equality_ids_,
std::optional<Int32> sort_order_id_)
std::optional<Int32> sort_order_id_,
Int64 record_count_,
Int64 file_size_in_bytes_)
: content_type(content_type_)
, file_path_key(std::move(file_path_key_))
, row_number(row_number_)
Expand All @@ -125,6 +131,8 @@ struct ParsedManifestFileEntry : boost::noncopyable
, upper_reference_data_file_path(std::move(upper_reference_data_file_path_))
, equality_ids(std::move(equality_ids_))
, sort_order_id(sort_order_id_)
, record_count(record_count_)
, file_size_in_bytes(file_size_in_bytes_)
{
}
};
Expand Down
20 changes: 20 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Storages/HivePartitioningUtils.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/DataLakes/DeletionVectorTransform.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
Expand Down Expand Up @@ -97,6 +98,23 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
}

void logIcebergFileStats(const ObjectInfoPtr & object_info, const LoggerPtr & log)
{
#if USE_AVRO
if (auto iceberg_object = std::dynamic_pointer_cast<IcebergDataObjectInfo>(object_info))
{
const auto & info = iceberg_object->info;
if (info.record_count.has_value())
LOG_TEST(log, "Iceberg record_count for '{}': {}", object_info->getPath(), *info.record_count);
if (info.file_size_in_bytes.has_value())
LOG_TEST(log, "Iceberg file_size_in_bytes for '{}': {}", object_info->getPath(), *info.file_size_in_bytes);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that writing in log with 'test' level is an only place, where new data are used?

}
#else
UNUSED(object_info);
UNUSED(log);
#endif
}

StorageObjectStorageSource::StorageObjectStorageSource(
String name_,
ObjectStoragePtr object_storage_,
Expand Down Expand Up @@ -878,6 +896,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
object_info->getObjectMetadata()->size_bytes,
object_info->getFileFormat().value_or(configuration->getFormat()));

logIcebergFileStats(object_info, log);

bool use_native_reader_v3 = format_settings.has_value()
? format_settings->parquet.use_native_reader_v3
: context_->getSettingsRef()[Setting::input_format_parquet_use_native_reader_v3];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#include <IO/WriteBufferFromString.h>
#include <gtest/gtest.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeTableStateSnapshot.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergTableStateSnapshot.h>
#include <Core/ProtocolDefines.h>

using namespace DB;

Expand Down Expand Up @@ -60,3 +62,97 @@ TEST(DatalakeStateSerde, DataLakeStateSerde)
}
}


TEST(DatalakeStateSerde, IcebergObjectSerializableInfoRoundTrip)
{
Iceberg::IcebergObjectSerializableInfo info;
info.data_object_file_path_key = "s3://bucket/path/to/file.parquet";
info.underlying_format_read_schema_id = 42;
info.schema_id_relevant_to_iterator = 7;
info.sequence_number = 123456;
info.file_format = "PARQUET";
info.position_deletes_objects = {{"s3://bucket/deletes/pos1.parquet", "PARQUET", "s3://bucket/path/to/file.parquet"}};
info.equality_deletes_objects = {{"s3://bucket/deletes/eq1.parquet", "PARQUET", std::vector<Int32>{1, 2, 3}, 42}};
info.record_count = 100500;
info.file_size_in_bytes = 999888777;

String str;
{
WriteBufferFromString write_buffer{str};
info.serializeForClusterFunctionProtocol(write_buffer, DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION);
}
ReadBufferFromMemory read_buffer(str.data(), str.size());
Iceberg::IcebergObjectSerializableInfo deserialized;
deserialized.deserializeForClusterFunctionProtocol(read_buffer, DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION);

ASSERT_TRUE(read_buffer.eof());
ASSERT_EQ(deserialized.data_object_file_path_key, info.data_object_file_path_key);
ASSERT_EQ(deserialized.underlying_format_read_schema_id, info.underlying_format_read_schema_id);
ASSERT_EQ(deserialized.schema_id_relevant_to_iterator, info.schema_id_relevant_to_iterator);
ASSERT_EQ(deserialized.sequence_number, info.sequence_number);
ASSERT_EQ(deserialized.file_format, info.file_format);
ASSERT_EQ(deserialized.position_deletes_objects.size(), 1);
ASSERT_EQ(deserialized.equality_deletes_objects.size(), 1);
ASSERT_TRUE(deserialized.record_count.has_value());
ASSERT_EQ(*deserialized.record_count, 100500);
ASSERT_TRUE(deserialized.file_size_in_bytes.has_value());
ASSERT_EQ(*deserialized.file_size_in_bytes, 999888777);
}


TEST(DatalakeStateSerde, IcebergObjectSerializableInfoWithoutFileStats)
{
Iceberg::IcebergObjectSerializableInfo info;
info.data_object_file_path_key = "s3://bucket/path/to/file.parquet";
info.underlying_format_read_schema_id = 1;
info.schema_id_relevant_to_iterator = 1;
info.sequence_number = 0;
info.file_format = "PARQUET";
info.record_count = 42;
info.file_size_in_bytes = 1024;

/// Serialize at protocol version 5 (before file stats were added)
String str;
{
WriteBufferFromString write_buffer{str};
info.serializeForClusterFunctionProtocol(write_buffer, DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS);
}

/// Deserialize at protocol version 5 — the new fields should be absent
ReadBufferFromMemory read_buffer(str.data(), str.size());
Iceberg::IcebergObjectSerializableInfo deserialized;
deserialized.deserializeForClusterFunctionProtocol(read_buffer, DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_EXCLUDED_ROWS);

ASSERT_TRUE(read_buffer.eof());
ASSERT_EQ(deserialized.data_object_file_path_key, info.data_object_file_path_key);
ASSERT_FALSE(deserialized.record_count.has_value());
ASSERT_FALSE(deserialized.file_size_in_bytes.has_value());
}


TEST(DatalakeStateSerde, IcebergObjectSerializableInfoNulloptFileStats)
{
/// Test round-trip when the fields are explicitly nullopt
Iceberg::IcebergObjectSerializableInfo info;
info.data_object_file_path_key = "s3://bucket/path/to/file.parquet";
info.underlying_format_read_schema_id = 1;
info.schema_id_relevant_to_iterator = 1;
info.sequence_number = 0;
info.file_format = "PARQUET";
info.record_count = std::nullopt;
info.file_size_in_bytes = std::nullopt;

String str;
{
WriteBufferFromString write_buffer{str};
info.serializeForClusterFunctionProtocol(write_buffer, DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION);
}
ReadBufferFromMemory read_buffer(str.data(), str.size());
Iceberg::IcebergObjectSerializableInfo deserialized;
deserialized.deserializeForClusterFunctionProtocol(read_buffer, DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION);

ASSERT_TRUE(read_buffer.eof());
ASSERT_FALSE(deserialized.record_count.has_value());
ASSERT_FALSE(deserialized.file_size_in_bytes.has_value());
}

Loading
Loading