中 | EN
基于 eShopOnDapr 的.Net Core
分布式应用程序示例,由Masa.BuildingBlocks, Masa.Contrib, Masa.Utils,Dapr提供支持。
Masa.EShop
├── dapr
│ ├── components dapr本地组件定义目录
│ │ ├── pubsub.yaml 发布订阅配置文件
│ │ └── statestore.yaml 状态管理配置文件
├── src 源文件目录
│ ├── Api
│ │ ├── Masa.EShop.Api.Caller Caller调用封装
│ │ └── Masa.EShop.Api.Open BFF层,提供接口给Web.Client
│ ├── Contracts 公用元素提取,如服务间通信的Event Class
│ │ ├── Masa.EShop.Contracts.Basket
│ │ ├── Masa.EShop.Contracts.Catalog
│ │ ├── Masa.EShop.Contracts.Ordering
│ │ └── Masa.EShop.Contracts.Payment
│ ├── Services 服务拆分
│ │ ├── Masa.EShop.Services.Basket
│ │ ├── Masa.EShop.Services.Catalog
│ │ ├── Masa.EShop.Services.Ordering
│ │ └── Masa.EShop.Services.Payment
│ ├── Web
│ │ ├── Masa.EShop.Web.Admin
│ │ └── Masa.EShop.Web.Client
├── test
| └── Masa.EShop.Services.Catalog.Tests
├── docker-compose docker-compose 服务配置
│ ├── Masa.EShop.Web.Admin
│ └── Masa.EShop.Web.Client
├── .gitignore git提交的忽略文件
├── LICENSE 项目许可
├── .dockerignore docker构建的忽略文件
└── README.md 项目说明文件
-
准备工作
- Docker
- VS 2022
- .Net 6.0
- Dapr
-
启动项目
-
启动效果
Baseket Service: http://localhost:8081/swagger/index.html
Catalog Service: http://localhost:8082/swagger/index.html
Ordering Service: http://localhost:8083/swagger/index.html
Payment Service: http://localhost:8084/swagger/index.html
Admin Web: empty
Client Web: http://localhost:8090/catalog
项目中的服务使用 .Net 6.0 新增的 Minimal API 方式代替原有的 Web API 实现
更多 Minimal API 内容参考mvc-to-minimal-apis-aspnet-6
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.MapGet("/api/v1/helloworld", ()=>"Hello World");
app.Run();
Masa.Contrib.Service.MinimalAPIs
对 Minimal API 进一步封装, 修改代码为:
var builder = WebApplication.CreateBuilder(args);
var app = builder.Services.AddServices(builder);
app.Run();
public class HelloService : ServiceBase
{
public HelloService(IServiceCollection services): base(services) =>
App.MapGet("/api/v1/helloworld", ()=>"Hello World"));
}
增加了 ServiceBase 类(相当于 ControllerBase),使用时定义自己的 Service 类(相当于 Controller),在构造函数中维护路由注册。
AddServices(builder)
方法会找到所有服务类完成注册。继承 ServiceBase 类为单例模式,构造函数注入只可以注入单例,如 Repostory 等应该借助 FromService 实现方法注入。
官方 Dapr 使用介绍,Masa.Contrib 封装的 Dapr 实现参考了 Event 部分
更多 Dapr 内容参考:https://docs.microsoft.com/zh-cn/dotnet/architecture/dapr-for-net-developers/
- 添加 Dapr
builder.Services.AddDaprClient();
...
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
});
- 发布事件
var @event = new OrderStatusChangedToValidatedIntegrationEvent();
await _daprClient.PublishEventAsync
(
"pubsub",
nameof(OrderStatusChangedToValidatedIntegrationEvent),
@event
);
- 订阅事件
[Topic("pubsub", nameof(OrderStatusChangedToValidatedIntegrationEvent)]
public async Task OrderStatusChangedToValidatedAsync(
OrderStatusChangedToValidatedIntegrationEvent integrationEvent,
[FromServices] ILogger<IntegrationEventService> logger)
{
logger.LogInformation("----- integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", integrationEvent.Id, Program.AppName, integrationEvent);
}
Topic 的第一个参数 pubsub 为配置文件 pubsub.yaml 中的 name
- 项目中增加 Actor 支持
app.UseEndpoints(endpoint =>
{
...
endpoint.MapActorsHandlers(); //Actor 支持
});
- 定义 Actor 接口,继承 IActor。
public interface IOrderingProcessActor : IActor
{
- 实现
IOrderingProcessActor
,并继承Actor
类。示例项目还实现了IRemindable
接口,实现该接口后通过方法RegisterReminderAsync
完成注册提醒。
public class OrderingProcessActor : Actor, IOrderingProcessActor, IRemindable
{
//todo
}
- 注册 Actor
builder.Services.AddActors(options =>
{
options.Actors.RegisterActor<OrderingProcessActor>();
});
- Actor 调用代码
var actorId = new ActorId(order.Id.ToString());
var actor = ActorProxy.Create<IOrderingProcessActor>(actorId, nameof(OrderingProcessActor));
仅支持发送进程内事件
- 添加 EventBus
builder.Services.AddEventBus();
- 自定义 Event
public class DemoEvent : Event
{
//todo 自定义属性事件参数
}
- 发送 Event
IEventBus eventBus;
await eventBus.PublishAsync(new DemoEvent());
- 处理事件
[EventHandler]
public async Task DemoHandleAsync(DemoEvent @event)
{
//todo
}
发送跨进程事件,但当同时添加 EventBus 时,也支持进程内事件
- 添加 IntegrationEventBus
builder.Services
.AddDaprEventBus<IntegrationEventLogService>();
// .AddDaprEventBus<IntegrationEventLogService>(options=>{
// //todo
// options.UseEventBus();//添加EventBus
// });
- 自定义 Event
public class DemoIntegrationEvent : IntegrationEvent
{
public override string Topic { get; set; } = nameof(DemoIntegrationEvent);
//todo 自定义属性事件参数
}
Topic 属性值为 Dapr pub/sub 相关特性 TopicAttribute 第二个参数的值
- 发送 Event
public class DemoService
{
private readonly IIntegrationEventBus _eventBus;
public DemoService(IIntegrationEventBus eventBus)
{
_eventBus = eventBus;
}
//todo
public async Task DemoPublish()
{
//todo
await _eventBus.PublishAsync(new DemoIntegrationEvent());
}
}
- 处理事件
[Topic("pubsub", nameof(DemoIntegrationEvent))]
public async Task DemoIntegrationEventHandleAsync(DemoIntegrationEvent @event)
{
//todo
}
更多关于 CQRS 文档请参考:https://docs.microsoft.com/en-us/azure/architecture/patterns/cqrs
- 定义 Query
public class CatalogItemQuery : Query<List<CatalogItem>>
{
public string Name { get; set; } = default!;
public override List<CatalogItem> Result { get; set; } = default!;
}
- 添加 QueryHandler, 例:
public class CatalogQueryHandler
{
private readonly ICatalogItemRepository _catalogItemRepository;
public CatalogQueryHandler(ICatalogItemRepository catalogItemRepository) => _catalogItemRepository = catalogItemRepository;
[EventHandler]
public async Task ItemsWithNameAsync(CatalogItemQuery query)
{
query.Result = await _catalogItemRepository.GetListAsync(query.Name);
}
}
- 发送 Query
IEventBus eventBus;
await eventBus.PublishAsync(new CatalogItemQuery(){
Name = "Rolex"
});//进程内使用IEventBus
- 定义 Command
public class CreateCatalogItemCommand : Command
{
public string Name { get; set; } = default!;
//todo
}
- 添加 CommandHandler, 例:
public class CatalogCommandHandler
{
private readonly ICatalogItemRepository _catalogItemRepository;
public CatalogCommandHandler(ICatalogItemRepository catalogItemRepository) => _catalogItemRepository = catalogItemRepository;
[EventHandler]
public async Task CreateCatalogItemAsync(CreateCatalogItemCommand command)
{
//todo
}
}
- 发送 Command
IEventBus eventBus;
await eventBus.PublishAsync(new CreateCatalogItemCommand());//进程内使用IEventBus
既可以可发送进程内事件、也可发送跨进程事件
- 添加 DomainEventBus
.AddDomainEventBus(options =>
{
options.UseEventBus()//使用进程内事件
.UseUow<PaymentDbContext>(dbOptions => dbOptions.UseSqlServer("server=masa.eshop.services.eshop.database;uid=sa;pwd=P@ssw0rd;database=payment"))//使用工作单元
.UseDaprEventBus<IntegrationEventLogService>()//使用跨进程事件
.UseEventLog<PaymentDbContext>()
.UseRepository<PaymentDbContext>();//使用Repository的EF版实现
})
- 定义 DomainCommand( 进程内 )
//校验支付的Command, 需要继承DomainCommand, 如果是查询, 则需要继承DomainQuery<>
public class OrderStatusChangedToValidatedCommand : DomainCommand
{
public Guid OrderId { get; set; }
}
- 发送 DomainCommand
IDomainEventBus domainEventBus;
await domainEventBus.PublishAsync(new OrderStatusChangedToValidatedCommand()
{
OrderId = "OrderId"
});//发送DomainCommand
- 添加 Handler
[EventHandler]
public async Task ValidatedHandleAsync(OrderStatusChangedToValidatedCommand command)
{
//todo
}
- 定义 DomainEvent(跨进程)
public class OrderPaymentSucceededDomainEvent : IntegrationDomainEvent
{
public Guid OrderId { get; init; }
public override string Topic { get; set; } = nameof(OrderPaymentSucceededIntegrationEvent);
private OrderPaymentSucceededDomainEvent()
{
}
public OrderPaymentSucceededDomainEvent(Guid orderId) => OrderId = orderId;
}
public class OrderPaymentFailedDomainEvent : IntegrationDomainEvent
{
public Guid OrderId { get; init; }
public override string Topic { get; set; } = nameof(OrderPaymentFailedIntegrationEvent);
private OrderPaymentFailedDomainEvent()
{
}
public OrderPaymentFailedDomainEvent(Guid orderId) => OrderId = orderId;
}
- 定义领域服务并发送 IntegrationDomainEvent(跨进程)
public class PaymentDomainService : DomainService
{
private readonly ILogger<PaymentDomainService> _logger;
public PaymentDomainService(IDomainEventBus eventBus, ILogger<PaymentDomainService> logger) : base(eventBus)
=> _logger = logger;
public async Task StatusChangedAsync(Aggregate.Payment payment)
{
IIntegrationDomainEvent orderPaymentDomainEvent;
if (payment.Succeeded)
{
orderPaymentDomainEvent = new OrderPaymentSucceededDomainEvent(payment.OrderId);
}
else
{
orderPaymentDomainEvent = new OrderPaymentFailedDomainEvent(payment.OrderId);
}
_logger.LogInformation("----- Publishing integration event: {IntegrationEventId} from {AppName} - ({@IntegrationEvent})", orderPaymentDomainEvent.Id, Program.AppName, orderPaymentDomainEvent);
await EventBus.PublishAsync(orderPaymentDomainEvent);//用于发送DomainEvent
}
}
- 添加MinimalAPI
- 添加、使用Dapr
builder.Services
.AddDaprEventBus<IntegrationEventLogService>(options =>
{
options.UseEventBus()//使用进程内事件
.UseUow<CatalogDbContext>(dbOptions => dbOptions.UseSqlServer("server=masa.eshop.services.eshop.database;uid=sa;pwd=P@ssw0rd;database=catalog"))//使用工作单元
.UseEventLog<CatalogDbContext>();//将CatalogDbContext上下文交于事件日志使用, CatalogDbContext需要继承IntegrationEventLogContext
})
- 使用CQRS
builder.Services
.AddMasaDbContext<OrderingContext>(dbOptions => dbOptions.UseSqlServer("Data Source=masa.eshop.services.eshop.database;uid=sa;pwd=P@ssw0rd;database=order"))
.AddDaprEventBus<IntegrationEventLogService>(options =>
{
options.UseEventBus().UseEventLog<OrderingContext>();
})
docker-compose.yml 中增加 dapr 服务;
dapr-placement:
image: "daprio/dapr:1.4.0"
docker-compose.override.yml 中增加具体命令和端口映射
dapr-placement:
command: ["./placement", "-port", "50000", "-log-level", "debug"]
ports:
- "50000:50000"
对应的 ordering.dapr 服务上增加命令
"-placement-host-address", "dapr-placement:50000"
builder.Services
.AddDomainEventBus(options =>
{
options.UseEventBus()//使用进程内事件
.UseUow<PaymentDbContext>(dbOptions => dbOptions.UseSqlServer("server=masa.eshop.services.eshop.database;uid=sa;pwd=P@ssw0rd;database=payment"))
.UseDaprEventBus<IntegrationEventLogService>()///使用跨进程事件
.UseEventLog<PaymentDbContext>()
.UseRepository<PaymentDbContext>();//使用Repository的EF版实现
})
待补充
Install-Package Masa.Contrib.Service.MinimalAPIs //MinimalAPI使用
Install-Package Masa.Contrib.Dispatcher.Events //发送进程内消息
Install-Package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr //发送跨进程消息使用
Install-Package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF //记录跨进程消息日志
Install-Package Masa.Contrib.Data.UoW.EF //工作单元,确保事务的一致性
Install-Package Masa.Contrib.ReadWriteSpliting.Cqrs //CQRS实现
Install-Package Masa.BuildingBlocks.Ddd.Domain //DDD相关实现
Install-Package Masa.Contribs.Ddd.Domain.Repository.EF //Repository实现
QQ group | WX public account | WX Customer Service |
---|---|---|