feat: form versioning, notification center, scheduler and webhooks

- Add FormDefinitionVersion with compare/versions endpoints and schema differ
- Add Notification entity, endpoints and application features
- Add Scheduler (timeout) and WebhookDispatcher services
- Add FormDataValidator/FieldPermissionEvaluator/ReactionEvaluator
- Add workflow task mark-read, CC support and SystemUserContext
- Add EF migrations for form versions and notifications
- Add unit tests for form schema, notifications, scheduler and serialization
This commit is contained in:
向宁 2026-06-14 15:03:11 +08:00
parent 7bad5f93d9
commit 9f878286e7
155 changed files with 9482 additions and 334 deletions

View File

@ -4,6 +4,12 @@ using Workflow.Domain.Expressions.Comparators;
namespace Workflow.Api.Configuration;
/// <summary>
/// 条件求值策略链的 DI 注册。注册顺序决定责任链优先级——
/// Numeric → DateTime → Boolean → Collection → String → Range
/// 当多个对比器声明同一操作符(如 ==)时,靠前的优先被尝试。
/// Registry 注册为 Singleton无状态对比器本身为 Transient。
/// </summary>
public static class ComparatorConfiguration
{
public static IServiceCollection AddValueComparators(this IServiceCollection services)

View File

@ -1,28 +1,30 @@
using System.Security.Claims;
using FastEndpoints.Security;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace Workflow.Api.Configuration;
/// <summary>
/// JWT 认证配置:切换为标准 OIDCJWKS 自动从 SSO 拉取RS256 验签)。
/// MapInboundClaims=false 保留短名 claimsub与其它服务统一读取方式。
/// </summary>
public static class JwtAuthConfiguration
{
public static void AddJwtAuthentication(this IServiceCollection services, IConfiguration configuration)
public static IServiceCollection AddJwtAuthentication(this IServiceCollection services, IConfiguration config)
{
var signingKey = configuration.GetSection("Jwt")["SigningKey"]
?? throw new InvalidOperationException("Jwt:SigningKey is not configured.");
var authority = config["Jwt:Authority"];
var issuer = configuration.GetSection("Jwt")["Issuer"];
var audience = configuration.GetSection("Jwt")["Audience"];
services.AddAuthenticationJwtBearer(
s => s.SigningKey = signingKey,
o =>
{
if (issuer is not null)
o.TokenValidationParameters.ValidIssuer = issuer;
if (audience is not null)
o.TokenValidationParameters.ValidAudience = audience;
o.TokenValidationParameters.NameClaimType = System.Security.Claims.ClaimTypes.NameIdentifier;
});
services.AddAuthentication("Bearer").AddJwtBearer(options =>
{
options.Authority = authority;
options.MapInboundClaims = false;
options.RequireHttpsMetadata = config.GetValue<bool>("Jwt:RequireHttps", false);
options.TokenValidationParameters.ValidateAudience = false;
});
services.AddAuthorization();
return services;
}
}

View File

@ -0,0 +1,39 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Workflow.Application.Notifications;
using Workflow.Application.Scheduler;
using Workflow.Api.Services;
namespace Workflow.Api.Configuration;
/// <summary>
/// 通知系统 DI 注册扩展。统一注册:
/// - NotificationOptionsIOptions 强类型绑定,对应 appsettings 的 Notification 段)
/// - INotificationService → NotificationServicescoped随 DbContext 生命周期)
/// - SystemUserContext后台调度器用单独注册避免覆盖 HTTP 上下文的 CurrentUserContext
/// - HttpClient 工厂Webhook 投递用)
/// - 后台调度器 HostedService超时扫描 + Webhook 投递)
/// </summary>
public static class NotificationConfiguration
{
public static IServiceCollection AddNotifications(this IServiceCollection services, IConfiguration configuration)
{
// 强类型配置绑定
services.Configure<NotificationOptions>(configuration.GetSection("Notification"));
// 通知服务scoped依赖 scoped DbContext
services.AddScoped<INotificationService, NotificationService>();
// 超时任务处理器scoped依赖 scoped DbContext + ProcessEngine
services.AddScoped<OverdueTaskProcessor>();
// 系统用户上下文:供后台调度器使用(不注册为 ICurrentUserContext 默认实现,
// 避免覆盖基于 HttpContext 的 CurrentUserContext由调度器自行解析
services.AddSingleton<SystemUserContext>();
// Webhook 投递用的 HttpClient
services.AddHttpClient("Webhook");
return services;
}
}

View File

@ -0,0 +1,36 @@
using FastEndpoints;
using MediatR;
using Workflow.Application.Form.DTOs;
using Workflow.Application.Form.FormDefinition.Queries;
namespace Workflow.Api.Endpoints.Form;
public class CompareFormVersionsEndpoint : Endpoint<CompareFormVersionsRequest, FormVersionCompareDto>
{
private readonly IMediator _mediator;
public CompareFormVersionsEndpoint(IMediator mediator) => _mediator = mediator;
public override void Configure()
{
Get("/forms/{Id}/versions/compare");
Summary(s =>
{
s.Summary = "对比表单的两个版本";
});
}
public override async Task HandleAsync(CompareFormVersionsRequest req, CancellationToken ct)
{
var query = new CompareFormVersionsQuery(req.Id, req.OldVersionId, req.NewVersionId);
var result = await _mediator.Send(query, ct);
await Send.OkAsync(result, ct);
}
}
public class CompareFormVersionsRequest
{
public Guid Id { get; set; }
public Guid? OldVersionId { get; set; }
public Guid? NewVersionId { get; set; }
}

View File

@ -14,7 +14,6 @@ public class CreateFormComponentEndpoint : Endpoint<CreateFormComponentRequest,
public override void Configure()
{
Post("/form-components");
AllowAnonymous();
Summary(s =>
{
s.Summary = "创建组件注册";

View File

@ -10,7 +10,6 @@ public class CreateFormDefinitionEndpoint(IMediator mediator) : Endpoint<CreateF
public override void Configure()
{
Post("/forms");
AllowAnonymous();
Summary(s =>
{
s.Summary = "创建表单定义(传入 Formily JSON Schema";

View File

@ -13,7 +13,6 @@ public class DeleteFormComponentEndpoint : Endpoint<DeleteFormComponentRequest>
public override void Configure()
{
Delete("/form-components/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "删除组件注册(软删除)";

View File

@ -13,7 +13,6 @@ public class DeleteFormDefinitionEndpoint : Endpoint<DeleteFormDefinitionRequest
public override void Configure()
{
Delete("/forms/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Delete a form definition (soft delete)";

View File

@ -12,7 +12,6 @@ public class DisableFormDefinitionEndpoint(IMediator mediator) : Endpoint<Disabl
public override void Configure()
{
Post("/forms/{Id}/disable");
AllowAnonymous();
Summary(s =>
{
s.Summary = "禁用表单定义";
@ -23,7 +22,9 @@ public class DisableFormDefinitionEndpoint(IMediator mediator) : Endpoint<Disabl
{
var command = new DisableFormDefinitionCommand(req.Id);
await mediator.Send(command, ct);
await Send.OkAsync(ct);
HttpContext.Response.StatusCode = 200;
HttpContext.Response.ContentType = "application/json";
await HttpContext.Response.WriteAsync("{}", ct);
}
}

View File

@ -14,7 +14,6 @@ public class GetFormComponentByIdEndpoint : Endpoint<GetFormComponentByIdRequest
public override void Configure()
{
Get("/form-components/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "根据ID获取组件注册详情";

View File

@ -14,7 +14,6 @@ public class GetFormComponentsEndpoint : EndpointWithoutRequest<List<FormCompone
public override void Configure()
{
Get("/form-components");
AllowAnonymous();
Summary(s =>
{
s.Summary = "获取组件注册列表";

View File

@ -14,7 +14,6 @@ public class GetFormDataByInstanceEndpoint : Endpoint<GetFormDataByInstanceReque
public override void Configure()
{
Get("/forms/data/{InstanceId}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get form data by workflow instance id";

View File

@ -14,7 +14,6 @@ public class GetFormDefinitionByIdEndpoint : Endpoint<GetFormDefinitionByIdReque
public override void Configure()
{
Get("/forms/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get form definition detail by id (includes fields)";

View File

@ -15,7 +15,6 @@ public class GetFormDefinitionListEndpoint : Endpoint<GetFormDefinitionListReque
public override void Configure()
{
Get("/forms");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get paginated list of form definitions";

View File

@ -0,0 +1,29 @@
using FastEndpoints;
using MediatR;
using Workflow.Application.Form.DTOs;
using Workflow.Application.Form.FormDefinition.Queries;
namespace Workflow.Api.Endpoints.Form;
public class GetFormVersionsEndpoint : EndpointWithoutRequest<List<FormVersionDto>>
{
private readonly IMediator _mediator;
public GetFormVersionsEndpoint(IMediator mediator) => _mediator = mediator;
public override void Configure()
{
Get("/forms/{Id}/versions");
Summary(s =>
{
s.Summary = "获取表单的历史版本列表";
});
}
public override async Task HandleAsync(CancellationToken ct)
{
var id = Route<Guid>("Id");
var result = await _mediator.Send(new GetFormVersionsQuery(id), ct);
await Send.OkAsync(result, ct);
}
}

View File

@ -9,7 +9,6 @@ public class PublishFormDefinitionEndpoint(IMediator mediator) : EndpointWithout
public override void Configure()
{
Post("/forms/{Id}/publish");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Publish a form definition";

View File

@ -13,7 +13,6 @@ public class SubmitFormDataEndpoint : Endpoint<SubmitFormDataRequest, Guid>
public override void Configure()
{
Post("/forms/data");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Submit form data for a workflow instance";

View File

@ -14,7 +14,6 @@ public class UpdateFormComponentEndpoint : Endpoint<UpdateFormComponentRequest,
public override void Configure()
{
Put("/form-components/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "更新组件注册";

View File

@ -17,7 +17,6 @@ public class UpdateFormDefinitionEndpoint : Endpoint<UpdateFormDefinitionRequest
public override void Configure()
{
Put("/forms/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "更新表单定义(传入新的 Formily JSON Schema";

View File

@ -0,0 +1,104 @@
using System.Security.Claims;
using FastEndpoints;
using MediatR;
using Workflow.Application.Common;
using Workflow.Application.Features.Notifications;
using Workflow.Domain.Common;
namespace Workflow.Api.Endpoints.Notification;
/// <summary>从 JWT claims 提取当前用户的角色列表(兼容 ClaimTypes.Role 与 "role" 两种 claim 名)。</summary>
internal static class NotificationClaimsExtensions
{
public static IReadOnlyList<string> GetUserRoles(this ClaimsPrincipal user)
{
// 兼容 MapInboundClaims=false 下 role 保持短名,以及标准 ClaimTypes.Role
return user.FindAll("role")
.Concat(user.FindAll(ClaimTypes.Role))
.Select(c => c.Value)
.Where(v => !string.IsNullOrWhiteSpace(v))
.Distinct()
.ToList();
}
}
public class GetNotificationsEndpoint(IMediator mediator, ICurrentUserContext userContext)
: Endpoint<GetNotificationsRequest, PagedResult<NotificationItemDto>>
{
public override void Configure()
{
Get("/notifications");
Summary(s => s.Summary = "获取当前用户的通知列表(含按角色匹配的通知)");
}
public override async Task HandleAsync(GetNotificationsRequest req, CancellationToken ct)
{
var userId = userContext.GetUserId();
var roles = User.GetUserRoles();
var result = await mediator.Send(
new GetNotificationsQuery(userId, roles, req.UnreadOnly, req.PageIndex, req.PageSize), ct);
await Send.OkAsync(result, ct);
}
}
public class GetNotificationsRequest
{
public bool UnreadOnly { get; set; }
public int PageIndex { get; set; } = 1;
public int PageSize { get; set; } = 20;
}
public class GetUnreadNotificationCountEndpoint(IMediator mediator, ICurrentUserContext userContext) : EndpointWithoutRequest<int>
{
public override void Configure()
{
Get("/notifications/unread-count");
Summary(s => s.Summary = "获取当前用户的未读通知数(前端角标用)");
}
public override async Task HandleAsync(CancellationToken ct)
{
var userId = userContext.GetUserId();
var roles = User.GetUserRoles();
var count = await mediator.Send(new GetUnreadNotificationCountQuery(userId, roles), ct);
await Send.OkAsync(count, ct);
}
}
public class MarkNotificationReadEndpoint(IMediator mediator, ICurrentUserContext userContext) : EndpointWithoutRequest
{
public override void Configure()
{
Post("/notifications/{Id}/read");
Summary(s => s.Summary = "标记单条通知为已读");
}
public override async Task HandleAsync(CancellationToken ct)
{
var id = Route<Guid>("Id");
var userId = userContext.GetUserId();
await mediator.Send(new MarkNotificationReadCommand(id, userId), ct);
HttpContext.Response.StatusCode = 200;
HttpContext.Response.ContentType = "application/json";
await HttpContext.Response.WriteAsync("{}", ct);
}
}
public class MarkAllNotificationsReadEndpoint(IMediator mediator, ICurrentUserContext userContext) : EndpointWithoutRequest
{
public override void Configure()
{
Post("/notifications/read-all");
Summary(s => s.Summary = "标记当前用户所有通知为已读");
}
public override async Task HandleAsync(CancellationToken ct)
{
var userId = userContext.GetUserId();
var roles = User.GetUserRoles();
await mediator.Send(new MarkAllNotificationsReadCommand(userId, roles), ct);
HttpContext.Response.StatusCode = 200;
HttpContext.Response.ContentType = "application/json";
await HttpContext.Response.WriteAsync("{}", ct);
}
}

View File

@ -15,7 +15,6 @@ public class CreateEdgeEndpoint : Endpoint<CreateEdgeRequest, WorkflowEdgeDto>
public override void Configure()
{
Post("/workflow-definitions/{DefinitionId}/edges");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Create a new edge in a workflow definition";

View File

@ -15,7 +15,6 @@ public class CreateNodeEndpoint : Endpoint<CreateNodeRequest, WorkflowNodeDto>
public override void Configure()
{
Post("/workflow-definitions/{DefinitionId}/nodes");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Create a new node in a workflow definition";
@ -24,7 +23,14 @@ public class CreateNodeEndpoint : Endpoint<CreateNodeRequest, WorkflowNodeDto>
public override async Task HandleAsync(CreateNodeRequest req, CancellationToken ct)
{
var command = new CreateNodeCommand(req.DefinitionId, (NodeType)req.NodeType, req.Name, req.Config, req.PositionX, req.PositionY);
var command = new CreateNodeCommand(
req.DefinitionId,
(NodeType)req.NodeType,
req.Name,
req.Config,
req.PositionX,
req.PositionY,
req.FormDefinitionId);
var result = await _mediator.Send(command, ct);
await Send.ResponseAsync(result, 201, ct);
}
@ -38,4 +44,5 @@ public class CreateNodeRequest
public string? Config { get; set; }
public int PositionX { get; set; }
public int PositionY { get; set; }
public Guid? FormDefinitionId { get; set; }
}

View File

@ -14,7 +14,6 @@ public class CreateWorkflowDefinitionEndpoint : Endpoint<CreateWorkflowDefinitio
public override void Configure()
{
Post("/workflow-definitions");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Create a new workflow definition";

View File

@ -13,7 +13,6 @@ public class DeleteEdgeEndpoint : Endpoint<DeleteEdgeRequest>
public override void Configure()
{
Delete("/workflow-definitions/{DefinitionId}/edges/{EdgeId}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Delete an edge from a workflow definition";
@ -24,7 +23,9 @@ public class DeleteEdgeEndpoint : Endpoint<DeleteEdgeRequest>
{
var command = new DeleteEdgeCommand(req.EdgeId);
await _mediator.Send(command, ct);
await Send.OkAsync(ct);
HttpContext.Response.StatusCode = 200;
HttpContext.Response.ContentType = "application/json";
await HttpContext.Response.WriteAsync("{}", ct);
}
}

View File

@ -13,7 +13,6 @@ public class DeleteNodeEndpoint : Endpoint<DeleteNodeRequest>
public override void Configure()
{
Delete("/workflow-definitions/{DefinitionId}/nodes/{NodeId}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Delete a node from a workflow definition";
@ -24,7 +23,9 @@ public class DeleteNodeEndpoint : Endpoint<DeleteNodeRequest>
{
var command = new DeleteNodeCommand(req.NodeId);
await _mediator.Send(command, ct);
await Send.OkAsync(ct);
HttpContext.Response.StatusCode = 200;
HttpContext.Response.ContentType = "application/json";
await HttpContext.Response.WriteAsync("{}", ct);
}
}

View File

@ -13,7 +13,6 @@ public class DeleteWorkflowDefinitionEndpoint : Endpoint<DeleteWorkflowDefinitio
public override void Configure()
{
Delete("/workflow-definitions/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Delete a workflow definition (soft delete)";

View File

@ -9,7 +9,6 @@ public class DisableWorkflowDefinitionEndpoint(IMediator mediator) : EndpointWit
public override void Configure()
{
Post("/workflow-definitions/{Id}/disable");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Disable a workflow definition";

View File

@ -14,7 +14,6 @@ public class GetWorkflowDefinitionByIdEndpoint : Endpoint<GetWorkflowDefinitionB
public override void Configure()
{
Get("/workflow-definitions/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get workflow definition detail by id (includes nodes and edges)";

View File

@ -16,7 +16,6 @@ public class GetWorkflowDefinitionListEndpoint : Endpoint<GetWorkflowDefinitionL
public override void Configure()
{
Get("/workflow-definitions");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get paginated list of workflow definitions";

View File

@ -9,7 +9,6 @@ public class PublishWorkflowDefinitionEndpoint(IMediator mediator) : EndpointWit
public override void Configure()
{
Post("/workflow-definitions/{Id}/publish");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Publish a workflow definition";

View File

@ -15,7 +15,6 @@ public class UpdateEdgeEndpoint : Endpoint<UpdateEdgeRequest, WorkflowEdgeDto>
public override void Configure()
{
Put("/workflow-definitions/{DefinitionId}/edges/{EdgeId}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Update an edge in a workflow definition";

View File

@ -14,7 +14,6 @@ public class UpdateNodeEndpoint : Endpoint<UpdateNodeRequest, WorkflowNodeDto>
public override void Configure()
{
Put("/workflow-definitions/{DefinitionId}/nodes/{NodeId}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Update a node in a workflow definition";
@ -23,7 +22,13 @@ public class UpdateNodeEndpoint : Endpoint<UpdateNodeRequest, WorkflowNodeDto>
public override async Task HandleAsync(UpdateNodeRequest req, CancellationToken ct)
{
var command = new UpdateNodeCommand(req.NodeId, req.Name, req.Config, req.PositionX, req.PositionY);
var command = new UpdateNodeCommand(
req.NodeId,
req.Name,
req.Config,
req.PositionX,
req.PositionY,
req.FormDefinitionId);
var result = await _mediator.Send(command, ct);
await Send.OkAsync(result, ct);
}
@ -37,4 +42,5 @@ public class UpdateNodeRequest
public string? Config { get; set; }
public int PositionX { get; set; }
public int PositionY { get; set; }
public Guid? FormDefinitionId { get; set; }
}

View File

@ -14,7 +14,6 @@ public class UpdateWorkflowDefinitionEndpoint : Endpoint<UpdateWorkflowDefinitio
public override void Configure()
{
Put("/workflow-definitions/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Update a workflow definition";

View File

@ -14,7 +14,6 @@ public class GetWorkflowInstanceByIdEndpoint : Endpoint<GetWorkflowInstanceByIdR
public override void Configure()
{
Get("/workflow-instances/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get workflow instance detail by id (includes tokens and tasks)";

View File

@ -16,7 +16,6 @@ public class GetWorkflowInstanceListEndpoint : Endpoint<GetWorkflowInstanceListR
public override void Configure()
{
Get("/workflow-instances");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get paginated list of workflow instances";

View File

@ -14,7 +14,6 @@ public class MonitorWorkflowInstancesEndpoint : EndpointWithoutRequest<WorkflowM
public override void Configure()
{
Get("/workflow-instances/monitor");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get workflow monitoring statistics";

View File

@ -9,7 +9,6 @@ public class ResumeWorkflowInstanceEndpoint(IMediator mediator) : EndpointWithou
public override void Configure()
{
Post("/workflow-instances/{Id}/resume");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Resume a suspended workflow instance";

View File

@ -13,7 +13,6 @@ public class StartWorkflowInstanceEndpoint : Endpoint<StartWorkflowInstanceReque
public override void Configure()
{
Post("/workflow-instances");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Start a new workflow instance";

View File

@ -9,7 +9,6 @@ public class SuspendWorkflowInstanceEndpoint(IMediator mediator) : EndpointWitho
public override void Configure()
{
Post("/workflow-instances/{Id}/suspend");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Suspend a running workflow instance";

View File

@ -10,7 +10,6 @@ public class WithdrawWorkflowInstanceEndpoint(IMediator mediator, ICurrentUserCo
public override void Configure()
{
Post("/workflow-instances/{Id}/withdraw");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Withdraw a workflow instance (initiator only)";

View File

@ -13,7 +13,6 @@ public class ApproveTaskEndpoint : Endpoint<ApproveTaskRequest>
public override void Configure()
{
Post("/workflow-tasks/{Id}/approve");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Approve a pending task";
@ -22,7 +21,7 @@ public class ApproveTaskEndpoint : Endpoint<ApproveTaskRequest>
public override async Task HandleAsync(ApproveTaskRequest req, CancellationToken ct)
{
var command = new ApproveTaskCommand(req.Id, req.UserId, req.Comment);
var command = new ApproveTaskCommand(req.Id, req.UserId, req.Comment, req.FormDataJson);
await _mediator.Send(command, ct);
HttpContext.Response.StatusCode = 200;
HttpContext.Response.ContentType = "application/json";
@ -35,4 +34,5 @@ public class ApproveTaskRequest
public Guid Id { get; set; }
public Guid UserId { get; set; }
public string? Comment { get; set; }
public string? FormDataJson { get; set; }
}

View File

@ -13,7 +13,6 @@ public class DelegateTaskEndpoint : Endpoint<DelegateTaskRequest>
public override void Configure()
{
Post("/workflow-tasks/{Id}/delegate");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Delegate a task to another user";

View File

@ -15,7 +15,6 @@ public class GetCcTasksEndpoint : Endpoint<GetCcTasksRequest, PagedResult<Workfl
public override void Configure()
{
Get("/workflow-tasks/cc");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get CC (carbon copy) tasks for a user";

View File

@ -15,7 +15,6 @@ public class GetHistoryTasksEndpoint : Endpoint<GetHistoryTasksRequest, PagedRes
public override void Configure()
{
Get("/workflow-tasks/history");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get completed (approved/rejected) tasks for a user";

View File

@ -15,7 +15,6 @@ public class GetOverdueTasksEndpoint : Endpoint<GetOverdueTasksRequest, PagedRes
public override void Configure()
{
Get("/workflow-tasks/overdue");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get overdue tasks (past due date, still pending)";

View File

@ -15,7 +15,6 @@ public class GetPendingTasksEndpoint : Endpoint<GetPendingTasksRequest, PagedRes
public override void Configure()
{
Get("/workflow-tasks/pending");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get pending tasks for a user";

View File

@ -5,7 +5,7 @@ using Workflow.Application.Features.WorkflowTasks.Queries;
namespace Workflow.Api.Endpoints.WorkflowTask;
public class GetTaskByIdEndpoint : Endpoint<GetTaskByIdRequest, WorkflowTaskListItemDto>
public class GetTaskByIdEndpoint : Endpoint<GetTaskByIdRequest, WorkflowTaskDetailDto>
{
private readonly IMediator _mediator;
@ -14,7 +14,6 @@ public class GetTaskByIdEndpoint : Endpoint<GetTaskByIdRequest, WorkflowTaskList
public override void Configure()
{
Get("/workflow-tasks/{Id}");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Get task detail by id";

View File

@ -0,0 +1,36 @@
using FastEndpoints;
using MediatR;
using Workflow.Application.Features.WorkflowTasks.Commands;
namespace Workflow.Api.Endpoints.WorkflowTask;
public class MarkCcTaskReadEndpoint : Endpoint<MarkCcTaskReadRequest>
{
private readonly IMediator _mediator;
public MarkCcTaskReadEndpoint(IMediator mediator) => _mediator = mediator;
public override void Configure()
{
Post("/workflow-tasks/{Id}/mark-read");
Summary(s =>
{
s.Summary = "Mark a CC task as read";
});
}
public override async Task HandleAsync(MarkCcTaskReadRequest req, CancellationToken ct)
{
var command = new MarkCcTaskReadCommand(req.Id, req.UserId);
await _mediator.Send(command, ct);
HttpContext.Response.StatusCode = 200;
HttpContext.Response.ContentType = "application/json";
await HttpContext.Response.WriteAsync("{}", ct);
}
}
public class MarkCcTaskReadRequest
{
public Guid Id { get; set; }
public Guid UserId { get; set; }
}

View File

@ -13,7 +13,6 @@ public class RejectTaskEndpoint : Endpoint<RejectTaskRequest>
public override void Configure()
{
Post("/workflow-tasks/{Id}/reject");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Reject a pending task";

View File

@ -13,7 +13,6 @@ public class TransferTaskEndpoint : Endpoint<TransferTaskRequest>
public override void Configure()
{
Post("/workflow-tasks/{Id}/transfer");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Transfer a task to another user";

View File

@ -13,7 +13,6 @@ public class UrgeTaskEndpoint : Endpoint<UrgeTaskRequest>
public override void Configure()
{
Post("/workflow-tasks/{Id}/urge");
AllowAnonymous();
Summary(s =>
{
s.Summary = "Urge a pending task (send notification to assignee)";

View File

@ -1,9 +1,22 @@
using System.Text.Json;
using Workflow.Api.Serialization;
namespace Workflow.Api.Middleware;
/// <summary>
/// 统一响应包装中间件。拦截下游返回的成功响应2xx JSON自动包成标准信封 { code: 0, data, message: "ok" }。
/// 跳过场景错误响应≥400已由 GlobalExceptionMiddleware 处理)、非 JSON 响应、Swagger 文档页面。
/// 通过临时 MemoryStream 缓存原始响应体实现读取后再写回,异常时恢复原始流并向上抛出。
/// </summary>
public class ApiResponseMiddleware
{
// 复用与 FastEndpoints 一致的序列化选项:驼峰 + 时间毫秒时间戳,确保信封包裹时时间格式不回退
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
Converters = { new TimestampDateTimeConverter(), new TimestampDateTimeOffsetConverter() }
};
private readonly RequestDelegate _next;
public ApiResponseMiddleware(RequestDelegate next)
@ -13,6 +26,7 @@ public class ApiResponseMiddleware
public async Task InvokeAsync(HttpContext context)
{
// 备份原始响应流,用 MemoryStream 替换以便下游写入后可被本中间件读取
var originalBodyStream = context.Response.Body;
using var responseBody = new MemoryStream();
@ -24,6 +38,7 @@ public class ApiResponseMiddleware
}
catch
{
// 下游抛异常时必须先恢复原始流,否则 GlobalExceptionMiddleware 写入会落到已释放的 MemoryStream
context.Response.Body = originalBodyStream;
throw;
}
@ -33,7 +48,7 @@ public class ApiResponseMiddleware
responseBody.Seek(0, SeekOrigin.Begin);
var responseText = await new StreamReader(responseBody).ReadToEndAsync();
// Skip wrapping for non-JSON responses, error responses, or Swagger
// 跳过包装的三类场景:错误响应、非 JSON、Swagger 文档
if (context.Response.StatusCode >= 400 ||
context.Response.ContentType?.Contains("application/json") != true ||
context.Request.Path.StartsWithSegments("/swagger"))
@ -43,7 +58,7 @@ public class ApiResponseMiddleware
return;
}
// Parse the original response and wrap it in the standard envelope
// 解析原始响应体;解析失败则原样作为字符串塞入 data避免吞数据
object? data;
if (string.IsNullOrWhiteSpace(responseText))
{
@ -68,11 +83,9 @@ public class ApiResponseMiddleware
message = "ok"
};
var json = JsonSerializer.Serialize(wrappedResponse, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
var json = JsonSerializer.Serialize(wrappedResponse, JsonOptions);
// 包装后统一返回 200真实业务状态由 code 字段表达
context.Response.StatusCode = 200;
context.Response.ContentLength = System.Text.Encoding.UTF8.GetByteCount(json);
await context.Response.WriteAsync(json);

View File

@ -4,6 +4,12 @@ using Workflow.Domain.Exceptions;
namespace Workflow.Api.Middleware;
/// <summary>
/// 全局异常处理中间件。捕获下游管道抛出的所有异常,按异常类型映射为 HTTP 状态码,
/// 并统一输出错误响应信封 { code, message, data: null }。
/// 业务异常4xx记 Warning未处理异常5xx记 Error 以便监控告警。
/// 故意不向前端透传 5xx 的内部异常详情,避免泄露堆栈等敏感信息。
/// </summary>
public class GlobalExceptionMiddleware
{
private readonly RequestDelegate _next;
@ -29,6 +35,7 @@ public class GlobalExceptionMiddleware
private async Task HandleExceptionAsync(HttpContext context, Exception exception)
{
// 异常类型 → (HTTP 状态码, 对外消息) 映射。5xx 不透传原始消息,统一返回通用提示
var (statusCode, message) = exception switch
{
BusinessException ex => ((int)HttpStatusCode.BadRequest, ex.Message),

View File

@ -1,12 +1,13 @@
using System.Text.Json;
using FastEndpoints;
using FastEndpoints.Swagger;
using Grpc.Net.Client;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Api.Configuration;
using Workflow.Api.Middleware;
using Workflow.Api.Serialization;
using Workflow.Api.Services;
using Workflow.Application.Engine;
using Workflow.Application.Form.FormDefinition.Commands;
using Workflow.Domain.Common;
using Workflow.Domain.Expressions;
@ -28,16 +29,17 @@ builder.Services.AddDbContext<WorkflowDbContext>((sp, options) =>
options.UseSnakeCaseNamingConvention();
});
// gRPC Auth Client
var authServerUrl = builder.Configuration["Grpc:AuthServerUrl"] ?? "http://localhost:50051";
builder.Services.AddSingleton(_ => GrpcChannel.ForAddress(authServerUrl));
builder.Services.AddSingleton<IAuthGrpcClient, AuthGrpcClient>();
// MediatR
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(CreateFormDefinitionCommand).Assembly));
// Value Comparators
builder.Services.AddValueComparators();
builder.Services.AddScoped<ProcessEngine>();
// Notifications & Schedulers
builder.Services.AddNotifications(builder.Configuration);
builder.Services.AddHostedService<TimeoutSchedulerService>();
builder.Services.AddHostedService<WebhookDispatcherService>();
// FastEndpoints
builder.Services.AddFastEndpoints();
@ -70,7 +72,8 @@ if (args.Contains("--seed"))
return;
}
// Middleware pipeline (order matters)
// 中间件管道顺序敏感CORS → ApiResponse 包装 → 全局异常 → 认证 → 授权 → FastEndpoints
// ApiResponse 必须在 GlobalException 之前:它需要捕获下游异常并恢复响应流,再交由 GlobalException 输出错误信封
app.UseCors();
app.UseMiddleware<ApiResponseMiddleware>();
app.UseMiddleware<GlobalExceptionMiddleware>();
@ -80,6 +83,9 @@ app.UseFastEndpoints(config =>
{
config.Endpoints.RoutePrefix = "api";
config.Serializer.Options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
// 统一数据规范:时间字段输出 UTC 毫秒时间戳
config.Serializer.Options.Converters.Add(new TimestampDateTimeConverter());
config.Serializer.Options.Converters.Add(new TimestampDateTimeOffsetConverter());
});
app.UseSwaggerGen();

View File

@ -1,35 +0,0 @@
syntax = "proto3";
package auth;
option csharp_namespace = "Workflow.Api.Grpc";
service AuthService {
rpc ValidateToken (ValidateTokenRequest) returns (ValidateTokenResponse);
rpc CheckPermission (CheckPermissionRequest) returns (CheckPermissionResponse);
}
message ValidateTokenRequest {
string token = 1;
}
message ValidateTokenResponse {
bool valid = 1;
string user_id = 2;
string username = 3;
string email = 4;
repeated string roles = 5;
repeated string permissions = 6;
int64 expires_at = 7;
}
message CheckPermissionRequest {
string token = 1;
string permission = 2;
}
message CheckPermissionResponse {
bool allowed = 1;
string user_id = 2;
repeated string roles = 3;
}

View File

@ -0,0 +1,63 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace Workflow.Api.Serialization;
/// <summary>
/// 统一数据规范:所有时间字段以 UTC 毫秒时间戳long形式在 JSON 中传输。
/// 后端只负责按标准输出毫秒时间戳,时区/格式化统一由前端处理。
/// 读JSON 数字(毫秒)→ DateTime(Utc)写入DateTime → 毫秒 long。
/// </summary>
public sealed class TimestampDateTimeConverter : JsonConverter<DateTime>
{
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
// 兼容数字毫秒时间戳与字符串ISO 8601向后兼容旧客户端
if (reader.TokenType == JsonTokenType.Number)
{
return DateTimeOffset.FromUnixTimeMilliseconds(reader.GetInt64()).UtcDateTime;
}
if (reader.TokenType == JsonTokenType.String)
{
return DateTime.Parse(reader.GetString()!, null, System.Globalization.DateTimeStyles.RoundtripKind);
}
throw new JsonException("Expected a number (ms timestamp) or string (ISO date) for DateTime field.");
}
public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
{
// 统一转 UTC 毫秒时间戳Unspecified 按当前时区推算(极少见,审计时间均为 Utc
var utc = value.Kind == DateTimeKind.Unspecified
? DateTime.SpecifyKind(value, DateTimeKind.Utc)
: value.ToUniversalTime();
writer.WriteNumberValue(new DateTimeOffset(utc, TimeSpan.Zero).ToUnixTimeMilliseconds());
}
}
/// <summary>
/// DateTimeOffset 版本:同样输出 UTC 毫秒时间戳。
/// </summary>
public sealed class TimestampDateTimeOffsetConverter : JsonConverter<DateTimeOffset>
{
public override DateTimeOffset Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType == JsonTokenType.Number)
{
return DateTimeOffset.FromUnixTimeMilliseconds(reader.GetInt64());
}
if (reader.TokenType == JsonTokenType.String)
{
return DateTimeOffset.Parse(reader.GetString()!);
}
throw new JsonException("Expected a number (ms timestamp) or string (ISO date) for DateTimeOffset field.");
}
public override void Write(Utf8JsonWriter writer, DateTimeOffset value, JsonSerializerOptions options)
{
writer.WriteNumberValue(value.ToUnixTimeMilliseconds());
}
}

View File

@ -1,47 +0,0 @@
using Grpc.Core;
using Grpc.Net.Client;
using Workflow.Api.Grpc;
namespace Workflow.Api.Services;
public interface IAuthGrpcClient
{
Task<(bool Valid, string UserId, List<string> Roles, List<string> Permissions)> ValidateTokenAsync(string token);
Task<(bool Allowed, string UserId, List<string> Roles)> CheckPermissionAsync(string token, string permission);
}
public class AuthGrpcClient : IAuthGrpcClient
{
private readonly AuthService.AuthServiceClient _client;
public AuthGrpcClient(GrpcChannel channel)
{
_client = new AuthService.AuthServiceClient(channel);
}
public async Task<(bool Valid, string UserId, List<string> Roles, List<string> Permissions)> ValidateTokenAsync(string token)
{
try
{
var response = await _client.ValidateTokenAsync(new ValidateTokenRequest { Token = token });
return (response.Valid, response.UserId, response.Roles.ToList(), response.Permissions.ToList());
}
catch (RpcException)
{
return (false, string.Empty, [], []);
}
}
public async Task<(bool Allowed, string UserId, List<string> Roles)> CheckPermissionAsync(string token, string permission)
{
try
{
var response = await _client.CheckPermissionAsync(new CheckPermissionRequest { Token = token, Permission = permission });
return (response.Allowed, response.UserId, response.Roles.ToList());
}
catch (RpcException)
{
return (false, string.Empty, []);
}
}
}

View File

@ -1,34 +1,33 @@
using System.Security.Claims;
using Microsoft.AspNetCore.Http;
using Microsoft.IdentityModel.JsonWebTokens;
using Workflow.Domain.Common;
namespace Workflow.Api.Services;
public class CurrentUserContext : ICurrentUserContext
/// <summary>
/// 基于 HttpContext 的当前用户上下文实现。从已认证的 JWT claims 中读取用户 ID
/// 供 AuditInterceptor 填充审计字段。注意JWT 配置中 MapInboundClaims=false
/// 因此 sub claim 保持短名,必须用 JwtRegisteredClaimNames.Sub 读取,而非 ClaimTypes.NameIdentifier。
/// 未认证或 claim 缺失时返回 Guid.Empty由调用方自行处理不抛异常以免阻断后台任务等无 HTTP 上下文场景)。
/// </summary>
public class CurrentUserContext(IHttpContextAccessor httpContextAccessor) : ICurrentUserContext
{
private readonly IHttpContextAccessor _httpContextAccessor;
public CurrentUserContext(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
public Guid GetUserId()
{
var httpContext = _httpContextAccessor.HttpContext;
var httpContext = httpContextAccessor.HttpContext;
if (httpContext?.User?.Identity?.IsAuthenticated == true)
{
var userIdClaim = httpContext.User.FindFirstValue(ClaimTypes.NameIdentifier)
?? httpContext.User.FindFirstValue("user_id")
?? "system";
return Guid.TryParse(userIdClaim, out var userId) ? userId : Guid.Empty;
// MapInboundClaims=falsesub 保持短名,统一读取
var userIdClaim = httpContext.User.FindFirstValue(JwtRegisteredClaimNames.Sub);
if (Guid.TryParse(userIdClaim, out var userId))
return userId;
}
return Guid.Empty;
}
public string? GetIPAddress()
{
var httpContext = _httpContextAccessor.HttpContext;
return httpContext?.Connection?.RemoteIpAddress?.ToString();
return httpContextAccessor.HttpContext?.Connection?.RemoteIpAddress?.ToString();
}
}

View File

@ -0,0 +1,18 @@
using Workflow.Domain.Common;
namespace Workflow.Api.Services;
/// <summary>
/// 系统身份上下文供后台调度器TimeoutSchedulerService / WebhookDispatcherService
/// 无 HTTP 请求上下文的场景使用,由 AuditInterceptor 填充审计字段时标识为系统操作。
/// SystemUserId 为固定 Guid便于审计查询中区分人工操作与系统自动处理。
/// </summary>
public class SystemUserContext : ICurrentUserContext
{
/// <summary>系统用户固定标识,全库一致,便于审计追溯。</summary>
public static readonly Guid SystemUserId = Guid.Parse("00000000-0000-0000-0000-000000000001");
public Guid GetUserId() => SystemUserId;
public string? GetIPAddress() => null;
}

View File

@ -0,0 +1,65 @@
using Microsoft.Extensions.Options;
using Workflow.Application.Notifications;
using Workflow.Application.Scheduler;
namespace Workflow.Api.Services;
/// <summary>
/// 超时任务调度器:后台 HostedService 壳,周期性调用 OverdueTaskProcessor 扫描逾期任务。
/// 核心逻辑在 Workflow.Application.Scheduler.OverdueTaskProcessor可单元测试
///
/// BackgroundService 是单例,通过 IServiceScopeFactory 创建独立 scope 解析 scoped 依赖DbContext 等)。
/// </summary>
public class TimeoutSchedulerService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<TimeoutSchedulerService> _logger;
private readonly NotificationOptions _options;
public TimeoutSchedulerService(
IServiceScopeFactory scopeFactory,
ILogger<TimeoutSchedulerService> logger,
IOptions<NotificationOptions> options)
{
_scopeFactory = scopeFactory;
_logger = logger;
_options = options.Value;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("TimeoutScheduler 启动,轮询间隔 {Interval}s", _options.Scheduler.PollIntervalSeconds);
// 启动时先等待一轮,避免与应用启动并发争抢资源
await Task.Delay(_options.Scheduler.StartupDelaySeconds * 1000, stoppingToken);
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(_options.Scheduler.PollIntervalSeconds));
do
{
try
{
await RunScanAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "TimeoutScheduler 单轮处理异常");
}
}
while (await timer.WaitForNextTickAsync(stoppingToken));
_logger.LogInformation("TimeoutScheduler 停止");
}
/// <summary>执行一轮扫描:创建 scope解析 OverdueTaskProcessor调用其 ExecuteAsync。</summary>
public async Task RunScanAsync(CancellationToken ct = default)
{
using var scope = _scopeFactory.CreateScope();
var processor = scope.ServiceProvider.GetRequiredService<OverdueTaskProcessor>();
await processor.ExecuteAsync(ct);
}
}

View File

@ -0,0 +1,220 @@
using System.Net;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Workflow.Application.Notifications;
using Workflow.Domain.Entities;
using Workflow.Infrastructure.Persistence;
namespace Workflow.Api.Services;
/// <summary>
/// Webhook 后台投递服务:轮询 wf_webhook_deliveries 中 status=pending 且到下次重试时间的记录,
/// 用 HttpClient POST 投递,记录响应、错误与重试计划。
///
/// 设计要点:
/// 1. SSRF 防护:投递前再次校验 Url 主机在 AllowedHosts 白名单内(落库时已校验,此处兜底),
/// 拒绝 localhost/私有 IP防止内网探测。
/// 2. 重试失败按指数退避2^attempts 分钟)安排下次重试,达 MaxAttempts 标 failed。
/// 3. BackgroundService 单例,通过 IServiceScopeFactory 创建独立 scope 访问 scoped DbContext。
/// </summary>
public class WebhookDispatcherService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<WebhookDispatcherService> _logger;
private readonly NotificationOptions _options;
public WebhookDispatcherService(
IServiceScopeFactory scopeFactory,
IHttpClientFactory httpClientFactory,
IOptions<NotificationOptions> options,
ILogger<WebhookDispatcherService> logger)
{
_scopeFactory = scopeFactory;
_httpClientFactory = httpClientFactory;
_logger = logger;
_options = options.Value;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 未配置任何 AllowedHosts 时Webhook 投递被完全禁用,服务空转退出(避免无谓轮询)
if (_options.Webhook.AllowedHosts is null || _options.Webhook.AllowedHosts.Length == 0)
{
_logger.LogInformation("WebhookDispatcherAllowedHosts 为空Webhook 投递已禁用,服务退出");
return;
}
_logger.LogInformation("WebhookDispatcher 启动,轮询间隔 {Interval}s", _options.Webhook.PollIntervalSeconds);
await Task.Delay(_options.Scheduler.StartupDelaySeconds * 1000, stoppingToken);
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(_options.Webhook.PollIntervalSeconds));
do
{
try
{
await DispatchPendingAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "WebhookDispatcher 单轮投递异常");
}
}
while (await timer.WaitForNextTickAsync(stoppingToken));
_logger.LogInformation("WebhookDispatcher 停止");
}
/// <summary>暴露给单元测试:直接执行一轮投递。</summary>
public async Task DispatchPendingAsync(CancellationToken ct = default)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<WorkflowDbContext>();
var now = DateTime.UtcNow;
// 取出到下次重试时间的 pending 记录,限制每轮批量避免单轮过长
var pending = await db.WebhookDeliveries
.Where(d => d.Status == "pending"
&& d.Attempts < d.MaxAttempts
&& (d.NextRetryAt == null || d.NextRetryAt <= now))
.OrderBy(d => d.CreatedAt)
.Take(50)
.ToListAsync(ct);
if (pending.Count == 0)
return;
var client = _httpClientFactory.CreateClient("Webhook");
client.Timeout = TimeSpan.FromSeconds(_options.Webhook.TimeoutSeconds);
foreach (var delivery in pending)
{
await DeliverOneAsync(db, client, delivery, ct);
}
}
private async Task DeliverOneAsync(
WorkflowDbContext db, HttpClient client, WebhookDelivery delivery, CancellationToken ct)
{
// SSRF 兜底校验(落库时已校验,此处防御配置变更或脏数据)
if (!IsHostAllowed(delivery.Url))
{
delivery.Status = "failed";
delivery.LastError = "目标主机不在 AllowedHosts 白名单SSRF 防护拒绝)";
delivery.Attempts = delivery.MaxAttempts; // 直接终止
delivery.UpdatedAt = DateTime.UtcNow;
await db.SaveChangesAsync(ct);
_logger.LogWarning("Webhook {Id} 投递被 SSRF 防护拦截:{Url}", delivery.Id, delivery.Url);
return;
}
delivery.Attempts++;
delivery.UpdatedAt = DateTime.UtcNow;
try
{
using var content = new StringContent(delivery.Payload, System.Text.Encoding.UTF8, "application/json");
using var request = new HttpRequestMessage(new HttpMethod(delivery.HttpMethod), delivery.Url) { Content = content };
using var response = await client.SendAsync(request, ct);
delivery.StatusCode = (int)response.StatusCode;
// 2xx 视为成功
if (response.IsSuccessStatusCode)
{
delivery.Status = "delivered";
delivery.ResponseBody = await TruncateAsync(response.Content, ct);
delivery.NextRetryAt = null;
delivery.LastError = null;
_logger.LogInformation("Webhook {Id} 投递成功({Status}", delivery.Id, delivery.StatusCode);
}
else
{
// 非 2xx记录响应按重试策略安排下次重试
delivery.ResponseBody = await TruncateAsync(response.Content, ct);
ScheduleRetryOrFail(delivery, $"HTTP {delivery.StatusCode}");
}
}
catch (Exception ex)
{
delivery.StatusCode = null;
ScheduleRetryOrFail(delivery, ex.Message);
_logger.LogWarning(ex, "Webhook {Id} 投递异常", delivery.Id);
}
await db.SaveChangesAsync(ct);
}
/// <summary>
/// 按指数退避2^attempts 分钟)安排下次重试;达 MaxAttempts 则标记 failed。
/// </summary>
private void ScheduleRetryOrFail(WebhookDelivery delivery, string error)
{
delivery.LastError = error;
if (delivery.Attempts >= delivery.MaxAttempts)
{
delivery.Status = "failed";
delivery.NextRetryAt = null;
_logger.LogWarning("Webhook {Id} 达最大尝试次数 {Max},标记 failed", delivery.Id, delivery.MaxAttempts);
}
else
{
// 仍在 pending安排下次重试指数退避1次=2分钟2次=4分钟3次=8分钟…
var delayMinutes = Math.Pow(2, delivery.Attempts);
delivery.NextRetryAt = DateTime.UtcNow.AddMinutes(delayMinutes);
}
}
private static async Task<string?> TruncateAsync(HttpContent content, CancellationToken ct)
{
try
{
var body = await content.ReadAsStringAsync(ct);
return body.Length > 4000 ? body[..4000] : body;
}
catch
{
return null;
}
}
/// <summary>SSRF 防护:校验 URL 主机在白名单内,且拒绝 localhost 与私有/内网 IP。</summary>
internal bool IsHostAllowed(string url)
{
if (_options.Webhook.AllowedHosts is null || _options.Webhook.AllowedHosts.Length == 0)
return false;
if (!Uri.TryCreate(url, UriKind.Absolute, out var uri))
return false;
var host = uri.Host;
// 拒绝 localhost
if (string.Equals(host, "localhost", StringComparison.OrdinalIgnoreCase))
return false;
// 拒绝私有/内网 IP防止白名单被绕过指向内网
if (IPAddress.TryParse(host, out var ip))
{
if (IPAddress.IsLoopback(ip))
return false;
// 私有地址段10.x / 172.16-31.x / 192.168.x
if (ip.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork)
{
var bytes = ip.GetAddressBytes();
if (bytes[0] == 10) return false;
if (bytes[0] == 172 && bytes[1] >= 16 && bytes[1] <= 31) return false;
if (bytes[0] == 192 && bytes[1] == 168) return false;
}
}
return _options.Webhook.AllowedHosts
.Any(allowed => string.Equals(allowed, host, StringComparison.OrdinalIgnoreCase));
}
}

View File

@ -7,11 +7,7 @@
<PackageReference Include="FastEndpoints" Version="8.1.0" />
<PackageReference Include="FastEndpoints.Security" Version="8.1.0" />
<PackageReference Include="FastEndpoints.Swagger" Version="8.1.0" />
<PackageReference Include="Grpc.AspNetCore" Version="2.71.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.71.0" />
</ItemGroup>
<ItemGroup>
<Protobuf Include="Protos\auth.proto" GrpcServices="Client" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="10.0.7" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Workflow.Application\Workflow.Application.csproj" />

View File

@ -4,12 +4,21 @@
"Default": "Host=localhost;Port=5432;Database=workflow;Username=rag;Password=rag123"
},
"Jwt": {
"Issuer": "rag-api",
"Audience": "rag-client",
"SigningKey": "RagJwtSecretKey2026MustBeAtLeast32CharsLong!"
"Authority": "http://localhost:5215",
"RequireHttps": false
},
"Grpc": {
"AuthServerUrl": "http://localhost:50051"
"Notification": {
"Scheduler": {
"PollIntervalSeconds": 60,
"StartupDelaySeconds": 15
},
"Webhook": {
"PollIntervalSeconds": 30,
"TimeoutSeconds": 10,
"MaxAttempts": 3,
"AllowedHosts": [],
"DefaultUrl": ""
}
},
"Logging": {
"LogLevel": {

View File

@ -0,0 +1,59 @@
using System.Text.Json;
namespace Workflow.Application.Engine;
/// <summary>
/// 节点 ConfigJSON解析工具。提取自 ProcessEngine 的私有 ParseConfig/GetString/GetInt/GetBool
/// 供 TimeoutSchedulerService 等应用层组件复用,避免重复实现 JSON 解析逻辑。
/// </summary>
public static class NodeConfigParser
{
public static Dictionary<string, JsonElement> Parse(string? config)
{
if (string.IsNullOrWhiteSpace(config))
return new();
try
{
return JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(config) ?? new();
}
catch (JsonException)
{
return new();
}
}
public static string? GetString(Dictionary<string, JsonElement> config, string key)
{
if (config.TryGetValue(key, out var element) && element.ValueKind == JsonValueKind.String)
return element.GetString();
return null;
}
public static int? GetInt(Dictionary<string, JsonElement> config, string key)
{
if (!config.TryGetValue(key, out var element))
return null;
return element.ValueKind switch
{
JsonValueKind.Number when element.TryGetInt32(out var v) => v,
JsonValueKind.String when int.TryParse(element.GetString(), out var v) => v,
_ => null
};
}
public static bool? GetBool(Dictionary<string, JsonElement> config, string key)
{
if (!config.TryGetValue(key, out var element))
return null;
return element.ValueKind switch
{
JsonValueKind.True => true,
JsonValueKind.False => false,
JsonValueKind.String when bool.TryParse(element.GetString(), out var v) => v,
_ => null
};
}
}

View File

@ -1,6 +1,7 @@
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Workflow.Application.Notifications;
using Workflow.Domain.Entities;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
@ -12,8 +13,15 @@ using TaskStatus = Workflow.Domain.Enums.TaskStatus;
namespace Workflow.Application.Engine;
/// <summary>
/// Core workflow engine that handles token propagation through the workflow graph.
/// Processes nodes by type: Start, End, Approval, Cc, Condition, Parallel, SubProcess.
/// 工作流核心引擎,负责在流程图中传播 Token执行控制权
/// Token 沿边在节点间接力:旧 token 被 Consumed新 token 在下游节点 Active形成推进链路。
/// 引擎按节点类型分发处理Start / End / Approval / Cc / Condition / Parallel / SubProcess
/// 七种节点类型各自定义 Token 的消费与再生产规则。
/// 关键约束:
/// - Approval / SubProcess 节点会让 token 暂停等待人工或子流程回调;
/// - Cc / Condition / Parallel(fork) 节点立即消费当前 token 并向下传播;
/// - Parallel(join) 节点会等待所有入边 token 到齐后才合并并继续;
/// - 所有节点处理都在同一 DbContext 事务内完成,保证 token 与任务状态一致。
/// </summary>
public class ProcessEngine
{
@ -29,7 +37,8 @@ public class ProcessEngine
}
/// <summary>
/// Starts a workflow instance by finding the Start node and propagating tokens.
/// 启动一个流程实例:定位 Start 节点 → 实例置为 Running → 在 Start 节点创建首个 Active token →
/// 立即处理 Start 节点把 token 推向第一个下游节点。
/// </summary>
public async Task StartAsync(WorkflowInstance instance)
{
@ -45,7 +54,7 @@ public class ProcessEngine
// Set instance status to Running
instance.Status = InstanceStatus.Running;
// Create a token at the start node
// 在 Start 节点放置首个 Active token作为后续传播的种子
var token = new WorkflowToken
{
Id = Guid.NewGuid(),
@ -57,12 +66,13 @@ public class ProcessEngine
await _dbContext.SaveChangesAsync();
// Immediately process the start node
// 立即驱动 Start 节点,把 token 推进到第一个业务节点
await ProcessNodeAsync(instance, token, startNode);
}
/// <summary>
/// Processes a node based on its type, propagating tokens accordingly.
/// 按节点类型分发处理。每个节点类型决定如何消费当前 token、是否产生新 token、
/// 以及是否需要暂停等待(如 Approval 等待人工、SubProcess 等待子实例完成)。
/// </summary>
public async Task ProcessNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
@ -95,13 +105,21 @@ public class ProcessEngine
}
/// <summary>
/// Completes a workflow task, executing actions and propagating tokens.
/// 完成一个审批任务:更新任务状态 → 触发对应 hook → 按审批结果选择边Approved 优先于 NormalRejected 独立)
/// → 消费任务所属 token → 在目标节点创建新 token → 递归处理目标节点。
/// 此方法是人工驱动流程推进的唯一入口。
/// </summary>
public async Task CompleteTaskAsync(WorkflowTask task, TaskResult result)
{
// 防重复推进:已完成的任务不可再次完成,否则会重复创建下游 token 并可能重复创建任务。
if (task.Status != TaskStatus.Pending)
{
throw new BusinessException($"任务 {task.Id} 已处理(当前状态:{task.Status}),不可重复完成");
}
// Update task status
task.Status = result == TaskResult.Approved ? TaskStatus.Approved : TaskStatus.Rejected;
task.Result = result.ToString().ToLowerInvariant();
task.Result = JsonSerializer.Serialize(result.ToString().ToLowerInvariant());
task.CompletedAt = DateTime.UtcNow;
// Find the node for this task
@ -113,10 +131,15 @@ public class ProcessEngine
var hook = result == TaskResult.Approved ? "onApproved" : "onRejected";
await ExecuteActionsSafely(node, hook, task.InstanceId, task.Result);
// 直接调用通知服务(不依赖吞异常的 hook保证必达
// 通知落库随主 DbContext 一起提交INotificationService 未注册时静默跳过(零破坏)。
await NotifyTaskResultAsync(task, result);
// Find the token for this task
var token = await _dbContext.WorkflowTokens.FindAsync(task.TokenId);
// Find outgoing edges filtered by result
// 边选择策略:通过时优先 Approved 边,缺省回退 Normal驳回时必须命中 Rejected 边
var edges = await GetOutgoingEdgesAsync(node.Id);
var targetEdge = result == TaskResult.Approved
? edges.FirstOrDefault(e => e.EdgeType == EdgeType.Approved)
@ -140,7 +163,7 @@ public class ProcessEngine
token.CompletedAt = DateTime.UtcNow;
}
// Create new token at target node
// 在目标节点投放新 token把执行控制权移交下游
var newToken = new WorkflowToken
{
Id = Guid.NewGuid(),
@ -161,10 +184,12 @@ public class ProcessEngine
}
/// <summary>
/// Handles the completion of a sub-process instance by propagating the parent token.
/// 处理子流程实例完成后的回调:找到父实例中等待该子流程的 token消费它并在父流程继续向下传播。
/// 父子实例通过 ParentInstanceId / ParentTokenId 关联。
/// </summary>
public async Task HandleSubProcessCompletionAsync(WorkflowInstance childInstance)
{
// 非子流程实例(无父实例)直接忽略
if (childInstance.ParentInstanceId is null)
return;
@ -174,7 +199,7 @@ public class ProcessEngine
if (parentInstance is null)
return;
// Find the token that was waiting for this sub-process
// 找到当初停在 SubProcess 节点、等待本子流程完成的那个父 token
var waitingToken = childInstance.ParentTokenId.HasValue
? await _dbContext.WorkflowTokens.FindAsync(childInstance.ParentTokenId.Value)
: null;
@ -182,16 +207,16 @@ public class ProcessEngine
if (waitingToken is null)
return;
// Find the sub-process node
// 找到 SubProcess 节点本身,以取其下游边
var subProcessNode = await FindNodeAsync(waitingToken.NodeId);
if (subProcessNode is null)
return;
// Consume the waiting token
// 等待 token 生命周期结束
waitingToken.Status = TokenStatus.Consumed;
waitingToken.CompletedAt = DateTime.UtcNow;
// Find outgoing edge and propagate
// 在父流程中继续向下投放新 token
var edges = await GetOutgoingEdgesAsync(subProcessNode.Id);
var edge = edges.FirstOrDefault();
@ -220,6 +245,7 @@ public class ProcessEngine
// Node Processing Methods
// ============================================================
/// <summary>Start 节点:消费 token沿第一条出边投放新 token推进到首个业务节点。</summary>
private async Task ProcessStartNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
var edges = await GetOutgoingEdgesAsync(node.Id);
@ -230,7 +256,7 @@ public class ProcessEngine
token.Status = TokenStatus.Consumed;
token.CompletedAt = DateTime.UtcNow;
// Create new token at the target of the first edge
// Start 节点约定只走第一条出边,多出边应使用 Parallel 节点表达
var targetEdge = edges.First();
var newToken = new WorkflowToken
{
@ -251,6 +277,7 @@ public class ProcessEngine
}
}
/// <summary>End 节点:消费 token若实例已无任何活跃 token则标记实例完成。</summary>
private async Task ProcessEndNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
// Consume the token
@ -259,7 +286,7 @@ public class ProcessEngine
await _dbContext.SaveChangesAsync();
// Check if all tokens for this instance are consumed
// 收敛判定:所有并行分支必须全部到达各自的 End 才算整体完成,否则保持 Running
var allTokens = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id)
.ToListAsync();
@ -274,11 +301,24 @@ public class ProcessEngine
}
}
/// <summary>
/// Approval 节点创建待办审批任务token 保持 Active 直到 <see cref="CompleteTaskAsync"/> 被调用。
/// 受理人规则支持 "user:{guid}"(指定用户)和 "role:{name}"(指定角色)两种前缀语法。
/// </summary>
private async Task ProcessApprovalNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
// 防重复处理:若该 token 已存在待处理任务,说明节点已被处理过,拒绝重复创建任务。
// token 被审批完成时会置为 Consumed此处仅拦截异常路径下同一 Active token 被二次处理。)
var alreadyHasPendingTask = await _dbContext.WorkflowTasks
.AnyAsync(t => t.TokenId == token.Id && t.Status == TaskStatus.Pending);
if (alreadyHasPendingTask)
{
throw new BusinessException($"节点 {node.Name}{node.Id})的 token 已创建待处理任务,不可重复处理");
}
var config = ParseConfig(node.Config);
// Parse assignee
// 受理人规则解析:支持 "user:" 与 "role:" 两种前缀
var assigneeRule = GetString(config, "assigneeRule");
// Create workflow task
@ -288,6 +328,7 @@ public class ProcessEngine
InstanceId = instance.Id,
TokenId = token.Id,
NodeId = node.Id,
Title = $"{instance.Title} - {node.Name}",
Type = TaskType.Approval,
Status = TaskStatus.Pending,
};
@ -296,15 +337,25 @@ public class ProcessEngine
{
if (assigneeRule.StartsWith("user:"))
{
// 指定具体用户:解析 user: 后的 GUID 作为受理人
var userIdStr = assigneeRule["user:".Length..];
task.AssigneeId = Guid.TryParse(userIdStr, out var userId) ? userId : null;
}
else if (assigneeRule.StartsWith("role:"))
{
// 指定角色:按角色名匹配,由前端/上层按角色解析实际受理人
task.AssigneeRole = assigneeRule["role:".Length..];
}
}
// 节点超时配置config.timeoutMinutes分钟设置任务截止时间 DueAt
// 供 TimeoutSchedulerService 扫描逾期任务触发 TimeoutAutoProcess。
var timeoutMinutes = GetInt(config, "timeoutMinutes");
if (timeoutMinutes is { } minutes && minutes > 0)
{
task.DueAt = DateTime.UtcNow.AddMinutes(minutes);
}
_dbContext.WorkflowTasks.Add(task);
// Execute onEnter actions
@ -312,17 +363,25 @@ public class ProcessEngine
await _dbContext.SaveChangesAsync();
// Token stays active - waiting for approval
// 任务到达通知(审批任务创建后通知受理人)。未注册 INotificationService 时静默跳过。
await NotifyTaskArrivedSafelyAsync(task);
// Token 保持 Active 状态:流程在此暂停,等待人工通过 CompleteTaskAsync 推进
}
/// <summary>
/// Cc抄送节点为每个收件人创建知会任务随后立即消费 token 向下传播。
/// 抄送是 fire-and-forget 语义——不等待收件人查阅即继续主流程。
/// </summary>
private async Task ProcessCcNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
var config = ParseConfig(node.Config);
// Parse recipients
// 收件人列表(用户 GUID 字符串数组)
var recipients = GetArray(config, "recipients");
// Create CC tasks for each recipient
// 为每个收件人生成独立的知会任务;不阻塞主流程
var createdCcTasks = new List<WorkflowTask>();
foreach (var recipient in recipients)
{
var ccTask = new WorkflowTask
@ -331,17 +390,19 @@ public class ProcessEngine
InstanceId = instance.Id,
TokenId = token.Id,
NodeId = node.Id,
Title = $"{instance.Title} - {node.Name}",
AssigneeId = Guid.TryParse(recipient, out var rcpId) ? rcpId : null,
Type = TaskType.Cc,
Status = TaskStatus.Pending,
};
_dbContext.WorkflowTasks.Add(ccTask);
createdCcTasks.Add(ccTask);
}
// Execute onEnter actions
await ExecuteActionsSafely(node, "onEnter", instance.Id);
// Consume current token
// 抄送任务创建后立即消费当前 token沿第一条出边继续主流程
token.Status = TokenStatus.Consumed;
token.CompletedAt = DateTime.UtcNow;
@ -373,25 +434,34 @@ public class ProcessEngine
{
await _dbContext.SaveChangesAsync();
}
// 抄送到达通知:通知所有收件人。未注册 INotificationService 时静默跳过。
foreach (var ccTask in createdCcTasks)
{
await NotifyTaskArrivedSafelyAsync(ccTask);
}
}
/// <summary>
/// Condition条件节点按 Order 排序逐条评估出边条件首个匹配分支胜出first-match-wins
/// 命中分支后消费当前 token向目标节点投放新 token。无任何分支命中则抛异常流程定义错误
/// </summary>
private async Task ProcessConditionNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
var edges = await GetOutgoingEdgesAsync(node.Id);
var variables = ParseVariables(instance.Variables);
// Sort edges by order, then evaluate conditions
// 按 Order 升序确保分支评估顺序稳定可控
var orderedEdges = edges.OrderBy(e => e.Order).ToList();
foreach (var edge in orderedEdges)
{
if (_conditionEvaluator.Evaluate(edge.Condition, variables))
{
// Found matching branch - consume current token
// 命中分支:消费当前 token向分支目标投放新 token
token.Status = TokenStatus.Consumed;
token.CompletedAt = DateTime.UtcNow;
// Create new token at target
var newToken = new WorkflowToken
{
Id = Guid.NewGuid(),
@ -403,7 +473,6 @@ public class ProcessEngine
await _dbContext.SaveChangesAsync();
// Process the target node
var targetNode = await FindNodeAsync(edge.TargetNodeId);
if (targetNode is not null)
{
@ -414,10 +483,16 @@ public class ProcessEngine
}
}
// No matching branch found
// 所有分支均不匹配:流程定义本身存在缺陷,直接报错而不是静默挂起
throw new BusinessException("No condition branch matched");
}
/// <summary>
/// Parallel并行节点根据出入边数量自动判定 Fork / Join / 透传三种行为。
/// - Fork多出边消费当前 token为每条出边生成一个独立 token实现并发分支
/// - Join多入边等待所有入边的 token 到齐,全部消费后合并为一个 token 继续向下;
/// - 透传(单入单出):直接接力。
/// </summary>
private async Task ProcessParallelNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
var outgoingEdges = await GetOutgoingEdgesAsync(node.Id);
@ -425,7 +500,7 @@ public class ProcessEngine
if (outgoingEdges.Count > 1)
{
// Fork behavior: create one token per outgoing edge
// Fork1 个 token 进 → N 个 token 出,开启并行分支
token.Status = TokenStatus.Consumed;
token.CompletedAt = DateTime.UtcNow;
@ -446,6 +521,7 @@ public class ProcessEngine
var targetNode = await FindNodeAsync(edge.TargetNodeId);
if (targetNode is not null)
{
// Fork 时为每个分支分别 SaveChanges确保每条分支的 token 都已落库后再独立推进
await _dbContext.SaveChangesAsync();
await ProcessNodeAsync(instance, newToken, targetNode);
}
@ -453,7 +529,8 @@ public class ProcessEngine
}
else if (incomingEdges.Count > 1)
{
// Join behavior: check if all tokens have arrived
// JoinN 个 token 进 → 1 个 token 出,合并并行分支
// 统计当前节点上的活跃 token 数,与入边数量比较以判定是否全部到齐
var tokensAtNode = await _dbContext.WorkflowTokens
.Where(t => t.InstanceId == instance.Id
&& t.NodeId == node.Id
@ -462,7 +539,7 @@ public class ProcessEngine
if (tokensAtNode.Count >= incomingEdges.Count)
{
// All tokens arrived - consume all and create one new token
// 全部到齐:一次性消费所有入边 token合并为单个出边 token
foreach (var t in tokensAtNode)
{
t.Status = TokenStatus.Consumed;
@ -497,13 +574,13 @@ public class ProcessEngine
}
else
{
// Not all tokens arrived yet - wait
// 分支尚未全部到达:本次仅持久化已到达的 token等待剩余分支汇入token 暂不向下传播
await _dbContext.SaveChangesAsync();
}
}
else
{
// Simple pass-through (single in, single out)
// 单入单出:等同于普通节点,直接接力消费并产生新 token
token.Status = TokenStatus.Consumed;
token.CompletedAt = DateTime.UtcNow;
@ -534,6 +611,11 @@ public class ProcessEngine
}
}
/// <summary>
/// SubProcess子流程节点依据 config.definitionId 创建子实例并启动,父 token 停在本节点等待。
/// 父子实例通过 ParentInstanceId / ParentTokenId 关联;子实例完成后由
/// <see cref="HandleSubProcessCompletionAsync"/> 唤醒父流程继续推进。
/// </summary>
private async Task ProcessSubProcessNodeAsync(WorkflowInstance instance, WorkflowToken token, WorkflowNode node)
{
var config = ParseConfig(node.Config);
@ -545,7 +627,7 @@ public class ProcessEngine
if (!Guid.TryParse(definitionIdStr, out var definitionId))
throw new BusinessException("SubProcess definitionId must be a valid GUID");
// Create child instance
// 创建子实例,记录父实例与父 token 以便完成后回调
var childInstance = new WorkflowInstance
{
Id = Guid.NewGuid(),
@ -559,7 +641,7 @@ public class ProcessEngine
await _dbContext.SaveChangesAsync();
// Token stays active at the sub-process node, waiting for child completion
// 父 token 保持 Active 状态停在本节点,等待子流程完成回调后由 HandleSubProcessCompletionAsync 处理
}
// ============================================================
@ -597,6 +679,7 @@ public class ProcessEngine
return new();
var result = new Dictionary<string, object>();
// 把 JsonElement 还原为强类型对象,供条件求值时按类型分派对比器
foreach (var kvp in dict)
{
result[kvp.Key] = kvp.Value.ValueKind switch
@ -612,31 +695,16 @@ public class ProcessEngine
}
catch (JsonException)
{
// 变量 JSON 解析失败不阻断流程,按空变量处理(条件求值会因字段缺失返回 false
return new();
}
}
private static Dictionary<string, JsonElement> ParseConfig(string? config)
{
if (string.IsNullOrWhiteSpace(config))
return new();
try
{
return JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(config) ?? new();
}
catch (JsonException)
{
return new();
}
}
=> NodeConfigParser.Parse(config);
private static string? GetString(Dictionary<string, JsonElement> config, string key)
{
if (config.TryGetValue(key, out var element) && element.ValueKind == JsonValueKind.String)
return element.GetString();
return null;
}
=> NodeConfigParser.GetString(config, key);
private static List<string> GetArray(Dictionary<string, JsonElement> config, string key)
{
@ -649,6 +717,58 @@ public class ProcessEngine
.ToList();
}
private static int? GetInt(Dictionary<string, JsonElement> config, string key)
=> NodeConfigParser.GetInt(config, key);
private static bool? GetBool(Dictionary<string, JsonElement> config, string key)
=> NodeConfigParser.GetBool(config, key);
/// <summary>
/// 安全发送任务到达通知:从 DI 解析 INotificationService可能未注册如单元测试场景
/// 未注册或发送异常均不阻断主流程。通知落库随主 DbContext 一起提交。
/// </summary>
private async Task NotifyTaskArrivedSafelyAsync(WorkflowTask task)
{
try
{
var notifier = _serviceProvider.GetService<INotificationService>();
if (notifier is null)
return;
await notifier.NotifyTaskArrivedAsync(task);
}
catch
{
// 通知是旁路副作用,失败不得阻断 token 推进
}
}
/// <summary>
/// 安全发送任务审批结果通知(通过/驳回)。与 NotifyTaskArrivedSafelyAsync 同样静默容错。
/// </summary>
private async Task NotifyTaskResultAsync(WorkflowTask task, TaskResult result)
{
try
{
var notifier = _serviceProvider.GetService<INotificationService>();
if (notifier is null)
return;
if (result == TaskResult.Approved)
await notifier.NotifyTaskApprovedAsync(task);
else
await notifier.NotifyTaskRejectedAsync(task);
}
catch
{
// 通知是旁路副作用,失败不得阻断 token 推进
}
}
/// <summary>
/// 安全执行节点动作钩子onEnter / onApproved / onRejected
/// 从 DI 按 key 解析 INodeAction未注册或执行异常均被吞掉
/// 确保副作用类动作(如发通知)永不阻断主流程的 token 传播。
/// </summary>
private async Task ExecuteActionsSafely(WorkflowNode node, string hook, Guid instanceId, string? result = null)
{
var config = ParseConfig(node.Config);
@ -685,7 +805,7 @@ public class ProcessEngine
}
catch
{
// Action failure does not block token propagation
// 故意吞掉异常:节点动作属于旁路副作用(通知/日志等),其失败不得阻断主流程推进
}
}
}

View File

@ -0,0 +1,150 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Application.Common;
using Workflow.Domain.Entities;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Features.Notifications;
public record NotificationItemDto(
Guid Id,
Guid? RecipientUserId,
string? RecipientRole,
string Title,
string Content,
string Category,
Guid? RelatedInstanceId,
Guid? RelatedTaskId,
bool IsRead,
DateTime? ReadAt,
DateTime CreatedAt);
/// <summary>查询当前用户的通知列表(含按角色匹配的通知)。</summary>
public record GetNotificationsQuery(
Guid UserId,
IReadOnlyList<string> UserRoles,
bool UnreadOnly,
int PageIndex,
int PageSize
) : IRequest<PagedResult<NotificationItemDto>>;
public class GetNotificationsQueryHandler(WorkflowDbContext db)
: IRequestHandler<GetNotificationsQuery, PagedResult<NotificationItemDto>>
{
public async Task<PagedResult<NotificationItemDto>> Handle(
GetNotificationsQuery request, CancellationToken cancellationToken)
{
var query = db.Notifications.AsNoTracking();
// 收件人匹配:直接发给该用户 OR 发给该用户的某个角色
query = query.Where(n =>
(n.RecipientUserId == request.UserId) ||
(n.RecipientRole != null && request.UserRoles.Contains(n.RecipientRole)));
if (request.UnreadOnly)
{
query = query.Where(n => !n.IsRead);
}
var total = await query.CountAsync(cancellationToken);
var items = await query
.OrderByDescending(n => n.CreatedAt)
.Skip((request.PageIndex - 1) * request.PageSize)
.Take(request.PageSize)
.Select(n => new NotificationItemDto(
n.Id, n.RecipientUserId, n.RecipientRole,
n.Title, n.Content, n.Category,
n.RelatedInstanceId, n.RelatedTaskId,
n.IsRead, n.ReadAt, n.CreatedAt))
.ToListAsync(cancellationToken);
return new PagedResult<NotificationItemDto>
{
Items = items,
Total = total,
PageIndex = request.PageIndex,
PageSize = request.PageSize
};
}
}
/// <summary>查询当前用户的未读通知数(前端角标用)。</summary>
public record GetUnreadNotificationCountQuery(
Guid UserId,
IReadOnlyList<string> UserRoles
) : IRequest<int>;
public class GetUnreadNotificationCountHandler(WorkflowDbContext db)
: IRequestHandler<GetUnreadNotificationCountQuery, int>
{
public async Task<int> Handle(GetUnreadNotificationCountQuery request, CancellationToken cancellationToken)
{
return await db.Notifications
.Where(n => !n.IsRead &&
((n.RecipientUserId == request.UserId) ||
(n.RecipientRole != null && request.UserRoles.Contains(n.RecipientRole))))
.CountAsync(cancellationToken);
}
}
/// <summary>标记单条通知已读。</summary>
public record MarkNotificationReadCommand(Guid NotificationId, Guid UserId) : IRequest<Unit>;
public class MarkNotificationReadCommandHandler(WorkflowDbContext db)
: IRequestHandler<MarkNotificationReadCommand, Unit>
{
public async Task<Unit> Handle(MarkNotificationReadCommand request, CancellationToken cancellationToken)
{
var notification = await db.Notifications.FirstOrDefaultAsync(
n => n.Id == request.NotificationId, cancellationToken)
?? throw new NotFoundException($"通知 '{request.NotificationId}' 不存在");
// 权限校验:只能标记发给自己的通知
if (notification.RecipientUserId != request.UserId)
{
throw new BusinessException("只能标记自己的通知为已读");
}
if (!notification.IsRead)
{
notification.IsRead = true;
notification.ReadAt = DateTime.UtcNow;
notification.UpdatedAt = DateTime.UtcNow;
await db.SaveChangesAsync(cancellationToken);
}
return Unit.Value;
}
}
/// <summary>标记当前用户所有通知为已读。</summary>
public record MarkAllNotificationsReadCommand(Guid UserId, IReadOnlyList<string> UserRoles) : IRequest<Unit>;
public class MarkAllNotificationsReadCommandHandler(WorkflowDbContext db)
: IRequestHandler<MarkAllNotificationsReadCommand, Unit>
{
public async Task<Unit> Handle(MarkAllNotificationsReadCommand request, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
var unread = await db.Notifications
.Where(n => !n.IsRead &&
((n.RecipientUserId == request.UserId) ||
(n.RecipientRole != null && request.UserRoles.Contains(n.RecipientRole))))
.ToListAsync(cancellationToken);
foreach (var n in unread)
{
n.IsRead = true;
n.ReadAt = now;
n.UpdatedAt = now;
}
if (unread.Count > 0)
{
await db.SaveChangesAsync(cancellationToken);
}
return Unit.Value;
}
}

View File

@ -22,12 +22,20 @@ public class CreateEdgeCommandHandler(WorkflowDbContext db)
{
public async Task<WorkflowEdgeDto> Handle(CreateEdgeCommand request, CancellationToken cancellationToken)
{
_ = await db.WorkflowDefinitions.FindAsync([request.DefinitionId], cancellationToken)
?? throw new NotFoundException($"Workflow definition '{request.DefinitionId}' not found.");
var sourceNode = await db.WorkflowNodes.FindAsync([request.SourceNodeId], cancellationToken)
?? throw new NotFoundException($"Source node '{request.SourceNodeId}' not found.");
var targetNode = await db.WorkflowNodes.FindAsync([request.TargetNodeId], cancellationToken)
?? throw new NotFoundException($"Target node '{request.TargetNodeId}' not found.");
if (sourceNode.DefinitionId != request.DefinitionId || targetNode.DefinitionId != request.DefinitionId)
{
throw new BusinessException("Source and target nodes must belong to the same workflow definition as the edge.");
}
var entity = new WorkflowEdge
{
Id = Guid.NewGuid(),

View File

@ -14,7 +14,8 @@ public record CreateNodeCommand(
string Name,
string? Config,
int PositionX,
int PositionY
int PositionY,
Guid? FormDefinitionId
) : IRequest<WorkflowNodeDto>;
public class CreateNodeCommandHandler(WorkflowDbContext db)
@ -25,6 +26,24 @@ public class CreateNodeCommandHandler(WorkflowDbContext db)
var definition = await db.WorkflowDefinitions.FindAsync([request.DefinitionId], cancellationToken)
?? throw new NotFoundException($"Workflow definition '{request.DefinitionId}' not found.");
// 不变量:仅 Approval/Cc 节点可绑定表单。其它节点携带 FormDefinitionId 直接拒绝,
// 与 UI 契约NodePropertyDrawer.vue:199一致避免脏数据被持久化后永不渲染。
if (request.FormDefinitionId.HasValue
&& request.NodeType is not (NodeType.Approval or NodeType.Cc))
{
throw new BusinessException($"节点类型 {request.NodeType} 不支持绑定表单仅审批Approval与抄送Cc节点可绑定表单");
}
string? formName = null;
if (request.FormDefinitionId.HasValue)
{
formName = await db.FormDefinitions
.Where(f => f.Id == request.FormDefinitionId.Value)
.Select(f => f.Name)
.FirstOrDefaultAsync(cancellationToken)
?? throw new NotFoundException($"Form definition '{request.FormDefinitionId.Value}' not found.");
}
var entity = new WorkflowNode
{
Id = Guid.NewGuid(),
@ -33,7 +52,8 @@ public class CreateNodeCommandHandler(WorkflowDbContext db)
Name = request.Name,
Config = request.Config,
PositionX = request.PositionX,
PositionY = request.PositionY
PositionY = request.PositionY,
FormDefinitionId = request.FormDefinitionId
};
db.WorkflowNodes.Add(entity);
@ -45,7 +65,9 @@ public class CreateNodeCommandHandler(WorkflowDbContext db)
entity.Name,
entity.Config,
entity.PositionX,
entity.PositionY
entity.PositionY,
entity.FormDefinitionId,
formName
);
}
}

View File

@ -1,4 +1,5 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
@ -14,6 +15,11 @@ public class DeleteNodeCommandHandler(WorkflowDbContext db)
var entity = await db.WorkflowNodes.FindAsync([request.NodeId], cancellationToken)
?? throw new NotFoundException($"Workflow node '{request.NodeId}' not found.");
var connectedEdges = await db.WorkflowEdges
.Where(e => e.SourceNodeId == request.NodeId || e.TargetNodeId == request.NodeId)
.ToListAsync(cancellationToken);
db.WorkflowEdges.RemoveRange(connectedEdges);
db.WorkflowNodes.Remove(entity);
await db.SaveChangesAsync(cancellationToken);

View File

@ -5,8 +5,13 @@ using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Features.WorkflowDefinitions.Commands;
/// <summary>发布流程定义命令:仅 Draft 状态可发布,发布后内容不可再改且可被实例化。</summary>
public record PublishWorkflowDefinitionCommand(Guid Id) : IRequest<Unit>;
/// <summary>
/// 发布处理器:将状态从 Draft 切换为 Published 并自增版本号。
/// 版本自增保证每次发布都产生新的不可变快照,已运行的旧实例仍绑定其启动时的版本。
/// </summary>
public class PublishWorkflowDefinitionCommandHandler(WorkflowDbContext db)
: IRequestHandler<PublishWorkflowDefinitionCommand, Unit>
{
@ -15,6 +20,7 @@ public class PublishWorkflowDefinitionCommandHandler(WorkflowDbContext db)
var entity = await db.WorkflowDefinitions.FindAsync([request.Id], cancellationToken)
?? throw new NotFoundException($"Workflow definition '{request.Id}' not found.");
// 仅草稿可发布:防止已发布/已停用的定义被重复发布,保证版本语义清晰
if (entity.Status != DefinitionStatus.Draft)
{
throw new BusinessException("Only draft workflow definitions can be published.");

View File

@ -1,5 +1,7 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Application.Features.WorkflowDefinitions.DTOs;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
@ -10,7 +12,8 @@ public record UpdateNodeCommand(
string Name,
string? Config,
int PositionX,
int PositionY
int PositionY,
Guid? FormDefinitionId
) : IRequest<WorkflowNodeDto>;
public class UpdateNodeCommandHandler(WorkflowDbContext db)
@ -21,10 +24,29 @@ public class UpdateNodeCommandHandler(WorkflowDbContext db)
var entity = await db.WorkflowNodes.FindAsync([request.NodeId], cancellationToken)
?? throw new NotFoundException($"Workflow node '{request.NodeId}' not found.");
// 不变量:仅 Approval/Cc 节点可绑定表单。UpdateNodeCommand 不含 NodeType
// 故校验基于实体当前的 NodeType。与 UI 契约NodePropertyDrawer.vue:199一致。
if (request.FormDefinitionId.HasValue
&& entity.NodeType is not (NodeType.Approval or NodeType.Cc))
{
throw new BusinessException($"节点类型 {entity.NodeType} 不支持绑定表单仅审批Approval与抄送Cc节点可绑定表单");
}
string? formName = null;
if (request.FormDefinitionId.HasValue)
{
formName = await db.FormDefinitions
.Where(f => f.Id == request.FormDefinitionId.Value)
.Select(f => f.Name)
.FirstOrDefaultAsync(cancellationToken)
?? throw new NotFoundException($"Form definition '{request.FormDefinitionId.Value}' not found.");
}
entity.Name = request.Name;
entity.Config = request.Config;
entity.PositionX = request.PositionX;
entity.PositionY = request.PositionY;
entity.FormDefinitionId = request.FormDefinitionId;
await db.SaveChangesAsync(cancellationToken);
@ -34,7 +56,9 @@ public class UpdateNodeCommandHandler(WorkflowDbContext db)
entity.Name,
entity.Config,
entity.PositionX,
entity.PositionY
entity.PositionY,
entity.FormDefinitionId,
formName
);
}
}

View File

@ -36,7 +36,9 @@ public record WorkflowNodeDto(
string Name,
string? Config,
int PositionX,
int PositionY
int PositionY,
Guid? FormDefinitionId,
string? FormName
);
public record WorkflowEdgeDto(

View File

@ -28,6 +28,18 @@ public class GetWorkflowDefinitionByIdQueryHandler(WorkflowDbContext db)
.FirstOrDefaultAsync(cancellationToken);
}
var nodeFormIds = entity.Nodes
.Where(n => n.FormDefinitionId.HasValue)
.Select(n => n.FormDefinitionId!.Value)
.Distinct()
.ToList();
var nodeFormNames = nodeFormIds.Count == 0
? new Dictionary<Guid, string>()
: await db.FormDefinitions
.Where(f => nodeFormIds.Contains(f.Id))
.ToDictionaryAsync(f => f.Id, f => f.Name, cancellationToken);
return new WorkflowDefinitionDetailDto(
entity.Id,
entity.Name,
@ -45,7 +57,11 @@ public class GetWorkflowDefinitionByIdQueryHandler(WorkflowDbContext db)
n.Name,
n.Config,
n.PositionX,
n.PositionY
n.PositionY,
n.FormDefinitionId,
n.FormDefinitionId.HasValue && nodeFormNames.TryGetValue(n.FormDefinitionId.Value, out var nodeFormName)
? nodeFormName
: null
)).ToList(),
entity.Edges.Select(e => new WorkflowEdgeDto(
e.Id,

View File

@ -1,6 +1,9 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
using Workflow.Application.Engine;
using Workflow.Application.Form.Schema;
using Workflow.Domain.Entities;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
@ -8,6 +11,7 @@ using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Features.WorkflowInstances.Commands;
/// <summary>启动流程实例命令:按 DefinitionCode 解析定义 → 校验表单数据 → 创建实例 → 落库初始表单数据 → 触发引擎从 Start 节点传播 token。</summary>
public record StartWorkflowInstanceCommand(
string DefinitionCode,
string Title,
@ -15,6 +19,10 @@ public record StartWorkflowInstanceCommand(
string? FormDataJson
) : IRequest<Guid>;
/// <summary>
/// 启动流程实例处理器。流程变量来源优先级:显式 Variables 覆盖 FormDataJson 字段
/// (通过 BuildVariablesJson 合并),保证表单字段既能作为条件求值变量,又可被显式变量覆写。
/// </summary>
public class StartWorkflowInstanceCommandHandler(WorkflowDbContext db, ProcessEngine processEngine)
: IRequestHandler<StartWorkflowInstanceCommand, Guid>
{
@ -29,13 +37,45 @@ public class StartWorkflowInstanceCommandHandler(WorkflowDbContext db, ProcessEn
throw new BusinessException($"流程定义 '{request.DefinitionCode}' 已禁用");
}
FormDefinition? startFormDefinition = null;
if (!string.IsNullOrEmpty(request.FormDataJson) && definition.FormDefinitionId.HasValue)
{
// 绕过软删除过滤器加载表单定义,以区分「已被删除」与「确实不存在」两种情况,
// 避免抛出误导性的「不存在」错误(表单实际存在,只是被删除)。
startFormDefinition = await db.FormDefinitions
.IgnoreQueryFilters()
.FirstOrDefaultAsync(f => f.Id == definition.FormDefinitionId.Value, cancellationToken);
if (startFormDefinition is null)
{
throw new BusinessException($"表单定义 {definition.FormDefinitionId.Value} 不存在");
}
if (startFormDefinition.IsDeleted)
{
throw new BusinessException($"表单定义 {startFormDefinition.Name}{definition.FormDefinitionId.Value})已被删除,无法启动流程");
}
// 产品决策FormStatus.Disabled 严格阻断——停用的表单不允许启动新流程。
if (startFormDefinition.Status == FormStatus.Disabled)
{
throw new BusinessException($"表单定义 {startFormDefinition.Name}{definition.FormDefinitionId.Value})已停用,无法启动流程");
}
var validation = FormDataValidator.Validate(startFormDefinition.SchemaJson ?? "{}", request.FormDataJson);
if (!validation.IsValid)
{
throw new BusinessException($"表单数据校验失败: {string.Join("; ", validation.Errors)}");
}
}
var instance = new WorkflowInstance
{
Id = Guid.NewGuid(),
DefinitionId = definition.Id,
Title = request.Title,
Status = InstanceStatus.Running,
Variables = request.Variables,
Variables = BuildVariablesJson(request.FormDataJson, request.Variables),
InitiatorId = Guid.Empty
};
@ -43,12 +83,12 @@ public class StartWorkflowInstanceCommandHandler(WorkflowDbContext db, ProcessEn
await db.SaveChangesAsync(cancellationToken);
// Save form data if provided and definition has an associated form
if (!string.IsNullOrEmpty(request.FormDataJson) && definition.FormDefinitionId.HasValue)
if (!string.IsNullOrEmpty(request.FormDataJson) && startFormDefinition is not null)
{
db.FormData.Add(new Domain.Entities.FormData
{
Id = Guid.NewGuid(),
FormDefinitionId = definition.FormDefinitionId.Value,
FormDefinitionId = startFormDefinition.Id,
InstanceId = instance.Id,
DataJson = request.FormDataJson,
});
@ -60,4 +100,54 @@ public class StartWorkflowInstanceCommandHandler(WorkflowDbContext db, ProcessEn
return instance.Id;
}
/// <summary>
/// 合并流程变量:先合并表单数据,再合并显式变量(后者覆盖前者同名字段)。
/// 合并语义:表单字段既可作为条件变量,又允许调用方显式覆写;任一来源为非法 JSON 时静默跳过。
/// 全部为空时返回 null表示实例无初始变量。
/// </summary>
private static string? BuildVariablesJson(string? formDataJson, string? variablesJson)
{
var merged = new JsonObject();
var hasMergedValues = false;
hasMergedValues |= TryMergeObject(merged, formDataJson);
if (string.IsNullOrWhiteSpace(variablesJson))
{
return hasMergedValues ? merged.ToJsonString() : null;
}
var variablesMerged = TryMergeObject(merged, variablesJson);
if (!variablesMerged && !hasMergedValues)
{
return variablesJson;
}
return merged.ToJsonString();
}
private static bool TryMergeObject(JsonObject target, string? json)
{
if (string.IsNullOrWhiteSpace(json))
return false;
try
{
var node = JsonNode.Parse(json);
if (node is not JsonObject obj)
return false;
foreach (var kvp in obj)
{
target[kvp.Key] = kvp.Value?.DeepClone();
}
return obj.Count > 0;
}
catch (JsonException)
{
return false;
}
}
}

View File

@ -7,11 +7,16 @@ using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Features.WorkflowInstances.Commands;
/// <summary>撤回流程实例命令。仅发起人可撤回,且需流程尚未处理任何节点(无已结束任务)。</summary>
public record WithdrawWorkflowInstanceCommand(
Guid InstanceId,
Guid UserId
) : IRequest<Unit>;
/// <summary>
/// 撤回处理器:通过 InstanceStateMachine 校验转换合法性后,将实例置为 Terminated
/// 并同步终止所有活跃 token防止引擎继续推进已撤回的流程。
/// </summary>
public class WithdrawWorkflowInstanceCommandHandler(WorkflowDbContext db)
: IRequestHandler<WithdrawWorkflowInstanceCommand, Unit>
{
@ -24,6 +29,7 @@ public class WithdrawWorkflowInstanceCommandHandler(WorkflowDbContext db)
.Where(t => t.InstanceId == request.InstanceId)
.ToListAsync(cancellationToken);
// 已处理节点判定:存在任何非 Pending 任务即视为流程已推进,不可撤回
var hasProcessedTasks = tasks.Any(t => t.Status != Domain.Enums.TaskStatus.Pending);
var context = new InstanceTransitionContext
@ -34,6 +40,7 @@ public class WithdrawWorkflowInstanceCommandHandler(WorkflowDbContext db)
var stateMachine = new InstanceStateMachine();
instance.Status = stateMachine.Transition(instance.Status, InstanceOperation.Withdraw, context);
// 同步终止所有 token切断流程图的执行控制权避免遗留 Active token 被引擎继续推进
var tokens = await db.WorkflowTokens
.Where(t => t.InstanceId == request.InstanceId)
.ToListAsync(cancellationToken);

View File

@ -8,6 +8,7 @@ using TaskStatus = Workflow.Domain.Enums.TaskStatus;
namespace Workflow.Application.Features.WorkflowInstances.Queries;
/// <summary>流程监控统计查询:聚合实例各状态数量与待办/逾期任务数,供监控大屏展示。</summary>
public record MonitorWorkflowInstancesQuery : IRequest<WorkflowMonitorDto>;
public class MonitorWorkflowInstancesQueryHandler(WorkflowDbContext db)
@ -22,6 +23,7 @@ public class MonitorWorkflowInstancesQueryHandler(WorkflowDbContext db)
var terminatedInstances = await db.WorkflowInstances.CountAsync(i => i.Status == InstanceStatus.Terminated, cancellationToken);
var pendingTasks = await db.WorkflowTasks.CountAsync(t => t.Status == TaskStatus.Pending, cancellationToken);
// 逾期判定:仅统计 Pending 且 DueAt 早于当前 UTC 时间的任务。DueAt 与比较基准均使用 UTC避免时区偏差
var overdueTasks = await db.WorkflowTasks
.CountAsync(t => t.Status == TaskStatus.Pending && t.DueAt != null && t.DueAt < DateTime.UtcNow, cancellationToken);

View File

@ -1,17 +1,28 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
using Workflow.Application.Engine;
using Workflow.Application.Form.Schema;
using Workflow.Domain.Entities;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Features.WorkflowTasks.Commands;
/// <summary>审批通过任务命令。仅任务受理人AssigneeId可执行可选提交节点表单数据。</summary>
public record ApproveTaskCommand(
Guid TaskId,
Guid UserId,
string? Comment
string? Comment,
string? FormDataJson = null
) : IRequest<Unit>;
/// <summary>
/// 审批通过处理器:校验受理人身份 → 持久化表单数据(若提供)→ 合并表单字段到实例变量 →
/// 调用引擎 CompleteTaskAsync 推进流程。表单字段合并进变量后即可被下游条件节点用于求值。
/// </summary>
public class ApproveTaskCommandHandler(WorkflowDbContext db, ProcessEngine processEngine)
: IRequestHandler<ApproveTaskCommand, Unit>
{
@ -20,6 +31,7 @@ public class ApproveTaskCommandHandler(WorkflowDbContext db, ProcessEngine proce
var task = await db.WorkflowTasks.FindAsync([request.TaskId], cancellationToken)
?? throw new NotFoundException($"Task '{request.TaskId}' not found.");
// 受理人鉴权:仅任务指定的 AssigneeId 可审批,防止越权操作他人任务
if (task.AssigneeId != request.UserId)
{
throw new UnauthorizedException("Only the assignee can approve this task.");
@ -27,8 +39,108 @@ public class ApproveTaskCommandHandler(WorkflowDbContext db, ProcessEngine proce
task.Comment = request.Comment;
if (!string.IsNullOrWhiteSpace(request.FormDataJson))
{
await SaveNodeFormDataAsync(task, request.FormDataJson, cancellationToken);
}
await processEngine.CompleteTaskAsync(task, TaskResult.Approved);
return Unit.Value;
}
private async Task SaveNodeFormDataAsync(
WorkflowTask task,
string formDataJson,
CancellationToken cancellationToken)
{
var node = await db.WorkflowNodes
.AsNoTracking()
.FirstOrDefaultAsync(n => n.Id == task.NodeId, cancellationToken)
?? throw new BusinessException("Task node not found");
if (!node.FormDefinitionId.HasValue)
{
throw new BusinessException("当前任务节点未绑定表单,无法提交表单数据");
}
// 绕过软删除过滤器加载表单定义,以区分「已被删除」与「确实不存在」两种情况,
// 避免抛出误导性的「不存在」错误(表单实际存在,只是被删除)。
var formDefinition = await db.FormDefinitions
.IgnoreQueryFilters()
.AsNoTracking()
.FirstOrDefaultAsync(f => f.Id == node.FormDefinitionId.Value, cancellationToken);
if (formDefinition is null)
{
throw new BusinessException($"节点绑定的表单定义 {node.FormDefinitionId.Value} 不存在");
}
if (formDefinition.IsDeleted)
{
throw new BusinessException($"节点绑定的表单定义 {formDefinition.Name}{node.FormDefinitionId.Value})已被删除,无法提交表单数据");
}
// 产品决策FormStatus.Disabled 严格阻断——停用的表单不接受审批提交。
if (formDefinition.Status == FormStatus.Disabled)
{
throw new BusinessException($"节点绑定的表单定义 {formDefinition.Name}{node.FormDefinitionId.Value})已停用,无法提交表单数据");
}
var validation = FormDataValidator.Validate(
formDefinition.SchemaJson ?? "{}",
formDataJson,
currentNodeKey: node.Name);
if (!validation.IsValid)
{
throw new BusinessException($"表单数据校验失败: {string.Join("; ", validation.Errors)}");
}
var instance = await db.WorkflowInstances
.FirstOrDefaultAsync(i => i.Id == task.InstanceId, cancellationToken)
?? throw new BusinessException("Instance not found");
db.FormData.Add(new FormData
{
Id = Guid.NewGuid(),
FormDefinitionId = formDefinition.Id,
InstanceId = task.InstanceId,
DataJson = formDataJson,
});
instance.Variables = MergeVariablesJson(instance.Variables, formDataJson);
}
/// <summary>
/// 将节点表单字段合并进实例变量:表单字段覆盖同名已有变量,
/// 使审批中填写的表单数据可被下游 Condition 节点作为求值依据。
/// </summary>
private static string MergeVariablesJson(string? variablesJson, string formDataJson)
{
var merged = new JsonObject();
TryMergeObject(merged, variablesJson);
TryMergeObject(merged, formDataJson);
return merged.ToJsonString();
}
private static void TryMergeObject(JsonObject target, string? json)
{
if (string.IsNullOrWhiteSpace(json))
return;
try
{
if (JsonNode.Parse(json) is not JsonObject obj)
return;
foreach (var kvp in obj)
{
target[kvp.Key] = kvp.Value?.DeepClone();
}
}
catch (JsonException)
{
// Invalid existing variables should not prevent a valid task form from being stored.
}
}
}

View File

@ -22,6 +22,24 @@ public class DelegateTaskCommandHandler(WorkflowDbContext db)
var task = await db.WorkflowTasks.FindAsync([request.TaskId], cancellationToken)
?? throw new NotFoundException($"Task '{request.TaskId}' not found.");
// 状态校验:仅 Pending 任务可委派,避免重复推进或委派已完结任务。
if (task.Status != TaskStatus.Pending)
{
throw new BusinessException($"任务 '{request.TaskId}' 当前状态为 {task.Status},不可委派(仅待处理任务可委派)");
}
// 授权校验:仅当前 assignee 可委派,与 Approve/Reject/Transfer 一致。
if (task.AssigneeId != request.FromUserId)
{
throw new UnauthorizedException("只有当前办理人可以委派此任务。");
}
// 禁止委派给自己(无意义且会造成脏数据)。
if (request.FromUserId == request.ToUserId)
{
throw new BusinessException("不能将任务委派给自己。");
}
// Create a new delegated task for the target user
var newTask = new WorkflowTask
{

View File

@ -0,0 +1,53 @@
using MediatR;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
using TaskStatus = Workflow.Domain.Enums.TaskStatus;
namespace Workflow.Application.Features.WorkflowTasks.Commands;
/// <summary>
/// 标记 Cc抄送任务为已读。仅适用于 Cc 任务Cc 为知会性质,不参与 token 路由,
/// 故此处仅更新任务状态,不调用 ProcessEngine。
/// 解决 Cc 任务永久处于 Pending、无法清理的功能缺口。
/// </summary>
public record MarkCcTaskReadCommand(
Guid TaskId,
Guid UserId
) : IRequest<Unit>;
public class MarkCcTaskReadCommandHandler(WorkflowDbContext db)
: IRequestHandler<MarkCcTaskReadCommand, Unit>
{
public async Task<Unit> Handle(MarkCcTaskReadCommand request, CancellationToken cancellationToken)
{
var task = await db.WorkflowTasks.FindAsync([request.TaskId], cancellationToken)
?? throw new NotFoundException($"Task '{request.TaskId}' not found.");
// 仅 Cc 任务可标记已读:审批/转办等任务有各自的生命周期,不应通过此路径变更状态。
if (task.Type != TaskType.Cc)
{
throw new BusinessException($"任务 '{request.TaskId}' 不是抄送Cc任务不可标记已读");
}
// 授权校验:仅 assignee 可标记自己的抄送任务已读。
if (task.AssigneeId != request.UserId)
{
throw new UnauthorizedException("只有任务办理人可以标记此抄送任务为已读。");
}
// 幂等/防重复:已读的 Cc 任务不可重复标记。
if (task.Status != TaskStatus.Pending)
{
throw new BusinessException($"抄送任务 '{request.TaskId}' 已是 {task.Status} 状态,不可重复标记已读");
}
task.Status = TaskStatus.Read;
task.CompletedAt = DateTime.UtcNow;
await db.SaveChangesAsync(cancellationToken);
return Unit.Value;
}
}

View File

@ -6,12 +6,17 @@ using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Features.WorkflowTasks.Commands;
/// <summary>驳回任务命令。仅任务受理人AssigneeId可执行引擎将沿 Rejected 边向下传播。</summary>
public record RejectTaskCommand(
Guid TaskId,
Guid UserId,
string? Comment
) : IRequest<Unit>;
/// <summary>
/// 驳回处理器:校验受理人身份后调用引擎 CompleteTaskAsync(TaskResult.Rejected)
/// 引擎会寻找 Rejected 类型的出边继续推进;若无 Rejected 边则抛 BusinessException。
/// </summary>
public class RejectTaskCommandHandler(WorkflowDbContext db, ProcessEngine processEngine)
: IRequestHandler<RejectTaskCommand, Unit>
{
@ -20,6 +25,7 @@ public class RejectTaskCommandHandler(WorkflowDbContext db, ProcessEngine proces
var task = await db.WorkflowTasks.FindAsync([request.TaskId], cancellationToken)
?? throw new NotFoundException($"Task '{request.TaskId}' not found.");
// 受理人鉴权:仅 AssigneeId 可驳回
if (task.AssigneeId != request.UserId)
{
throw new UnauthorizedException("Only the assignee can reject this task.");

View File

@ -22,6 +22,24 @@ public class TransferTaskCommandHandler(WorkflowDbContext db)
var task = await db.WorkflowTasks.FindAsync([request.TaskId], cancellationToken)
?? throw new NotFoundException($"Task '{request.TaskId}' not found.");
// 状态校验:仅 Pending 任务可转办,避免重复推进或转办已完结任务。
if (task.Status != TaskStatus.Pending)
{
throw new BusinessException($"任务 '{request.TaskId}' 当前状态为 {task.Status},不可转办(仅待处理任务可转办)");
}
// 授权校验:仅当前 assignee 可转办,与 Approve/Reject 一致。
if (task.AssigneeId != request.FromUserId)
{
throw new UnauthorizedException("只有当前办理人可以转办此任务。");
}
// 禁止转办给自己(无意义且会造成脏数据)。
if (request.FromUserId == request.ToUserId)
{
throw new BusinessException("不能将任务转办给自己。");
}
// Create a new task for the target user
var newTask = new WorkflowTask
{

View File

@ -1,4 +1,5 @@
using MediatR;
using Workflow.Application.Notifications;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
@ -12,7 +13,13 @@ public record UrgeTaskCommand(
Guid UserId
) : IRequest<Unit>;
public class UrgeTaskCommandHandler(WorkflowDbContext db)
/// <summary>
/// 催办命令:校验任务为 Pending 后,通过 INotificationService 向受理人发送催办通知。
/// 通知落库随主事务提交必达。INotificationService 未注册时静默跳过(单元测试场景)。
/// </summary>
public class UrgeTaskCommandHandler(
WorkflowDbContext db,
INotificationService? notifier = null)
: IRequestHandler<UrgeTaskCommand, Unit>
{
public async Task<Unit> Handle(UrgeTaskCommand request, CancellationToken cancellationToken)
@ -25,9 +32,11 @@ public class UrgeTaskCommandHandler(WorkflowDbContext db)
throw new BusinessException("Only pending tasks can be urged.");
}
// Urge logic: in a real system this would send a notification
// For now, we validate the task is pending and return success
// Notification/email sending would be handled by a domain event or outbox pattern
// 催办通知:通知受理人尽快处理。通知落库随下方 SaveChangesAsync 一起提交。
if (notifier is not null)
{
await notifier.NotifyTaskUrgedAsync(task, cancellationToken);
}
await db.SaveChangesAsync(cancellationToken);

View File

@ -11,3 +11,20 @@ public record WorkflowTaskListItemDto(
string? Title,
DateTime? CompletedAt
);
public record WorkflowTaskDetailDto(
Guid Id,
Guid InstanceId,
Guid TokenId,
Guid NodeId,
string? NodeName,
Guid? AssigneeId,
TaskStatus Status,
string? Title,
DateTime? CompletedAt,
Guid? FormDefinitionId,
string? FormName,
string? FormSchemaJson,
string? NodeFormDataJson,
string? InstanceFormDataJson
);

View File

@ -9,6 +9,7 @@ using TaskStatus = Workflow.Domain.Enums.TaskStatus;
namespace Workflow.Application.Features.WorkflowTasks.Queries;
/// <summary>逾期任务查询Pending 且 DueAt 已早于当前 UTC 时间。可按 UserId 过滤为"我的逾期任务"。</summary>
public record GetOverdueTasksQuery(
Guid? UserId,
int PageIndex,
@ -20,9 +21,11 @@ public class GetOverdueTasksQueryHandler(WorkflowDbContext db)
{
public async Task<PagedResult<WorkflowTaskListItemDto>> Handle(GetOverdueTasksQuery request, CancellationToken cancellationToken)
{
// 逾期判定基准统一为 UTC按 DueAt 升序,最紧迫的逾期任务排在前
var query = db.WorkflowTasks
.Where(t => t.Status == TaskStatus.Pending && t.DueAt != null && t.DueAt < DateTime.UtcNow);
// UserId 过滤:传入则只看该用户的逾期任务,未传则返回全员逾期视图(管理端用)
if (request.UserId.HasValue)
{
query = query.Where(t => t.AssigneeId == request.UserId.Value);

View File

@ -9,6 +9,7 @@ using TaskStatus = Workflow.Domain.Enums.TaskStatus;
namespace Workflow.Application.Features.WorkflowTasks.Queries;
/// <summary>我的待办任务查询:按 AssigneeId 严格过滤,仅返回该用户的 Pending 任务。</summary>
public record GetPendingTasksQuery(
Guid UserId,
int PageIndex,

View File

@ -6,24 +6,79 @@ using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Features.WorkflowTasks.Queries;
public record GetTaskByIdQuery(Guid Id) : IRequest<WorkflowTaskListItemDto>;
public record GetTaskByIdQuery(Guid Id) : IRequest<WorkflowTaskDetailDto>;
public class GetTaskByIdQueryHandler(WorkflowDbContext db)
: IRequestHandler<GetTaskByIdQuery, WorkflowTaskListItemDto>
: IRequestHandler<GetTaskByIdQuery, WorkflowTaskDetailDto>
{
public async Task<WorkflowTaskListItemDto> Handle(GetTaskByIdQuery request, CancellationToken cancellationToken)
public async Task<WorkflowTaskDetailDto> Handle(GetTaskByIdQuery request, CancellationToken cancellationToken)
{
var task = await db.WorkflowTasks.FindAsync([request.Id], cancellationToken)
var task = await db.WorkflowTasks
.AsNoTracking()
.FirstOrDefaultAsync(t => t.Id == request.Id, cancellationToken)
?? throw new NotFoundException($"Task '{request.Id}' not found.");
return new WorkflowTaskListItemDto(
var node = await db.WorkflowNodes
.AsNoTracking()
.FirstOrDefaultAsync(n => n.Id == task.NodeId, cancellationToken);
var formDefinition = node?.FormDefinitionId is null
? null
: await db.FormDefinitions
.AsNoTracking()
.FirstOrDefaultAsync(f => f.Id == node.FormDefinitionId.Value, cancellationToken);
var nodeFormDataJson = node?.FormDefinitionId is null
? null
: await db.FormData
.AsNoTracking()
.Where(f => f.InstanceId == task.InstanceId && f.FormDefinitionId == node.FormDefinitionId.Value)
.OrderByDescending(f => f.CreatedAt)
.Select(f => f.DataJson)
.FirstOrDefaultAsync(cancellationToken);
var instanceFormDataJson = await GetInstanceFormDataJsonAsync(task.InstanceId, cancellationToken);
return new WorkflowTaskDetailDto(
task.Id,
task.InstanceId,
task.TokenId,
task.NodeId,
node?.Name,
task.AssigneeId,
task.Status,
task.Title,
task.CompletedAt
task.CompletedAt,
node?.FormDefinitionId,
formDefinition?.Name,
formDefinition?.SchemaJson,
nodeFormDataJson,
instanceFormDataJson
);
}
private async Task<string?> GetInstanceFormDataJsonAsync(Guid instanceId, CancellationToken cancellationToken)
{
var definitionFormId = await db.WorkflowInstances
.AsNoTracking()
.Where(i => i.Id == instanceId)
.Join(
db.WorkflowDefinitions.AsNoTracking(),
instance => instance.DefinitionId,
definition => definition.Id,
(_, definition) => definition.FormDefinitionId)
.FirstOrDefaultAsync(cancellationToken);
if (definitionFormId is null)
{
return null;
}
return await db.FormData
.AsNoTracking()
.Where(f => f.InstanceId == instanceId && f.FormDefinitionId == definitionFormId.Value)
.OrderByDescending(f => f.CreatedAt)
.Select(f => f.DataJson)
.FirstOrDefaultAsync(cancellationToken);
}
}

View File

@ -0,0 +1,22 @@
using Workflow.Application.Form.Schema;
namespace Workflow.Application.Form.DTOs;
/// <summary>表单版本历史条目</summary>
public class FormVersionDto
{
public Guid Id { get; set; }
public int Version { get; set; }
public string? SchemaJson { get; set; }
public string Source { get; set; } = "Update";
public string? ChangeSummary { get; set; }
public DateTime CreatedAt { get; set; }
}
/// <summary>两个版本的对比结果</summary>
public class FormVersionCompareDto
{
public FormVersionDto? OldVersion { get; set; }
public FormVersionDto? NewVersion { get; set; }
public SchemaDiff Diff { get; set; } = new([], [], []);
}

View File

@ -3,7 +3,7 @@ namespace Workflow.Application.Form.DTOs;
public class PagedResult<T>
{
public List<T> Items { get; set; } = new();
public int TotalCount { get; set; }
public int Total { get; set; }
public int PageIndex { get; set; }
public int PageSize { get; set; }
}

View File

@ -1,7 +1,7 @@
using System.Text.Json;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Application.Form.Schema;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
@ -22,46 +22,32 @@ public class SubmitFormDataCommandHandler(WorkflowDbContext db)
{
public async Task<Guid> Handle(SubmitFormDataCommand request, CancellationToken cancellationToken)
{
// 绕过软删除过滤器加载表单定义,以区分「已被删除」与「确实不存在」两种情况,
// 避免抛出误导性的「不存在」错误(表单实际存在,只是被删除)。
var formDefinition = await db.FormDefinitions
.FirstOrDefaultAsync(f => f.Id == request.FormDefinitionId, cancellationToken)
?? throw new BusinessException($"表单定义 {request.FormDefinitionId} 不存在");
.IgnoreQueryFilters()
.FirstOrDefaultAsync(f => f.Id == request.FormDefinitionId, cancellationToken);
var schemaValidation = SchemaValidator.Validate(formDefinition.SchemaJson ?? "{}");
var fieldSummaries = schemaValidation.Fields;
var data = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(
request.DataJson,
new JsonSerializerOptions { PropertyNameCaseInsensitive = true })
?? new Dictionary<string, JsonElement>();
foreach (var field in fieldSummaries.Where(f => f.Required))
if (formDefinition is null)
{
if (!data.ContainsKey(field.Path) || IsNullOrEmpty(data[field.Path]))
{
var label = !string.IsNullOrEmpty(field.Title) ? field.Title : field.Path;
throw new BusinessException($"必填字段 {label} 缺失");
}
throw new BusinessException($"表单定义 {request.FormDefinitionId} 不存在");
}
foreach (var field in fieldSummaries)
if (formDefinition.IsDeleted)
{
if (!data.TryGetValue(field.Path, out var value)) continue;
throw new BusinessException($"表单定义 {formDefinition.Name}{request.FormDefinitionId})已被删除,无法提交表单数据");
}
if (field.JsonType is "number" or "integer")
{
if (value.ValueKind == JsonValueKind.String)
{
var str = value.GetString();
if (!double.TryParse(str, out _))
{
throw new BusinessException($"字段 {field.Path} 类型不匹配,期望数字");
}
}
else if (value.ValueKind != JsonValueKind.Number)
{
throw new BusinessException($"字段 {field.Path} 类型不匹配,期望数字");
}
}
// 产品决策FormStatus.Disabled 严格阻断——停用的表单不接受新提交。
if (formDefinition.Status == FormStatus.Disabled)
{
throw new BusinessException($"表单定义 {formDefinition.Name}{request.FormDefinitionId})已停用,无法提交表单数据");
}
var validation = FormDataValidator.Validate(formDefinition.SchemaJson ?? "{}", request.DataJson);
if (!validation.IsValid)
{
throw new BusinessException($"表单数据校验失败: {string.Join("; ", validation.Errors)}");
}
var formData = new Domain.Entities.FormData
@ -77,10 +63,4 @@ public class SubmitFormDataCommandHandler(WorkflowDbContext db)
return formData.Id;
}
private static bool IsNullOrEmpty(JsonElement element)
{
return element.ValueKind == JsonValueKind.Null ||
(element.ValueKind == JsonValueKind.String && string.IsNullOrEmpty(element.GetString()));
}
}

View File

@ -34,7 +34,7 @@ public class CreateFormDefinitionCommandHandler(WorkflowDbContext db)
.Where(c => c.IsActive)
.Select(c => c.Name)
.ToListAsync(cancellationToken);
var allowedSet = components.ToHashSet();
var allowedSet = components.Count == 0 ? null : components.ToHashSet();
var validation = SchemaValidator.Validate(request.SchemaJson, allowedSet);
if (!validation.IsValid)

View File

@ -16,6 +16,22 @@ public class DeleteFormDefinitionCommandHandler(WorkflowDbContext db)
.FirstOrDefaultAsync(f => f.Id == request.Id, cancellationToken)
?? throw new NotFoundException($"表单 {request.Id} 不存在");
// 删除前检查:表单若仍被活跃流程引用(流程定义的 FormDefinitionId 或流程节点的
// FormDefinitionId阻断删除。WorkflowDefinition/WorkflowNode 均实现 ISoftDelete
// 全局查询过滤器会自动排除已软删除的引用,因此仅被已删除流程引用时不会被锁死。
var referencedByDefinition = await db.WorkflowDefinitions
.AsNoTracking()
.AnyAsync(d => d.FormDefinitionId == request.Id, cancellationToken);
var referencedByNode = await db.WorkflowNodes
.AsNoTracking()
.AnyAsync(n => n.FormDefinitionId == request.Id, cancellationToken);
if (referencedByDefinition || referencedByNode)
{
throw new BusinessException("该表单正被流程引用,无法删除");
}
entity.IsDeleted = true;
await db.SaveChangesAsync(cancellationToken);
}

View File

@ -1,5 +1,6 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Domain.Entities;
using Workflow.Domain.Enums;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
@ -23,6 +24,16 @@ public class PublishFormDefinitionCommandHandler(WorkflowDbContext db)
}
entity.Status = FormStatus.Published;
// 发布时也记录一条版本快照,便于追溯发布时刻的 schema
db.FormDefinitionVersions.Add(new FormDefinitionVersion
{
FormDefinitionId = entity.Id,
Version = entity.Version,
SchemaJson = entity.SchemaJson,
Source = "Publish",
});
await db.SaveChangesAsync(cancellationToken);
}
}

View File

@ -2,6 +2,7 @@ using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Application.Form.DTOs;
using Workflow.Application.Form.Schema;
using Workflow.Domain.Entities;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
@ -9,6 +10,7 @@ namespace Workflow.Application.Form.FormDefinition.Commands;
/// <summary>
/// 更新表单定义:接收新的 Formily JSON Schema校验后替换 SchemaJson 并递增 Version。
/// 同时写入一条历史版本快照,用于版本对比。
/// </summary>
public record UpdateFormDefinitionCommand(
Guid Id,
@ -30,7 +32,7 @@ public class UpdateFormDefinitionCommandHandler(WorkflowDbContext db)
.Where(c => c.IsActive)
.Select(c => c.Name)
.ToListAsync(cancellationToken);
var allowedSet = components.ToHashSet();
var allowedSet = components.Count == 0 ? null : components.ToHashSet();
var validation = SchemaValidator.Validate(request.SchemaJson, allowedSet);
if (!validation.IsValid)
@ -43,6 +45,15 @@ public class UpdateFormDefinitionCommandHandler(WorkflowDbContext db)
entity.SchemaJson = request.SchemaJson;
entity.Version++;
// 写入历史版本快照
db.FormDefinitionVersions.Add(new FormDefinitionVersion
{
FormDefinitionId = entity.Id,
Version = entity.Version,
SchemaJson = request.SchemaJson,
Source = "Update",
});
await db.SaveChangesAsync(cancellationToken);
return new FormDefinitionDto

View File

@ -0,0 +1,79 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Application.Form.DTOs;
using Workflow.Application.Form.Schema;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Form.FormDefinition.Queries;
/// <summary>对比表单的两个历史版本(或当前版本与某历史版本)</summary>
public record CompareFormVersionsQuery(Guid FormDefinitionId, Guid? OldVersionId, Guid? NewVersionId)
: IRequest<FormVersionCompareDto>;
public class CompareFormVersionsQueryHandler(WorkflowDbContext db)
: IRequestHandler<CompareFormVersionsQuery, FormVersionCompareDto>
{
public async Task<FormVersionCompareDto> Handle(CompareFormVersionsQuery request, CancellationToken cancellationToken)
{
var form = await db.FormDefinitions
.FirstOrDefaultAsync(f => f.Id == request.FormDefinitionId, cancellationToken)
?? throw new NotFoundException($"表单 {request.FormDefinitionId} 不存在");
// 新版本:指定版本快照,否则用当前表单
FormVersionDto? newVersion = null;
if (request.NewVersionId is { } newId)
{
var nv = await db.FormDefinitionVersions.FirstOrDefaultAsync(v => v.Id == newId, cancellationToken);
if (nv is not null)
{
newVersion = ToDto(nv);
}
}
newVersion ??= new FormVersionDto
{
Id = Guid.Empty,
Version = form.Version,
SchemaJson = form.SchemaJson,
Source = "Current",
CreatedAt = form.UpdatedAt,
};
// 旧版本:指定版本快照,否则取当前版本的前一个快照
FormVersionDto? oldVersion = null;
if (request.OldVersionId is { } oldId)
{
var ov = await db.FormDefinitionVersions.FirstOrDefaultAsync(v => v.Id == oldId, cancellationToken);
if (ov is not null) oldVersion = ToDto(ov);
}
if (oldVersion is null)
{
// 自动取比 newVersion 早一个的快照
var prev = await db.FormDefinitionVersions
.Where(v => v.FormDefinitionId == request.FormDefinitionId && v.Version < newVersion.Version)
.OrderByDescending(v => v.Version)
.ThenByDescending(v => v.CreatedAt)
.FirstOrDefaultAsync(cancellationToken);
if (prev is not null) oldVersion = ToDto(prev);
}
var diff = SchemaDiffer.Diff(oldVersion?.SchemaJson, newVersion.SchemaJson);
return new FormVersionCompareDto
{
OldVersion = oldVersion,
NewVersion = newVersion,
Diff = diff,
};
}
private static FormVersionDto ToDto(Workflow.Domain.Entities.FormDefinitionVersion v) => new()
{
Id = v.Id,
Version = v.Version,
SchemaJson = v.SchemaJson,
Source = v.Source,
ChangeSummary = v.ChangeSummary,
CreatedAt = v.CreatedAt,
};
}

View File

@ -44,7 +44,7 @@ public class GetFormDefinitionListQueryHandler(WorkflowDbContext db)
return new PagedResult<FormDefinitionDto>
{
Items = items,
TotalCount = totalCount,
Total = totalCount,
PageIndex = request.PageIndex,
PageSize = request.PageSize,
};

View File

@ -0,0 +1,35 @@
using MediatR;
using Microsoft.EntityFrameworkCore;
using Workflow.Application.Form.DTOs;
using Workflow.Domain.Exceptions;
using Workflow.Infrastructure.Persistence;
namespace Workflow.Application.Form.FormDefinition.Queries;
/// <summary>获取表单的历史版本列表(按版本号降序)</summary>
public record GetFormVersionsQuery(Guid FormDefinitionId) : IRequest<List<FormVersionDto>>;
public class GetFormVersionsQueryHandler(WorkflowDbContext db)
: IRequestHandler<GetFormVersionsQuery, List<FormVersionDto>>
{
public async Task<List<FormVersionDto>> Handle(GetFormVersionsQuery request, CancellationToken cancellationToken)
{
var exists = await db.FormDefinitions.AnyAsync(f => f.Id == request.FormDefinitionId, cancellationToken);
if (!exists) throw new NotFoundException($"表单 {request.FormDefinitionId} 不存在");
return await db.FormDefinitionVersions
.Where(v => v.FormDefinitionId == request.FormDefinitionId)
.OrderByDescending(v => v.Version)
.ThenByDescending(v => v.CreatedAt)
.Select(v => new FormVersionDto
{
Id = v.Id,
Version = v.Version,
SchemaJson = v.SchemaJson,
Source = v.Source,
ChangeSummary = v.ChangeSummary,
CreatedAt = v.CreatedAt,
})
.ToListAsync(cancellationToken);
}
}

View File

@ -0,0 +1,67 @@
namespace Workflow.Application.Form.Schema;
/// <summary>
/// 字段级数据权限求值器根据当前审批节点名计算应被隐藏hidden的字段路径集合。
///
/// 设计与 ReactionEvaluator 对齐:两者都产出"应跳过的字段路径集合",在 FormDataValidator
/// 中与联动隐藏集合 union 后统一跳过必填校验。隐藏字段不参与校验,因为用户在前端看不到、也无法填写。
///
/// 权限解析优先级:精确节点名命中 > __default__ 兜底 > 默认 visible可见
/// </summary>
public static class FieldPermissionEvaluator
{
/// <summary>
/// 计算在指定节点下应被隐藏hidden的字段路径集合。
/// currentNodeKey 为 null 时返回空集(无节点上下文 = 不做权限过滤,如普通表单数据录入)。
/// </summary>
public static HashSet<string> GetHiddenFields(
IReadOnlyList<FieldSummary> fields,
string? currentNodeKey)
{
var hidden = new HashSet<string>();
if (string.IsNullOrWhiteSpace(currentNodeKey))
{
return hidden;
}
foreach (var field in fields)
{
var action = ResolveAction(field.FieldPermission, currentNodeKey);
if (action == "hidden")
{
hidden.Add(field.Path);
}
}
return hidden;
}
/// <summary>
/// 解析某字段在指定节点的权限动作。优先级:
/// 1. 精确节点名命中
/// 2. __default__ 兜底
/// 3. 默认 visible
/// </summary>
internal static string ResolveAction(
IReadOnlyDictionary<string, string>? permission,
string currentNodeKey)
{
if (permission is null || permission.Count == 0)
{
return "visible";
}
if (permission.TryGetValue(currentNodeKey, out var exact))
{
return exact;
}
if (permission.TryGetValue("__default__", out var fallback))
{
return fallback;
}
return "visible";
}
}

View File

@ -1,11 +1,59 @@
namespace Workflow.Application.Form.Schema;
using System.Text.Json;
/// <summary>
/// 从 Formily Schema 中提取的数据字段摘要,用于 FormData 提交时的后端校验。
/// </summary>
public record FieldSummary(
string Path,
string JsonType,
string? Component,
bool Required,
string? Title
string? Title,
IReadOnlyList<FormOptionSummary> Options,
IReadOnlyList<FormValidatorSummary> Validators,
IReadOnlyList<ReactionSummary> Reactions,
/// <summary>
/// 字段级数据权限key=节点名(或特殊键 __default__/__initiator__
/// value=权限动作visible 可见可编辑 / readonly 只读 / hidden 隐藏)。
/// null 表示该字段未配置任何节点权限(所有节点均默认可见可编辑)。
/// </summary>
IReadOnlyDictionary<string, string>? FieldPermission
);
public record FormOptionSummary(
string Label,
JsonElement Value,
bool Disabled
);
public record FormValidatorSummary(
string Type,
JsonElement? Value,
IReadOnlyList<JsonElement> Values,
string? Message
);
/// <summary>
/// 字段上的联动规则摘要。type=condition 时 When/Action 有值type=data 时 Expression 有值。
/// </summary>
public record ReactionSummary(
string Type,
string? Target,
ReactionWhenSummary? When,
string? Action,
ReactionExpressionSummary? Expression
);
public record ReactionWhenSummary(
string Source,
string Operator,
JsonElement? Value
);
public record ReactionExpressionSummary(
string Left,
string Operator,
string Right
);

View File

@ -0,0 +1,8 @@
namespace Workflow.Application.Form.Schema;
public record FormDataValidationResult(bool IsValid, IReadOnlyList<string> Errors)
{
public static FormDataValidationResult Valid() => new(true, []);
public static FormDataValidationResult Invalid(IReadOnlyList<string> errors) => new(false, errors);
}

View File

@ -0,0 +1,234 @@
using System.Text.Json;
using System.Text.RegularExpressions;
namespace Workflow.Application.Form.Schema;
/// <summary>
/// 表单数据校验器。结合 SchemaValidator 解析出的字段元数据,对提交的表单 JSON 做必填/类型/选项/规则四类校验。
/// 关键设计:联动隐藏的字段跳过校验(与前端 useReactions 行为对齐);
/// 字段级数据权限 hidden 的字段同样跳过(与前端 applyFieldPermissions 行为对齐);
/// 嵌套字段支持点分路径取值(兼容容器组件嵌套结构与旧扁平提交)。
/// </summary>
public static class FormDataValidator
{
/// <summary>
/// 校验表单数据是否符合 schema 定义。返回错误列表,空表示通过。
/// </summary>
/// <param name="schemaJson">表单结构定义 JSON</param>
/// <param name="dataJson">提交的表单数据 JSON</param>
/// <param name="allowedComponents">允许的组件白名单;为 null 表示不限制(用于隔离租户/场景自定义组件)</param>
/// <param name="currentNodeKey">
/// 当前审批节点名用于字段级数据权限hidden 字段跳过必填校验)。
/// 为 null 表示无节点上下文(如普通表单数据录入),不做权限过滤。
/// </param>
public static FormDataValidationResult Validate(
string schemaJson,
string dataJson,
HashSet<string>? allowedComponents = null,
string? currentNodeKey = null)
{
var schemaValidation = SchemaValidator.Validate(schemaJson, allowedComponents);
if (!schemaValidation.IsValid)
{
return FormDataValidationResult.Invalid(schemaValidation.Errors);
}
JsonElement dataRoot;
try
{
dataRoot = JsonSerializer.Deserialize<JsonElement>(
dataJson,
new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
}
catch (JsonException ex)
{
return FormDataValidationResult.Invalid([$"表单数据 JSON 格式无效: {ex.Message}"]);
}
// 兼容:旧逻辑把 dataJson 反序列化为扁平字典。ReactionEvaluator 仍依赖扁平字典,
// 这里按原有契约构建一份(顶层 key → JsonElement不包含嵌套字段。
var flatData = dataRoot.ValueKind == JsonValueKind.Object
? dataRoot.EnumerateObject().ToDictionary(p => p.Name, p => p.Value.Clone())
: new Dictionary<string, JsonElement>();
var errors = new List<string>();
// 求值联动规则:被联动隐藏的字段不参与校验(跳过必填/类型/选项/规则检查)
var hiddenFields = ReactionEvaluator.GetHiddenFields(schemaValidation.Fields, flatData);
// 字段级数据权限当前节点下应隐藏hidden的字段也跳过校验。
// 与联动隐藏集合 union复用同一条 if (hiddenFields.Contains(path)) continue 跳过逻辑。
if (!string.IsNullOrWhiteSpace(currentNodeKey))
{
hiddenFields.UnionWith(
FieldPermissionEvaluator.GetHiddenFields(schemaValidation.Fields, currentNodeKey));
}
foreach (var field in schemaValidation.Fields)
{
// 联动隐藏或权限隐藏的字段直接跳过(用户在前端看不到,提交的值无意义)
if (hiddenFields.Contains(field.Path)) continue;
var label = string.IsNullOrWhiteSpace(field.Title) ? field.Path : field.Title;
// 按点分路径在嵌套 JSON 中取值(与前端 useValidation.getValue 行为对齐):
// FormGrid/FormLayout 等容器下的字段 Path 形如 "dateRange.startDate"
// 而提交数据是嵌套对象 {dateRange:{startDate:...}},必须逐层下钻。
var exists = TryGetByDottedPath(dataRoot, field.Path, out var value);
if (field.Required && (!exists || IsNullOrEmpty(value)))
{
errors.Add($"必填字段 {label} 缺失");
continue;
}
if (!exists || IsNullOrEmpty(value)) continue;
ValidateJsonType(field, value, errors);
ValidateOptions(field, value, errors);
ValidateRules(field, value, label, errors);
}
return errors.Count == 0
? FormDataValidationResult.Valid()
: FormDataValidationResult.Invalid(errors);
}
private static bool IsNullOrEmpty(JsonElement element)
{
return element.ValueKind == JsonValueKind.Null ||
(element.ValueKind == JsonValueKind.String && string.IsNullOrEmpty(element.GetString()));
}
private static void ValidateJsonType(FieldSummary field, JsonElement value, List<string> errors)
{
if (field.JsonType is "number" or "integer")
{
if (value.ValueKind == JsonValueKind.String)
{
if (!double.TryParse(value.GetString(), out _))
{
errors.Add($"字段 {field.Path} 类型不匹配,期望数字");
}
}
else if (value.ValueKind != JsonValueKind.Number)
{
errors.Add($"字段 {field.Path} 类型不匹配,期望数字");
}
}
if (field.JsonType == "boolean" && value.ValueKind is not JsonValueKind.True and not JsonValueKind.False)
{
errors.Add($"字段 {field.Path} 类型不匹配,期望布尔值");
}
if (field.JsonType == "array" && value.ValueKind != JsonValueKind.Array)
{
errors.Add($"字段 {field.Path} 类型不匹配,期望数组");
}
}
private static void ValidateOptions(FieldSummary field, JsonElement value, List<string> errors)
{
if (field.Options.Count == 0) return;
var current = value.ToString();
var allowed = field.Options
.Where(o => !o.Disabled)
.Select(o => o.Value.ToString())
.ToHashSet();
if (!allowed.Contains(current))
{
errors.Add($"字段 {field.Path} 的值不在允许范围内");
}
}
private static void ValidateRules(FieldSummary field, JsonElement value, string label, List<string> errors)
{
foreach (var rule in field.Validators)
{
switch (rule.Type)
{
case "minLength":
if (value.ValueKind == JsonValueKind.String &&
rule.Value.HasValue &&
value.GetString()!.Length < rule.Value.Value.GetInt32())
{
errors.Add(rule.Message ?? $"{label} 长度过短");
}
break;
case "maxLength":
if (value.ValueKind == JsonValueKind.String &&
rule.Value.HasValue &&
value.GetString()!.Length > rule.Value.Value.GetInt32())
{
errors.Add(rule.Message ?? $"{label} 长度过长");
}
break;
case "min":
if (TryGetNumber(value, out var minNumber) &&
rule.Value.HasValue &&
minNumber < rule.Value.Value.GetDouble())
{
errors.Add(rule.Message ?? $"{label} 不能小于 {rule.Value.Value}");
}
break;
case "max":
if (TryGetNumber(value, out var maxNumber) &&
rule.Value.HasValue &&
maxNumber > rule.Value.Value.GetDouble())
{
errors.Add(rule.Message ?? $"{label} 不能大于 {rule.Value.Value}");
}
break;
case "pattern":
if (value.ValueKind == JsonValueKind.String &&
rule.Value.HasValue &&
!Regex.IsMatch(value.GetString() ?? "", rule.Value.Value.GetString() ?? ""))
{
errors.Add(rule.Message ?? $"{label} 格式不正确");
}
break;
}
}
}
private static bool TryGetNumber(JsonElement value, out double number)
{
if (value.ValueKind == JsonValueKind.Number) return value.TryGetDouble(out number);
if (value.ValueKind == JsonValueKind.String) return double.TryParse(value.GetString(), out number);
number = 0;
return false;
}
/// <summary>
/// 按点分路径(如 "dateRange.startDate")在嵌套 JSON 中逐层取值。
/// 支持两种数据形态:
/// 1. 嵌套对象:{dateRange:{startDate:"2026-06-15"}}
/// 2. 扁平 key{"dateRange.startDate":"2026-06-15"}(兼容旧提交)
/// 与前端 useValidation.getValue 的路径解析行为保持一致。
/// </summary>
private static bool TryGetByDottedPath(JsonElement root, string path, out JsonElement value)
{
// 优先尝试扁平 key直接命中避免破坏旧契约
if (root.ValueKind == JsonValueKind.Object &&
root.TryGetProperty(path, out value))
{
return true;
}
var current = root;
foreach (var segment in path.Split('.'))
{
if (current.ValueKind != JsonValueKind.Object ||
!current.TryGetProperty(segment, out current))
{
value = default;
return false;
}
}
value = current;
return true;
}
}

Some files were not shown because too many files have changed in this diff Show More