From b4e74186f9bdd64d6625349aa7a2684286f78eb2 Mon Sep 17 00:00:00 2001 From: zstan Date: Wed, 28 Jan 2026 08:47:53 +0300 Subject: [PATCH 01/17] IGNITE-27678 Same partitions on different nodes can hold different updates if writeThrough is enabled --- .../IgniteControlUtilityTestSuite2.java | 4 +- .../IdleVerifyCheckWithWriteThroughTest.java | 452 ++++++++++++++++++ .../communication/GridIoMessageFactory.java | 3 + .../dht/GridDhtTxFinishFuture.java | 36 +- .../distributed/dht/GridDhtTxRemote.java | 9 +- .../dht/GridDhtTxSalvageMessage.java | 56 +++ .../GridDhtPartitionsExchangeFuture.java | 1 + .../dht/topology/GridDhtLocalPartition.java | 4 +- .../near/GridNearTxFinishFuture.java | 7 + .../store/GridCacheStoreManagerAdapter.java | 13 + .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 18 + .../cache/transactions/IgniteTxManager.java | 54 ++- .../IgniteTxRemoteStateAdapter.java | 2 +- .../transactions/IgniteTxRemoteStateImpl.java | 4 +- ...SynchronizationModesMultithreadedTest.java | 18 + .../IgniteCacheTxRecoveryRollbackTest.java | 14 +- 17 files changed, 679 insertions(+), 18 deletions(-) create mode 100644 modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index c0adb33e5b2a9..946ef9e73e665 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -33,6 +33,7 @@ import org.apache.ignite.util.GridCommandHandlerPropertiesTest; import org.apache.ignite.util.GridCommandHandlerScheduleIndexRebuildTest; import org.apache.ignite.util.GridCommandHandlerTracingConfigurationTest; +import org.apache.ignite.util.IdleVerifyCheckWithWriteThroughTest; import org.apache.ignite.util.IdleVerifyDumpTest; import org.apache.ignite.util.MetricCommandTest; import org.apache.ignite.util.PerformanceStatisticsCommandTest; @@ -76,7 +77,8 @@ SecurityCommandHandlerPermissionsTest.class, - IdleVerifyDumpTest.class + IdleVerifyDumpTest.class, + IdleVerifyCheckWithWriteThroughTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java new file mode 100644 index 0000000000000..f1d54bc0227d9 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; + +/** */ +public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClusterPerMethodAbstractTest { + /** Node kill trigger. */ + private static CountDownLatch nodeKillLatch; + + /** Node left on backup. */ + private static CountDownLatch nodeLeftRegisteredOnBackup; + + /** */ + @Parameterized.Parameter(1) + public Boolean withPersistence; + + /** */ + @Parameterized.Parameter(2) + public TransactionConcurrency conc; + + /** */ + private static final String CORRECT_VERIFY_MSG = "The check procedure has finished, no conflicts have been found."; + + /** */ + @Parameterized.Parameters(name = "cmdHnd={0}, withPersistence={1}, concMode={2}") + public static Collection parameters() { + return cartesianProduct( + List.of(CLI_CMD_HND), + List.of(true, false), + List.of(OPTIMISTIC, PESSIMISTIC) + ); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + persistenceEnable(withPersistence); + + if (withPersistence) + cleanPersistenceDir(); + + nodeKillLatch = new CountDownLatch(1); + nodeLeftRegisteredOnBackup = new CountDownLatch(1); + + MapCacheStore.salvagedLatch = new CountDownLatch(1); + MapCacheStore.txCoordStoreLatch = new CountDownLatch(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + try { + for (Ignite node : G.allGrids()) { + Collection txs = ((IgniteEx)node).context().cache().context().tm().activeTransactions(); + + assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty()); + } + } + finally { + stopAllGrids(); + + super.afterTest(); + } + } + + /** {@inheritDoc} */ + @Override protected boolean persistenceEnable() { + return withPersistence; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCommunicationSpi(new TestRecordingCommunicationSpi()); + } + + /** Test scenario: + *
    + *
  • Start 3 nodes [node0, node1, node2].
  • + *
  • Initialize put operation into transactional cache where [node1] holds primary partition for such insertion.
  • + *
  • Kill [node1] right after tx PREPARE stage is completed (it triggers tx recovery procedure).
  • + *
+ * + * @see IgniteTxManager#salvageTx(IgniteInternalTx) + */ + @Test + public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exception { + // sequential start is important here + IgniteEx nodeCoord = startGrid(0); + // near node + IgniteEx nodePrimary = startGrid(1); + // backup node + IgniteEx nodeBackup = startGrid(2); + + int firstVal = 0; + int secondVal = 1; + + nodeCoord.cluster().state(ClusterState.ACTIVE); + + CacheConfiguration ccfgWithWriteThrough = createCache(DEFAULT_CACHE_NAME, true); + IgniteCache cache = nodeCoord.createCache(ccfgWithWriteThrough); + + Integer primaryKey = primaryKey(nodePrimary.cache(DEFAULT_CACHE_NAME)); + + try (Transaction tx = nodeCoord.transactions().txStart()) { + cache.put(primaryKey, firstVal); + + tx.commit(); + } + + sqlVisibilityCheck(List.of(nodeCoord, nodeBackup), primaryKey, firstVal); + + nodeCoord.cluster().state(ClusterState.INACTIVE); + + GridMessageListener lsnr = new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof GridNearTxPrepareResponse) { + IgniteTxManager txMgr = nodeBackup.context().cache().context().tm(); + Collection txs = txMgr.activeTransactions(); + + assertEquals(1, txs.size()); + IgniteInternalTx idleTx = txs.iterator().next(); + assertFalse(idleTx.local()); + + Map activeTx = GridTestUtils.getFieldValue(txMgr, "idMap"); + assertEquals(1, activeTx.size()); + + nodeKillLatch.countDown(); + + U.awaitQuiet(nodeLeftRegisteredOnBackup); + + // let`s wait until all discovery events have been processed on backup node. + doSleep(1000); + + MapCacheStore.txCoordStoreLatch.countDown(); + } + } + }; + + nodeCoord.context().io().removeMessageListener(GridTopic.TOPIC_CACHE); // Remove old cache listener. + nodeCoord.context().io().addMessageListener(GridTopic.TOPIC_CACHE, lsnr); // Register as first listener. + nodeCoord.context().cache().context().io().start0(); // Register cache listener again. + + nodeCoord.cluster().state(ClusterState.ACTIVE); + + nodeCoord.context().event().addDiscoveryEventListener(new BeforeRecoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); + nodeBackup.context().event().addDiscoveryEventListener(new BeforeBackupRecoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); + + IgniteInternalFuture stopFut = GridTestUtils.runAsync(() -> { + nodeKillLatch.await(); + nodePrimary.close(); + }); + + injectTestSystemOut(); + + try (Transaction tx = nodeCoord.transactions().txStart(conc, READ_COMMITTED)) { + cache.put(primaryKey, secondVal); + + tx.commit(); + } + catch (Throwable th) { + fail("Unexpected exception: " + th); + } + + stopFut.get(getTestTimeout()); + + awaitPartitionMapExchange(); + + sqlVisibilityCheck(List.of(nodeCoord, nodeBackup), primaryKey, secondVal); + + int cacheSize = cache.size(); + assertEquals(1, cacheSize); + + int locSize = -1; + Object peeked = null; + + for (Ignite g : G.allGrids()) { + final Affinity aff = affinity(g.cache(DEFAULT_CACHE_NAME)); + + boolean primary = aff.isPrimary(g.cluster().localNode(), primaryKey); + + Object peekedCurr; + + if (primary) + peekedCurr = g.cache(DEFAULT_CACHE_NAME).localPeek(primaryKey, CachePeekMode.PRIMARY); + else + peekedCurr = g.cache(DEFAULT_CACHE_NAME).localPeek(primaryKey, CachePeekMode.BACKUP); + + if (peeked == null) + peeked = peekedCurr; + else { + assertEquals(peeked, peekedCurr); + assertNotNull(peeked); + } + + int locSizeCurr; + + if (primary) + locSizeCurr = g.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY); + else + locSizeCurr = g.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.BACKUP); + + if (locSize == -1) + locSize = locSizeCurr; + else { + assertEquals(locSize, locSizeCurr); + } + + assertNotNull("grid instance: " + g.name(), g.cache(DEFAULT_CACHE_NAME).get(primaryKey)); + } + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + + String out = testOut.toString(); + + // partVerHash can be different + if (withPersistence) { + Assert.assertThat(out, anyOf(is(containsString("updateCntr=[lwm=2, missed=[], hwm=2], " + + "partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + Assert.assertThat(out, anyOf(is(containsString("updateCntr=[lwm=2, missed=[], hwm=2], " + + "partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + } + else { + Assert.assertThat(out, anyOf(is(containsString("consistentId=gridCommandHandlerTest0, " + + "updateCntr=1, partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + Assert.assertThat(out, anyOf(is(containsString("consistentId=gridCommandHandlerTest2, " + + "updateCntr=1, partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + } + testOut.reset(); + + if (withPersistence) { + stopAllGrids(); + startGridsMultiThreaded(3); + + awaitPartitionMapExchange(true, true, null); + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + out = testOut.toString(); + + Pattern regexCorrectCheck = Pattern.compile(CORRECT_VERIFY_MSG); + boolean correctOut = regexCorrectCheck.matcher(out).find(); + + // partVerHash are different, thus only regex check here + String regexCheck = "Partition instances: \\[PartitionHashRecord" + + ".*?consistentId=%s, updateCntr=\\[lwm=2, missed=\\[\\], hwm=2\\], partitionState=OWNING, size=1"; + Pattern part0Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest0")); + Pattern part1Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest1")); + Pattern part2Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest2")); + + boolean matches = + part0Pattern.matcher(out).find() && + part1Pattern.matcher(out).find() && + part2Pattern.matcher(out).find(); + + assertTrue(out, matches || correctOut); + } + } + + /** */ + private void sqlVisibilityCheck(List nodes, int keyToCheck, int referal) { + for (Ignite node : nodes) { + Object ret = node.compute(node.cluster().forLocal()).call(new IgniteCallable<>() { + /** */ + @SuppressWarnings({"UnusedDeclaration"}) + @IgniteInstanceResource + private Ignite instance; + + /** */ + @Override public Integer call() { + String selectSql = "SELECT VAL FROM " + DEFAULT_CACHE_NAME + " WHERE ID=" + keyToCheck; + + List> res = instance.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery(selectSql)).getAll(); + + return (int)res.get(0).get(0); + } + }); + + assertEquals("Unexpected result on node: " + node.name(), referal, ret); + } + } + + /** */ + private CacheConfiguration createCache(String cacheName, boolean writeThrough) { + CacheConfiguration ccfg = new CacheConfiguration<>(cacheName); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setCacheMode(CacheMode.REPLICATED); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setQueryEntities(List.of(queryEntity(cacheName))); + + if (writeThrough) { + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory(MapCacheStore::new); + } + + return ccfg; + } + + /** */ + private QueryEntity queryEntity(String cacheName) { + var entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Integer.class.getName()); + entity.addQueryField("ID", Integer.class.getName(), null); + entity.addQueryField("VAL", Integer.class.getName(), null); + entity.setKeyFieldName("ID"); + entity.setValueFieldName("VAL"); + entity.setTableName(cacheName); + + return entity; + } + + /** */ + public static class MapCacheStore extends CacheStoreAdapter { + /** Store map. */ + private static final Map map = new ConcurrentHashMap<>(); + + /** */ + static CountDownLatch salvagedLatch; + + /** */ + static CountDownLatch txCoordStoreLatch; + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + for (Map.Entry e : map.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + Object val = map.get(key); + + salvagedLatch.countDown(); + + return val; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry e) { + map.put(e.getKey(), e.getValue()); + + txCoordStoreLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + map.remove(key); + } + } + + /** */ + private static class BeforeRecoveryListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + U.awaitQuiet(MapCacheStore.txCoordStoreLatch); + } + + /** {@inheritDoc} */ + @Override public int order() { + return -1; + } + } + + /** */ + private static class BeforeBackupRecoveryListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + nodeLeftRegisteredOnBackup.countDown(); + } + + /** {@inheritDoc} */ + @Override public int order() { + return -1; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b284cab5ddac1..c9f23c40bafe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -125,6 +125,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequestSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponseSerializer; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessageSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequestSerializer; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; @@ -489,6 +491,7 @@ public GridIoMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register(116, GridNearSingleGetRequest::new, new GridNearSingleGetRequestSerializer()); factory.register(117, GridNearSingleGetResponse::new, new GridNearSingleGetResponseSerializer()); factory.register(118, CacheContinuousQueryBatchAck::new, new CacheContinuousQueryBatchAckSerializer()); + factory.register(119, GridDhtTxSalvageMessage::new, new GridDhtTxSalvageMessageSerializer()); // [120..123] - DR factory.register(125, GridNearAtomicSingleUpdateRequest::new, new GridNearAtomicSingleUpdateRequestSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 98c580203c210..51fef4ac6f634 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -473,7 +473,7 @@ private boolean finish(boolean commit, if (isNull(cctx.discovery().getAlive(n.id()))) { log.error("Unable to send message (node left topology): " + n); - fut.onNodeLeft(); + fut.onNodeLeft(n.id()); } else { cctx.tm().sendTransactionMessage(n, req, tx, tx.ioPolicy()); @@ -687,6 +687,40 @@ void onResult(Throwable e) { onDone(e); } + /** */ + private void onNodeLeft(UUID nodeId) { + // Cause in common case #onNodeLeft() completes with no error it`s necessary to send salvage message. + if (tx.storeWriteThrough()) { + Map> txNodes = tx.transactionNodes(); + + if (txNodes != null) { + Collection backups = txNodes.get(nodeId); + + if (!F.isEmpty(backups)) { + GridDhtTxSalvageMessage salvageReq = null; + + for (UUID backupId : backups) { + ClusterNode backup = cctx.discovery().node(backupId); + + if (!backup.isLocal()) { + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.nearXidVersion()); + + try { + cctx.io().send(backup, salvageReq, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send " + GridDhtTxSalvageMessage.class.getName() + " message.", e); + } + } + } + } + } + } + + onNodeLeft(); + } + /** */ void onNodeLeft() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index e60041e34ed06..c2ad54b41b910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import javax.cache.processor.EntryProcessor; @@ -226,12 +226,7 @@ public GridDhtTxRemote( /** {@inheritDoc} */ @Override public Collection masterNodeIds() { - Collection res = new ArrayList<>(2); - - res.add(nearNodeId); - res.add(nodeId); - - return res; + return List.of(nearNodeId, nodeId); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java new file mode 100644 index 0000000000000..cee9ea6e3bf15 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** Salvage tx. */ +public class GridDhtTxSalvageMessage extends GridCacheMessage { + /** */ + @Order(0) + GridCacheVersion ver; + + /** Empty constructor. */ + public GridDhtTxSalvageMessage() { + // No-op. + } + + /** + * @param ver Global transaction identifier within cluster, assigned by transaction coordinator. + */ + public GridDhtTxSalvageMessage(GridCacheVersion ver) { + this.ver = ver; + } + + /** Tx version. */ + public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** */ + @Override public short directType() { + return 119; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index c269af4923eb7..04464ca09f5a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -272,6 +272,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private final Map fullMsgs = new ConcurrentHashMap<>(); /** */ + @SuppressWarnings("unused") @GridToStringInclude private volatile IgniteInternalFuture partReleaseFut; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index 7ae4141ad5be5..6bb2d4f31e0f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -1198,7 +1198,9 @@ public void clearDeferredDeletes() { while (true) { long state = this.state.get(); - assert getPartState(state) != EVICTED; + GridDhtPartitionState partState = getPartState(state); + + assert partState != EVICTED : partState; if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1))) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 307bdad29e444..9f427b7f1f02e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.tracing.MTC; @@ -983,6 +984,7 @@ ClusterNode primary() { }); GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true); + GridDhtTxSalvageMessage salvageReq = null; for (UUID backupId : backups) { ClusterNode backup = cctx.discovery().node(backupId); @@ -996,6 +998,11 @@ ClusterNode primary() { else { try { cctx.io().send(backup, req, tx.ioPolicy()); + + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.xidVersion()); + + cctx.io().send(backup, salvageReq, tx.ioPolicy()); } catch (ClusterTopologyCheckedException ignored) { mini.onNodeLeft(backupId, discoThread); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 6221f9682dcd2..f63ea1b07fe92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -964,6 +964,11 @@ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOp lsnr.onSessionStart(locSes); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); + + throw e; + } catch (Exception e) { throw new IgniteCheckedException("Failed to start store session: " + e, e); } @@ -984,6 +989,14 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws store.sessionEnd(!threwEx); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); + + if (!threwEx) + throw U.cast(e); + else + throw e; + } catch (Exception e) { if (!threwEx) throw U.cast(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 193eb2a4e8ae0..1529d53b5ce8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -628,7 +628,7 @@ protected void uncommit() { /** * @return Finalization status. */ - @Override @Nullable public FinalizationStatus finalizationStatus() { + @Override public FinalizationStatus finalizationStatus() { return finalizing; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 10f38a3d105e1..71544ea6fceb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; @@ -223,6 +224,9 @@ public IgniteTxHandler(GridCacheSharedContext ctx) { ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) -> processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg)); + ctx.io().addCacheHandler(GridDhtTxSalvageMessage.class, (UUID nodeId, GridCacheMessage msg) -> + processDhtTxSalvageRequest((GridDhtTxSalvageMessage)msg)); + ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, (UUID nodeId, GridCacheMessage msg) -> processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg)); @@ -1341,6 +1345,20 @@ private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId, } } + /** + * @param req Rrequest. + */ + private void processDhtTxSalvageRequest(GridDhtTxSalvageMessage req) { + for (IgniteInternalTx active : ctx.tm().activeTransactions()) { + if (active.nearXidVersion().equals(req.version())) { + // GridDhtTxLocal possible + if (active instanceof GridDhtTxRemote) { + ctx.tm().salvageTx(active); + } + } + } + } + /** * @param nodeId Node ID. * @param req Request. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 9990cb67a0f95..474b5da2347fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -44,6 +44,7 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.failure.FailureContext; @@ -80,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; @@ -3137,7 +3139,26 @@ private TxRecoveryInitRunnable(ClusterNode node) { ", failedNodeId=" + evtNodeId + ']'); for (final IgniteInternalTx tx : activeTransactions()) { - if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId)) + Map> txNodes = tx.transactionNodes(); + + if (tx.storeWriteThrough() && txNodes != null + && tx.near() && txNodes.containsKey(evtNodeId) + && (tx.state() == PREPARING || tx.state() == PREPARED + || tx.state() == COMMITTING || tx.state() == COMMITTED)) { + // Send a message, tx is applied on near node and can be processed on backup if postponed. + sendTxSalvage(tx, evtNodeId); + } + + if (tx.storeWriteThrough() && !tx.masterNodeIds().contains(cctx.localNodeId()) + && tx.nodeId().equals(evtNodeId) && tx.state() == PREPARED) { + boolean fullSyncedOp = tx.writeEntries().stream().map(e -> + cctx.cacheContext(e.cacheId())).allMatch(GridCacheContext::syncCommit); + + if (!fullSyncedOp) + salvageTx(tx, RECOVERY_FINISH); + // Delay a commit, on backup. It will be raised further after near or coord. node will confirm it. + } + else if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId)) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { // Invalidate transactions. salvageTx(tx, RECOVERY_FINISH); @@ -3180,6 +3201,37 @@ else if (tx.state() == MARKED_ROLLBACK || tx.setRollbackOnly()) } } + /** + * Salvage progress can be postponed in special case when {@link CacheConfiguration#setWriteThrough} is enabled + * and current node is backup. It required for eliminate situaltions when backup node read data before primary will + * continue with transaction commit. + * + * @see CacheConfiguration#setWriteThrough + */ + private void sendTxSalvage(IgniteInternalTx tx, UUID evtNodeId) { + Collection involvedNodes = tx.transactionNodes().get(evtNodeId); + + if (involvedNodes != null) { + GridDhtTxSalvageMessage salvageReq = null; + + for (UUID nodeId : involvedNodes) { + if (tx.masterNodeIds().contains(nodeId)) + continue; + + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.nearXidVersion()); + + try { + cctx.io().send(nodeId, salvageReq, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + log.warning("Failed to send salvage message [failedNodeId=" + evtNodeId + + ", nodeId=" + nodeId + ']', e); + } + } + } + } + /** * @param tx Tx. * @param failedNode Failed node. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java index 60248017ee116..d1826f7cc5a64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -33,7 +33,7 @@ */ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState { /** Active cache IDs. */ - private GridIntList activeCacheIds = new GridIntList(); + private final GridIntList activeCacheIds = new GridIntList(); /** {@inheritDoc} */ @Override public boolean implicitSingle() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java index 409cffab11aef..7556efe6fafa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java @@ -40,11 +40,11 @@ public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter { /** Read set. */ @GridToStringInclude - protected Map readMap; + protected final Map readMap; /** Write map. */ @GridToStringInclude - protected Map writeMap; + protected final Map writeMap; /** * @param readMap Read map. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java index 7d05a9bde65ad..b6ddfeef0c7d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.TreeMap; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.Cache; @@ -37,7 +38,10 @@ import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -93,6 +97,20 @@ public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends Gri assertTrue(grid(SRVS + i).configuration().isClientMode()); } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (Ignite node : G.allGrids()) { + IgniteTxManager tm = ((IgniteEx)node).context().cache().context().tm(); + + ConcurrentMap uncommited = + GridTestUtils.getFieldValue(tm, IgniteTxManager.class, "uncommitedSalvageTx"); + + assertTrue("Unfinished uncommited [node=" + node.name() + ", size=" + uncommited.size() + ']', uncommited.isEmpty()); + } + + super.afterTest(); + } + /** * @throws Exception If failed. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java index 937b51cd4dee7..1babfd6b61759 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.integration.CacheLoaderException; @@ -36,12 +37,14 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -86,9 +89,14 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { try { for (Ignite node : G.allGrids()) { - Collection txs = ((IgniteKernal)node).context().cache().context().tm().activeTransactions(); + IgniteTxManager tm = ((IgniteEx)node).context().cache().context().tm(); + Collection txs = tm.activeTransactions(); + + ConcurrentMap uncommited = + GridTestUtils.getFieldValue(tm, IgniteTxManager.class, "uncommitedSalvageTx"); assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty()); + assertTrue("Unfinished uncommited [node=" + node.name() + ']', uncommited.isEmpty()); } } finally { @@ -379,7 +387,7 @@ private void txWithStore(final TransactionConcurrency concurrency, boolean write final IgniteCache clientCache = client.cache(DEFAULT_CACHE_NAME); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { - @Override public Void call() throws Exception { + @Override public Void call() { log.info("Start put"); clientCache.put(key, 2); From fb9de7f0fc0921215b593805cb4eab0451ddd20d Mon Sep 17 00:00:00 2001 From: zstan Date: Sun, 5 Apr 2026 13:32:08 +0300 Subject: [PATCH 02/17] fix --- .../cache/distributed/near/GridNearTxFinishFuture.java | 8 +++++--- .../processors/cache/transactions/IgniteTxHandler.java | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 9f427b7f1f02e..6e8c24979a956 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -999,10 +999,12 @@ ClusterNode primary() { try { cctx.io().send(backup, req, tx.ioPolicy()); - if (salvageReq == null) - salvageReq = new GridDhtTxSalvageMessage(tx.xidVersion()); + if (tx.storeWriteThrough()) { + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.xidVersion()); - cctx.io().send(backup, salvageReq, tx.ioPolicy()); + cctx.io().send(backup, salvageReq, tx.ioPolicy()); + } } catch (ClusterTopologyCheckedException ignored) { mini.onNodeLeft(backupId, discoThread); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 71544ea6fceb1..1013201946386 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1354,6 +1354,7 @@ private void processDhtTxSalvageRequest(GridDhtTxSalvageMessage req) { // GridDhtTxLocal possible if (active instanceof GridDhtTxRemote) { ctx.tm().salvageTx(active); + break; } } } From 0585650bf49666ac093a3ca539e33865e9beb174 Mon Sep 17 00:00:00 2001 From: zstan Date: Sun, 5 Apr 2026 18:37:11 +0300 Subject: [PATCH 03/17] no break + event --- .../processors/cache/transactions/IgniteTxHandler.java | 1 - .../processors/cache/transactions/IgniteTxManager.java | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 1013201946386..71544ea6fceb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1354,7 +1354,6 @@ private void processDhtTxSalvageRequest(GridDhtTxSalvageMessage req) { // GridDhtTxLocal possible if (active instanceof GridDhtTxRemote) { ctx.tm().salvageTx(active); - break; } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 474b5da2347fd..8384200251475 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -3140,6 +3140,7 @@ private TxRecoveryInitRunnable(ClusterNode node) { for (final IgniteInternalTx tx : activeTransactions()) { Map> txNodes = tx.transactionNodes(); + boolean localInMaster = tx.masterNodeIds().contains(cctx.localNodeId()); if (tx.storeWriteThrough() && txNodes != null && tx.near() && txNodes.containsKey(evtNodeId) @@ -3149,7 +3150,11 @@ private TxRecoveryInitRunnable(ClusterNode node) { sendTxSalvage(tx, evtNodeId); } - if (tx.storeWriteThrough() && !tx.masterNodeIds().contains(cctx.localNodeId()) + if (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId)) { + if (localInMaster || tx.eventNodeId().equals(evtNodeId)) + salvageTx(tx, RECOVERY_FINISH); + } + else if (tx.storeWriteThrough() && !localInMaster && tx.nodeId().equals(evtNodeId) && tx.state() == PREPARED) { boolean fullSyncedOp = tx.writeEntries().stream().map(e -> cctx.cacheContext(e.cacheId())).allMatch(GridCacheContext::syncCommit); From da2af74dc3568f70aca1b2ede05208b626d0f8ab Mon Sep 17 00:00:00 2001 From: zstan Date: Sun, 5 Apr 2026 18:45:57 +0300 Subject: [PATCH 04/17] fix npe --- .../processors/cache/distributed/dht/GridDhtTxFinishFuture.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 51fef4ac6f634..53123251ab6fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -702,7 +702,7 @@ private void onNodeLeft(UUID nodeId) { for (UUID backupId : backups) { ClusterNode backup = cctx.discovery().node(backupId); - if (!backup.isLocal()) { + if (backup != null && !backup.isLocal()) { if (salvageReq == null) salvageReq = new GridDhtTxSalvageMessage(tx.nearXidVersion()); From 8651672823438c11bc26d4a148ffada2ee3eed9d Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 6 Apr 2026 09:10:15 +0300 Subject: [PATCH 05/17] tx manager --- .../cache/transactions/IgniteTxManager.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 8384200251475..6aeeabacf8058 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -38,6 +38,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; @@ -3140,7 +3141,6 @@ private TxRecoveryInitRunnable(ClusterNode node) { for (final IgniteInternalTx tx : activeTransactions()) { Map> txNodes = tx.transactionNodes(); - boolean localInMaster = tx.masterNodeIds().contains(cctx.localNodeId()); if (tx.storeWriteThrough() && txNodes != null && tx.near() && txNodes.containsKey(evtNodeId) @@ -3150,17 +3150,13 @@ private TxRecoveryInitRunnable(ClusterNode node) { sendTxSalvage(tx, evtNodeId); } - if (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId)) { - if (localInMaster || tx.eventNodeId().equals(evtNodeId)) - salvageTx(tx, RECOVERY_FINISH); - } - else if (tx.storeWriteThrough() && !localInMaster - && tx.nodeId().equals(evtNodeId) && tx.state() == PREPARED) { - boolean fullSyncedOp = tx.writeEntries().stream().map(e -> - cctx.cacheContext(e.cacheId())).allMatch(GridCacheContext::syncCommit); + Supplier fullSyncedOp = () -> tx.writeEntries().stream().map(e -> + cctx.cacheContext(e.cacheId())).allMatch(GridCacheContext::syncCommit); - if (!fullSyncedOp) - salvageTx(tx, RECOVERY_FINISH); + if (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId) && tx.eventNodeId().equals(evtNodeId)) + salvageTx(tx, RECOVERY_FINISH); + else if (tx.storeWriteThrough() && !tx.masterNodeIds().contains(cctx.localNodeId()) + && tx.nodeId().equals(evtNodeId) && tx.state() == PREPARED && fullSyncedOp.get()) { // Delay a commit, on backup. It will be raised further after near or coord. node will confirm it. } else if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId)) From b47be919082e8b87ad64f47bf590586856ac1a67 Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 6 Apr 2026 12:16:44 +0300 Subject: [PATCH 06/17] ligthter tests --- .../IdleVerifyCheckWithWriteThroughTest.java | 48 ++----------------- 1 file changed, 5 insertions(+), 43 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java index f1d54bc0227d9..d56a091f62b73 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -29,10 +29,8 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.cluster.ClusterState; @@ -67,6 +65,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; @@ -240,47 +239,10 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc awaitPartitionMapExchange(); - sqlVisibilityCheck(List.of(nodeCoord, nodeBackup), primaryKey, secondVal); - - int cacheSize = cache.size(); - assertEquals(1, cacheSize); - - int locSize = -1; - Object peeked = null; - - for (Ignite g : G.allGrids()) { - final Affinity aff = affinity(g.cache(DEFAULT_CACHE_NAME)); - - boolean primary = aff.isPrimary(g.cluster().localNode(), primaryKey); - - Object peekedCurr; - - if (primary) - peekedCurr = g.cache(DEFAULT_CACHE_NAME).localPeek(primaryKey, CachePeekMode.PRIMARY); - else - peekedCurr = g.cache(DEFAULT_CACHE_NAME).localPeek(primaryKey, CachePeekMode.BACKUP); - - if (peeked == null) - peeked = peekedCurr; - else { - assertEquals(peeked, peekedCurr); - assertNotNull(peeked); - } - - int locSizeCurr; - - if (primary) - locSizeCurr = g.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.PRIMARY); - else - locSizeCurr = g.cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.BACKUP); - - if (locSize == -1) - locSize = locSizeCurr; - else { - assertEquals(locSize, locSizeCurr); - } - - assertNotNull("grid instance: " + g.name(), g.cache(DEFAULT_CACHE_NAME).get(primaryKey)); + for (int nodeIdx : List.of(0, 2)) { + IgniteEx g = grid(nodeIdx); + IgniteCache cacheInner = g.cache(DEFAULT_CACHE_NAME); + waitForCondition(() -> secondVal == (int)cacheInner.get(primaryKey), 1_000); } assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); From d2bb726d2795838ee98e94fd5c58f50417e224f2 Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 6 Apr 2026 14:39:55 +0300 Subject: [PATCH 07/17] fix --- .../IdleVerifyCheckWithWriteThroughTest.java | 12 +++++------ .../cache/transactions/IgniteTxManager.java | 20 +++++++++++-------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java index d56a091f62b73..39cb60ddd4d72 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -239,12 +239,6 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc awaitPartitionMapExchange(); - for (int nodeIdx : List.of(0, 2)) { - IgniteEx g = grid(nodeIdx); - IgniteCache cacheInner = g.cache(DEFAULT_CACHE_NAME); - waitForCondition(() -> secondVal == (int)cacheInner.get(primaryKey), 1_000); - } - assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); String out = testOut.toString(); @@ -264,6 +258,12 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc } testOut.reset(); + for (int nodeIdx : List.of(0, 2)) { + IgniteEx g = grid(nodeIdx); + IgniteCache cacheInner = g.cache(DEFAULT_CACHE_NAME); + waitForCondition(() -> secondVal == (int)cacheInner.get(primaryKey), 1_000); + } + if (withPersistence) { stopAllGrids(); startGridsMultiThreaded(3); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 6aeeabacf8058..639c52e463554 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -3219,15 +3219,19 @@ private void sendTxSalvage(IgniteInternalTx tx, UUID evtNodeId) { if (tx.masterNodeIds().contains(nodeId)) continue; - if (salvageReq == null) - salvageReq = new GridDhtTxSalvageMessage(tx.nearXidVersion()); + ClusterNode involvedNode = cctx.discovery().node(nodeId); - try { - cctx.io().send(nodeId, salvageReq, tx.ioPolicy()); - } - catch (IgniteCheckedException e) { - log.warning("Failed to send salvage message [failedNodeId=" + evtNodeId + - ", nodeId=" + nodeId + ']', e); + if (involvedNode != null && !involvedNode.isLocal()) { + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.nearXidVersion()); + + try { + cctx.io().send(nodeId, salvageReq, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + log.warning("Failed to send salvage message [failedNodeId=" + evtNodeId + + ", nodeId=" + nodeId + ']', e); + } } } } From ef55749bfa028ef9170e3d5d9e3436c0cd1736de Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 9 Apr 2026 18:05:38 +0300 Subject: [PATCH 08/17] last failed test fix --- .../processors/cache/distributed/dht/GridDhtTxLocal.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 93ec7a05543e4..67b32d38f86b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -460,8 +460,12 @@ else if (!lockFut.isDone()) { logTxFinishErrorSafe(log, commit, e); // Treat heuristic exception as critical. - if (X.hasCause(e, IgniteTxHeuristicCheckedException.class)) + if (X.hasCause(e, IgniteTxHeuristicCheckedException.class)) { + if (storeWriteThrough() && local()) + salvageTx(); + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } err = e; } From 911999a3561ed8706213d899ae95380ea6e5ad33 Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 10 Apr 2026 14:08:59 +0300 Subject: [PATCH 09/17] IndexingSpiQueryTxSelfTest fix --- .../query/IndexingSpiQueryTxSelfTest.java | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index 8b158292d524f..968c5b600cfe0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -22,13 +22,13 @@ import java.util.concurrent.Callable; import javax.cache.Cache; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; -import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.indexing.IndexingQueryFilter; @@ -36,11 +36,14 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionHeuristicException; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.junit.Test; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + /** * Indexing Spi transactional query test */ @@ -75,7 +78,7 @@ public void testIndexingSpiWithTxClient() throws Exception { assertNotNull(client.cache(DEFAULT_CACHE_NAME)); - doTestIndexingSpiWithTx(client, 0); + doTestIndexingSpiWithTx(client, 0, false); } /** */ @@ -83,7 +86,7 @@ public void testIndexingSpiWithTxClient() throws Exception { public void testIndexingSpiWithTxLocal() throws Exception { IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME); - doTestIndexingSpiWithTx(ignite, 0); + doTestIndexingSpiWithTx(ignite, 0, true); } /** */ @@ -91,13 +94,13 @@ public void testIndexingSpiWithTxLocal() throws Exception { public void testIndexingSpiWithTxNotLocal() throws Exception { IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME); - doTestIndexingSpiWithTx(ignite, 1); + doTestIndexingSpiWithTx(ignite, 1, false); } /** * @throws Exception If failed. */ - private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception { + private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic) throws Exception { final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); final IgniteTransactions txs = ignite.transactions(); @@ -107,7 +110,7 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception System.out.println("Run in transaction: " + concurrency + " " + isolation); GridTestUtils.assertThrowsWithCause(new Callable() { - @Override public Void call() throws Exception { + @Override public Void call() { Transaction tx; try (Transaction tx0 = tx = txs.txStart(concurrency, isolation)) { @@ -120,9 +123,19 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception return null; } - }, IgniteTxHeuristicCheckedException.class); - - checkFutures(); + }, heuristic ? TransactionHeuristicException.class : IgniteException.class); + + assertTrue( + waitForCondition(() -> { + try { + checkFutures(); + return true; + } + catch (AssertionError err) { + return false; + } + }, 5_000) + ); } } } From 71150a2e7954209fa7b709366032ba4c1b6d08a0 Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 10 Apr 2026 15:29:25 +0300 Subject: [PATCH 10/17] code style --- .../query/IndexingSpiQueryTxSelfTest.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index 968c5b600cfe0..e1fb2719ca942 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -126,15 +126,16 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic }, heuristic ? TransactionHeuristicException.class : IgniteException.class); assertTrue( - waitForCondition(() -> { - try { - checkFutures(); - return true; - } - catch (AssertionError err) { - return false; - } - }, 5_000) + waitForCondition(() -> + { + try { + checkFutures(); + return true; + } + catch (AssertionError err) { + return false; + } + }, 5_000) ); } } From 3941b806456c0ac201108eb6bfbb85e240424969 Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 10 Apr 2026 15:36:18 +0300 Subject: [PATCH 11/17] code style --- .../processors/cache/query/IndexingSpiQueryTxSelfTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index e1fb2719ca942..cbbd7704fdfbc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -126,8 +126,7 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic }, heuristic ? TransactionHeuristicException.class : IgniteException.class); assertTrue( - waitForCondition(() -> - { + waitForCondition(() -> { try { checkFutures(); return true; From ce3028e808963442136919326fd914e2efa141e7 Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 15 May 2026 18:15:04 +0300 Subject: [PATCH 12/17] improve test --- .../IdleVerifyCheckWithWriteThroughTest.java | 47 ++----------------- .../ignite/internal/CoreMessagesProvider.java | 2 + .../dht/GridDhtTxSalvageMessage.java | 5 -- .../query/IndexingSpiQueryTxSelfTest.java | 6 ++- 4 files changed, 11 insertions(+), 49 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java index 39cb60ddd4d72..eb1d050a4a34b 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -31,7 +31,6 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; @@ -52,8 +51,6 @@ import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.lang.IgniteCallable; -import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -65,7 +62,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct; -import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; @@ -170,7 +166,7 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc nodeCoord.cluster().state(ClusterState.ACTIVE); - CacheConfiguration ccfgWithWriteThrough = createCache(DEFAULT_CACHE_NAME, true); + CacheConfiguration ccfgWithWriteThrough = configureCache(DEFAULT_CACHE_NAME); IgniteCache cache = nodeCoord.createCache(ccfgWithWriteThrough); Integer primaryKey = primaryKey(nodePrimary.cache(DEFAULT_CACHE_NAME)); @@ -181,8 +177,6 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc tx.commit(); } - sqlVisibilityCheck(List.of(nodeCoord, nodeBackup), primaryKey, firstVal); - nodeCoord.cluster().state(ClusterState.INACTIVE); GridMessageListener lsnr = new GridMessageListener() { @@ -258,12 +252,6 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc } testOut.reset(); - for (int nodeIdx : List.of(0, 2)) { - IgniteEx g = grid(nodeIdx); - IgniteCache cacheInner = g.cache(DEFAULT_CACHE_NAME); - waitForCondition(() -> secondVal == (int)cacheInner.get(primaryKey), 1_000); - } - if (withPersistence) { stopAllGrids(); startGridsMultiThreaded(3); @@ -293,41 +281,16 @@ public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exc } /** */ - private void sqlVisibilityCheck(List nodes, int keyToCheck, int referal) { - for (Ignite node : nodes) { - Object ret = node.compute(node.cluster().forLocal()).call(new IgniteCallable<>() { - /** */ - @SuppressWarnings({"UnusedDeclaration"}) - @IgniteInstanceResource - private Ignite instance; - - /** */ - @Override public Integer call() { - String selectSql = "SELECT VAL FROM " + DEFAULT_CACHE_NAME + " WHERE ID=" + keyToCheck; - - List> res = instance.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery(selectSql)).getAll(); - - return (int)res.get(0).get(0); - } - }); - - assertEquals("Unexpected result on node: " + node.name(), referal, ret); - } - } - - /** */ - private CacheConfiguration createCache(String cacheName, boolean writeThrough) { + private CacheConfiguration configureCache(String cacheName) { CacheConfiguration ccfg = new CacheConfiguration<>(cacheName); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); ccfg.setCacheMode(CacheMode.REPLICATED); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); ccfg.setQueryEntities(List.of(queryEntity(cacheName))); - if (writeThrough) { - ccfg.setReadThrough(true); - ccfg.setWriteThrough(true); - ccfg.setCacheStoreFactory(MapCacheStore::new); - } + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory(MapCacheStore::new); return ccfg; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index f1f7883ea1961..ac71bcb57e3f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -96,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest; @@ -571,6 +572,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(StatisticsRequest.class); withNoSchema(StatisticsResponse.class); withNoSchema(CacheContinuousQueryBatchAck.class); + withNoSchema(GridDhtTxSalvageMessage.class); withSchema(CacheContinuousQueryEntry.class); // [11200 - 11300]: Compute, distributed process messages. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java index cee9ea6e3bf15..ae2d0715cb829 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java @@ -48,9 +48,4 @@ public GridCacheVersion version() { @Override public boolean addDeploymentInfo() { return addDepInfo; } - - /** */ - @Override public short directType() { - return 119; - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index cbbd7704fdfbc..3ac57d95fe693 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -125,7 +125,9 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic } }, heuristic ? TransactionHeuristicException.class : IgniteException.class); - assertTrue( + checkFutures(); + +/* assertTrue( waitForCondition(() -> { try { checkFutures(); @@ -135,7 +137,7 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic return false; } }, 5_000) - ); + );*/ } } } From d22297981534df3a90e9f32b0346cb50c00d8ad8 Mon Sep 17 00:00:00 2001 From: zstan Date: Fri, 15 May 2026 18:15:23 +0300 Subject: [PATCH 13/17] debug needs --- .../processors/cache/distributed/dht/GridDhtTxLocal.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 67b32d38f86b9..d9a1388d77109 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -461,8 +461,8 @@ else if (!lockFut.isDone()) { // Treat heuristic exception as critical. if (X.hasCause(e, IgniteTxHeuristicCheckedException.class)) { - if (storeWriteThrough() && local()) - salvageTx(); +/* if (storeWriteThrough() && local()) + salvageTx();*/ cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); } From 6d6231c1445eae19027f5a90048850d2984d1f1a Mon Sep 17 00:00:00 2001 From: zstan Date: Mon, 18 May 2026 08:38:43 +0300 Subject: [PATCH 14/17] fix --- .../processors/cache/query/IndexingSpiQueryTxSelfTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index 3ac57d95fe693..cbbd7704fdfbc 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -125,9 +125,7 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic } }, heuristic ? TransactionHeuristicException.class : IgniteException.class); - checkFutures(); - -/* assertTrue( + assertTrue( waitForCondition(() -> { try { checkFutures(); @@ -137,7 +135,7 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic return false; } }, 5_000) - );*/ + ); } } } From 88ef5cd8c389e335cfbd983b991a11ca142697bf Mon Sep 17 00:00:00 2001 From: zstan Date: Wed, 20 May 2026 08:38:44 +0300 Subject: [PATCH 15/17] copilot issue --- .../cache/distributed/dht/GridDhtTxFinishFuture.java | 2 +- .../processors/cache/distributed/dht/GridDhtTxLocal.java | 4 ++-- .../processors/cache/query/IndexingSpiQueryTxSelfTest.java | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 53123251ab6fd..a2befc5d99987 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -493,7 +493,7 @@ private boolean finish(boolean commit, catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft(); + fut.onNodeLeft(n.id()); else { if (msgLog.isDebugEnabled()) { msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index d9a1388d77109..67b32d38f86b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -461,8 +461,8 @@ else if (!lockFut.isDone()) { // Treat heuristic exception as critical. if (X.hasCause(e, IgniteTxHeuristicCheckedException.class)) { -/* if (storeWriteThrough() && local()) - salvageTx();*/ + if (storeWriteThrough() && local()) + salvageTx(); cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index cbbd7704fdfbc..2c345f773e103 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -125,6 +125,7 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic } }, heuristic ? TransactionHeuristicException.class : IgniteException.class); + // Cause asynced salvage in action assertTrue( waitForCondition(() -> { try { From b626bc79e1ea02c1bbfeab27d1e8b1f87304b72a Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 21 May 2026 09:31:50 +0300 Subject: [PATCH 16/17] minor fix --- .../processors/cache/transactions/IgniteTxHandler.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 71544ea6fceb1..df7aed0a34ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1350,11 +1350,8 @@ private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId, */ private void processDhtTxSalvageRequest(GridDhtTxSalvageMessage req) { for (IgniteInternalTx active : ctx.tm().activeTransactions()) { - if (active.nearXidVersion().equals(req.version())) { - // GridDhtTxLocal possible - if (active instanceof GridDhtTxRemote) { + if (active.nearXidVersion().equals(req.version()) && active instanceof GridDhtTxRemote) { ctx.tm().salvageTx(active); - } } } } From 74775e866fc04960c86a98d0e779acf7ff1d97de Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 21 May 2026 15:10:07 +0300 Subject: [PATCH 17/17] style --- .../internal/processors/cache/transactions/IgniteTxHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index df7aed0a34ef7..805f2be4135b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1351,7 +1351,7 @@ private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId, private void processDhtTxSalvageRequest(GridDhtTxSalvageMessage req) { for (IgniteInternalTx active : ctx.tm().activeTransactions()) { if (active.nearXidVersion().equals(req.version()) && active instanceof GridDhtTxRemote) { - ctx.tm().salvageTx(active); + ctx.tm().salvageTx(active); } } }