work-flow/tests/Workflow.Tests/Engine/ProcessEngineTests.cs
向宁 9f878286e7 feat: form versioning, notification center, scheduler and webhooks
- Add FormDefinitionVersion with compare/versions endpoints and schema differ
- Add Notification entity, endpoints and application features
- Add Scheduler (timeout) and WebhookDispatcher services
- Add FormDataValidator/FieldPermissionEvaluator/ReactionEvaluator
- Add workflow task mark-read, CC support and SystemUserContext
- Add EF migrations for form versions and notifications
- Add unit tests for form schema, notifications, scheduler and serialization
2026-06-14 15:03:11 +08:00

1050 lines
40 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

namespace Workflow.Tests.Engine;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using Workflow.Application.Engine;
using Workflow.Domain.Entities;
using Workflow.Domain.Enums;
using Workflow.Domain.Expressions;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
using Microsoft.EntityFrameworkCore;
using Xunit;
public class ProcessEngineTests
{
private readonly WorkflowDbContext _dbContext;
private IServiceProvider _serviceProvider;
private ProcessEngine _engine;
public ProcessEngineTests()
{
var options = new DbContextOptionsBuilder<WorkflowDbContext>()
.UseInMemoryDatabase(Guid.NewGuid().ToString())
.Options;
_dbContext = new WorkflowDbContext(options);
_serviceProvider = new ServiceCollection().BuildServiceProvider();
_engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator());
}
private void RegisterKeyedAction(string name, INodeAction action)
{
var services = new ServiceCollection();
services.AddKeyedSingleton<INodeAction>(name, (_, _) => action);
_serviceProvider = services.BuildServiceProvider();
_engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator());
}
// ============================================================
// Start Node
// ============================================================
[Fact]
public async Task ProcessStartNode_CreatesSingleToken_AndPropagatesToNextNode()
{
var startNode = CreateNode(NodeType.Start);
var nextNode = CreateNode(NodeType.Approval);
var edge = CreateEdge(startNode, nextNode);
var (_, instance) = PersistSetup([startNode, nextNode], [edge]);
await _engine.StartAsync(instance);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().HaveCount(1);
tokens[0].NodeId.Should().Be(nextNode.Id);
instance.Status.Should().Be(InstanceStatus.Running);
}
[Fact]
public async Task ProcessStartNode_ThrowsWhenNoOutgoingEdge()
{
var startNode = CreateNode(NodeType.Start);
var (_, instance) = PersistSetup([startNode], []);
var act = () => _engine.StartAsync(instance);
await act.Should().ThrowAsync<BusinessException>()
.WithMessage("*start*edge*");
}
// ============================================================
// Approval Node
// ============================================================
[Fact]
public async Task ProcessApprovalNode_CreatesTaskForAssignee()
{
var userId = Guid.NewGuid();
var approvalNode = CreateNode(NodeType.Approval);
approvalNode.Config = $"{{ \"assigneeRule\": \"user:{userId}\" }}";
var (_, instance) = PersistSetup([approvalNode], []);
var token = PersistToken(instance, approvalNode);
await _engine.ProcessNodeAsync(instance, token, approvalNode);
var tasks = await _dbContext.WorkflowTasks
.Where(t => t.InstanceId == instance.Id)
.ToListAsync();
tasks.Should().HaveCount(1);
tasks[0].AssigneeId.Should().Be(userId);
tasks[0].Type.Should().Be(TaskType.Approval);
tasks[0].Status.Should().Be(TaskStatus.Pending);
}
[Fact]
public async Task ProcessApprovalNode_CreatesTaskForRole()
{
var approvalNode = CreateNode(NodeType.Approval);
approvalNode.Config = """{ "assigneeRule": "role:manager" }""";
var (_, instance) = PersistSetup([approvalNode], []);
var token = PersistToken(instance, approvalNode);
await _engine.ProcessNodeAsync(instance, token, approvalNode);
var tasks = await _dbContext.WorkflowTasks
.Where(t => t.InstanceId == instance.Id)
.ToListAsync();
tasks.Should().HaveCount(1);
tasks[0].AssigneeRole.Should().Be("manager");
tasks[0].Type.Should().Be(TaskType.Approval);
tasks[0].Status.Should().Be(TaskStatus.Pending);
}
[Fact]
public async Task ProcessApprovalNode_SetsDueAtFromTimeoutMinutesConfig()
{
var userId = Guid.NewGuid();
var approvalNode = CreateNode(NodeType.Approval);
approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\", \"timeoutMinutes\": 1440 }";
var (_, instance) = PersistSetup([approvalNode], []);
var token = PersistToken(instance, approvalNode);
var before = DateTime.UtcNow;
await _engine.ProcessNodeAsync(instance, token, approvalNode);
var after = DateTime.UtcNow;
var task = await _dbContext.WorkflowTasks.SingleAsync(t => t.InstanceId == instance.Id);
task.DueAt.Should().NotBeNull();
// DueAt 应在 before+1440min 到 after+1440min 之间(容忍执行耗时)
task.DueAt.Should().BeOnOrAfter(before.AddMinutes(1440)).And.BeOnOrBefore(after.AddMinutes(1440).AddSeconds(1));
}
[Fact]
public async Task ProcessApprovalNode_NoTimeoutConfig_DueAtIsNull()
{
var userId = Guid.NewGuid();
var approvalNode = CreateNode(NodeType.Approval);
approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\" }";
var (_, instance) = PersistSetup([approvalNode], []);
var token = PersistToken(instance, approvalNode);
await _engine.ProcessNodeAsync(instance, token, approvalNode);
var task = await _dbContext.WorkflowTasks.SingleAsync(t => t.InstanceId == instance.Id);
task.DueAt.Should().BeNull();
}
[Fact]
public async Task ProcessApprovalNode_TriggersTaskArrivedNotificationWhenServiceRegistered()
{
// 注册 INotificationService验证任务到达时通知被触发
var capture = new NotificationCaptureService();
var services = new ServiceCollection();
services.AddSingleton<Workflow.Application.Notifications.INotificationService>(capture);
_serviceProvider = services.BuildServiceProvider();
_engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator());
var userId = Guid.NewGuid();
var approvalNode = CreateNode(NodeType.Approval);
approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\" }";
var (_, inst) = PersistSetup([approvalNode], []);
var tok = PersistToken(inst, approvalNode);
await _engine.ProcessNodeAsync(inst, tok, approvalNode);
capture.ArrivedTasks.Should().HaveCount(1);
capture.ArrivedTasks[0].AssigneeId.Should().Be(userId);
}
[Fact]
public async Task CompleteTask_Approved_PropagatesTokenAlongApprovedEdge()
{
var approvalNode = CreateNode(NodeType.Approval);
var nextNode = CreateNode(NodeType.Approval);
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
var token = PersistToken(instance, approvalNode);
var task = PersistTask(instance, token, approvalNode);
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().ContainSingle(t => t.NodeId == nextNode.Id);
}
[Fact]
public async Task CompleteTask_Rejected_PropagatesTokenAlongRejectedEdge()
{
var approvalNode = CreateNode(NodeType.Approval);
var rejectNode = CreateNode(NodeType.Approval);
var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected);
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]);
var token = PersistToken(instance, approvalNode);
var task = PersistTask(instance, token, approvalNode);
await _engine.CompleteTaskAsync(task, TaskResult.Rejected);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().ContainSingle(t => t.NodeId == rejectNode.Id);
}
[Fact]
public async Task CompleteTask_Rejected_WithNoRejectEdge_ThrowsBusinessException()
{
var approvalNode = CreateNode(NodeType.Approval);
var nextNode = CreateNode(NodeType.Approval);
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
var token = PersistToken(instance, approvalNode);
var task = PersistTask(instance, token, approvalNode);
var act = () => _engine.CompleteTaskAsync(task, TaskResult.Rejected);
await act.Should().ThrowAsync<BusinessException>()
.WithMessage("*reject*edge*");
}
// ============================================================
// Cc Node
// ============================================================
[Fact]
public async Task ProcessCcNode_CreatesCcTasks_ForAllRecipients()
{
var user1 = Guid.NewGuid();
var user2 = Guid.NewGuid();
var user3 = Guid.NewGuid();
var ccNode = CreateNode(NodeType.Cc);
var nextNode = CreateNode(NodeType.Approval);
var edge = CreateEdge(ccNode, nextNode);
ccNode.Config = $"{{ \"recipients\": [\"{user1}\", \"{user2}\", \"{user3}\"] }}";
var (_, instance) = PersistSetup([ccNode, nextNode], [edge]);
var token = PersistToken(instance, ccNode);
await _engine.ProcessNodeAsync(instance, token, ccNode);
var tasks = await _dbContext.WorkflowTasks
.Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc)
.ToListAsync();
tasks.Should().HaveCount(3);
tasks.Should().OnlyContain(t => t.Status == TaskStatus.Pending);
tasks.Select(t => t.AssigneeId).Should().BeEquivalentTo(new Guid?[] { user1, user2, user3 });
}
[Fact]
public async Task ProcessCcNode_PropagatesImmediatelyWithoutWaiting()
{
var user1 = Guid.NewGuid();
var ccNode = CreateNode(NodeType.Cc);
var nextNode = CreateNode(NodeType.Approval);
var edge = CreateEdge(ccNode, nextNode);
ccNode.Config = $"{{ \"recipients\": [\"{user1}\"] }}";
var (_, instance) = PersistSetup([ccNode, nextNode], [edge]);
var token = PersistToken(instance, ccNode);
await _engine.ProcessNodeAsync(instance, token, ccNode);
var propagatedTokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id
&& t.NodeId == nextNode.Id
&& t.Status == TokenStatus.Active)
.ToListAsync();
propagatedTokens.Should().HaveCount(1);
var ccTasks = await _dbContext.WorkflowTasks
.Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc)
.ToListAsync();
ccTasks.Should().HaveCount(1);
ccTasks[0].Status.Should().Be(TaskStatus.Pending);
}
// ============================================================
// Condition Gateway (Exclusive Gateway)
// ============================================================
[Fact]
public async Task ProcessConditionNode_TakesMatchingBranch()
{
var conditionNode = CreateNode(NodeType.Condition);
var branchA = CreateNode(NodeType.Approval);
var branchB = CreateNode(NodeType.Approval);
var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000");
var edgeB = CreateEdge(conditionNode, branchB, condition: "amount <= 1000");
var (_, instance) = PersistSetup([conditionNode, branchA, branchB], [edgeA, edgeB]);
instance.Variables = """{ "amount": 1500 }""";
await _dbContext.SaveChangesAsync();
var token = PersistToken(instance, conditionNode);
await _engine.ProcessNodeAsync(instance, token, conditionNode);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().HaveCount(1);
tokens[0].NodeId.Should().Be(branchA.Id);
}
[Fact]
public async Task ProcessConditionNode_NoMatchingBranch_ThrowsBusinessException()
{
var conditionNode = CreateNode(NodeType.Condition);
var branchA = CreateNode(NodeType.Approval);
var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 5000");
var (_, instance) = PersistSetup([conditionNode, branchA], [edgeA]);
instance.Variables = """{ "amount": 100 }""";
await _dbContext.SaveChangesAsync();
var token = PersistToken(instance, conditionNode);
var act = () => _engine.ProcessNodeAsync(instance, token, conditionNode);
await act.Should().ThrowAsync<BusinessException>()
.WithMessage("*condition*match*");
}
[Fact]
public async Task ProcessConditionNode_FirstMatchingBranchWins_WhenMultipleMatch()
{
var conditionNode = CreateNode(NodeType.Condition);
var branchA = CreateNode(NodeType.Approval);
var branchB = CreateNode(NodeType.Approval);
var branchC = CreateNode(NodeType.Approval);
var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000", order: 1);
var edgeB = CreateEdge(conditionNode, branchB, condition: "amount > 500", order: 2);
var edgeC = CreateEdge(conditionNode, branchC, condition: "amount > 2000", order: 3);
var (_, instance) = PersistSetup(
[conditionNode, branchA, branchB, branchC],
[edgeA, edgeB, edgeC]);
instance.Variables = """{ "amount": 1500 }""";
await _dbContext.SaveChangesAsync();
var token = PersistToken(instance, conditionNode);
await _engine.ProcessNodeAsync(instance, token, conditionNode);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().HaveCount(1);
tokens[0].NodeId.Should().Be(branchA.Id);
}
// ============================================================
// Parallel Gateway (Fork)
// ============================================================
[Fact]
public async Task ProcessParallelFork_CreatesOneTokenPerOutgoingEdge()
{
var forkNode = CreateNode(NodeType.Parallel);
var branchA = CreateNode(NodeType.Approval);
var branchB = CreateNode(NodeType.Approval);
var edgeA = CreateEdge(forkNode, branchA);
var edgeB = CreateEdge(forkNode, branchB);
var (_, instance) = PersistSetup([forkNode, branchA, branchB], [edgeA, edgeB]);
var token = PersistToken(instance, forkNode);
await _engine.ProcessNodeAsync(instance, token, forkNode);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().HaveCount(2);
tokens.Select(t => t.NodeId).Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id });
}
[Fact]
public async Task ProcessParallelFork_With3Edges_Creates3Tokens()
{
var forkNode = CreateNode(NodeType.Parallel);
var branchA = CreateNode(NodeType.Approval);
var branchB = CreateNode(NodeType.Approval);
var branchC = CreateNode(NodeType.Approval);
var edgeA = CreateEdge(forkNode, branchA);
var edgeB = CreateEdge(forkNode, branchB);
var edgeC = CreateEdge(forkNode, branchC);
var (_, instance) = PersistSetup(
[forkNode, branchA, branchB, branchC],
[edgeA, edgeB, edgeC]);
var token = PersistToken(instance, forkNode);
await _engine.ProcessNodeAsync(instance, token, forkNode);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().HaveCount(3);
tokens.Select(t => t.NodeId)
.Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id, branchC.Id });
}
// ============================================================
// Parallel Gateway (Join)
// ============================================================
[Fact]
public async Task ProcessParallelJoin_WaitsForAllTokens_WhenNotAllArrived()
{
var joinNode = CreateNode(NodeType.Parallel);
var branchA = CreateNode(NodeType.Approval);
var branchB = CreateNode(NodeType.Approval);
var branchC = CreateNode(NodeType.Approval);
var edgeA = CreateEdge(branchA, joinNode);
var edgeB = CreateEdge(branchB, joinNode);
var edgeC = CreateEdge(branchC, joinNode);
var (_, instance) = PersistSetup(
[joinNode, branchA, branchB, branchC],
[edgeA, edgeB, edgeC]);
var token1 = PersistToken(instance, joinNode, TokenStatus.Active);
var token2 = PersistToken(instance, joinNode, TokenStatus.Active);
await _engine.ProcessNodeAsync(instance, token2, joinNode);
var activeTokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
activeTokens.Should().HaveCount(2);
activeTokens.Should().OnlyContain(t => t.NodeId == joinNode.Id);
}
[Fact]
public async Task ProcessParallelJoin_MergesAllTokens_WhenAllArrived()
{
var joinNode = CreateNode(NodeType.Parallel);
var nextNode = CreateNode(NodeType.Approval);
var branchA = CreateNode(NodeType.Approval);
var branchB = CreateNode(NodeType.Approval);
var edgeA = CreateEdge(branchA, joinNode);
var edgeB = CreateEdge(branchB, joinNode);
var edgeOut = CreateEdge(joinNode, nextNode);
var (_, instance) = PersistSetup(
[joinNode, nextNode, branchA, branchB],
[edgeA, edgeB, edgeOut]);
var token1 = PersistToken(instance, joinNode, TokenStatus.Active);
var token2 = PersistToken(instance, joinNode, TokenStatus.Active);
await _engine.ProcessNodeAsync(instance, token2, joinNode);
var consumedTokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Consumed)
.ToListAsync();
consumedTokens.Should().HaveCount(2);
var activeTokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
activeTokens.Should().HaveCount(1);
activeTokens[0].NodeId.Should().Be(nextNode.Id);
}
// ============================================================
// End Node
// ============================================================
[Fact]
public async Task ProcessEndNode_ConsumesToken()
{
var endNode = CreateNode(NodeType.End);
var (_, instance) = PersistSetup([endNode], []);
var token = PersistToken(instance, endNode);
await _engine.ProcessNodeAsync(instance, token, endNode);
token.Status.Should().Be(TokenStatus.Consumed);
}
[Fact]
public async Task ProcessEndNode_CompletesInstance_WhenAllTokensConsumed()
{
var endNode = CreateNode(NodeType.End);
var (_, instance) = PersistSetup([endNode], []);
instance.Status = InstanceStatus.Running;
await _dbContext.SaveChangesAsync();
var token = PersistToken(instance, endNode);
await _engine.ProcessNodeAsync(instance, token, endNode);
instance.Status.Should().Be(InstanceStatus.Completed);
}
[Fact]
public async Task ProcessEndNode_DoesNotCompleteInstance_WhenOtherTokensActive()
{
var endNode = CreateNode(NodeType.End);
var approvalNode = CreateNode(NodeType.Approval);
var (_, instance) = PersistSetup([endNode, approvalNode], []);
instance.Status = InstanceStatus.Running;
await _dbContext.SaveChangesAsync();
var endToken = PersistToken(instance, endNode, TokenStatus.Active);
var activeToken = PersistToken(instance, approvalNode, TokenStatus.Active);
await _engine.ProcessNodeAsync(instance, endToken, endNode);
endToken.Status.Should().Be(TokenStatus.Consumed);
activeToken.Status.Should().Be(TokenStatus.Active);
instance.Status.Should().Be(InstanceStatus.Running);
}
// ============================================================
// SubProcess
// ============================================================
[Fact]
public async Task ProcessSubProcessNode_CreatesChildInstance()
{
var subDefId = Guid.NewGuid();
var subProcessNode = CreateNode(NodeType.SubProcess);
subProcessNode.Config = $"{{ \"definitionId\": \"{subDefId}\" }}";
var (_, instance) = PersistSetup([subProcessNode], []);
var token = PersistToken(instance, subProcessNode);
await _engine.ProcessNodeAsync(instance, token, subProcessNode);
var childInstance = await _dbContext.WorkflowInstances
.FirstOrDefaultAsync(i => i.ParentInstanceId == instance.Id);
childInstance.Should().NotBeNull();
childInstance!.DefinitionId.Should().Be(subDefId);
childInstance.Status.Should().Be(InstanceStatus.Running);
}
[Fact]
public async Task SubProcessComplete_PropagatesTokenInParent()
{
var subProcessNode = CreateNode(NodeType.SubProcess);
var nextNode = CreateNode(NodeType.Approval);
var edge = CreateEdge(subProcessNode, nextNode);
subProcessNode.Config = """{ "definitionId": "00000000-0000-0000-0000-000000000001" }""";
var (_, parentInstance) = PersistSetup([subProcessNode, nextNode], [edge]);
var parentToken = PersistToken(parentInstance, subProcessNode);
var childInstance = new WorkflowInstance
{
Id = Guid.NewGuid(),
DefinitionId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
ParentInstanceId = parentInstance.Id,
ParentTokenId = parentToken.Id,
Status = InstanceStatus.Completed,
Variables = "{}",
};
_dbContext.WorkflowInstances.Add(childInstance);
await _dbContext.SaveChangesAsync();
await _engine.HandleSubProcessCompletionAsync(childInstance);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == parentInstance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
tokens.Should().HaveCount(1);
tokens[0].NodeId.Should().Be(nextNode.Id);
}
// ============================================================
// Node Actions (Hooks)
// ============================================================
[Fact]
public async Task ProcessApprovalNode_ExecutesOnEnterAction()
{
var approvalNode = CreateNode(NodeType.Approval);
approvalNode.Config = """{ "assigneeRule": "role:user", "onEnter": "send-notification" }""";
var (_, instance) = PersistSetup([approvalNode], []);
var token = PersistToken(instance, approvalNode);
var actionExecuted = false;
RegisterKeyedAction("send-notification", new TestAction(() => actionExecuted = true));
await _engine.ProcessNodeAsync(instance, token, approvalNode);
actionExecuted.Should().BeTrue("the onEnter action should be executed when entering an approval node");
}
[Fact]
public async Task CompleteTask_Approved_ExecutesOnApprovedAction()
{
var approvalNode = CreateNode(NodeType.Approval);
var nextNode = CreateNode(NodeType.Approval);
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "log-approval" }""";
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
var token = PersistToken(instance, approvalNode);
var task = PersistTask(instance, token, approvalNode);
var actionExecuted = false;
RegisterKeyedAction("log-approval", new TestAction(() => actionExecuted = true));
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
actionExecuted.Should().BeTrue("the onApproved action should be executed when task is approved");
}
[Fact]
public async Task CompleteTask_Rejected_ExecutesOnRejectedAction()
{
var approvalNode = CreateNode(NodeType.Approval);
var rejectNode = CreateNode(NodeType.Approval);
var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected);
approvalNode.Config = """{ "assigneeRule": "role:user", "onRejected": "notify-rejection" }""";
var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]);
var token = PersistToken(instance, approvalNode);
var task = PersistTask(instance, token, approvalNode);
var actionExecuted = false;
RegisterKeyedAction("notify-rejection", new TestAction(() => actionExecuted = true));
await _engine.CompleteTaskAsync(task, TaskResult.Rejected);
actionExecuted.Should().BeTrue("the onRejected action should be executed when task is rejected");
}
[Fact]
public async Task ActionFailure_DoesNotBlockTokenPropagation()
{
var approvalNode = CreateNode(NodeType.Approval);
var nextNode = CreateNode(NodeType.Approval);
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "failing-action" }""";
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
var token = PersistToken(instance, approvalNode);
var task = PersistTask(instance, token, approvalNode);
RegisterKeyedAction("failing-action", new ThrowingAction());
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
var tokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active
&& t.NodeId == nextNode.Id)
.ToListAsync();
tokens.Should().HaveCount(1);
}
// ============================================================
// Test Helpers
// ============================================================
private static WorkflowNode CreateNode(NodeType type)
{
return new WorkflowNode
{
Id = Guid.NewGuid(),
NodeType = type,
Config = "{}",
};
}
// ============================================================
// Parallel Fork → Approvals → Join: full pipeline
// ============================================================
/// <summary>
/// 端到端:并行 fork 分出两条审批分支,两条均审批通过后汇聚到 join
/// join 仅在两个 token 都到达时才合并并推进到下游。
/// 这是 fork/join 最常出问题的真实路径(既有测试分别测 fork 与 join未覆盖整条链
/// </summary>
[Fact]
public async Task ParallelFork_BothBranchesApproved_JoinsAndPropagatesOnce()
{
var forkNode = CreateNode(NodeType.Parallel);
var approvalA = CreateNode(NodeType.Approval);
approvalA.Config = """{ "assigneeRule": "role:userA" }""";
var approvalB = CreateNode(NodeType.Approval);
approvalB.Config = """{ "assigneeRule": "role:userB" }""";
var joinNode = CreateNode(NodeType.Parallel);
var afterJoin = CreateNode(NodeType.End);
var edgeForkA = CreateEdge(forkNode, approvalA);
var edgeForkB = CreateEdge(forkNode, approvalB);
var edgeAJoin = CreateEdge(approvalA, joinNode);
var edgeBJoin = CreateEdge(approvalB, joinNode);
var edgeJoinOut = CreateEdge(joinNode, afterJoin);
var (_, instance) = PersistSetup(
[forkNode, approvalA, approvalB, joinNode, afterJoin],
[edgeForkA, edgeForkB, edgeAJoin, edgeBJoin, edgeJoinOut]);
var forkToken = PersistToken(instance, forkNode);
// Fork分出两个 token 到两个审批节点
await _engine.ProcessNodeAsync(instance, forkToken, forkNode);
var activeAfterFork = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
activeAfterFork.Should().HaveCount(2);
// 完成分支 A 的审批 → token 推进到 join但 join 应等待
var taskA = await _dbContext.WorkflowTasks.FirstAsync(t => t.NodeId == approvalA.Id);
await _engine.CompleteTaskAsync(taskA, TaskResult.Approved);
var activeAfterA = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
// 分支 A 的 token 已进入 join 等待;分支 B 仍未完成 → 仍 2 个 active1 在 join1 在 approvalB
activeAfterA.Should().HaveCount(2);
activeAfterA.Should().Contain(t => t.NodeId == approvalB.Id);
instance.Status.Should().NotBe(InstanceStatus.Completed);
// 完成分支 B 的审批 → 第二个 token 到达 join满足合并条件
var taskB = await _dbContext.WorkflowTasks.FirstAsync(t => t.NodeId == approvalB.Id);
await _engine.CompleteTaskAsync(taskB, TaskResult.Approved);
// join 合并后产生 1 个 token → 进入 End 节点被消费 → 无残留 Active token实例完成。
// 关键不变量:两个 join 入口 token 都被消费(已合并),且实例确实完成。
var activeAfterB = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
.ToListAsync();
activeAfterB.Should().BeEmpty("End 节点应已消费 join 合并后的 token");
instance.Status.Should().Be(InstanceStatus.Completed);
// join 处的两个入口 token 均应已消费
var joinTokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.NodeId == joinNode.Id)
.ToListAsync();
joinTokens.Should().OnlyContain(t => t.Status == TokenStatus.Consumed);
joinTokens.Should().HaveCount(2);
}
// ============================================================
// SubProcess boundaries
// ============================================================
/// <summary>
/// 边界SubProcess 节点未配置 definitionId 时必须明确拒绝。
/// </summary>
[Fact]
public async Task ProcessSubProcessNode_WithoutDefinitionId_ThrowsBusinessException()
{
var subProcessNode = CreateNode(NodeType.SubProcess);
subProcessNode.Config = "{}"; // 无 definitionId
var (_, instance) = PersistSetup([subProcessNode], []);
var token = PersistToken(instance, subProcessNode);
var act = () => _engine.ProcessNodeAsync(instance, token, subProcessNode);
await act.Should().ThrowAsync<BusinessException>()
.WithMessage("*definitionId*");
}
/// <summary>
/// 边界SubProcess 完成但其节点没有出边时,父 token 应被消费(不残留),
/// 而非静默挂着。锁定 HandleSubProcessCompletionAsync 的当前行为。
/// </summary>
[Fact]
public async Task SubProcessComplete_WithNoOutgoingEdge_ConsumesWaitingToken()
{
var subProcessNode = CreateNode(NodeType.SubProcess);
subProcessNode.Config = """{ "definitionId": "00000000-0000-0000-0000-000000000001" }""";
var (_, parentInstance) = PersistSetup([subProcessNode], []);
var parentToken = PersistToken(parentInstance, subProcessNode);
var childInstance = new WorkflowInstance
{
Id = Guid.NewGuid(),
DefinitionId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
ParentInstanceId = parentInstance.Id,
ParentTokenId = parentToken.Id,
Status = InstanceStatus.Completed,
Variables = "{}",
};
_dbContext.WorkflowInstances.Add(childInstance);
await _dbContext.SaveChangesAsync();
await _engine.HandleSubProcessCompletionAsync(childInstance);
// 父 token 必须被消费,不可残留 Active
var refreshedToken = await _dbContext.WorkflowTokens.FindAsync(parentToken.Id);
refreshedToken!.Status.Should().Be(TokenStatus.Consumed);
}
// ============================================================
// Token Duplicate-Propagation Guard (idempotency)
// ============================================================
/// <summary>
/// 边界:已完成(已审批)的任务不可被再次完成,否则会重复推进 token
/// 产生额外的下游 token 并可能创建重复任务。必须保持幂等/拒绝。
/// </summary>
[Fact]
public async Task CompleteTask_AlreadyCompletedTask_ThrowsOrIsIdempotent()
{
var approvalNode = CreateNode(NodeType.Approval);
var nextNode = CreateNode(NodeType.Approval);
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
var token = PersistToken(instance, approvalNode);
var task = PersistTask(instance, token, approvalNode);
// 第一次完成:正常推进到 nextNode
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
var activeTokensAfterFirst = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id)
.ToListAsync();
activeTokensAfterFirst.Should().HaveCount(1);
// 第二次完成同一任务:必须抛错(或幂等不产生新 token绝不可产生第二个下游 token
var secondAct = () => _engine.CompleteTaskAsync(task, TaskResult.Approved);
await secondAct.Should().ThrowAsync<BusinessException>();
// 仍只有 1 个活跃下游 token未被重复推进
var activeTokensAfterSecond = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id)
.ToListAsync();
activeTokensAfterSecond.Should().HaveCount(1);
}
/// <summary>
/// 边界:同一 Approval 节点的 token 不可被 ProcessNodeAsync 重复处理,
/// 否则会在同一节点创建多份审批任务。第二次处理必须拒绝(或幂等)。
/// </summary>
[Fact]
public async Task ProcessApprovalNode_TokenAlreadyProcessed_ThrowsOrIsIdempotent()
{
var userId = Guid.NewGuid();
var approvalNode = CreateNode(NodeType.Approval);
approvalNode.Config = $"{{ \"assigneeRule\": \"user:{userId}\" }}";
var (_, instance) = PersistSetup([approvalNode], []);
var token = PersistToken(instance, approvalNode);
// 第一次处理:创建 1 个任务
await _engine.ProcessNodeAsync(instance, token, approvalNode);
var tasksAfterFirst = await _dbContext.WorkflowTasks
.Where(t => t.InstanceId == instance.Id)
.ToListAsync();
tasksAfterFirst.Should().HaveCount(1);
// 第二次处理同一 token必须拒绝或幂等不可创建第二个任务
var secondAct = () => _engine.ProcessNodeAsync(instance, token, approvalNode);
await secondAct.Should().ThrowAsync<BusinessException>();
var tasksAfterSecond = await _dbContext.WorkflowTasks
.Where(t => t.InstanceId == instance.Id)
.ToListAsync();
tasksAfterSecond.Should().HaveCount(1);
}
private static WorkflowEdge CreateEdge(
WorkflowNode source,
WorkflowNode target,
EdgeType? edgeType = null,
string? condition = null,
int order = 0)
{
return new WorkflowEdge
{
Id = Guid.NewGuid(),
SourceNodeId = source.Id,
TargetNodeId = target.Id,
EdgeType = edgeType ?? EdgeType.Normal,
Condition = condition,
Order = order,
};
}
private (WorkflowDefinition Definition, WorkflowInstance Instance) PersistSetup(
List<WorkflowNode> nodes,
List<WorkflowEdge> edges)
{
var definition = new WorkflowDefinition { Id = Guid.NewGuid() };
foreach (var node in nodes)
{
node.DefinitionId = definition.Id;
_dbContext.WorkflowNodes.Add(node);
}
foreach (var edge in edges)
{
edge.DefinitionId = definition.Id;
_dbContext.WorkflowEdges.Add(edge);
}
_dbContext.WorkflowDefinitions.Add(definition);
var instance = new WorkflowInstance
{
Id = Guid.NewGuid(),
DefinitionId = definition.Id,
Status = InstanceStatus.Pending,
Variables = "{}",
};
_dbContext.WorkflowInstances.Add(instance);
_dbContext.SaveChanges();
return (definition, instance);
}
private WorkflowToken PersistToken(
WorkflowInstance instance,
WorkflowNode node,
TokenStatus status = TokenStatus.Active)
{
var token = new WorkflowToken
{
Id = Guid.NewGuid(),
InstanceId = instance.Id,
NodeId = node.Id,
Status = status,
};
_dbContext.WorkflowTokens.Add(token);
_dbContext.SaveChanges();
return token;
}
private WorkflowTask PersistTask(
WorkflowInstance instance,
WorkflowToken token,
WorkflowNode node)
{
var task = new WorkflowTask
{
Id = Guid.NewGuid(),
InstanceId = instance.Id,
TokenId = token.Id,
NodeId = node.Id,
Type = TaskType.Approval,
Status = TaskStatus.Pending,
};
_dbContext.WorkflowTasks.Add(task);
_dbContext.SaveChanges();
return task;
}
private class TestAction : INodeAction
{
private readonly Action _callback;
public TestAction(Action callback)
{
_callback = callback;
}
public Task ExecuteAsync(NodeActionContext context)
{
_callback();
return Task.CompletedTask;
}
}
private class ThrowingAction : INodeAction
{
public Task ExecuteAsync(NodeActionContext context)
{
throw new InvalidOperationException("Simulated action failure");
}
}
/// <summary>测试用通知捕获器:记录任务到达/通过/驳回通知,便于断言引擎触发了通知。</summary>
private class NotificationCaptureService : Workflow.Application.Notifications.INotificationService
{
public List<WorkflowTask> ArrivedTasks { get; } = [];
public List<WorkflowTask> ApprovedTasks { get; } = [];
public List<WorkflowTask> RejectedTasks { get; } = [];
public Task NotifyTaskArrivedAsync(WorkflowTask task, CancellationToken ct = default)
{ ArrivedTasks.Add(task); return Task.CompletedTask; }
public Task NotifyTaskApprovedAsync(WorkflowTask task, CancellationToken ct = default)
{ ApprovedTasks.Add(task); return Task.CompletedTask; }
public Task NotifyTaskRejectedAsync(WorkflowTask task, CancellationToken ct = default)
{ RejectedTasks.Add(task); return Task.CompletedTask; }
public Task NotifyTaskUrgedAsync(WorkflowTask task, CancellationToken ct = default) => Task.CompletedTask;
public Task NotifyTaskTimeoutAsync(WorkflowTask task, bool autoApproved, CancellationToken ct = default) => Task.CompletedTask;
}
}