diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index c0adb33e5b2a9..946ef9e73e665 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -33,6 +33,7 @@ import org.apache.ignite.util.GridCommandHandlerPropertiesTest; import org.apache.ignite.util.GridCommandHandlerScheduleIndexRebuildTest; import org.apache.ignite.util.GridCommandHandlerTracingConfigurationTest; +import org.apache.ignite.util.IdleVerifyCheckWithWriteThroughTest; import org.apache.ignite.util.IdleVerifyDumpTest; import org.apache.ignite.util.MetricCommandTest; import org.apache.ignite.util.PerformanceStatisticsCommandTest; @@ -76,7 +77,8 @@ SecurityCommandHandlerPermissionsTest.class, - IdleVerifyDumpTest.class + IdleVerifyDumpTest.class, + IdleVerifyCheckWithWriteThroughTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java new file mode 100644 index 0000000000000..eb1d050a4a34b --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/IdleVerifyCheckWithWriteThroughTest.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; + +/** */ +public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClusterPerMethodAbstractTest { + /** Node kill trigger. */ + private static CountDownLatch nodeKillLatch; + + /** Node left on backup. */ + private static CountDownLatch nodeLeftRegisteredOnBackup; + + /** */ + @Parameterized.Parameter(1) + public Boolean withPersistence; + + /** */ + @Parameterized.Parameter(2) + public TransactionConcurrency conc; + + /** */ + private static final String CORRECT_VERIFY_MSG = "The check procedure has finished, no conflicts have been found."; + + /** */ + @Parameterized.Parameters(name = "cmdHnd={0}, withPersistence={1}, concMode={2}") + public static Collection parameters() { + return cartesianProduct( + List.of(CLI_CMD_HND), + List.of(true, false), + List.of(OPTIMISTIC, PESSIMISTIC) + ); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + persistenceEnable(withPersistence); + + if (withPersistence) + cleanPersistenceDir(); + + nodeKillLatch = new CountDownLatch(1); + nodeLeftRegisteredOnBackup = new CountDownLatch(1); + + MapCacheStore.salvagedLatch = new CountDownLatch(1); + MapCacheStore.txCoordStoreLatch = new CountDownLatch(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + try { + for (Ignite node : G.allGrids()) { + Collection txs = ((IgniteEx)node).context().cache().context().tm().activeTransactions(); + + assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty()); + } + } + finally { + stopAllGrids(); + + super.afterTest(); + } + } + + /** {@inheritDoc} */ + @Override protected boolean persistenceEnable() { + return withPersistence; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCommunicationSpi(new TestRecordingCommunicationSpi()); + } + + /** Test scenario: + * + * + * @see IgniteTxManager#salvageTx(IgniteInternalTx) + */ + @Test + public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exception { + // sequential start is important here + IgniteEx nodeCoord = startGrid(0); + // near node + IgniteEx nodePrimary = startGrid(1); + // backup node + IgniteEx nodeBackup = startGrid(2); + + int firstVal = 0; + int secondVal = 1; + + nodeCoord.cluster().state(ClusterState.ACTIVE); + + CacheConfiguration ccfgWithWriteThrough = configureCache(DEFAULT_CACHE_NAME); + IgniteCache cache = nodeCoord.createCache(ccfgWithWriteThrough); + + Integer primaryKey = primaryKey(nodePrimary.cache(DEFAULT_CACHE_NAME)); + + try (Transaction tx = nodeCoord.transactions().txStart()) { + cache.put(primaryKey, firstVal); + + tx.commit(); + } + + nodeCoord.cluster().state(ClusterState.INACTIVE); + + GridMessageListener lsnr = new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (msg instanceof GridNearTxPrepareResponse) { + IgniteTxManager txMgr = nodeBackup.context().cache().context().tm(); + Collection txs = txMgr.activeTransactions(); + + assertEquals(1, txs.size()); + IgniteInternalTx idleTx = txs.iterator().next(); + assertFalse(idleTx.local()); + + Map activeTx = GridTestUtils.getFieldValue(txMgr, "idMap"); + assertEquals(1, activeTx.size()); + + nodeKillLatch.countDown(); + + U.awaitQuiet(nodeLeftRegisteredOnBackup); + + // let`s wait until all discovery events have been processed on backup node. + doSleep(1000); + + MapCacheStore.txCoordStoreLatch.countDown(); + } + } + }; + + nodeCoord.context().io().removeMessageListener(GridTopic.TOPIC_CACHE); // Remove old cache listener. + nodeCoord.context().io().addMessageListener(GridTopic.TOPIC_CACHE, lsnr); // Register as first listener. + nodeCoord.context().cache().context().io().start0(); // Register cache listener again. + + nodeCoord.cluster().state(ClusterState.ACTIVE); + + nodeCoord.context().event().addDiscoveryEventListener(new BeforeRecoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); + nodeBackup.context().event().addDiscoveryEventListener(new BeforeBackupRecoveryListener(), EVT_NODE_FAILED, EVT_NODE_LEFT); + + IgniteInternalFuture stopFut = GridTestUtils.runAsync(() -> { + nodeKillLatch.await(); + nodePrimary.close(); + }); + + injectTestSystemOut(); + + try (Transaction tx = nodeCoord.transactions().txStart(conc, READ_COMMITTED)) { + cache.put(primaryKey, secondVal); + + tx.commit(); + } + catch (Throwable th) { + fail("Unexpected exception: " + th); + } + + stopFut.get(getTestTimeout()); + + awaitPartitionMapExchange(); + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + + String out = testOut.toString(); + + // partVerHash can be different + if (withPersistence) { + Assert.assertThat(out, anyOf(is(containsString("updateCntr=[lwm=2, missed=[], hwm=2], " + + "partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + Assert.assertThat(out, anyOf(is(containsString("updateCntr=[lwm=2, missed=[], hwm=2], " + + "partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + } + else { + Assert.assertThat(out, anyOf(is(containsString("consistentId=gridCommandHandlerTest0, " + + "updateCntr=1, partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + Assert.assertThat(out, anyOf(is(containsString("consistentId=gridCommandHandlerTest2, " + + "updateCntr=1, partitionState=OWNING, size=1")), is(containsString(CORRECT_VERIFY_MSG)))); + } + testOut.reset(); + + if (withPersistence) { + stopAllGrids(); + startGridsMultiThreaded(3); + + awaitPartitionMapExchange(true, true, null); + + assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify")); + out = testOut.toString(); + + Pattern regexCorrectCheck = Pattern.compile(CORRECT_VERIFY_MSG); + boolean correctOut = regexCorrectCheck.matcher(out).find(); + + // partVerHash are different, thus only regex check here + String regexCheck = "Partition instances: \\[PartitionHashRecord" + + ".*?consistentId=%s, updateCntr=\\[lwm=2, missed=\\[\\], hwm=2\\], partitionState=OWNING, size=1"; + Pattern part0Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest0")); + Pattern part1Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest1")); + Pattern part2Pattern = Pattern.compile(String.format(regexCheck, "gridCommandHandlerTest2")); + + boolean matches = + part0Pattern.matcher(out).find() && + part1Pattern.matcher(out).find() && + part2Pattern.matcher(out).find(); + + assertTrue(out, matches || correctOut); + } + } + + /** */ + private CacheConfiguration configureCache(String cacheName) { + CacheConfiguration ccfg = new CacheConfiguration<>(cacheName); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setCacheMode(CacheMode.REPLICATED); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setQueryEntities(List.of(queryEntity(cacheName))); + + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory(MapCacheStore::new); + + return ccfg; + } + + /** */ + private QueryEntity queryEntity(String cacheName) { + var entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Integer.class.getName()); + entity.addQueryField("ID", Integer.class.getName(), null); + entity.addQueryField("VAL", Integer.class.getName(), null); + entity.setKeyFieldName("ID"); + entity.setValueFieldName("VAL"); + entity.setTableName(cacheName); + + return entity; + } + + /** */ + public static class MapCacheStore extends CacheStoreAdapter { + /** Store map. */ + private static final Map map = new ConcurrentHashMap<>(); + + /** */ + static CountDownLatch salvagedLatch; + + /** */ + static CountDownLatch txCoordStoreLatch; + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + for (Map.Entry e : map.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + Object val = map.get(key); + + salvagedLatch.countDown(); + + return val; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry e) { + map.put(e.getKey(), e.getValue()); + + txCoordStoreLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + map.remove(key); + } + } + + /** */ + private static class BeforeRecoveryListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + U.awaitQuiet(MapCacheStore.txCoordStoreLatch); + } + + /** {@inheritDoc} */ + @Override public int order() { + return -1; + } + } + + /** */ + private static class BeforeBackupRecoveryListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + nodeLeftRegisteredOnBackup.countDown(); + } + + /** {@inheritDoc} */ + @Override public int order() { + return -1; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index f1f7883ea1961..ac71bcb57e3f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -96,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest; @@ -571,6 +572,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(StatisticsRequest.class); withNoSchema(StatisticsResponse.class); withNoSchema(CacheContinuousQueryBatchAck.class); + withNoSchema(GridDhtTxSalvageMessage.class); withSchema(CacheContinuousQueryEntry.class); // [11200 - 11300]: Compute, distributed process messages. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 98c580203c210..a2befc5d99987 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -473,7 +473,7 @@ private boolean finish(boolean commit, if (isNull(cctx.discovery().getAlive(n.id()))) { log.error("Unable to send message (node left topology): " + n); - fut.onNodeLeft(); + fut.onNodeLeft(n.id()); } else { cctx.tm().sendTransactionMessage(n, req, tx, tx.ioPolicy()); @@ -493,7 +493,7 @@ private boolean finish(boolean commit, catch (IgniteCheckedException e) { // Fail the whole thing. if (e instanceof ClusterTopologyCheckedException) - fut.onNodeLeft(); + fut.onNodeLeft(n.id()); else { if (msgLog.isDebugEnabled()) { msgLog.debug("DHT finish fut, failed to send request dht [txId=" + tx.nearXidVersion() + @@ -687,6 +687,40 @@ void onResult(Throwable e) { onDone(e); } + /** */ + private void onNodeLeft(UUID nodeId) { + // Cause in common case #onNodeLeft() completes with no error it`s necessary to send salvage message. + if (tx.storeWriteThrough()) { + Map> txNodes = tx.transactionNodes(); + + if (txNodes != null) { + Collection backups = txNodes.get(nodeId); + + if (!F.isEmpty(backups)) { + GridDhtTxSalvageMessage salvageReq = null; + + for (UUID backupId : backups) { + ClusterNode backup = cctx.discovery().node(backupId); + + if (backup != null && !backup.isLocal()) { + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.nearXidVersion()); + + try { + cctx.io().send(backup, salvageReq, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send " + GridDhtTxSalvageMessage.class.getName() + " message.", e); + } + } + } + } + } + } + + onNodeLeft(); + } + /** */ void onNodeLeft() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 93ec7a05543e4..67b32d38f86b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -460,8 +460,12 @@ else if (!lockFut.isDone()) { logTxFinishErrorSafe(log, commit, e); // Treat heuristic exception as critical. - if (X.hasCause(e, IgniteTxHeuristicCheckedException.class)) + if (X.hasCause(e, IgniteTxHeuristicCheckedException.class)) { + if (storeWriteThrough() && local()) + salvageTx(); + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } err = e; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index e60041e34ed06..c2ad54b41b910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import javax.cache.processor.EntryProcessor; @@ -226,12 +226,7 @@ public GridDhtTxRemote( /** {@inheritDoc} */ @Override public Collection masterNodeIds() { - Collection res = new ArrayList<>(2); - - res.add(nearNodeId); - res.add(nodeId); - - return res; + return List.of(nearNodeId, nodeId); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java new file mode 100644 index 0000000000000..ae2d0715cb829 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxSalvageMessage.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** Salvage tx. */ +public class GridDhtTxSalvageMessage extends GridCacheMessage { + /** */ + @Order(0) + GridCacheVersion ver; + + /** Empty constructor. */ + public GridDhtTxSalvageMessage() { + // No-op. + } + + /** + * @param ver Global transaction identifier within cluster, assigned by transaction coordinator. + */ + public GridDhtTxSalvageMessage(GridCacheVersion ver) { + this.ver = ver; + } + + /** Tx version. */ + public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 733f9b0adc462..a7ef3531a0cba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -272,6 +272,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private final Map fullMsgs = new ConcurrentHashMap<>(); /** */ + @SuppressWarnings("unused") @GridToStringInclude private volatile IgniteInternalFuture partReleaseFut; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index 1c565177cbd51..34ab379e525ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -1194,7 +1194,9 @@ public void clearDeferredDeletes() { while (true) { long state = this.state.get(); - assert getPartState(state) != EVICTED; + GridDhtPartitionState partState = getPartState(state); + + assert partState != EVICTED : partState; if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1))) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 307bdad29e444..6e8c24979a956 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.tracing.MTC; @@ -983,6 +984,7 @@ ClusterNode primary() { }); GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true); + GridDhtTxSalvageMessage salvageReq = null; for (UUID backupId : backups) { ClusterNode backup = cctx.discovery().node(backupId); @@ -996,6 +998,13 @@ ClusterNode primary() { else { try { cctx.io().send(backup, req, tx.ioPolicy()); + + if (tx.storeWriteThrough()) { + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.xidVersion()); + + cctx.io().send(backup, salvageReq, tx.ioPolicy()); + } } catch (ClusterTopologyCheckedException ignored) { mini.onNodeLeft(backupId, discoThread); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index b175df09a09bc..115fc73f4c42f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -964,6 +964,11 @@ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOp lsnr.onSessionStart(locSes); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); + + throw e; + } catch (Exception e) { throw new IgniteCheckedException("Failed to start store session: " + e, e); } @@ -984,6 +989,14 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws store.sessionEnd(!threwEx); } } + catch (RuntimeException e) { + U.error(log, "Exception raised during the notification of cache store session listeners: ", e); + + if (!threwEx) + throw U.cast(e); + else + throw e; + } catch (Exception e) { if (!threwEx) throw U.cast(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 193eb2a4e8ae0..1529d53b5ce8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -628,7 +628,7 @@ protected void uncommit() { /** * @return Finalization status. */ - @Override @Nullable public FinalizationStatus finalizationStatus() { + @Override public FinalizationStatus finalizationStatus() { return finalizing; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 10f38a3d105e1..805f2be4135b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; @@ -223,6 +224,9 @@ public IgniteTxHandler(GridCacheSharedContext ctx) { ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) -> processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg)); + ctx.io().addCacheHandler(GridDhtTxSalvageMessage.class, (UUID nodeId, GridCacheMessage msg) -> + processDhtTxSalvageRequest((GridDhtTxSalvageMessage)msg)); + ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, (UUID nodeId, GridCacheMessage msg) -> processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg)); @@ -1341,6 +1345,17 @@ private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId, } } + /** + * @param req Rrequest. + */ + private void processDhtTxSalvageRequest(GridDhtTxSalvageMessage req) { + for (IgniteInternalTx active : ctx.tm().activeTransactions()) { + if (active.nearXidVersion().equals(req.version()) && active instanceof GridDhtTxRemote) { + ctx.tm().salvageTx(active); + } + } + } + /** * @param nodeId Node ID. * @param req Request. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index cd2704daf9007..e0834206f9a3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -38,12 +38,14 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.failure.FailureContext; @@ -80,6 +82,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxSalvageMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.TransactionAttributesAwareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; @@ -3123,7 +3126,26 @@ private TxRecoveryInitRunnable(ClusterNode node) { ", failedNodeId=" + evtNodeId + ']'); for (final IgniteInternalTx tx : activeTransactions()) { - if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId)) + Map> txNodes = tx.transactionNodes(); + + if (tx.storeWriteThrough() && txNodes != null + && tx.near() && txNodes.containsKey(evtNodeId) + && (tx.state() == PREPARING || tx.state() == PREPARED + || tx.state() == COMMITTING || tx.state() == COMMITTED)) { + // Send a message, tx is applied on near node and can be processed on backup if postponed. + sendTxSalvage(tx, evtNodeId); + } + + Supplier fullSyncedOp = () -> tx.writeEntries().stream().map(e -> + cctx.cacheContext(e.cacheId())).allMatch(GridCacheContext::syncCommit); + + if (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId) && tx.eventNodeId().equals(evtNodeId)) + salvageTx(tx, RECOVERY_FINISH); + else if (tx.storeWriteThrough() && !tx.masterNodeIds().contains(cctx.localNodeId()) + && tx.nodeId().equals(evtNodeId) && tx.state() == PREPARED && fullSyncedOp.get()) { + // Delay a commit, on backup. It will be raised further after near or coord. node will confirm it. + } + else if ((tx.near() && !tx.local() && tx.originatingNodeId().equals(evtNodeId)) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { // Invalidate transactions. salvageTx(tx, RECOVERY_FINISH); @@ -3166,6 +3188,41 @@ else if (tx.state() == MARKED_ROLLBACK || tx.setRollbackOnly()) } } + /** + * Salvage progress can be postponed in special case when {@link CacheConfiguration#setWriteThrough} is enabled + * and current node is backup. It required for eliminate situaltions when backup node read data before primary will + * continue with transaction commit. + * + * @see CacheConfiguration#setWriteThrough + */ + private void sendTxSalvage(IgniteInternalTx tx, UUID evtNodeId) { + Collection involvedNodes = tx.transactionNodes().get(evtNodeId); + + if (involvedNodes != null) { + GridDhtTxSalvageMessage salvageReq = null; + + for (UUID nodeId : involvedNodes) { + if (tx.masterNodeIds().contains(nodeId)) + continue; + + ClusterNode involvedNode = cctx.discovery().node(nodeId); + + if (involvedNode != null && !involvedNode.isLocal()) { + if (salvageReq == null) + salvageReq = new GridDhtTxSalvageMessage(tx.nearXidVersion()); + + try { + cctx.io().send(nodeId, salvageReq, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + log.warning("Failed to send salvage message [failedNodeId=" + evtNodeId + + ", nodeId=" + nodeId + ']', e); + } + } + } + } + } + /** * @param tx Tx. * @param failedNode Failed node. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java index 60248017ee116..d1826f7cc5a64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -33,7 +33,7 @@ */ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState { /** Active cache IDs. */ - private GridIntList activeCacheIds = new GridIntList(); + private final GridIntList activeCacheIds = new GridIntList(); /** {@inheritDoc} */ @Override public boolean implicitSingle() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java index 409cffab11aef..7556efe6fafa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java @@ -40,11 +40,11 @@ public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter { /** Read set. */ @GridToStringInclude - protected Map readMap; + protected final Map readMap; /** Write map. */ @GridToStringInclude - protected Map writeMap; + protected final Map writeMap; /** * @param readMap Read map. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java index 7d05a9bde65ad..b6ddfeef0c7d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.TreeMap; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.Cache; @@ -37,7 +38,10 @@ import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -93,6 +97,20 @@ public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends Gri assertTrue(grid(SRVS + i).configuration().isClientMode()); } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (Ignite node : G.allGrids()) { + IgniteTxManager tm = ((IgniteEx)node).context().cache().context().tm(); + + ConcurrentMap uncommited = + GridTestUtils.getFieldValue(tm, IgniteTxManager.class, "uncommitedSalvageTx"); + + assertTrue("Unfinished uncommited [node=" + node.name() + ", size=" + uncommited.size() + ']', uncommited.isEmpty()); + } + + super.afterTest(); + } + /** * @throws Exception If failed. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java index 937b51cd4dee7..1babfd6b61759 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.integration.CacheLoaderException; @@ -36,12 +37,14 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -86,9 +89,14 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { try { for (Ignite node : G.allGrids()) { - Collection txs = ((IgniteKernal)node).context().cache().context().tm().activeTransactions(); + IgniteTxManager tm = ((IgniteEx)node).context().cache().context().tm(); + Collection txs = tm.activeTransactions(); + + ConcurrentMap uncommited = + GridTestUtils.getFieldValue(tm, IgniteTxManager.class, "uncommitedSalvageTx"); assertTrue("Unfinished txs [node=" + node.name() + ", txs=" + txs + ']', txs.isEmpty()); + assertTrue("Unfinished uncommited [node=" + node.name() + ']', uncommited.isEmpty()); } } finally { @@ -379,7 +387,7 @@ private void txWithStore(final TransactionConcurrency concurrency, boolean write final IgniteCache clientCache = client.cache(DEFAULT_CACHE_NAME); IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { - @Override public Void call() throws Exception { + @Override public Void call() { log.info("Start put"); clientCache.put(key, 2); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index 8b158292d524f..2c345f773e103 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -22,13 +22,13 @@ import java.util.concurrent.Callable; import javax.cache.Cache; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; -import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.indexing.IndexingQueryFilter; @@ -36,11 +36,14 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionHeuristicException; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.junit.Test; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + /** * Indexing Spi transactional query test */ @@ -75,7 +78,7 @@ public void testIndexingSpiWithTxClient() throws Exception { assertNotNull(client.cache(DEFAULT_CACHE_NAME)); - doTestIndexingSpiWithTx(client, 0); + doTestIndexingSpiWithTx(client, 0, false); } /** */ @@ -83,7 +86,7 @@ public void testIndexingSpiWithTxClient() throws Exception { public void testIndexingSpiWithTxLocal() throws Exception { IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME); - doTestIndexingSpiWithTx(ignite, 0); + doTestIndexingSpiWithTx(ignite, 0, true); } /** */ @@ -91,13 +94,13 @@ public void testIndexingSpiWithTxLocal() throws Exception { public void testIndexingSpiWithTxNotLocal() throws Exception { IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME); - doTestIndexingSpiWithTx(ignite, 1); + doTestIndexingSpiWithTx(ignite, 1, false); } /** * @throws Exception If failed. */ - private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception { + private void doTestIndexingSpiWithTx(IgniteEx ignite, int key, boolean heuristic) throws Exception { final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); final IgniteTransactions txs = ignite.transactions(); @@ -107,7 +110,7 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception System.out.println("Run in transaction: " + concurrency + " " + isolation); GridTestUtils.assertThrowsWithCause(new Callable() { - @Override public Void call() throws Exception { + @Override public Void call() { Transaction tx; try (Transaction tx0 = tx = txs.txStart(concurrency, isolation)) { @@ -120,9 +123,20 @@ private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception return null; } - }, IgniteTxHeuristicCheckedException.class); - - checkFutures(); + }, heuristic ? TransactionHeuristicException.class : IgniteException.class); + + // Cause asynced salvage in action + assertTrue( + waitForCondition(() -> { + try { + checkFutures(); + return true; + } + catch (AssertionError err) { + return false; + } + }, 5_000) + ); } } }