namespace Workflow.Tests.Engine; using FluentAssertions; using Microsoft.Extensions.DependencyInjection; using Moq; using Workflow.Application.Engine; using Workflow.Domain.Entities; using Workflow.Domain.Enums; using Workflow.Domain.Expressions; using Workflow.Domain.Exceptions; using Workflow.Infrastructure.Persistence; using Microsoft.EntityFrameworkCore; using Xunit; public class ProcessEngineTests { private readonly WorkflowDbContext _dbContext; private IServiceProvider _serviceProvider; private ProcessEngine _engine; public ProcessEngineTests() { var options = new DbContextOptionsBuilder() .UseInMemoryDatabase(Guid.NewGuid().ToString()) .Options; _dbContext = new WorkflowDbContext(options); _serviceProvider = new ServiceCollection().BuildServiceProvider(); _engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator()); } private void RegisterKeyedAction(string name, INodeAction action) { var services = new ServiceCollection(); services.AddKeyedSingleton(name, (_, _) => action); _serviceProvider = services.BuildServiceProvider(); _engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator()); } // ============================================================ // Start Node // ============================================================ [Fact] public async Task ProcessStartNode_CreatesSingleToken_AndPropagatesToNextNode() { var startNode = CreateNode(NodeType.Start); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(startNode, nextNode); var (_, instance) = PersistSetup([startNode, nextNode], [edge]); await _engine.StartAsync(instance); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(nextNode.Id); instance.Status.Should().Be(InstanceStatus.Running); } [Fact] public async Task ProcessStartNode_ThrowsWhenNoOutgoingEdge() { var startNode = CreateNode(NodeType.Start); var (_, instance) = PersistSetup([startNode], []); var act = () => _engine.StartAsync(instance); await act.Should().ThrowAsync() .WithMessage("*start*edge*"); } // ============================================================ // Approval Node // ============================================================ [Fact] public async Task ProcessApprovalNode_CreatesTaskForAssignee() { var userId = Guid.NewGuid(); var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = $"{{ \"assigneeRule\": \"user:{userId}\" }}"; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); await _engine.ProcessNodeAsync(instance, token, approvalNode); var tasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id) .ToListAsync(); tasks.Should().HaveCount(1); tasks[0].AssigneeId.Should().Be(userId); tasks[0].Type.Should().Be(TaskType.Approval); tasks[0].Status.Should().Be(TaskStatus.Pending); } [Fact] public async Task ProcessApprovalNode_CreatesTaskForRole() { var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = """{ "assigneeRule": "role:manager" }"""; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); await _engine.ProcessNodeAsync(instance, token, approvalNode); var tasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id) .ToListAsync(); tasks.Should().HaveCount(1); tasks[0].AssigneeRole.Should().Be("manager"); tasks[0].Type.Should().Be(TaskType.Approval); tasks[0].Status.Should().Be(TaskStatus.Pending); } [Fact] public async Task ProcessApprovalNode_SetsDueAtFromTimeoutMinutesConfig() { var userId = Guid.NewGuid(); var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\", \"timeoutMinutes\": 1440 }"; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); var before = DateTime.UtcNow; await _engine.ProcessNodeAsync(instance, token, approvalNode); var after = DateTime.UtcNow; var task = await _dbContext.WorkflowTasks.SingleAsync(t => t.InstanceId == instance.Id); task.DueAt.Should().NotBeNull(); // DueAt 应在 before+1440min 到 after+1440min 之间(容忍执行耗时) task.DueAt.Should().BeOnOrAfter(before.AddMinutes(1440)).And.BeOnOrBefore(after.AddMinutes(1440).AddSeconds(1)); } [Fact] public async Task ProcessApprovalNode_NoTimeoutConfig_DueAtIsNull() { var userId = Guid.NewGuid(); var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\" }"; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); await _engine.ProcessNodeAsync(instance, token, approvalNode); var task = await _dbContext.WorkflowTasks.SingleAsync(t => t.InstanceId == instance.Id); task.DueAt.Should().BeNull(); } [Fact] public async Task ProcessApprovalNode_TriggersTaskArrivedNotificationWhenServiceRegistered() { // 注册 INotificationService,验证任务到达时通知被触发 var capture = new NotificationCaptureService(); var services = new ServiceCollection(); services.AddSingleton(capture); _serviceProvider = services.BuildServiceProvider(); _engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator()); var userId = Guid.NewGuid(); var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\" }"; var (_, inst) = PersistSetup([approvalNode], []); var tok = PersistToken(inst, approvalNode); await _engine.ProcessNodeAsync(inst, tok, approvalNode); capture.ArrivedTasks.Should().HaveCount(1); capture.ArrivedTasks[0].AssigneeId.Should().Be(userId); } [Fact] public async Task CompleteTask_Approved_PropagatesTokenAlongApprovedEdge() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); await _engine.CompleteTaskAsync(task, TaskResult.Approved); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().ContainSingle(t => t.NodeId == nextNode.Id); } [Fact] public async Task CompleteTask_Rejected_PropagatesTokenAlongRejectedEdge() { var approvalNode = CreateNode(NodeType.Approval); var rejectNode = CreateNode(NodeType.Approval); var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected); approvalNode.Config = """{ "assigneeRule": "role:user" }"""; var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); await _engine.CompleteTaskAsync(task, TaskResult.Rejected); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().ContainSingle(t => t.NodeId == rejectNode.Id); } [Fact] public async Task CompleteTask_Rejected_WithNoRejectEdge_ThrowsBusinessException() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); var act = () => _engine.CompleteTaskAsync(task, TaskResult.Rejected); await act.Should().ThrowAsync() .WithMessage("*reject*edge*"); } // ============================================================ // Cc Node // ============================================================ [Fact] public async Task ProcessCcNode_CreatesCcTasks_ForAllRecipients() { var user1 = Guid.NewGuid(); var user2 = Guid.NewGuid(); var user3 = Guid.NewGuid(); var ccNode = CreateNode(NodeType.Cc); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(ccNode, nextNode); ccNode.Config = $"{{ \"recipients\": [\"{user1}\", \"{user2}\", \"{user3}\"] }}"; var (_, instance) = PersistSetup([ccNode, nextNode], [edge]); var token = PersistToken(instance, ccNode); await _engine.ProcessNodeAsync(instance, token, ccNode); var tasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc) .ToListAsync(); tasks.Should().HaveCount(3); tasks.Should().OnlyContain(t => t.Status == TaskStatus.Pending); tasks.Select(t => t.AssigneeId).Should().BeEquivalentTo(new Guid?[] { user1, user2, user3 }); } [Fact] public async Task ProcessCcNode_PropagatesImmediatelyWithoutWaiting() { var user1 = Guid.NewGuid(); var ccNode = CreateNode(NodeType.Cc); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(ccNode, nextNode); ccNode.Config = $"{{ \"recipients\": [\"{user1}\"] }}"; var (_, instance) = PersistSetup([ccNode, nextNode], [edge]); var token = PersistToken(instance, ccNode); await _engine.ProcessNodeAsync(instance, token, ccNode); var propagatedTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.NodeId == nextNode.Id && t.Status == TokenStatus.Active) .ToListAsync(); propagatedTokens.Should().HaveCount(1); var ccTasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc) .ToListAsync(); ccTasks.Should().HaveCount(1); ccTasks[0].Status.Should().Be(TaskStatus.Pending); } // ============================================================ // Condition Gateway (Exclusive Gateway) // ============================================================ [Fact] public async Task ProcessConditionNode_TakesMatchingBranch() { var conditionNode = CreateNode(NodeType.Condition); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000"); var edgeB = CreateEdge(conditionNode, branchB, condition: "amount <= 1000"); var (_, instance) = PersistSetup([conditionNode, branchA, branchB], [edgeA, edgeB]); instance.Variables = """{ "amount": 1500 }"""; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, conditionNode); await _engine.ProcessNodeAsync(instance, token, conditionNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(branchA.Id); } [Fact] public async Task ProcessConditionNode_NoMatchingBranch_ThrowsBusinessException() { var conditionNode = CreateNode(NodeType.Condition); var branchA = CreateNode(NodeType.Approval); var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 5000"); var (_, instance) = PersistSetup([conditionNode, branchA], [edgeA]); instance.Variables = """{ "amount": 100 }"""; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, conditionNode); var act = () => _engine.ProcessNodeAsync(instance, token, conditionNode); await act.Should().ThrowAsync() .WithMessage("*condition*match*"); } [Fact] public async Task ProcessConditionNode_FirstMatchingBranchWins_WhenMultipleMatch() { var conditionNode = CreateNode(NodeType.Condition); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var branchC = CreateNode(NodeType.Approval); var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000", order: 1); var edgeB = CreateEdge(conditionNode, branchB, condition: "amount > 500", order: 2); var edgeC = CreateEdge(conditionNode, branchC, condition: "amount > 2000", order: 3); var (_, instance) = PersistSetup( [conditionNode, branchA, branchB, branchC], [edgeA, edgeB, edgeC]); instance.Variables = """{ "amount": 1500 }"""; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, conditionNode); await _engine.ProcessNodeAsync(instance, token, conditionNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(branchA.Id); } // ============================================================ // Parallel Gateway (Fork) // ============================================================ [Fact] public async Task ProcessParallelFork_CreatesOneTokenPerOutgoingEdge() { var forkNode = CreateNode(NodeType.Parallel); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var edgeA = CreateEdge(forkNode, branchA); var edgeB = CreateEdge(forkNode, branchB); var (_, instance) = PersistSetup([forkNode, branchA, branchB], [edgeA, edgeB]); var token = PersistToken(instance, forkNode); await _engine.ProcessNodeAsync(instance, token, forkNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(2); tokens.Select(t => t.NodeId).Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id }); } [Fact] public async Task ProcessParallelFork_With3Edges_Creates3Tokens() { var forkNode = CreateNode(NodeType.Parallel); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var branchC = CreateNode(NodeType.Approval); var edgeA = CreateEdge(forkNode, branchA); var edgeB = CreateEdge(forkNode, branchB); var edgeC = CreateEdge(forkNode, branchC); var (_, instance) = PersistSetup( [forkNode, branchA, branchB, branchC], [edgeA, edgeB, edgeC]); var token = PersistToken(instance, forkNode); await _engine.ProcessNodeAsync(instance, token, forkNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(3); tokens.Select(t => t.NodeId) .Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id, branchC.Id }); } // ============================================================ // Parallel Gateway (Join) // ============================================================ [Fact] public async Task ProcessParallelJoin_WaitsForAllTokens_WhenNotAllArrived() { var joinNode = CreateNode(NodeType.Parallel); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var branchC = CreateNode(NodeType.Approval); var edgeA = CreateEdge(branchA, joinNode); var edgeB = CreateEdge(branchB, joinNode); var edgeC = CreateEdge(branchC, joinNode); var (_, instance) = PersistSetup( [joinNode, branchA, branchB, branchC], [edgeA, edgeB, edgeC]); var token1 = PersistToken(instance, joinNode, TokenStatus.Active); var token2 = PersistToken(instance, joinNode, TokenStatus.Active); await _engine.ProcessNodeAsync(instance, token2, joinNode); var activeTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); activeTokens.Should().HaveCount(2); activeTokens.Should().OnlyContain(t => t.NodeId == joinNode.Id); } [Fact] public async Task ProcessParallelJoin_MergesAllTokens_WhenAllArrived() { var joinNode = CreateNode(NodeType.Parallel); var nextNode = CreateNode(NodeType.Approval); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var edgeA = CreateEdge(branchA, joinNode); var edgeB = CreateEdge(branchB, joinNode); var edgeOut = CreateEdge(joinNode, nextNode); var (_, instance) = PersistSetup( [joinNode, nextNode, branchA, branchB], [edgeA, edgeB, edgeOut]); var token1 = PersistToken(instance, joinNode, TokenStatus.Active); var token2 = PersistToken(instance, joinNode, TokenStatus.Active); await _engine.ProcessNodeAsync(instance, token2, joinNode); var consumedTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Consumed) .ToListAsync(); consumedTokens.Should().HaveCount(2); var activeTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); activeTokens.Should().HaveCount(1); activeTokens[0].NodeId.Should().Be(nextNode.Id); } // ============================================================ // End Node // ============================================================ [Fact] public async Task ProcessEndNode_ConsumesToken() { var endNode = CreateNode(NodeType.End); var (_, instance) = PersistSetup([endNode], []); var token = PersistToken(instance, endNode); await _engine.ProcessNodeAsync(instance, token, endNode); token.Status.Should().Be(TokenStatus.Consumed); } [Fact] public async Task ProcessEndNode_CompletesInstance_WhenAllTokensConsumed() { var endNode = CreateNode(NodeType.End); var (_, instance) = PersistSetup([endNode], []); instance.Status = InstanceStatus.Running; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, endNode); await _engine.ProcessNodeAsync(instance, token, endNode); instance.Status.Should().Be(InstanceStatus.Completed); } [Fact] public async Task ProcessEndNode_DoesNotCompleteInstance_WhenOtherTokensActive() { var endNode = CreateNode(NodeType.End); var approvalNode = CreateNode(NodeType.Approval); var (_, instance) = PersistSetup([endNode, approvalNode], []); instance.Status = InstanceStatus.Running; await _dbContext.SaveChangesAsync(); var endToken = PersistToken(instance, endNode, TokenStatus.Active); var activeToken = PersistToken(instance, approvalNode, TokenStatus.Active); await _engine.ProcessNodeAsync(instance, endToken, endNode); endToken.Status.Should().Be(TokenStatus.Consumed); activeToken.Status.Should().Be(TokenStatus.Active); instance.Status.Should().Be(InstanceStatus.Running); } // ============================================================ // SubProcess // ============================================================ [Fact] public async Task ProcessSubProcessNode_CreatesChildInstance() { var subDefId = Guid.NewGuid(); var subProcessNode = CreateNode(NodeType.SubProcess); subProcessNode.Config = $"{{ \"definitionId\": \"{subDefId}\" }}"; var (_, instance) = PersistSetup([subProcessNode], []); var token = PersistToken(instance, subProcessNode); await _engine.ProcessNodeAsync(instance, token, subProcessNode); var childInstance = await _dbContext.WorkflowInstances .FirstOrDefaultAsync(i => i.ParentInstanceId == instance.Id); childInstance.Should().NotBeNull(); childInstance!.DefinitionId.Should().Be(subDefId); childInstance.Status.Should().Be(InstanceStatus.Running); } [Fact] public async Task SubProcessComplete_PropagatesTokenInParent() { var subProcessNode = CreateNode(NodeType.SubProcess); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(subProcessNode, nextNode); subProcessNode.Config = """{ "definitionId": "00000000-0000-0000-0000-000000000001" }"""; var (_, parentInstance) = PersistSetup([subProcessNode, nextNode], [edge]); var parentToken = PersistToken(parentInstance, subProcessNode); var childInstance = new WorkflowInstance { Id = Guid.NewGuid(), DefinitionId = Guid.Parse("00000000-0000-0000-0000-000000000001"), ParentInstanceId = parentInstance.Id, ParentTokenId = parentToken.Id, Status = InstanceStatus.Completed, Variables = "{}", }; _dbContext.WorkflowInstances.Add(childInstance); await _dbContext.SaveChangesAsync(); await _engine.HandleSubProcessCompletionAsync(childInstance); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == parentInstance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(nextNode.Id); } // ============================================================ // Node Actions (Hooks) // ============================================================ [Fact] public async Task ProcessApprovalNode_ExecutesOnEnterAction() { var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = """{ "assigneeRule": "role:user", "onEnter": "send-notification" }"""; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); var actionExecuted = false; RegisterKeyedAction("send-notification", new TestAction(() => actionExecuted = true)); await _engine.ProcessNodeAsync(instance, token, approvalNode); actionExecuted.Should().BeTrue("the onEnter action should be executed when entering an approval node"); } [Fact] public async Task CompleteTask_Approved_ExecutesOnApprovedAction() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "log-approval" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); var actionExecuted = false; RegisterKeyedAction("log-approval", new TestAction(() => actionExecuted = true)); await _engine.CompleteTaskAsync(task, TaskResult.Approved); actionExecuted.Should().BeTrue("the onApproved action should be executed when task is approved"); } [Fact] public async Task CompleteTask_Rejected_ExecutesOnRejectedAction() { var approvalNode = CreateNode(NodeType.Approval); var rejectNode = CreateNode(NodeType.Approval); var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected); approvalNode.Config = """{ "assigneeRule": "role:user", "onRejected": "notify-rejection" }"""; var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); var actionExecuted = false; RegisterKeyedAction("notify-rejection", new TestAction(() => actionExecuted = true)); await _engine.CompleteTaskAsync(task, TaskResult.Rejected); actionExecuted.Should().BeTrue("the onRejected action should be executed when task is rejected"); } [Fact] public async Task ActionFailure_DoesNotBlockTokenPropagation() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "failing-action" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); RegisterKeyedAction("failing-action", new ThrowingAction()); await _engine.CompleteTaskAsync(task, TaskResult.Approved); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id) .ToListAsync(); tokens.Should().HaveCount(1); } // ============================================================ // Test Helpers // ============================================================ private static WorkflowNode CreateNode(NodeType type) { return new WorkflowNode { Id = Guid.NewGuid(), NodeType = type, Config = "{}", }; } // ============================================================ // Parallel Fork → Approvals → Join: full pipeline // ============================================================ /// /// 端到端:并行 fork 分出两条审批分支,两条均审批通过后汇聚到 join, /// join 仅在两个 token 都到达时才合并并推进到下游。 /// 这是 fork/join 最常出问题的真实路径(既有测试分别测 fork 与 join,未覆盖整条链)。 /// [Fact] public async Task ParallelFork_BothBranchesApproved_JoinsAndPropagatesOnce() { var forkNode = CreateNode(NodeType.Parallel); var approvalA = CreateNode(NodeType.Approval); approvalA.Config = """{ "assigneeRule": "role:userA" }"""; var approvalB = CreateNode(NodeType.Approval); approvalB.Config = """{ "assigneeRule": "role:userB" }"""; var joinNode = CreateNode(NodeType.Parallel); var afterJoin = CreateNode(NodeType.End); var edgeForkA = CreateEdge(forkNode, approvalA); var edgeForkB = CreateEdge(forkNode, approvalB); var edgeAJoin = CreateEdge(approvalA, joinNode); var edgeBJoin = CreateEdge(approvalB, joinNode); var edgeJoinOut = CreateEdge(joinNode, afterJoin); var (_, instance) = PersistSetup( [forkNode, approvalA, approvalB, joinNode, afterJoin], [edgeForkA, edgeForkB, edgeAJoin, edgeBJoin, edgeJoinOut]); var forkToken = PersistToken(instance, forkNode); // Fork:分出两个 token 到两个审批节点 await _engine.ProcessNodeAsync(instance, forkToken, forkNode); var activeAfterFork = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); activeAfterFork.Should().HaveCount(2); // 完成分支 A 的审批 → token 推进到 join,但 join 应等待 var taskA = await _dbContext.WorkflowTasks.FirstAsync(t => t.NodeId == approvalA.Id); await _engine.CompleteTaskAsync(taskA, TaskResult.Approved); var activeAfterA = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); // 分支 A 的 token 已进入 join 等待;分支 B 仍未完成 → 仍 2 个 active(1 在 join,1 在 approvalB) activeAfterA.Should().HaveCount(2); activeAfterA.Should().Contain(t => t.NodeId == approvalB.Id); instance.Status.Should().NotBe(InstanceStatus.Completed); // 完成分支 B 的审批 → 第二个 token 到达 join,满足合并条件 var taskB = await _dbContext.WorkflowTasks.FirstAsync(t => t.NodeId == approvalB.Id); await _engine.CompleteTaskAsync(taskB, TaskResult.Approved); // join 合并后产生 1 个 token → 进入 End 节点被消费 → 无残留 Active token,实例完成。 // 关键不变量:两个 join 入口 token 都被消费(已合并),且实例确实完成。 var activeAfterB = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); activeAfterB.Should().BeEmpty("End 节点应已消费 join 合并后的 token"); instance.Status.Should().Be(InstanceStatus.Completed); // join 处的两个入口 token 均应已消费 var joinTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.NodeId == joinNode.Id) .ToListAsync(); joinTokens.Should().OnlyContain(t => t.Status == TokenStatus.Consumed); joinTokens.Should().HaveCount(2); } // ============================================================ // SubProcess boundaries // ============================================================ /// /// 边界:SubProcess 节点未配置 definitionId 时必须明确拒绝。 /// [Fact] public async Task ProcessSubProcessNode_WithoutDefinitionId_ThrowsBusinessException() { var subProcessNode = CreateNode(NodeType.SubProcess); subProcessNode.Config = "{}"; // 无 definitionId var (_, instance) = PersistSetup([subProcessNode], []); var token = PersistToken(instance, subProcessNode); var act = () => _engine.ProcessNodeAsync(instance, token, subProcessNode); await act.Should().ThrowAsync() .WithMessage("*definitionId*"); } /// /// 边界:SubProcess 完成但其节点没有出边时,父 token 应被消费(不残留), /// 而非静默挂着。锁定 HandleSubProcessCompletionAsync 的当前行为。 /// [Fact] public async Task SubProcessComplete_WithNoOutgoingEdge_ConsumesWaitingToken() { var subProcessNode = CreateNode(NodeType.SubProcess); subProcessNode.Config = """{ "definitionId": "00000000-0000-0000-0000-000000000001" }"""; var (_, parentInstance) = PersistSetup([subProcessNode], []); var parentToken = PersistToken(parentInstance, subProcessNode); var childInstance = new WorkflowInstance { Id = Guid.NewGuid(), DefinitionId = Guid.Parse("00000000-0000-0000-0000-000000000001"), ParentInstanceId = parentInstance.Id, ParentTokenId = parentToken.Id, Status = InstanceStatus.Completed, Variables = "{}", }; _dbContext.WorkflowInstances.Add(childInstance); await _dbContext.SaveChangesAsync(); await _engine.HandleSubProcessCompletionAsync(childInstance); // 父 token 必须被消费,不可残留 Active var refreshedToken = await _dbContext.WorkflowTokens.FindAsync(parentToken.Id); refreshedToken!.Status.Should().Be(TokenStatus.Consumed); } // ============================================================ // Token Duplicate-Propagation Guard (idempotency) // ============================================================ /// /// 边界:已完成(已审批)的任务不可被再次完成,否则会重复推进 token, /// 产生额外的下游 token 并可能创建重复任务。必须保持幂等/拒绝。 /// [Fact] public async Task CompleteTask_AlreadyCompletedTask_ThrowsOrIsIdempotent() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); // 第一次完成:正常推进到 nextNode await _engine.CompleteTaskAsync(task, TaskResult.Approved); var activeTokensAfterFirst = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id) .ToListAsync(); activeTokensAfterFirst.Should().HaveCount(1); // 第二次完成同一任务:必须抛错(或幂等不产生新 token),绝不可产生第二个下游 token var secondAct = () => _engine.CompleteTaskAsync(task, TaskResult.Approved); await secondAct.Should().ThrowAsync(); // 仍只有 1 个活跃下游 token,未被重复推进 var activeTokensAfterSecond = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id) .ToListAsync(); activeTokensAfterSecond.Should().HaveCount(1); } /// /// 边界:同一 Approval 节点的 token 不可被 ProcessNodeAsync 重复处理, /// 否则会在同一节点创建多份审批任务。第二次处理必须拒绝(或幂等)。 /// [Fact] public async Task ProcessApprovalNode_TokenAlreadyProcessed_ThrowsOrIsIdempotent() { var userId = Guid.NewGuid(); var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = $"{{ \"assigneeRule\": \"user:{userId}\" }}"; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); // 第一次处理:创建 1 个任务 await _engine.ProcessNodeAsync(instance, token, approvalNode); var tasksAfterFirst = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id) .ToListAsync(); tasksAfterFirst.Should().HaveCount(1); // 第二次处理同一 token:必须拒绝(或幂等),不可创建第二个任务 var secondAct = () => _engine.ProcessNodeAsync(instance, token, approvalNode); await secondAct.Should().ThrowAsync(); var tasksAfterSecond = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id) .ToListAsync(); tasksAfterSecond.Should().HaveCount(1); } private static WorkflowEdge CreateEdge( WorkflowNode source, WorkflowNode target, EdgeType? edgeType = null, string? condition = null, int order = 0) { return new WorkflowEdge { Id = Guid.NewGuid(), SourceNodeId = source.Id, TargetNodeId = target.Id, EdgeType = edgeType ?? EdgeType.Normal, Condition = condition, Order = order, }; } private (WorkflowDefinition Definition, WorkflowInstance Instance) PersistSetup( List nodes, List edges) { var definition = new WorkflowDefinition { Id = Guid.NewGuid() }; foreach (var node in nodes) { node.DefinitionId = definition.Id; _dbContext.WorkflowNodes.Add(node); } foreach (var edge in edges) { edge.DefinitionId = definition.Id; _dbContext.WorkflowEdges.Add(edge); } _dbContext.WorkflowDefinitions.Add(definition); var instance = new WorkflowInstance { Id = Guid.NewGuid(), DefinitionId = definition.Id, Status = InstanceStatus.Pending, Variables = "{}", }; _dbContext.WorkflowInstances.Add(instance); _dbContext.SaveChanges(); return (definition, instance); } private WorkflowToken PersistToken( WorkflowInstance instance, WorkflowNode node, TokenStatus status = TokenStatus.Active) { var token = new WorkflowToken { Id = Guid.NewGuid(), InstanceId = instance.Id, NodeId = node.Id, Status = status, }; _dbContext.WorkflowTokens.Add(token); _dbContext.SaveChanges(); return token; } private WorkflowTask PersistTask( WorkflowInstance instance, WorkflowToken token, WorkflowNode node) { var task = new WorkflowTask { Id = Guid.NewGuid(), InstanceId = instance.Id, TokenId = token.Id, NodeId = node.Id, Type = TaskType.Approval, Status = TaskStatus.Pending, }; _dbContext.WorkflowTasks.Add(task); _dbContext.SaveChanges(); return task; } private class TestAction : INodeAction { private readonly Action _callback; public TestAction(Action callback) { _callback = callback; } public Task ExecuteAsync(NodeActionContext context) { _callback(); return Task.CompletedTask; } } private class ThrowingAction : INodeAction { public Task ExecuteAsync(NodeActionContext context) { throw new InvalidOperationException("Simulated action failure"); } } /// 测试用通知捕获器:记录任务到达/通过/驳回通知,便于断言引擎触发了通知。 private class NotificationCaptureService : Workflow.Application.Notifications.INotificationService { public List ArrivedTasks { get; } = []; public List ApprovedTasks { get; } = []; public List RejectedTasks { get; } = []; public Task NotifyTaskArrivedAsync(WorkflowTask task, CancellationToken ct = default) { ArrivedTasks.Add(task); return Task.CompletedTask; } public Task NotifyTaskApprovedAsync(WorkflowTask task, CancellationToken ct = default) { ApprovedTasks.Add(task); return Task.CompletedTask; } public Task NotifyTaskRejectedAsync(WorkflowTask task, CancellationToken ct = default) { RejectedTasks.Add(task); return Task.CompletedTask; } public Task NotifyTaskUrgedAsync(WorkflowTask task, CancellationToken ct = default) => Task.CompletedTask; public Task NotifyTaskTimeoutAsync(WorkflowTask task, bool autoApproved, CancellationToken ct = default) => Task.CompletedTask; } }