Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion AnnService/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ include_directories(${Zstd}/lib)
file(GLOB_RECURSE HDR_FILES ${AnnService}/inc/Core/*.h ${AnnService}/inc/Helper/*.h)
file(GLOB_RECURSE SRC_FILES ${AnnService}/src/Core/*.cpp ${AnnService}/src/Helper/*.cpp)

# Include Socket sources in core lib for PostingRouter
file(GLOB SOCKET_HDR_FILES ${AnnService}/inc/Socket/*.h)
file(GLOB SOCKET_SRC_FILES ${AnnService}/src/Socket/*.cpp)
list(APPEND HDR_FILES ${SOCKET_HDR_FILES})
list(APPEND SRC_FILES ${SOCKET_SRC_FILES})

set(SPDK_LIBRARIES "")
if (SPDK)
set(Spdk ${PROJECT_SOURCE_DIR}/ThirdParty/spdk/build)
Expand Down Expand Up @@ -73,7 +79,7 @@ endif()
add_library (SPTAGLib SHARED ${SRC_FILES} ${HDR_FILES} ${TiKV_PROTO_SOURCES})
target_link_libraries (SPTAGLib DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_shared ${NUMA_LIBRARY} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES})
add_library (SPTAGLibStatic STATIC ${SRC_FILES} ${HDR_FILES} ${TiKV_PROTO_SOURCES})
target_link_libraries (SPTAGLibStatic DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_static ${NUMA_LIBRARY_STATIC} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES})
target_link_libraries (SPTAGLibStatic DistanceUtils ${RocksDB_LIBRARIES} ${uring_LIBRARIES} libzstd_static ${NUMA_LIBRARY_STATIC} ${TBB_LIBRARIES} ${SPDK_LIBRARIES} ${TiKV_LIBRARIES} ${Boost_LIBRARIES})

if (MSVC)
# SPANNIndex.cpp can exceed COFF section limits in Debug without /bigobj.
Expand Down
21 changes: 13 additions & 8 deletions AnnService/inc/Core/Common/FineGrainedLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ namespace SPTAG
{
return idx;
}

// Bucket index for the internal mutex-sharded unordered_map of
// per-posting locks. Exposed for callers that need an array sized
// to BucketCount and indexed by the same granularity as the lock
// pool (e.g. ExtraDynamicSearcher::m_remoteBucketLocked).
static inline unsigned BucketIndex(SizeType idx)
{
unsigned key = static_cast<unsigned>(idx);
return ((unsigned)(key * 99991) + _rotl(key, 2) + 101) & BucketMask;
}

static const int BucketMask = 32767;
static const int BucketCount = BucketMask + 1;
private:
struct Bucket {
std::mutex mutex;
Expand All @@ -76,14 +89,6 @@ namespace SPTAG
return *iter->second;
}

static inline unsigned BucketIndex(SizeType idx)
{
unsigned key = static_cast<unsigned>(idx);
return ((unsigned)(key * 99991) + _rotl(key, 2) + 101) & BucketMask;
}

static const int BucketMask = 32767;
static const int BucketCount = BucketMask + 1;
mutable std::unique_ptr<Bucket[]> m_buckets;
};
}
Expand Down
12 changes: 12 additions & 0 deletions AnnService/inc/Core/Common/IVersionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ namespace SPTAG
virtual uint8_t GetVersion(const SizeType& key) = 0;
virtual uint8_t GetVersion(const SizeType& key, VersionReadPolicy policy) { return GetVersion(key); }
virtual void SetVersion(const SizeType& key, const uint8_t& version) = 0;

/// Batch SetVersion: apply (vids[i] -> versions[i]) for all i.
/// Default impl is a per-VID loop. TiKV-backed maps override this
/// to group writes by chunk so N records in the same chunk only
/// trigger 1 ReadChunk + 1 WriteChunk RPC pair
virtual void SetVersionBatch(const std::vector<SizeType>& vids, const std::vector<uint8_t>& versions)
{
size_t n = std::min(vids.size(), versions.size());
for (size_t i = 0; i < n; i++) {
SetVersion(vids[i], versions[i]);
}
}
/// Increment the version of a VID.
/// @param expectedOld If not 0xff, the caller asserts the current version should be this value.
/// If TiKV already holds (expectedOld+1)&0x7f, treat as success (another node did the same increment).
Expand Down
55 changes: 55 additions & 0 deletions AnnService/inc/Core/Common/TiKVVersionMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,61 @@ namespace SPTAG
else if (oldVal != 0xfe && version == 0xfe) m_deleted++;
}

// Group writes by chunk: 1 ReadChunk + N byte-modifications + 1 WriteChunk
// per chunk, instead of N × (ReadChunk + WriteChunk). Bypasses the LRU
// cache because runs that exercise this path always have
// VersionCacheMaxChunks=0; reading TiKV directly removes a layer of
// bookkeeping (cache invalidate-on-write) we no longer benefit from.
void SetVersionBatch(const std::vector<SizeType>& vids, const std::vector<uint8_t>& versions) override
{
size_t n = std::min(vids.size(), versions.size());
if (n == 0) return;
const SizeType localCount = m_count.load();

// Group (idx into vids/versions) by chunk id.
std::unordered_map<SizeType, std::vector<size_t>> byChunk;
byChunk.reserve(n);
for (size_t i = 0; i < n; i++) {
SizeType vid = vids[i];
if (vid < 0 || vid >= localCount) continue;
byChunk[ChunkId(vid)].push_back(i);
}
if (byChunk.empty()) return;

long deletedDelta = 0;
for (auto& kv : byChunk) {
SizeType cid = kv.first;
auto& idxs = kv.second;
std::lock_guard<std::mutex> lock(ChunkMutex(cid));
std::string chunk = ReadChunk(cid);
if (chunk.empty()) {
chunk.assign(m_chunkSize, static_cast<char>(0xff));
}
bool dirty = false;
for (size_t i : idxs) {
SizeType vid = vids[i];
uint8_t newVal = versions[i];
int offset = ChunkOffset(vid);
if (offset < 0 || offset >= (int)chunk.size()) continue;
uint8_t oldVal = static_cast<uint8_t>(chunk[offset]);
if (oldVal == newVal) continue;
if (oldVal == 0xfe && newVal != 0xfe) deletedDelta--;
else if (oldVal != 0xfe && newVal == 0xfe) deletedDelta++;
chunk[offset] = static_cast<char>(newVal);
dirty = true;
}
if (dirty) {
auto ret = WriteChunk(cid, chunk);
if (ret != ErrorCode::Success) {
SPTAGLIB_LOG(Helper::LogLevel::LL_Error,
"TiKVVersionMap::SetVersionBatch: WriteChunk failed chunk=%d layer=%d\n",
cid, m_layer);
}
}
}
if (deletedDelta != 0) m_deleted += deletedDelta;
}

bool IncVersion(const SizeType& key, uint8_t* newVersion, uint8_t expectedOld = 0xff) override
{
if (key < 0 || key >= m_count.load()) {
Expand Down
93 changes: 93 additions & 0 deletions AnnService/inc/Core/SPANN/Distributed/ConsistentHashRing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#pragma once

#include "inc/Core/Common.h"
#include <cstdint>
#include <map>
#include <set>

namespace SPTAG::SPANN {

/// Consistent hash ring for distributing headIDs across compute nodes.
/// Uses virtual nodes (vnodes) for balanced distribution.
/// When nodes are added/removed, only ~1/N of keys are remapped.
class ConsistentHashRing {
public:
explicit ConsistentHashRing(int vnodeCount = 150)
: m_vnodeCount(vnodeCount) {}

/// Add a physical node to the ring with its virtual nodes.
void AddNode(int nodeIndex) {
for (int i = 0; i < m_vnodeCount; i++) {
uint32_t h = HashVNode(nodeIndex, i);
m_ring[h] = nodeIndex;
}
m_nodes.insert(nodeIndex);
}

/// Remove a physical node and all its virtual nodes from the ring.
void RemoveNode(int nodeIndex) {
for (int i = 0; i < m_vnodeCount; i++) {
uint32_t h = HashVNode(nodeIndex, i);
m_ring.erase(h);
}
m_nodes.erase(nodeIndex);
}

/// Find the owner node for a given key (headID).
/// Returns -1 if the ring is empty.
int GetOwner(SizeType headID) const {
if (m_ring.empty()) return -1;
uint32_t h = HashKey(headID);
auto it = m_ring.lower_bound(h);
if (it == m_ring.end()) it = m_ring.begin();
return it->second;
}

bool Empty() const { return m_ring.empty(); }
size_t NodeCount() const { return m_nodes.size(); }
bool HasNode(int nodeIndex) const { return m_nodes.count(nodeIndex) > 0; }
const std::set<int>& GetNodes() const { return m_nodes; }
int GetVNodeCount() const { return m_vnodeCount; }

private:
static uint32_t HashKey(SizeType headID) {
uint32_t hash = 2166136261u; // FNV-1a offset basis
uint32_t val = static_cast<uint32_t>(headID);
for (int i = 0; i < 4; i++) {
hash ^= (val >> (i * 8)) & 0xFF;
hash *= 16777619u; // FNV prime
}
return hash;
}

static uint32_t HashVNode(int nodeIndex, int vnodeIdx) {
// Raw FNV-1a on tiny nodeIndex (1, 2, 3) produces a
// pathologically biased ring (71.9% vs 28.1% for nodes 1/2 with
// 150 vnodes). Pre-mix nodeIndex through Knuth's golden-ratio
// multiplier so small node IDs become full-spectrum uint32 values
// before they hit FNV's accumulator. Validated to give ≈50/50
// for K=2 and stay within ±15% of even split for K up to 8.
uint32_t saltedVnode =
static_cast<uint32_t>(vnodeIdx) ^
(static_cast<uint32_t>(nodeIndex) * 2654435761u);
uint32_t hash = 2166136261u;
auto mix = [&](uint32_t v) {
for (int i = 0; i < 4; i++) {
hash ^= (v >> (i * 8)) & 0xFF;
hash *= 16777619u;
}
};
mix(saltedVnode);
mix(static_cast<uint32_t>(nodeIndex));
return hash;
}

int m_vnodeCount;
std::map<uint32_t, int> m_ring; // hash position → nodeIndex
std::set<int> m_nodes; // active physical node indices
};

} // namespace SPTAG::SPANN
Loading