- Add FormDefinitionVersion with compare/versions endpoints and schema differ - Add Notification entity, endpoints and application features - Add Scheduler (timeout) and WebhookDispatcher services - Add FormDataValidator/FieldPermissionEvaluator/ReactionEvaluator - Add workflow task mark-read, CC support and SystemUserContext - Add EF migrations for form versions and notifications - Add unit tests for form schema, notifications, scheduler and serialization
1050 lines
40 KiB
C#
1050 lines
40 KiB
C#
namespace Workflow.Tests.Engine;
|
||
|
||
using FluentAssertions;
|
||
using Microsoft.Extensions.DependencyInjection;
|
||
using Moq;
|
||
using Workflow.Application.Engine;
|
||
using Workflow.Domain.Entities;
|
||
using Workflow.Domain.Enums;
|
||
using Workflow.Domain.Expressions;
|
||
using Workflow.Domain.Exceptions;
|
||
using Workflow.Infrastructure.Persistence;
|
||
using Microsoft.EntityFrameworkCore;
|
||
using Xunit;
|
||
|
||
public class ProcessEngineTests
|
||
{
|
||
private readonly WorkflowDbContext _dbContext;
|
||
private IServiceProvider _serviceProvider;
|
||
private ProcessEngine _engine;
|
||
|
||
public ProcessEngineTests()
|
||
{
|
||
var options = new DbContextOptionsBuilder<WorkflowDbContext>()
|
||
.UseInMemoryDatabase(Guid.NewGuid().ToString())
|
||
.Options;
|
||
|
||
_dbContext = new WorkflowDbContext(options);
|
||
_serviceProvider = new ServiceCollection().BuildServiceProvider();
|
||
_engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator());
|
||
}
|
||
|
||
private void RegisterKeyedAction(string name, INodeAction action)
|
||
{
|
||
var services = new ServiceCollection();
|
||
services.AddKeyedSingleton<INodeAction>(name, (_, _) => action);
|
||
_serviceProvider = services.BuildServiceProvider();
|
||
_engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator());
|
||
}
|
||
|
||
// ============================================================
|
||
// Start Node
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessStartNode_CreatesSingleToken_AndPropagatesToNextNode()
|
||
{
|
||
var startNode = CreateNode(NodeType.Start);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var edge = CreateEdge(startNode, nextNode);
|
||
|
||
var (_, instance) = PersistSetup([startNode, nextNode], [edge]);
|
||
|
||
await _engine.StartAsync(instance);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().HaveCount(1);
|
||
tokens[0].NodeId.Should().Be(nextNode.Id);
|
||
instance.Status.Should().Be(InstanceStatus.Running);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessStartNode_ThrowsWhenNoOutgoingEdge()
|
||
{
|
||
var startNode = CreateNode(NodeType.Start);
|
||
|
||
var (_, instance) = PersistSetup([startNode], []);
|
||
|
||
var act = () => _engine.StartAsync(instance);
|
||
|
||
await act.Should().ThrowAsync<BusinessException>()
|
||
.WithMessage("*start*edge*");
|
||
}
|
||
|
||
// ============================================================
|
||
// Approval Node
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessApprovalNode_CreatesTaskForAssignee()
|
||
{
|
||
var userId = Guid.NewGuid();
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
approvalNode.Config = $"{{ \"assigneeRule\": \"user:{userId}\" }}";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode], []);
|
||
var token = PersistToken(instance, approvalNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, approvalNode);
|
||
|
||
var tasks = await _dbContext.WorkflowTasks
|
||
.Where(t => t.InstanceId == instance.Id)
|
||
.ToListAsync();
|
||
|
||
tasks.Should().HaveCount(1);
|
||
tasks[0].AssigneeId.Should().Be(userId);
|
||
tasks[0].Type.Should().Be(TaskType.Approval);
|
||
tasks[0].Status.Should().Be(TaskStatus.Pending);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessApprovalNode_CreatesTaskForRole()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
approvalNode.Config = """{ "assigneeRule": "role:manager" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode], []);
|
||
var token = PersistToken(instance, approvalNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, approvalNode);
|
||
|
||
var tasks = await _dbContext.WorkflowTasks
|
||
.Where(t => t.InstanceId == instance.Id)
|
||
.ToListAsync();
|
||
|
||
tasks.Should().HaveCount(1);
|
||
tasks[0].AssigneeRole.Should().Be("manager");
|
||
tasks[0].Type.Should().Be(TaskType.Approval);
|
||
tasks[0].Status.Should().Be(TaskStatus.Pending);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessApprovalNode_SetsDueAtFromTimeoutMinutesConfig()
|
||
{
|
||
var userId = Guid.NewGuid();
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\", \"timeoutMinutes\": 1440 }";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode], []);
|
||
var token = PersistToken(instance, approvalNode);
|
||
|
||
var before = DateTime.UtcNow;
|
||
await _engine.ProcessNodeAsync(instance, token, approvalNode);
|
||
var after = DateTime.UtcNow;
|
||
|
||
var task = await _dbContext.WorkflowTasks.SingleAsync(t => t.InstanceId == instance.Id);
|
||
task.DueAt.Should().NotBeNull();
|
||
// DueAt 应在 before+1440min 到 after+1440min 之间(容忍执行耗时)
|
||
task.DueAt.Should().BeOnOrAfter(before.AddMinutes(1440)).And.BeOnOrBefore(after.AddMinutes(1440).AddSeconds(1));
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessApprovalNode_NoTimeoutConfig_DueAtIsNull()
|
||
{
|
||
var userId = Guid.NewGuid();
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\" }";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode], []);
|
||
var token = PersistToken(instance, approvalNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, approvalNode);
|
||
|
||
var task = await _dbContext.WorkflowTasks.SingleAsync(t => t.InstanceId == instance.Id);
|
||
task.DueAt.Should().BeNull();
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessApprovalNode_TriggersTaskArrivedNotificationWhenServiceRegistered()
|
||
{
|
||
// 注册 INotificationService,验证任务到达时通知被触发
|
||
var capture = new NotificationCaptureService();
|
||
var services = new ServiceCollection();
|
||
services.AddSingleton<Workflow.Application.Notifications.INotificationService>(capture);
|
||
_serviceProvider = services.BuildServiceProvider();
|
||
_engine = new ProcessEngine(_dbContext, _serviceProvider, new ConditionEvaluator());
|
||
|
||
var userId = Guid.NewGuid();
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
approvalNode.Config = """{ "assigneeRule": "user:""" + userId + "\" }";
|
||
|
||
var (_, inst) = PersistSetup([approvalNode], []);
|
||
var tok = PersistToken(inst, approvalNode);
|
||
|
||
await _engine.ProcessNodeAsync(inst, tok, approvalNode);
|
||
|
||
capture.ArrivedTasks.Should().HaveCount(1);
|
||
capture.ArrivedTasks[0].AssigneeId.Should().Be(userId);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task CompleteTask_Approved_PropagatesTokenAlongApprovedEdge()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
|
||
|
||
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
|
||
var token = PersistToken(instance, approvalNode);
|
||
var task = PersistTask(instance, token, approvalNode);
|
||
|
||
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().ContainSingle(t => t.NodeId == nextNode.Id);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task CompleteTask_Rejected_PropagatesTokenAlongRejectedEdge()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
var rejectNode = CreateNode(NodeType.Approval);
|
||
var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected);
|
||
|
||
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]);
|
||
var token = PersistToken(instance, approvalNode);
|
||
var task = PersistTask(instance, token, approvalNode);
|
||
|
||
await _engine.CompleteTaskAsync(task, TaskResult.Rejected);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().ContainSingle(t => t.NodeId == rejectNode.Id);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task CompleteTask_Rejected_WithNoRejectEdge_ThrowsBusinessException()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
|
||
|
||
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
|
||
var token = PersistToken(instance, approvalNode);
|
||
var task = PersistTask(instance, token, approvalNode);
|
||
|
||
var act = () => _engine.CompleteTaskAsync(task, TaskResult.Rejected);
|
||
|
||
await act.Should().ThrowAsync<BusinessException>()
|
||
.WithMessage("*reject*edge*");
|
||
}
|
||
|
||
// ============================================================
|
||
// Cc Node
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessCcNode_CreatesCcTasks_ForAllRecipients()
|
||
{
|
||
var user1 = Guid.NewGuid();
|
||
var user2 = Guid.NewGuid();
|
||
var user3 = Guid.NewGuid();
|
||
var ccNode = CreateNode(NodeType.Cc);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var edge = CreateEdge(ccNode, nextNode);
|
||
|
||
ccNode.Config = $"{{ \"recipients\": [\"{user1}\", \"{user2}\", \"{user3}\"] }}";
|
||
|
||
var (_, instance) = PersistSetup([ccNode, nextNode], [edge]);
|
||
var token = PersistToken(instance, ccNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, ccNode);
|
||
|
||
var tasks = await _dbContext.WorkflowTasks
|
||
.Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc)
|
||
.ToListAsync();
|
||
|
||
tasks.Should().HaveCount(3);
|
||
tasks.Should().OnlyContain(t => t.Status == TaskStatus.Pending);
|
||
tasks.Select(t => t.AssigneeId).Should().BeEquivalentTo(new Guid?[] { user1, user2, user3 });
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessCcNode_PropagatesImmediatelyWithoutWaiting()
|
||
{
|
||
var user1 = Guid.NewGuid();
|
||
var ccNode = CreateNode(NodeType.Cc);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var edge = CreateEdge(ccNode, nextNode);
|
||
|
||
ccNode.Config = $"{{ \"recipients\": [\"{user1}\"] }}";
|
||
|
||
var (_, instance) = PersistSetup([ccNode, nextNode], [edge]);
|
||
var token = PersistToken(instance, ccNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, ccNode);
|
||
|
||
var propagatedTokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id
|
||
&& t.NodeId == nextNode.Id
|
||
&& t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
propagatedTokens.Should().HaveCount(1);
|
||
|
||
var ccTasks = await _dbContext.WorkflowTasks
|
||
.Where(t => t.InstanceId == instance.Id && t.Type == TaskType.Cc)
|
||
.ToListAsync();
|
||
|
||
ccTasks.Should().HaveCount(1);
|
||
ccTasks[0].Status.Should().Be(TaskStatus.Pending);
|
||
}
|
||
|
||
// ============================================================
|
||
// Condition Gateway (Exclusive Gateway)
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessConditionNode_TakesMatchingBranch()
|
||
{
|
||
var conditionNode = CreateNode(NodeType.Condition);
|
||
var branchA = CreateNode(NodeType.Approval);
|
||
var branchB = CreateNode(NodeType.Approval);
|
||
|
||
var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000");
|
||
var edgeB = CreateEdge(conditionNode, branchB, condition: "amount <= 1000");
|
||
|
||
var (_, instance) = PersistSetup([conditionNode, branchA, branchB], [edgeA, edgeB]);
|
||
instance.Variables = """{ "amount": 1500 }""";
|
||
await _dbContext.SaveChangesAsync();
|
||
|
||
var token = PersistToken(instance, conditionNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, conditionNode);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().HaveCount(1);
|
||
tokens[0].NodeId.Should().Be(branchA.Id);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessConditionNode_NoMatchingBranch_ThrowsBusinessException()
|
||
{
|
||
var conditionNode = CreateNode(NodeType.Condition);
|
||
var branchA = CreateNode(NodeType.Approval);
|
||
|
||
var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 5000");
|
||
|
||
var (_, instance) = PersistSetup([conditionNode, branchA], [edgeA]);
|
||
instance.Variables = """{ "amount": 100 }""";
|
||
await _dbContext.SaveChangesAsync();
|
||
|
||
var token = PersistToken(instance, conditionNode);
|
||
|
||
var act = () => _engine.ProcessNodeAsync(instance, token, conditionNode);
|
||
|
||
await act.Should().ThrowAsync<BusinessException>()
|
||
.WithMessage("*condition*match*");
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessConditionNode_FirstMatchingBranchWins_WhenMultipleMatch()
|
||
{
|
||
var conditionNode = CreateNode(NodeType.Condition);
|
||
var branchA = CreateNode(NodeType.Approval);
|
||
var branchB = CreateNode(NodeType.Approval);
|
||
var branchC = CreateNode(NodeType.Approval);
|
||
|
||
var edgeA = CreateEdge(conditionNode, branchA, condition: "amount > 1000", order: 1);
|
||
var edgeB = CreateEdge(conditionNode, branchB, condition: "amount > 500", order: 2);
|
||
var edgeC = CreateEdge(conditionNode, branchC, condition: "amount > 2000", order: 3);
|
||
|
||
var (_, instance) = PersistSetup(
|
||
[conditionNode, branchA, branchB, branchC],
|
||
[edgeA, edgeB, edgeC]);
|
||
instance.Variables = """{ "amount": 1500 }""";
|
||
await _dbContext.SaveChangesAsync();
|
||
|
||
var token = PersistToken(instance, conditionNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, conditionNode);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().HaveCount(1);
|
||
tokens[0].NodeId.Should().Be(branchA.Id);
|
||
}
|
||
|
||
// ============================================================
|
||
// Parallel Gateway (Fork)
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessParallelFork_CreatesOneTokenPerOutgoingEdge()
|
||
{
|
||
var forkNode = CreateNode(NodeType.Parallel);
|
||
var branchA = CreateNode(NodeType.Approval);
|
||
var branchB = CreateNode(NodeType.Approval);
|
||
|
||
var edgeA = CreateEdge(forkNode, branchA);
|
||
var edgeB = CreateEdge(forkNode, branchB);
|
||
|
||
var (_, instance) = PersistSetup([forkNode, branchA, branchB], [edgeA, edgeB]);
|
||
var token = PersistToken(instance, forkNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, forkNode);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().HaveCount(2);
|
||
tokens.Select(t => t.NodeId).Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id });
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessParallelFork_With3Edges_Creates3Tokens()
|
||
{
|
||
var forkNode = CreateNode(NodeType.Parallel);
|
||
var branchA = CreateNode(NodeType.Approval);
|
||
var branchB = CreateNode(NodeType.Approval);
|
||
var branchC = CreateNode(NodeType.Approval);
|
||
|
||
var edgeA = CreateEdge(forkNode, branchA);
|
||
var edgeB = CreateEdge(forkNode, branchB);
|
||
var edgeC = CreateEdge(forkNode, branchC);
|
||
|
||
var (_, instance) = PersistSetup(
|
||
[forkNode, branchA, branchB, branchC],
|
||
[edgeA, edgeB, edgeC]);
|
||
var token = PersistToken(instance, forkNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, forkNode);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().HaveCount(3);
|
||
tokens.Select(t => t.NodeId)
|
||
.Should().BeEquivalentTo(new[] { branchA.Id, branchB.Id, branchC.Id });
|
||
}
|
||
|
||
// ============================================================
|
||
// Parallel Gateway (Join)
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessParallelJoin_WaitsForAllTokens_WhenNotAllArrived()
|
||
{
|
||
var joinNode = CreateNode(NodeType.Parallel);
|
||
var branchA = CreateNode(NodeType.Approval);
|
||
var branchB = CreateNode(NodeType.Approval);
|
||
var branchC = CreateNode(NodeType.Approval);
|
||
|
||
var edgeA = CreateEdge(branchA, joinNode);
|
||
var edgeB = CreateEdge(branchB, joinNode);
|
||
var edgeC = CreateEdge(branchC, joinNode);
|
||
|
||
var (_, instance) = PersistSetup(
|
||
[joinNode, branchA, branchB, branchC],
|
||
[edgeA, edgeB, edgeC]);
|
||
|
||
var token1 = PersistToken(instance, joinNode, TokenStatus.Active);
|
||
var token2 = PersistToken(instance, joinNode, TokenStatus.Active);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token2, joinNode);
|
||
|
||
var activeTokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
activeTokens.Should().HaveCount(2);
|
||
activeTokens.Should().OnlyContain(t => t.NodeId == joinNode.Id);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessParallelJoin_MergesAllTokens_WhenAllArrived()
|
||
{
|
||
var joinNode = CreateNode(NodeType.Parallel);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var branchA = CreateNode(NodeType.Approval);
|
||
var branchB = CreateNode(NodeType.Approval);
|
||
|
||
var edgeA = CreateEdge(branchA, joinNode);
|
||
var edgeB = CreateEdge(branchB, joinNode);
|
||
var edgeOut = CreateEdge(joinNode, nextNode);
|
||
|
||
var (_, instance) = PersistSetup(
|
||
[joinNode, nextNode, branchA, branchB],
|
||
[edgeA, edgeB, edgeOut]);
|
||
|
||
var token1 = PersistToken(instance, joinNode, TokenStatus.Active);
|
||
var token2 = PersistToken(instance, joinNode, TokenStatus.Active);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token2, joinNode);
|
||
|
||
var consumedTokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Consumed)
|
||
.ToListAsync();
|
||
|
||
consumedTokens.Should().HaveCount(2);
|
||
|
||
var activeTokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
activeTokens.Should().HaveCount(1);
|
||
activeTokens[0].NodeId.Should().Be(nextNode.Id);
|
||
}
|
||
|
||
// ============================================================
|
||
// End Node
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessEndNode_ConsumesToken()
|
||
{
|
||
var endNode = CreateNode(NodeType.End);
|
||
var (_, instance) = PersistSetup([endNode], []);
|
||
var token = PersistToken(instance, endNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, endNode);
|
||
|
||
token.Status.Should().Be(TokenStatus.Consumed);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessEndNode_CompletesInstance_WhenAllTokensConsumed()
|
||
{
|
||
var endNode = CreateNode(NodeType.End);
|
||
var (_, instance) = PersistSetup([endNode], []);
|
||
instance.Status = InstanceStatus.Running;
|
||
await _dbContext.SaveChangesAsync();
|
||
|
||
var token = PersistToken(instance, endNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, endNode);
|
||
|
||
instance.Status.Should().Be(InstanceStatus.Completed);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ProcessEndNode_DoesNotCompleteInstance_WhenOtherTokensActive()
|
||
{
|
||
var endNode = CreateNode(NodeType.End);
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
|
||
var (_, instance) = PersistSetup([endNode, approvalNode], []);
|
||
instance.Status = InstanceStatus.Running;
|
||
await _dbContext.SaveChangesAsync();
|
||
|
||
var endToken = PersistToken(instance, endNode, TokenStatus.Active);
|
||
var activeToken = PersistToken(instance, approvalNode, TokenStatus.Active);
|
||
|
||
await _engine.ProcessNodeAsync(instance, endToken, endNode);
|
||
|
||
endToken.Status.Should().Be(TokenStatus.Consumed);
|
||
activeToken.Status.Should().Be(TokenStatus.Active);
|
||
instance.Status.Should().Be(InstanceStatus.Running);
|
||
}
|
||
|
||
// ============================================================
|
||
// SubProcess
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessSubProcessNode_CreatesChildInstance()
|
||
{
|
||
var subDefId = Guid.NewGuid();
|
||
var subProcessNode = CreateNode(NodeType.SubProcess);
|
||
subProcessNode.Config = $"{{ \"definitionId\": \"{subDefId}\" }}";
|
||
|
||
var (_, instance) = PersistSetup([subProcessNode], []);
|
||
var token = PersistToken(instance, subProcessNode);
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, subProcessNode);
|
||
|
||
var childInstance = await _dbContext.WorkflowInstances
|
||
.FirstOrDefaultAsync(i => i.ParentInstanceId == instance.Id);
|
||
|
||
childInstance.Should().NotBeNull();
|
||
childInstance!.DefinitionId.Should().Be(subDefId);
|
||
childInstance.Status.Should().Be(InstanceStatus.Running);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task SubProcessComplete_PropagatesTokenInParent()
|
||
{
|
||
var subProcessNode = CreateNode(NodeType.SubProcess);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var edge = CreateEdge(subProcessNode, nextNode);
|
||
|
||
subProcessNode.Config = """{ "definitionId": "00000000-0000-0000-0000-000000000001" }""";
|
||
|
||
var (_, parentInstance) = PersistSetup([subProcessNode, nextNode], [edge]);
|
||
var parentToken = PersistToken(parentInstance, subProcessNode);
|
||
|
||
var childInstance = new WorkflowInstance
|
||
{
|
||
Id = Guid.NewGuid(),
|
||
DefinitionId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
|
||
ParentInstanceId = parentInstance.Id,
|
||
ParentTokenId = parentToken.Id,
|
||
Status = InstanceStatus.Completed,
|
||
Variables = "{}",
|
||
};
|
||
_dbContext.WorkflowInstances.Add(childInstance);
|
||
await _dbContext.SaveChangesAsync();
|
||
|
||
await _engine.HandleSubProcessCompletionAsync(childInstance);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == parentInstance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().HaveCount(1);
|
||
tokens[0].NodeId.Should().Be(nextNode.Id);
|
||
}
|
||
|
||
// ============================================================
|
||
// Node Actions (Hooks)
|
||
// ============================================================
|
||
|
||
[Fact]
|
||
public async Task ProcessApprovalNode_ExecutesOnEnterAction()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
approvalNode.Config = """{ "assigneeRule": "role:user", "onEnter": "send-notification" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode], []);
|
||
var token = PersistToken(instance, approvalNode);
|
||
|
||
var actionExecuted = false;
|
||
RegisterKeyedAction("send-notification", new TestAction(() => actionExecuted = true));
|
||
|
||
await _engine.ProcessNodeAsync(instance, token, approvalNode);
|
||
|
||
actionExecuted.Should().BeTrue("the onEnter action should be executed when entering an approval node");
|
||
}
|
||
|
||
[Fact]
|
||
public async Task CompleteTask_Approved_ExecutesOnApprovedAction()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
|
||
|
||
approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "log-approval" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
|
||
var token = PersistToken(instance, approvalNode);
|
||
var task = PersistTask(instance, token, approvalNode);
|
||
|
||
var actionExecuted = false;
|
||
RegisterKeyedAction("log-approval", new TestAction(() => actionExecuted = true));
|
||
|
||
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
|
||
|
||
actionExecuted.Should().BeTrue("the onApproved action should be executed when task is approved");
|
||
}
|
||
|
||
[Fact]
|
||
public async Task CompleteTask_Rejected_ExecutesOnRejectedAction()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
var rejectNode = CreateNode(NodeType.Approval);
|
||
var rejectedEdge = CreateEdge(approvalNode, rejectNode, EdgeType.Rejected);
|
||
|
||
approvalNode.Config = """{ "assigneeRule": "role:user", "onRejected": "notify-rejection" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode, rejectNode], [rejectedEdge]);
|
||
var token = PersistToken(instance, approvalNode);
|
||
var task = PersistTask(instance, token, approvalNode);
|
||
|
||
var actionExecuted = false;
|
||
RegisterKeyedAction("notify-rejection", new TestAction(() => actionExecuted = true));
|
||
|
||
await _engine.CompleteTaskAsync(task, TaskResult.Rejected);
|
||
|
||
actionExecuted.Should().BeTrue("the onRejected action should be executed when task is rejected");
|
||
}
|
||
|
||
[Fact]
|
||
public async Task ActionFailure_DoesNotBlockTokenPropagation()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
|
||
|
||
approvalNode.Config = """{ "assigneeRule": "role:user", "onApproved": "failing-action" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
|
||
var token = PersistToken(instance, approvalNode);
|
||
var task = PersistTask(instance, token, approvalNode);
|
||
|
||
RegisterKeyedAction("failing-action", new ThrowingAction());
|
||
|
||
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
|
||
|
||
var tokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active
|
||
&& t.NodeId == nextNode.Id)
|
||
.ToListAsync();
|
||
|
||
tokens.Should().HaveCount(1);
|
||
}
|
||
|
||
// ============================================================
|
||
// Test Helpers
|
||
// ============================================================
|
||
|
||
private static WorkflowNode CreateNode(NodeType type)
|
||
{
|
||
return new WorkflowNode
|
||
{
|
||
Id = Guid.NewGuid(),
|
||
NodeType = type,
|
||
Config = "{}",
|
||
};
|
||
}
|
||
|
||
// ============================================================
|
||
// Parallel Fork → Approvals → Join: full pipeline
|
||
// ============================================================
|
||
|
||
/// <summary>
|
||
/// 端到端:并行 fork 分出两条审批分支,两条均审批通过后汇聚到 join,
|
||
/// join 仅在两个 token 都到达时才合并并推进到下游。
|
||
/// 这是 fork/join 最常出问题的真实路径(既有测试分别测 fork 与 join,未覆盖整条链)。
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task ParallelFork_BothBranchesApproved_JoinsAndPropagatesOnce()
|
||
{
|
||
var forkNode = CreateNode(NodeType.Parallel);
|
||
var approvalA = CreateNode(NodeType.Approval);
|
||
approvalA.Config = """{ "assigneeRule": "role:userA" }""";
|
||
var approvalB = CreateNode(NodeType.Approval);
|
||
approvalB.Config = """{ "assigneeRule": "role:userB" }""";
|
||
var joinNode = CreateNode(NodeType.Parallel);
|
||
var afterJoin = CreateNode(NodeType.End);
|
||
|
||
var edgeForkA = CreateEdge(forkNode, approvalA);
|
||
var edgeForkB = CreateEdge(forkNode, approvalB);
|
||
var edgeAJoin = CreateEdge(approvalA, joinNode);
|
||
var edgeBJoin = CreateEdge(approvalB, joinNode);
|
||
var edgeJoinOut = CreateEdge(joinNode, afterJoin);
|
||
|
||
var (_, instance) = PersistSetup(
|
||
[forkNode, approvalA, approvalB, joinNode, afterJoin],
|
||
[edgeForkA, edgeForkB, edgeAJoin, edgeBJoin, edgeJoinOut]);
|
||
|
||
var forkToken = PersistToken(instance, forkNode);
|
||
|
||
// Fork:分出两个 token 到两个审批节点
|
||
await _engine.ProcessNodeAsync(instance, forkToken, forkNode);
|
||
var activeAfterFork = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
activeAfterFork.Should().HaveCount(2);
|
||
|
||
// 完成分支 A 的审批 → token 推进到 join,但 join 应等待
|
||
var taskA = await _dbContext.WorkflowTasks.FirstAsync(t => t.NodeId == approvalA.Id);
|
||
await _engine.CompleteTaskAsync(taskA, TaskResult.Approved);
|
||
|
||
var activeAfterA = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
// 分支 A 的 token 已进入 join 等待;分支 B 仍未完成 → 仍 2 个 active(1 在 join,1 在 approvalB)
|
||
activeAfterA.Should().HaveCount(2);
|
||
activeAfterA.Should().Contain(t => t.NodeId == approvalB.Id);
|
||
instance.Status.Should().NotBe(InstanceStatus.Completed);
|
||
|
||
// 完成分支 B 的审批 → 第二个 token 到达 join,满足合并条件
|
||
var taskB = await _dbContext.WorkflowTasks.FirstAsync(t => t.NodeId == approvalB.Id);
|
||
await _engine.CompleteTaskAsync(taskB, TaskResult.Approved);
|
||
|
||
// join 合并后产生 1 个 token → 进入 End 节点被消费 → 无残留 Active token,实例完成。
|
||
// 关键不变量:两个 join 入口 token 都被消费(已合并),且实例确实完成。
|
||
var activeAfterB = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active)
|
||
.ToListAsync();
|
||
activeAfterB.Should().BeEmpty("End 节点应已消费 join 合并后的 token");
|
||
|
||
instance.Status.Should().Be(InstanceStatus.Completed);
|
||
|
||
// join 处的两个入口 token 均应已消费
|
||
var joinTokens = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.NodeId == joinNode.Id)
|
||
.ToListAsync();
|
||
joinTokens.Should().OnlyContain(t => t.Status == TokenStatus.Consumed);
|
||
joinTokens.Should().HaveCount(2);
|
||
}
|
||
|
||
// ============================================================
|
||
// SubProcess boundaries
|
||
// ============================================================
|
||
|
||
/// <summary>
|
||
/// 边界:SubProcess 节点未配置 definitionId 时必须明确拒绝。
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task ProcessSubProcessNode_WithoutDefinitionId_ThrowsBusinessException()
|
||
{
|
||
var subProcessNode = CreateNode(NodeType.SubProcess);
|
||
subProcessNode.Config = "{}"; // 无 definitionId
|
||
|
||
var (_, instance) = PersistSetup([subProcessNode], []);
|
||
var token = PersistToken(instance, subProcessNode);
|
||
|
||
var act = () => _engine.ProcessNodeAsync(instance, token, subProcessNode);
|
||
|
||
await act.Should().ThrowAsync<BusinessException>()
|
||
.WithMessage("*definitionId*");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 边界:SubProcess 完成但其节点没有出边时,父 token 应被消费(不残留),
|
||
/// 而非静默挂着。锁定 HandleSubProcessCompletionAsync 的当前行为。
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task SubProcessComplete_WithNoOutgoingEdge_ConsumesWaitingToken()
|
||
{
|
||
var subProcessNode = CreateNode(NodeType.SubProcess);
|
||
subProcessNode.Config = """{ "definitionId": "00000000-0000-0000-0000-000000000001" }""";
|
||
|
||
var (_, parentInstance) = PersistSetup([subProcessNode], []);
|
||
var parentToken = PersistToken(parentInstance, subProcessNode);
|
||
|
||
var childInstance = new WorkflowInstance
|
||
{
|
||
Id = Guid.NewGuid(),
|
||
DefinitionId = Guid.Parse("00000000-0000-0000-0000-000000000001"),
|
||
ParentInstanceId = parentInstance.Id,
|
||
ParentTokenId = parentToken.Id,
|
||
Status = InstanceStatus.Completed,
|
||
Variables = "{}",
|
||
};
|
||
_dbContext.WorkflowInstances.Add(childInstance);
|
||
await _dbContext.SaveChangesAsync();
|
||
|
||
await _engine.HandleSubProcessCompletionAsync(childInstance);
|
||
|
||
// 父 token 必须被消费,不可残留 Active
|
||
var refreshedToken = await _dbContext.WorkflowTokens.FindAsync(parentToken.Id);
|
||
refreshedToken!.Status.Should().Be(TokenStatus.Consumed);
|
||
}
|
||
|
||
// ============================================================
|
||
// Token Duplicate-Propagation Guard (idempotency)
|
||
// ============================================================
|
||
|
||
/// <summary>
|
||
/// 边界:已完成(已审批)的任务不可被再次完成,否则会重复推进 token,
|
||
/// 产生额外的下游 token 并可能创建重复任务。必须保持幂等/拒绝。
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task CompleteTask_AlreadyCompletedTask_ThrowsOrIsIdempotent()
|
||
{
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
var nextNode = CreateNode(NodeType.Approval);
|
||
var approvedEdge = CreateEdge(approvalNode, nextNode, EdgeType.Approved);
|
||
approvalNode.Config = """{ "assigneeRule": "role:user" }""";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode, nextNode], [approvedEdge]);
|
||
var token = PersistToken(instance, approvalNode);
|
||
var task = PersistTask(instance, token, approvalNode);
|
||
|
||
// 第一次完成:正常推进到 nextNode
|
||
await _engine.CompleteTaskAsync(task, TaskResult.Approved);
|
||
|
||
var activeTokensAfterFirst = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id)
|
||
.ToListAsync();
|
||
activeTokensAfterFirst.Should().HaveCount(1);
|
||
|
||
// 第二次完成同一任务:必须抛错(或幂等不产生新 token),绝不可产生第二个下游 token
|
||
var secondAct = () => _engine.CompleteTaskAsync(task, TaskResult.Approved);
|
||
|
||
await secondAct.Should().ThrowAsync<BusinessException>();
|
||
|
||
// 仍只有 1 个活跃下游 token,未被重复推进
|
||
var activeTokensAfterSecond = await _dbContext.WorkflowTokens
|
||
.Where(t => t.InstanceId == instance.Id && t.Status == TokenStatus.Active && t.NodeId == nextNode.Id)
|
||
.ToListAsync();
|
||
activeTokensAfterSecond.Should().HaveCount(1);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 边界:同一 Approval 节点的 token 不可被 ProcessNodeAsync 重复处理,
|
||
/// 否则会在同一节点创建多份审批任务。第二次处理必须拒绝(或幂等)。
|
||
/// </summary>
|
||
[Fact]
|
||
public async Task ProcessApprovalNode_TokenAlreadyProcessed_ThrowsOrIsIdempotent()
|
||
{
|
||
var userId = Guid.NewGuid();
|
||
var approvalNode = CreateNode(NodeType.Approval);
|
||
approvalNode.Config = $"{{ \"assigneeRule\": \"user:{userId}\" }}";
|
||
|
||
var (_, instance) = PersistSetup([approvalNode], []);
|
||
var token = PersistToken(instance, approvalNode);
|
||
|
||
// 第一次处理:创建 1 个任务
|
||
await _engine.ProcessNodeAsync(instance, token, approvalNode);
|
||
var tasksAfterFirst = await _dbContext.WorkflowTasks
|
||
.Where(t => t.InstanceId == instance.Id)
|
||
.ToListAsync();
|
||
tasksAfterFirst.Should().HaveCount(1);
|
||
|
||
// 第二次处理同一 token:必须拒绝(或幂等),不可创建第二个任务
|
||
var secondAct = () => _engine.ProcessNodeAsync(instance, token, approvalNode);
|
||
await secondAct.Should().ThrowAsync<BusinessException>();
|
||
|
||
var tasksAfterSecond = await _dbContext.WorkflowTasks
|
||
.Where(t => t.InstanceId == instance.Id)
|
||
.ToListAsync();
|
||
tasksAfterSecond.Should().HaveCount(1);
|
||
}
|
||
|
||
private static WorkflowEdge CreateEdge(
|
||
WorkflowNode source,
|
||
WorkflowNode target,
|
||
EdgeType? edgeType = null,
|
||
string? condition = null,
|
||
int order = 0)
|
||
{
|
||
return new WorkflowEdge
|
||
{
|
||
Id = Guid.NewGuid(),
|
||
SourceNodeId = source.Id,
|
||
TargetNodeId = target.Id,
|
||
EdgeType = edgeType ?? EdgeType.Normal,
|
||
Condition = condition,
|
||
Order = order,
|
||
};
|
||
}
|
||
|
||
private (WorkflowDefinition Definition, WorkflowInstance Instance) PersistSetup(
|
||
List<WorkflowNode> nodes,
|
||
List<WorkflowEdge> edges)
|
||
{
|
||
var definition = new WorkflowDefinition { Id = Guid.NewGuid() };
|
||
|
||
foreach (var node in nodes)
|
||
{
|
||
node.DefinitionId = definition.Id;
|
||
_dbContext.WorkflowNodes.Add(node);
|
||
}
|
||
|
||
foreach (var edge in edges)
|
||
{
|
||
edge.DefinitionId = definition.Id;
|
||
_dbContext.WorkflowEdges.Add(edge);
|
||
}
|
||
|
||
_dbContext.WorkflowDefinitions.Add(definition);
|
||
|
||
var instance = new WorkflowInstance
|
||
{
|
||
Id = Guid.NewGuid(),
|
||
DefinitionId = definition.Id,
|
||
Status = InstanceStatus.Pending,
|
||
Variables = "{}",
|
||
};
|
||
_dbContext.WorkflowInstances.Add(instance);
|
||
|
||
_dbContext.SaveChanges();
|
||
|
||
return (definition, instance);
|
||
}
|
||
|
||
private WorkflowToken PersistToken(
|
||
WorkflowInstance instance,
|
||
WorkflowNode node,
|
||
TokenStatus status = TokenStatus.Active)
|
||
{
|
||
var token = new WorkflowToken
|
||
{
|
||
Id = Guid.NewGuid(),
|
||
InstanceId = instance.Id,
|
||
NodeId = node.Id,
|
||
Status = status,
|
||
};
|
||
|
||
_dbContext.WorkflowTokens.Add(token);
|
||
_dbContext.SaveChanges();
|
||
|
||
return token;
|
||
}
|
||
|
||
private WorkflowTask PersistTask(
|
||
WorkflowInstance instance,
|
||
WorkflowToken token,
|
||
WorkflowNode node)
|
||
{
|
||
var task = new WorkflowTask
|
||
{
|
||
Id = Guid.NewGuid(),
|
||
InstanceId = instance.Id,
|
||
TokenId = token.Id,
|
||
NodeId = node.Id,
|
||
Type = TaskType.Approval,
|
||
Status = TaskStatus.Pending,
|
||
};
|
||
_dbContext.WorkflowTasks.Add(task);
|
||
_dbContext.SaveChanges();
|
||
|
||
return task;
|
||
}
|
||
|
||
private class TestAction : INodeAction
|
||
{
|
||
private readonly Action _callback;
|
||
|
||
public TestAction(Action callback)
|
||
{
|
||
_callback = callback;
|
||
}
|
||
|
||
public Task ExecuteAsync(NodeActionContext context)
|
||
{
|
||
_callback();
|
||
return Task.CompletedTask;
|
||
}
|
||
}
|
||
|
||
private class ThrowingAction : INodeAction
|
||
{
|
||
public Task ExecuteAsync(NodeActionContext context)
|
||
{
|
||
throw new InvalidOperationException("Simulated action failure");
|
||
}
|
||
}
|
||
|
||
/// <summary>测试用通知捕获器:记录任务到达/通过/驳回通知,便于断言引擎触发了通知。</summary>
|
||
private class NotificationCaptureService : Workflow.Application.Notifications.INotificationService
|
||
{
|
||
public List<WorkflowTask> ArrivedTasks { get; } = [];
|
||
public List<WorkflowTask> ApprovedTasks { get; } = [];
|
||
public List<WorkflowTask> RejectedTasks { get; } = [];
|
||
|
||
public Task NotifyTaskArrivedAsync(WorkflowTask task, CancellationToken ct = default)
|
||
{ ArrivedTasks.Add(task); return Task.CompletedTask; }
|
||
public Task NotifyTaskApprovedAsync(WorkflowTask task, CancellationToken ct = default)
|
||
{ ApprovedTasks.Add(task); return Task.CompletedTask; }
|
||
public Task NotifyTaskRejectedAsync(WorkflowTask task, CancellationToken ct = default)
|
||
{ RejectedTasks.Add(task); return Task.CompletedTask; }
|
||
public Task NotifyTaskUrgedAsync(WorkflowTask task, CancellationToken ct = default) => Task.CompletedTask;
|
||
public Task NotifyTaskTimeoutAsync(WorkflowTask task, bool autoApproved, CancellationToken ct = default) => Task.CompletedTask;
|
||
}
|
||
}
|