مطالب آموزشی Event Sourcing:
- بخش اول مقدمهای بر Event Sourcing
- بخش دوم آشنایی مقدماتی با ساختار داخلی Event Store
- مقایسه رویکردهای State-Oriented و State-Transition
- مزیتهای Event Sourcing
- سلام به دنیا به روش Event Sourcing
- سلام به دنیا به روش Event Sourcing-بخش دوم
- بخش هفتم Projection
- ویرایش event ها در EventSourcing
- Message، Command یا Event، کدوم رو انتخاب کنم؟
- بخش دهم: Internal Event vs External Event
- بخش یازدهم: Push-Based-vs-Pull-Based
- بخش دوازدهم: مقدمهای بر الگوی Inox-Outbox
مقدمه
در مقاله دوازدهم با الگوی Inbox-Outbox و مزایای آن آشنا شدیم. حال در این مقاله به بررسی نحوه پیادهسازی عملی این الگو با تمرکز بر دو رویکرد اصلی، یعنی مبتنی بر پایگاه داده (Transactional Outbox) و مبتنی بر صف پیام (Message Queue)، میپردازیم.
- پیادهسازی مبتنی بر پایگاه داده (Transactional Outbox)
این رویکرد از یک جدول در پایگاه داده به عنوان Outbox استفاده میکند و با استفاده از تراکنشهای پایگاه داده، اتمیسیته بین ذخیره رویداد و تغییرات دادههای اصلی را تضمین میکند.
Outbox: یک جدول جداگانه در پایگاه داده برای Outbox هر سرویس ایجاد میشود. هنگامی که یک رویداد در سرویس تولید میشود، به صورت اتمیک و در قالب یک تراکنش پایگاه داده (همزمان با تغییرات دادههای اصلی سرویس) در جدول Outbox ذخیره میشود. این کار تضمین میکند که رویداد و تغییرات دادهها به صورت همزمان و بدون از دست رفتن دادهها ثبت شوند. یک فرایند جداگانه (مانند یک Worker Process یا یک Scheduled Task) به صورت دورهای جدول Outbox را بررسی کرده و رویدادهای جدید را به سایر سرویسها ارسال میکند. پس از ارسال موفقیتآمیز، رویدادها از جدول Outbox حذف یا به عنوان “ارسال شده” علامتگذاری میشوند. استفاده از یک ستون status در این جدول بسیار رایج است.
Inbox: به طور مشابه، یک جدول جداگانه برای Inbox هر سرویس ایجاد میشود. رویدادهایی که از سایر سرویسها دریافت میشوند، در این جدول ذخیره میشوند. سپس سرویس مربوطه این رویدادها را از Inbox خوانده و پردازش میکند. پس از پردازش، رویدادها از Inbox حذف یا به عنوان “پردازش شده” علامتگذاری میشوند.
مراحل پیادهسازی:
- ایجاد جدول Outbox: جدولی با ستونهای زیر ایجاد کنید:
Id
(شناسه یکتا)EventType
(نوع رویداد)Payload
(دادههای رویداد به فرمت JSON یا XML)Timestamp
(زمان ایجاد رویداد)Processed
(وضعیت پردازش رویداد – boolean یا enum)
- ذخیره رویداد در تراکنش: هنگام ایجاد یک رویداد، آن را همزمان با تغییرات دادههای اصلی در یک تراکنش پایگاه داده در جدول Outbox ذخیره کنید. این کار تضمین میکند که یا هر دو عملیات انجام میشوند یا هیچکدام.
- فرایند ارسال رویداد: یک فرایند جداگانه (مانند یک Worker Process یا یک Scheduled Task) به صورت دورهای جدول Outbox را بررسی میکند و رویدادهایی با
Processed = false
را انتخاب میکند. - ارسال رویداد: رویدادها به سرویسهای مقصد ارسال میشوند.
- علامتگذاری رویداد به عنوان پردازش شده: پس از ارسال موفقیتآمیز رویداد، وضعیت
Processed
آن در جدول Outbox بهtrue
تغییر میکند.
public class Order
{
public int Id { get; set; }
public string CustomerId { get; set; }
// ... سایر مشخصات سفارش
}
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; }
public string Payload { get; set; }
public DateTime Timestamp { get; set; }
public bool Processed { get; set; }
}
public class OrderService
{
private readonly ApplicationDbContext _context;
public OrderService(ApplicationDbContext context)
{
_context = context;
}
public async Task CreateOrderAsync(string customerId)
{
using (var transaction = _context.Database.BeginTransaction())
{
try
{
var order = new Order { CustomerId = customerId };
_context.Orders.Add(order);
await _context.SaveChangesAsync();
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = "OrderCreated",
Payload = JsonConvert.SerializeObject(new { order.Id, order.CustomerId }),
Timestamp = DateTime.UtcNow,
Processed = false
};
_context.OutboxMessages.Add(outboxMessage);
await _context.SaveChangesAsync();
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
// مدیریت خطا
throw;
}
}
}
}
۲٫ پیادهسازی مبتنی بر صف پیام (Message Queue-based)
در این رویکرد از یک Message Broker مانند RabbitMQ یا Kafka استفاده میشود.
Outbox: رویدادها به یک Topic یا Queue اختصاصی برای Outbox ارسال میشوند. یک Consumer یا Listener به این صف متصل شده و رویدادها را از صف خوانده و به سایر سرویسها ارسال میکند.
Inbox: رویدادها در Topic یا Queue مربوط به Inbox سرویس دریافتکننده قرار میگیرند و سرویس مربوطه آنها را از صف خوانده و پردازش میکند.
مراحل پیادهسازی:
- پیکربندی Message Broker: یک Message Broker را نصب و پیکربندی کنید.
- ارسال رویداد به صف: هنگام ایجاد یک رویداد، آن را به یک Topic یا Queue در Message Broker ارسال کنید.
- دریافت رویداد توسط مصرفکننده: سرویسهای مقصد به Topic یا Queue مربوطه subscribe میکنند و رویدادها را دریافت و پردازش میکنند.
public interface OrderCreated
{
int OrderId { get; }
string CustomerId { get; }
}
public class OrderService
{
private readonly IEventPublisher _eventPublisher;
public OrderService(IEventPublisher eventPublisher)
{
eventPublisher= _eventPublisher;
}
public async Task CreateOrderAsync(string customerId)
{
await _eventPublisher.Publish<OrderCreated>(new
{
OrderId = order.Id,
CustomerId = order.CustomerId
});
}
}
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
public async Task Consume(ConsumeContext<OrderCreated> context)
{
Console.WriteLine($"Order Created: {context.Message.OrderId}");
}
}
۳٫ تفاوتهای کلیدی و انتخاب رویکرد مناسب:
- تراکنش: در رویکرد استفاده از دیتابیس، اتمیک بودن فرآیند با تراکنشهای پایگاه داده تضمین میشود. در رویکرد Message Broker، باید از الگوهایی مانند Saga یا Outbox Pattern به همراه Message Broker استفاده کرد.
- پیچیدگی: رویکرد پایگاه داده معمولاً سادهتر است، به خصوص برای پروژههای کوچک. رویکرد Message Broker پیچیدگی بیشتری دارد، اما مقیاسپذیری و decoupling بهتری ارائه میدهد.
- عملکرد: رویکرد Message Broker معمولاً برای حجم بالای رویدادها عملکرد بهتری دارد.