Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions src/data/managers/rbac-cache-version-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
const BaseManager = require('./base-manager')
const models = require('../models')
const RbacCacheVersion = models.RbacCacheVersion
const logger = require('../../logger')

class RbacCacheVersionManager extends BaseManager {
getEntity () {
Expand All @@ -33,22 +34,95 @@ class RbacCacheVersionManager extends BaseManager {
return cacheVersion.version || 0
}

_getModelOptions (transaction) {
return transaction && transaction.fakeTransaction
? {}
: { transaction: transaction }
}

_extractAffectedRows (updateResult) {
if (Array.isArray(updateResult)) {
return Number(updateResult[0] || 0)
}
return Number(updateResult || 0)
}

_isUniqueConstraintError (error) {
return error && error.name === 'SequelizeUniqueConstraintError'
}

_isVersionOverflowError (error) {
if (!error || error.name !== 'SequelizeDatabaseError') {
return false
}
const message = (error.message || '').toLowerCase()
return message.includes('out of range for type bigint') ||
message.includes('bigint value is out of range') ||
message.includes('integer overflow') ||
message.includes('numeric value out of range')
}

async _incrementVersionAtomic (transaction) {
return this.getEntity().update(
{ version: models.Sequelize.literal('version + 1') },
{
where: { id: 1 },
...this._getModelOptions(transaction)
}
)
}

async _resetVersionToOne (transaction) {
const updateResult = await this.getEntity().update(
{ version: 1 },
{
where: { id: 1 },
...this._getModelOptions(transaction)
}
)

if (this._extractAffectedRows(updateResult) === 0) {
try {
await this.create({ id: 1, version: 1 }, transaction)
} catch (error) {
if (!this._isUniqueConstraintError(error)) {
throw error
}
}
}
}

/**
* Increment cache version
* This should be called whenever any RBAC resource (Role, RoleBinding, ServiceAccount) is modified
* @param {Object} transaction - Database transaction
* @returns {Promise<void>}
*/
async incrementVersion (transaction) {
const cacheVersion = await this.findOne({ id: 1 }, transaction)
try {
const updateResult = await this._incrementVersionAtomic(transaction)
const affectedRows = this._extractAffectedRows(updateResult)

if (cacheVersion) {
// Update existing version
const newVersion = (cacheVersion.version || 0) + 1
await this.update({ id: 1 }, { version: newVersion }, transaction)
} else {
// Create initial version if it doesn't exist
await this.create({ id: 1, version: 1 }, transaction)
if (affectedRows === 0) {
try {
await this.create({ id: 1, version: 1 }, transaction)
} catch (error) {
if (this._isUniqueConstraintError(error)) {
await this._incrementVersionAtomic(transaction)
} else {
throw error
}
}
}
} catch (error) {
if (!this._isVersionOverflowError(error)) {
throw error
}

logger.warn(`RBAC cache version overflow detected. Resetting version to 1. Error: ${error.message}`)
await this._resetVersionToOne(transaction)
await this._incrementVersionAtomic(transaction)
logger.info('RBAC cache version reset and increment completed successfully')
}
}

Expand Down
Loading