namespace Workflow.Tests.Engine; using FluentAssertions; using Microsoft.Extensions.DependencyInjection; using Moq; using Workflow.Application.Engine; using Workflow.Domain.Entities; using Workflow.Domain.Enums; using Workflow.Domain.Expressions; using Workflow.Domain.Exceptions; using Workflow.Infrastructure.Persistence; using Microsoft.EntityFrameworkCore; using Xunit; public class ProcessEngineTests { private readonly WorkflowDbContext _dbContext; private IServiceProvider _serviceProvider; private ProcessEngine _engine; public ProcessEngineTests() { var options = new DbContextOptionsBuilder() .UseInMemoryDatabase(Guid.NewGuid().ToString()) .Options; _dbContext = new WorkflowDbContext(options); _serviceProvider = new ServiceCollection().BuildServiceProvider(); _engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator()); } private void RegisterKeyedAction(string name, INodeAction action) { var services = new ServiceCollection(); services.AddKeyedSingleton(name, (_, _) => action); _serviceProvider = services.BuildServiceProvider(); _engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator()); } // ============================================================ // Start Node // ============================================================ [Fact] public async Task ProcessStartNode_CreatesSingleToken_AndPropagatesToNextNode() { var startNode = CreateNode(NodeType.Start); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(startNode, nextNode); var (_, instance) = PersistSetup([startNode, nextNode], [edge]); await _engine.StartAsync(instance); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(nextNode.Id); instance.Status.Should().Be(InstanceStatus.Running); } [Fact] public async Task ProcessStartNode_ThrowsWhenNoOutgoingEdge() { var startNode = CreateNode(NodeType.Start); var (_, instance) = PersistSetup([startNode], []); var act = () => _engine.StartAsync(instance); await act.Should().ThrowAsync() .WithMessage("*start*edge*"); } // ============================================================ // Approval Node // ============================================================ [Fact] public async Task ProcessApprovalNode_CreatesTaskForAssignee() { var userId = Guid.NewGuid(); var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = $"{{ \"assigneeRule\": \"user:{userId}\" }}"; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); await _engine.ProcessNodeAsync(instance, token, approvalNode); var tasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id) .ToListAsync(); tasks.Should().HaveCount(1); tasks[0].AssigneeId.Should().Be(userId); tasks[0].Type.Should().Be(TaskType.Approval); tasks[0].Status.Should().Be(TaskStatus.Pending); } [Fact] public async Task ProcessApprovalNode_CreatesTaskForRole() { var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = """{ "assigneeRule": "role:manager" }"""; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); await _engine.ProcessNodeAsync(instance, token, approvalNode); var tasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id) .ToListAsync(); tasks.Should().HaveCount(1); tasks[0].AssigneeRole.Should().Be("manager"); tasks[0].Type.Should().Be(TaskType.Approval); tasks[0].Status.Should().Be(TaskStatus.Pending); } [Fact] public async Task CompleteTask_Approved_PropagatesTokenAlongApprovedEdge() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); await _engine.CompleteTaskAsync(task, TaskResult.Approved); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().ContainSingle(t => t.NodeId == nextNode.Id); } [Fact] public async Task CompleteTask_Rejected_PropagatesTokenAlongRejectedEdge() { var approvalNode = CreateNode(NodeType.Approval); var rejectNode = CreateNode(NodeType.Approval); var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected); approvalNode.Config = """{ "assigneeRule": "role:user" }"""; var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); await _engine.CompleteTaskAsync(task, TaskResult.Rejected); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().ContainSingle(t => t.NodeId == rejectNode.Id); } [Fact] public async Task CompleteTask_Rejected_WithNoRejectEdge_ThrowsBusinessException() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); var act = () => _engine.CompleteTaskAsync(task, TaskResult.Rejected); await act.Should().ThrowAsync() .WithMessage("*reject*edge*"); } // ============================================================ // Cc Node // ============================================================ [Fact] public async Task ProcessCcNode_CreatesCcTasks_ForAllRecipients() { var user1 = Guid.NewGuid(); var user2 = Guid.NewGuid(); var user3 = Guid.NewGuid(); var ccNode = CreateNode(NodeType.Cc); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(ccNode, nextNode); ccNode.Config = $"{{ \"recipients\": [\"{user1}\", \"{user2}\", \"{user3}\"] }}"; var (_, instance) = PersistSetup([ccNode, nextNode], [edge]); var token = PersistToken(instance, ccNode); await _engine.ProcessNodeAsync(instance, token, ccNode); var tasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc) .ToListAsync(); tasks.Should().HaveCount(3); tasks.Should().OnlyContain(t => t.Status == TaskStatus.Pending); tasks.Select(t => t.AssigneeId).Should().BeEquivalentTo(new Guid?[] { user1, user2, user3 }); } [Fact] public async Task ProcessCcNode_PropagatesImmediatelyWithoutWaiting() { var user1 = Guid.NewGuid(); var ccNode = CreateNode(NodeType.Cc); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(ccNode, nextNode); ccNode.Config = $"{{ \"recipients\": [\"{user1}\"] }}"; var (_, instance) = PersistSetup([ccNode, nextNode], [edge]); var token = PersistToken(instance, ccNode); await _engine.ProcessNodeAsync(instance, token, ccNode); var propagatedTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.NodeId == nextNode.Id && t.Status == TokenStatus.Active) .ToListAsync(); propagatedTokens.Should().HaveCount(1); var ccTasks = await _dbContext.WorkflowTasks .Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc) .ToListAsync(); ccTasks.Should().HaveCount(1); ccTasks[0].Status.Should().Be(TaskStatus.Pending); } // ============================================================ // Condition Gateway (Exclusive Gateway) // ============================================================ [Fact] public async Task ProcessConditionNode_TakesMatchingBranch() { var conditionNode = CreateNode(NodeType.Condition); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000"); var edgeB = CreateEdge(conditionNode, branchB, condition: "amount <= 1000"); var (_, instance) = PersistSetup([conditionNode, branchA, branchB], [edgeA, edgeB]); instance.Variables = """{ "amount": 1500 }"""; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, conditionNode); await _engine.ProcessNodeAsync(instance, token, conditionNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(branchA.Id); } [Fact] public async Task ProcessConditionNode_NoMatchingBranch_ThrowsBusinessException() { var conditionNode = CreateNode(NodeType.Condition); var branchA = CreateNode(NodeType.Approval); var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 5000"); var (_, instance) = PersistSetup([conditionNode, branchA], [edgeA]); instance.Variables = """{ "amount": 100 }"""; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, conditionNode); var act = () => _engine.ProcessNodeAsync(instance, token, conditionNode); await act.Should().ThrowAsync() .WithMessage("*condition*match*"); } [Fact] public async Task ProcessConditionNode_FirstMatchingBranchWins_WhenMultipleMatch() { var conditionNode = CreateNode(NodeType.Condition); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var branchC = CreateNode(NodeType.Approval); var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000", order: 1); var edgeB = CreateEdge(conditionNode, branchB, condition: "amount > 500", order: 2); var edgeC = CreateEdge(conditionNode, branchC, condition: "amount > 2000", order: 3); var (_, instance) = PersistSetup( [conditionNode, branchA, branchB, branchC], [edgeA, edgeB, edgeC]); instance.Variables = """{ "amount": 1500 }"""; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, conditionNode); await _engine.ProcessNodeAsync(instance, token, conditionNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(branchA.Id); } // ============================================================ // Parallel Gateway (Fork) // ============================================================ [Fact] public async Task ProcessParallelFork_CreatesOneTokenPerOutgoingEdge() { var forkNode = CreateNode(NodeType.Parallel); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var edgeA = CreateEdge(forkNode, branchA); var edgeB = CreateEdge(forkNode, branchB); var (_, instance) = PersistSetup([forkNode, branchA, branchB], [edgeA, edgeB]); var token = PersistToken(instance, forkNode); await _engine.ProcessNodeAsync(instance, token, forkNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(2); tokens.Select(t => t.NodeId).Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id }); } [Fact] public async Task ProcessParallelFork_With3Edges_Creates3Tokens() { var forkNode = CreateNode(NodeType.Parallel); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var branchC = CreateNode(NodeType.Approval); var edgeA = CreateEdge(forkNode, branchA); var edgeB = CreateEdge(forkNode, branchB); var edgeC = CreateEdge(forkNode, branchC); var (_, instance) = PersistSetup( [forkNode, branchA, branchB, branchC], [edgeA, edgeB, edgeC]); var token = PersistToken(instance, forkNode); await _engine.ProcessNodeAsync(instance, token, forkNode); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(3); tokens.Select(t => t.NodeId) .Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id, branchC.Id }); } // ============================================================ // Parallel Gateway (Join) // ============================================================ [Fact] public async Task ProcessParallelJoin_WaitsForAllTokens_WhenNotAllArrived() { var joinNode = CreateNode(NodeType.Parallel); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var branchC = CreateNode(NodeType.Approval); var edgeA = CreateEdge(branchA, joinNode); var edgeB = CreateEdge(branchB, joinNode); var edgeC = CreateEdge(branchC, joinNode); var (_, instance) = PersistSetup( [joinNode, branchA, branchB, branchC], [edgeA, edgeB, edgeC]); var token1 = PersistToken(instance, joinNode, TokenStatus.Active); var token2 = PersistToken(instance, joinNode, TokenStatus.Active); await _engine.ProcessNodeAsync(instance, token2, joinNode); var activeTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); activeTokens.Should().HaveCount(2); activeTokens.Should().OnlyContain(t => t.NodeId == joinNode.Id); } [Fact] public async Task ProcessParallelJoin_MergesAllTokens_WhenAllArrived() { var joinNode = CreateNode(NodeType.Parallel); var nextNode = CreateNode(NodeType.Approval); var branchA = CreateNode(NodeType.Approval); var branchB = CreateNode(NodeType.Approval); var edgeA = CreateEdge(branchA, joinNode); var edgeB = CreateEdge(branchB, joinNode); var edgeOut = CreateEdge(joinNode, nextNode); var (_, instance) = PersistSetup( [joinNode, nextNode, branchA, branchB], [edgeA, edgeB, edgeOut]); var token1 = PersistToken(instance, joinNode, TokenStatus.Active); var token2 = PersistToken(instance, joinNode, TokenStatus.Active); await _engine.ProcessNodeAsync(instance, token2, joinNode); var consumedTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Consumed) .ToListAsync(); consumedTokens.Should().HaveCount(2); var activeTokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active) .ToListAsync(); activeTokens.Should().HaveCount(1); activeTokens[0].NodeId.Should().Be(nextNode.Id); } // ============================================================ // End Node // ============================================================ [Fact] public async Task ProcessEndNode_ConsumesToken() { var endNode = CreateNode(NodeType.End); var (_, instance) = PersistSetup([endNode], []); var token = PersistToken(instance, endNode); await _engine.ProcessNodeAsync(instance, token, endNode); token.Status.Should().Be(TokenStatus.Consumed); } [Fact] public async Task ProcessEndNode_CompletesInstance_WhenAllTokensConsumed() { var endNode = CreateNode(NodeType.End); var (_, instance) = PersistSetup([endNode], []); instance.Status = InstanceStatus.Running; await _dbContext.SaveChangesAsync(); var token = PersistToken(instance, endNode); await _engine.ProcessNodeAsync(instance, token, endNode); instance.Status.Should().Be(InstanceStatus.Completed); } [Fact] public async Task ProcessEndNode_DoesNotCompleteInstance_WhenOtherTokensActive() { var endNode = CreateNode(NodeType.End); var approvalNode = CreateNode(NodeType.Approval); var (_, instance) = PersistSetup([endNode, approvalNode], []); instance.Status = InstanceStatus.Running; await _dbContext.SaveChangesAsync(); var endToken = PersistToken(instance, endNode, TokenStatus.Active); var activeToken = PersistToken(instance, approvalNode, TokenStatus.Active); await _engine.ProcessNodeAsync(instance, endToken, endNode); endToken.Status.Should().Be(TokenStatus.Consumed); activeToken.Status.Should().Be(TokenStatus.Active); instance.Status.Should().Be(InstanceStatus.Running); } // ============================================================ // SubProcess // ============================================================ [Fact] public async Task ProcessSubProcessNode_CreatesChildInstance() { var subDefId = Guid.NewGuid(); var subProcessNode = CreateNode(NodeType.SubProcess); subProcessNode.Config = $"{{ \"definitionId\": \"{subDefId}\" }}"; var (_, instance) = PersistSetup([subProcessNode], []); var token = PersistToken(instance, subProcessNode); await _engine.ProcessNodeAsync(instance, token, subProcessNode); var childInstance = await _dbContext.WorkflowInstances .FirstOrDefaultAsync(i => i.ParentInstanceId == instance.Id); childInstance.Should().NotBeNull(); childInstance!.DefinitionId.Should().Be(subDefId); childInstance.Status.Should().Be(InstanceStatus.Running); } [Fact] public async Task SubProcessComplete_PropagatesTokenInParent() { var subProcessNode = CreateNode(NodeType.SubProcess); var nextNode = CreateNode(NodeType.Approval); var edge = CreateEdge(subProcessNode, nextNode); subProcessNode.Config = """{ "definitionId": "00000000-0000-0000-0000-000000000001" }"""; var (_, parentInstance) = PersistSetup([subProcessNode, nextNode], [edge]); var parentToken = PersistToken(parentInstance, subProcessNode); var childInstance = new WorkflowInstance { Id = Guid.NewGuid(), DefinitionId = Guid.Parse("00000000-0000-0000-0000-000000000001"), ParentInstanceId = parentInstance.Id, ParentTokenId = parentToken.Id, Status = InstanceStatus.Completed, Variables = "{}", }; _dbContext.WorkflowInstances.Add(childInstance); await _dbContext.SaveChangesAsync(); await _engine.HandleSubProcessCompletionAsync(childInstance); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == parentInstance.Id && t.Status == TokenStatus.Active) .ToListAsync(); tokens.Should().HaveCount(1); tokens[0].NodeId.Should().Be(nextNode.Id); } // ============================================================ // Node Actions (Hooks) // ============================================================ [Fact] public async Task ProcessApprovalNode_ExecutesOnEnterAction() { var approvalNode = CreateNode(NodeType.Approval); approvalNode.Config = """{ "assigneeRule": "role:user", "onEnter": "send-notification" }"""; var (_, instance) = PersistSetup([approvalNode], []); var token = PersistToken(instance, approvalNode); var actionExecuted = false; RegisterKeyedAction("send-notification", new TestAction(() => actionExecuted = true)); await _engine.ProcessNodeAsync(instance, token, approvalNode); actionExecuted.Should().BeTrue("the onEnter action should be executed when entering an approval node"); } [Fact] public async Task CompleteTask_Approved_ExecutesOnApprovedAction() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "log-approval" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); var actionExecuted = false; RegisterKeyedAction("log-approval", new TestAction(() => actionExecuted = true)); await _engine.CompleteTaskAsync(task, TaskResult.Approved); actionExecuted.Should().BeTrue("the onApproved action should be executed when task is approved"); } [Fact] public async Task CompleteTask_Rejected_ExecutesOnRejectedAction() { var approvalNode = CreateNode(NodeType.Approval); var rejectNode = CreateNode(NodeType.Approval); var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected); approvalNode.Config = """{ "assigneeRule": "role:user", "onRejected": "notify-rejection" }"""; var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); var actionExecuted = false; RegisterKeyedAction("notify-rejection", new TestAction(() => actionExecuted = true)); await _engine.CompleteTaskAsync(task, TaskResult.Rejected); actionExecuted.Should().BeTrue("the onRejected action should be executed when task is rejected"); } [Fact] public async Task ActionFailure_DoesNotBlockTokenPropagation() { var approvalNode = CreateNode(NodeType.Approval); var nextNode = CreateNode(NodeType.Approval); var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved); approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "failing-action" }"""; var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]); var token = PersistToken(instance, approvalNode); var task = PersistTask(instance, token, approvalNode); RegisterKeyedAction("failing-action", new ThrowingAction()); await _engine.CompleteTaskAsync(task, TaskResult.Approved); var tokens = await _dbContext.WorkflowTokens .Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id) .ToListAsync(); tokens.Should().HaveCount(1); } // ============================================================ // Test Helpers // ============================================================ private static WorkflowNode CreateNode(NodeType type) { return new WorkflowNode { Id = Guid.NewGuid(), NodeType = type, Config = "{}", }; } private static WorkflowEdge CreateEdge( WorkflowNode source, WorkflowNode target, EdgeType? edgeType = null, string? condition = null, int order = 0) { return new WorkflowEdge { Id = Guid.NewGuid(), SourceNodeId = source.Id, TargetNodeId = target.Id, EdgeType = edgeType ?? EdgeType.Normal, Condition = condition, Order = order, }; } private (WorkflowDefinition Definition, WorkflowInstance Instance) PersistSetup( List nodes, List edges) { var definition = new WorkflowDefinition { Id = Guid.NewGuid() }; foreach (var node in nodes) { node.DefinitionId = definition.Id; _dbContext.WorkflowNodes.Add(node); } foreach (var edge in edges) { edge.DefinitionId = definition.Id; _dbContext.WorkflowEdges.Add(edge); } _dbContext.WorkflowDefinitions.Add(definition); var instance = new WorkflowInstance { Id = Guid.NewGuid(), DefinitionId = definition.Id, Status = InstanceStatus.Pending, Variables = "{}", }; _dbContext.WorkflowInstances.Add(instance); _dbContext.SaveChanges(); return (definition, instance); } private WorkflowToken PersistToken( WorkflowInstance instance, WorkflowNode node, TokenStatus status = TokenStatus.Active) { var token = new WorkflowToken { Id = Guid.NewGuid(), InstanceId = instance.Id, NodeId = node.Id, Status = status, }; _dbContext.WorkflowTokens.Add(token); _dbContext.SaveChanges(); return token; } private WorkflowTask PersistTask( WorkflowInstance instance, WorkflowToken token, WorkflowNode node) { var task = new WorkflowTask { Id = Guid.NewGuid(), InstanceId = instance.Id, TokenId = token.Id, NodeId = node.Id, Type = TaskType.Approval, Status = TaskStatus.Pending, }; _dbContext.WorkflowTasks.Add(task); _dbContext.SaveChanges(); return task; } private class TestAction : INodeAction { private readonly Action _callback; public TestAction(Action callback) { _callback = callback; } public Task ExecuteAsync(NodeActionContext context) { _callback(); return Task.CompletedTask; } } private class ThrowingAction : INodeAction { public Task ExecuteAsync(NodeActionContext context) { throw new InvalidOperationException("Simulated action failure"); } } }