-
Notifications
You must be signed in to change notification settings - Fork 7
[NAE-2448] Migration for NAE v7 phase 2 #451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
renczesstefan
wants to merge
7
commits into
release/7.0.0-rev10
Choose a base branch
from
NAE-2448
base: release/7.0.0-rev10
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
1e6fcf4
Introduce migration tests for `nae_2432` case handling
renczesstefan 6f0b4ad
Replace `getNewestVersionByIdentifier` with `getDefaultVersionByIdent…
renczesstefan 3a21556
[NAE-2446] Migration for NAE v7
renczesstefan 06dd0f4
[NAE-2446] Migration for NAE v7
renczesstefan 4bca04f
[NAE-2446] Migration for NAE v7
renczesstefan f02edd9
[NAE-2446] Migration for NAE v7
renczesstefan 3b9c60e
[NAE-2448] Migration for NAE v7 phase 2
renczesstefan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
608 changes: 608 additions & 0 deletions
608
...on-engine/src/main/groovy/com/netgrif/application/engine/migration/MigrationHelper.groovy
Large diffs are not rendered by default.
Oops, something went wrong.
375 changes: 375 additions & 0 deletions
375
...in/groovy/com/netgrif/application/engine/migration/helpers/AbstractMigrationHelper.groovy
Large diffs are not rendered by default.
Oops, something went wrong.
470 changes: 470 additions & 0 deletions
470
...c/main/groovy/com/netgrif/application/engine/migration/helpers/CaseMigrationHelper.groovy
Large diffs are not rendered by default.
Oops, something went wrong.
536 changes: 536 additions & 0 deletions
536
...in/groovy/com/netgrif/application/engine/migration/helpers/PetriNetMigrationHelper.groovy
Large diffs are not rendered by default.
Oops, something went wrong.
281 changes: 281 additions & 0 deletions
281
...c/main/groovy/com/netgrif/application/engine/migration/helpers/TaskMigrationHelper.groovy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,281 @@ | ||
| package com.netgrif.application.engine.migration.helpers | ||
|
|
||
| import com.netgrif.application.engine.adapter.spring.workflow.domain.QTask | ||
| import com.netgrif.application.engine.configuration.properties.MigrationProperties | ||
| import com.netgrif.application.engine.elastic.service.interfaces.IElasticTaskMappingService | ||
| import com.netgrif.application.engine.elastic.service.interfaces.IElasticTaskService | ||
| import com.netgrif.application.engine.migration.model.MigrationErrorPolicy | ||
| import com.netgrif.application.engine.objects.petrinet.domain.PetriNet | ||
| import com.netgrif.application.engine.objects.petrinet.domain.Transition | ||
| import com.netgrif.application.engine.objects.petrinet.domain.roles.ProcessRole | ||
| import com.netgrif.application.engine.objects.workflow.domain.Case | ||
| import com.netgrif.application.engine.objects.workflow.domain.Task | ||
| import com.netgrif.application.engine.objects.workflow.domain.TaskPair | ||
| import com.netgrif.application.engine.petrinet.service.interfaces.IPetriNetService | ||
| import com.netgrif.application.engine.workflow.service.interfaces.ITaskService | ||
| import com.querydsl.core.types.Predicate | ||
| import groovy.util.logging.Slf4j | ||
| import org.springframework.data.mongodb.core.BulkOperations | ||
| import org.springframework.data.mongodb.core.MongoTemplate | ||
| import org.springframework.data.mongodb.core.query.Criteria | ||
| import org.springframework.data.mongodb.core.query.Query | ||
| import org.springframework.stereotype.Component | ||
|
|
||
| /** | ||
| * A helper class for managing task migrations. | ||
| * This class extends {@link AbstractMigrationHelper} and provides methods for updating, iterating, | ||
| * and manipulating {@link Task} entities in bulk during migration processes. | ||
| * It integrates with MongoDB and uses the {@link MongoTemplate} for data operations and | ||
| * {@link IPetriNetService} for interacting with PetriNet services. | ||
| */ | ||
| @Slf4j | ||
| @Component | ||
| class TaskMigrationHelper extends AbstractMigrationHelper<Task> { | ||
|
|
||
| /** | ||
| * Service for handling Petri Net operations. | ||
| * | ||
| * This service is used to access and interact with Petri Net tasks, | ||
| * such as retrieving the latest version of a Petri Net by its identifier | ||
| * during task migrations. | ||
| */ | ||
| protected final IPetriNetService petriNetService | ||
|
|
||
| /** | ||
| * Service for handling task operations. | ||
| * | ||
| * This service provides methods for managing task entities, | ||
| * including finding, saving, and reloading tasks during migration processes. | ||
| */ | ||
| protected final ITaskService taskService | ||
|
|
||
| /** | ||
| * Service for handling Elasticsearch task indexing operations. | ||
| * | ||
| * This service is used to index task documents into Elasticsearch, | ||
| * enabling full-text search and analytics capabilities for tasks. | ||
| */ | ||
| protected final IElasticTaskService elasticTaskService | ||
|
|
||
| /** | ||
| * Service for mapping task entities to Elasticsearch documents. | ||
| * | ||
| * This service transforms task domain objects into their Elasticsearch | ||
| * representation before indexing, ensuring proper field mapping and data structure. | ||
| */ | ||
| protected final IElasticTaskMappingService elasticTaskMappingService | ||
|
|
||
| /** | ||
| * Constructs a new TaskMigrationHelper with the specified MongoTemplate. | ||
| * | ||
| * @param mongoTemplate the {@link MongoTemplate} to use for interacting with MongoDB | ||
| */ | ||
| TaskMigrationHelper(MongoTemplate mongoTemplate, | ||
| MigrationProperties migrationProperties, | ||
| IPetriNetService petriNetService, | ||
| ITaskService taskService, | ||
| IElasticTaskService elasticTaskService, | ||
| IElasticTaskMappingService elasticTaskMappingService) { | ||
| super(Task.class, mongoTemplate, migrationProperties) | ||
| this.petriNetService = petriNetService | ||
| this.taskService = taskService | ||
| this.elasticTaskService = elasticTaskService | ||
| this.elasticTaskMappingService = elasticTaskMappingService | ||
| } | ||
|
|
||
| /** | ||
| * Returns the page size for the task migration process. | ||
| * | ||
| * The page size is configured in the {@link MigrationProperties.TaskMigrationProperties} and determines | ||
| * the number of tasks processed in a single batch during migration operations. | ||
| * | ||
| * @return an integer indicating the configured page size | ||
| */ | ||
| @Override | ||
| int getPageSize() { | ||
| return migrationProperties.tasks.pageSize | ||
| } | ||
|
|
||
| /** | ||
| * Prepares a set of bulk operations for tasks during the migration process. | ||
| * | ||
| * This method is called for each individual {@link Task} document that needs to be updated. | ||
| * It executes the provided {@code update} closure to modify the task and | ||
| * prepares a bulk replacement operation to save the changes to the database. | ||
| * | ||
| * @param document the {@link Task} document to be updated | ||
| * @param update a {@link Closure} that defines the update logic to be applied to the {@link Task} | ||
| * @param bulkOperations the {@link BulkOperations} object used to queue the MongoDB operations for batch execution | ||
| */ | ||
| @Override | ||
| void prepareOperations(Task document, Closure update, BulkOperations bulkOperations) { | ||
| log.debug("Updating task with ID ${document.stringId}") | ||
| update(document) | ||
| bulkOperations.replaceOne(Query.query(Criteria.where("_id").is(document.getObjectId())), document) | ||
| } | ||
|
|
||
| /** | ||
| * Resolves and returns the unique identifier of the given task document. | ||
| * | ||
| * This method extracts the string representation of the task's identifier, | ||
| * which is used for logging and tracking purposes during migration operations. | ||
| * | ||
| * @param document the {@link Task} document whose identifier should be resolved | ||
| * @return a {@link String} representing the unique identifier of the task | ||
| */ | ||
| @Override | ||
| String resolveId(Task document) { | ||
| return document.getStringId() | ||
| } | ||
|
|
||
| /** | ||
| * Updates all tasks filtered by filter Predicate. Update closure is called on each filtered task. | ||
| * @param update Instance of Closure, which should contain code that will be executed for every Task matched by filter | ||
| * @param filter Instance of Predicate, to filter which tasks should be updated | ||
| */ | ||
| void updateTasks(Closure update, Predicate filter, MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting updateTasks with filter: ${filter.toString()}") | ||
| log.info("Updating tasks with filter ${filter.toString()} and update ${update.toString()}") | ||
| log.trace("Converting filter to query and calling iterate") | ||
| iterate(update, null, toQuery(filter), 0, getPageSize(), errorPolicy) | ||
| } | ||
|
|
||
| /** | ||
| * Iterates all tasks filtered by filter Predicate. Update closure is called on each filtered task. PageProcessed closure is called after each page iteration. | ||
| * @param update Instance of Closure, which should contain code that will be executed for every Task matched by filter (changes made to Task will not be saved automatically, for that use updateCases method) | ||
| * @param sleepFor Optional attribute to set sleep time (in milliseconds) to sleep for after each iterated page. Default 0ms | ||
| * @param filter Instance of Predicate, to filter which tasks should be iterated | ||
| */ | ||
| void iterateTasks(Closure update, Closure pageProcessed = null, long sleepFor = 0, Predicate filter, | ||
| MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting iterateTasks with filter: ${filter.toString()}, sleepFor: ${sleepFor}ms") | ||
| log.trace("Converting filter to query and calling iterate with pageProcessed closure") | ||
| iterate(update, pageProcessed, toQuery(filter), sleepFor, getPageSize(), errorPolicy) | ||
| } | ||
|
|
||
| /** | ||
| * Updates all tasks of a given process. | ||
| * @param update Instance of Closure, which should contain code that will be executed for every Task matched by filter | ||
| * @param processIdentifier identifier of PetriNet, to filter which tasks should be updated | ||
| * @param pageSize Optional attribute to set page size. Default page size 100 | ||
| */ | ||
| void updateTasksCursor(Closure update, String processIdentifier, int pageSize = 100, | ||
| MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting updateTasksCursor for processIdentifier: ${processIdentifier}, pageSize: ${pageSize}") | ||
| String processId = petriNetService.getDefaultVersionByIdentifier(processIdentifier).stringId | ||
| Query query = new Query(Criteria.where("processId").is(processId)) | ||
| log.trace("Created query for processId: ${processId}, calling iterate") | ||
| iterate(update, null, query, 0, pageSize as int, errorPolicy) | ||
| } | ||
|
|
||
| /** | ||
| * Updates specific tasks of a given process. | ||
| * @param update Instance of Closure, which should contain code that will be executed for every Task matched by filter | ||
| * @param processIdentifier identifier of PetriNet, to filter which tasks should be updated | ||
| * @param transitionIds List of transition IDs to limit filter to specific transitions of given processIdentifier | ||
| * @param pageSize Optional attribute to set page size. Default page size 100 | ||
| */ | ||
| void updateSpecificTasksCursor(Closure update, String processIdentifier, List<String> transitionIds, int pageSize = 100, | ||
| MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting updateSpecificTasksCursor for processIdentifier: ${processIdentifier}, transitionIds: ${transitionIds}, pageSize: ${pageSize}") | ||
| String processId = petriNetService.getDefaultVersionByIdentifier(processIdentifier).stringId | ||
| Query query = new Query(Criteria.where("processId").is(processId)) | ||
| query.addCriteria(Criteria.where("transitionId").in(transitionIds)) | ||
| log.trace("Created query with criteria for processId: ${processId} and transitionIds: ${transitionIds}, calling iterate") | ||
| iterate(update, null, query, 0, pageSize as int, errorPolicy) | ||
| } | ||
|
|
||
| /** | ||
| * Update all tasks. | ||
| * @param update Instance of Closure, which should contain code that will be executed for every Task | ||
| * @param pageSize Optional attribute to set page size. Default page size 100.0 | ||
| */ | ||
| void updateAllTasksCursor(Closure update, int pageSize = 100, MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting updateAllTasksCursor with pageSize: ${pageSize}") | ||
| log.trace("Calling iterate with empty query to process all tasks") | ||
| iterate(update, null, new Query(), 0, pageSize as int, errorPolicy) | ||
| } | ||
|
|
||
| /** | ||
| * Reloads tasks of provided case via TaskService, | ||
| * handles useCase.petriNet internally | ||
| * @param useCase Instance of Case for which tasks will be reloaded | ||
| * @param net Instance of Petri Net, it needs to match processIdentifier of useCase | ||
| */ | ||
| void reloadTasks(Case useCase, PetriNet net) { | ||
| log.debug("Starting reloadTasks for case: ${useCase.stringId}, net identifier: ${net.identifier}") | ||
| PetriNetMigrationHelper.setPetriNet(useCase, net) | ||
| log.trace("Set PetriNet for case, calling taskService.reloadTasks") | ||
| taskService.reloadTasks(useCase, false) | ||
| } | ||
|
|
||
| /** | ||
| * Indexes provided task in elasticsearch | ||
| * @param task Instance of Task that will be indexed into elasticsearch index | ||
| */ | ||
| void elasticTaskIndex(Task task, MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting elasticTaskIndex for task: ${task.stringId}") | ||
| try { | ||
| log.trace("Transforming and indexing task: ${task.stringId} into elasticsearch") | ||
| elasticTaskService.indexNow(elasticTaskMappingService.transform(task)) | ||
| } catch (Exception e) { | ||
| String message = "Failed to index $task.stringId" | ||
| log.error(message, e) | ||
| handleMigrationError(errorPolicy, "elasticTaskIndex", type, task.stringId, message, e) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Adds role with permissions to existing tasks of net | ||
| * @param role ProcessRole that will be added to transitions | ||
| * @param net Instance of Petri Net of updated transitions | ||
| * @param transitionIds List of transition IDs the role will be added to | ||
| * @param permissions Map of permissions for the role | ||
| */ | ||
| void addRoleToExistingTasks(ProcessRole role, PetriNet net, List<String> transitionIds, Map<String, Boolean> permissions) { | ||
| log.debug("Starting addRoleToExistingTasks for role: ${role.getName()}, net: ${net.identifier}, transitionIds: ${transitionIds}") | ||
| log.trace("Calling updateTasks to add role with permissions: ${permissions}") | ||
| updateTasks({ Task task -> | ||
| log.trace("Add role '${role.getName()}' with roleId=${role.getImportId()} to transitionId=${task.getTransitionId()} in task ${task.stringId}") | ||
| task.addRole(role.getStringId(), permissions) | ||
| }, QTask.task.transitionId.in(transitionIds) & QTask.task.processId.eq(net.getStringId())) | ||
| } | ||
|
|
||
| /** | ||
| * Updates permissions on existing tasks filtered by relevantTransitionIds | ||
| * @param useCase Instance of Case | ||
| * @param net Instance of Petri Net, it needs to match processIdentifier of useCase | ||
| * @param relevantTransitionIds List of transition IDs for permissions update | ||
| */ | ||
| void updateTasksPermissions(Case useCase, PetriNet net, List<String> relevantTransitionIds, MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting updateTasksPermissions for case: ${useCase.stringId}, net: ${net.identifier}, relevantTransitionIds: ${relevantTransitionIds}") | ||
| useCase.tasks.findAll { it.transition in relevantTransitionIds }.each { taskPair -> | ||
| log.trace("Processing task permissions for transition: ${taskPair.transition} in case: ${useCase.stringId}") | ||
| updateTaskPermissions(useCase, taskPair, net, errorPolicy) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Updates permissions on existing task | ||
| * @param useCase Instance of Case | ||
| * @param taskPair TaskPair object of updated Task | ||
| * @param net Instance of Petri Net, it needs to match processIdentifier of useCase | ||
| */ | ||
| void updateTaskPermissions(Case useCase, TaskPair taskPair, PetriNet net, MigrationErrorPolicy errorPolicy = defaultErrorPolicy()) { | ||
| log.debug("Starting updateTaskPermissions for case: ${useCase.stringId}, task transition: ${taskPair.transition}") | ||
| try { | ||
| Transition newTransition = net.getTransition(taskPair.transition) | ||
| Task oldTask = taskService.findOne(taskPair.task) | ||
| log.trace("Updating task roles and permissions for task: ${oldTask.stringId}") | ||
| oldTask.setProcessId(net.stringId) | ||
| oldTask.setRoles(newTransition.roles) | ||
| oldTask.setNegativeViewRoles(newTransition.negativeViewRoles) | ||
| oldTask.resolveViewRoles() | ||
| taskService.save(oldTask) | ||
| } catch (Exception e) { | ||
| String message = "Failed to update task permissions $useCase.stringId $taskPair.transition" | ||
| log.error(message, e) | ||
| handleMigrationError(errorPolicy, "updateTaskPermissions", type, taskPair?.task?.toString(), message, e) | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate facade error policy in
addRoleToExistingTasks.This path always falls back to
defaultErrorPolicy()because it callsupdateTasks(...)without an explicit policy. Calls wrapped inMigrationHelper.withErrorPolicy(...)won’t honor the caller’s policy here.Suggested fix
🤖 Prompt for AI Agents