ثبت نام دوره جدید DDD و EventSourcing ...
0

آموزش Event Sourcing قسمت ۱۳ | پیاده‌سازی الگوی Inbox-Outbox

مطالب آموزشی Event Sourcing:

مقدمه

در مقاله دوازدهم با الگوی Inbox-Outbox و مزایای آن آشنا شدیم. حال در این مقاله به بررسی نحوه پیاده‌سازی عملی این الگو با تمرکز بر دو رویکرد اصلی، یعنی مبتنی بر پایگاه داده (Transactional Outbox) و مبتنی بر صف پیام (Message Queue)، می‌پردازیم.

  1. پیاده‌سازی مبتنی بر پایگاه داده (Transactional Outbox)

این رویکرد از یک جدول در پایگاه داده به عنوان Outbox استفاده می‌کند و با استفاده از تراکنش‌های پایگاه داده، اتمیسیته بین ذخیره رویداد و تغییرات داده‌های اصلی را تضمین می‌کند.

Outbox: یک جدول جداگانه در پایگاه داده برای Outbox هر سرویس ایجاد می‌شود. هنگامی که یک رویداد در سرویس تولید می‌شود، به صورت اتمیک و در قالب یک تراکنش پایگاه داده (همزمان با تغییرات داده‌های اصلی سرویس) در جدول Outbox ذخیره می‌شود. این کار تضمین می‌کند که رویداد و تغییرات داده‌ها به صورت همزمان و بدون از دست رفتن داده‌ها ثبت شوند. یک فرایند جداگانه (مانند یک Worker Process یا یک Scheduled Task) به صورت دوره‌ای جدول Outbox را بررسی کرده و رویدادهای جدید را به سایر سرویس‌ها ارسال می‌کند. پس از ارسال موفقیت‌آمیز، رویدادها از جدول Outbox حذف یا به عنوان “ارسال شده” علامت‌گذاری می‌شوند. استفاده از یک ستون status در این جدول بسیار رایج است.

Inbox: به طور مشابه، یک جدول جداگانه برای Inbox هر سرویس ایجاد می‌شود. رویدادهایی که از سایر سرویس‌ها دریافت می‌شوند، در این جدول ذخیره می‌شوند. سپس سرویس مربوطه این رویدادها را از Inbox خوانده و پردازش می‌کند. پس از پردازش، رویدادها از Inbox حذف یا به عنوان “پردازش شده” علامت‌گذاری می‌شوند.

مراحل پیاده‌سازی:

  1. ایجاد جدول Outbox: جدولی با ستون‌های زیر ایجاد کنید:
    • Id (شناسه یکتا)
    • EventType (نوع رویداد)
    • Payload (داده‌های رویداد به فرمت JSON یا XML)
    • Timestamp (زمان ایجاد رویداد)
    • Processed (وضعیت پردازش رویداد – boolean یا enum)
  2. ذخیره رویداد در تراکنش: هنگام ایجاد یک رویداد، آن را همزمان با تغییرات داده‌های اصلی در یک تراکنش پایگاه داده در جدول Outbox ذخیره کنید. این کار تضمین می‌کند که یا هر دو عملیات انجام می‌شوند یا هیچکدام.
  3. فرایند ارسال رویداد: یک فرایند جداگانه (مانند یک Worker Process یا یک Scheduled Task) به صورت دوره‌ای جدول Outbox را بررسی می‌کند و رویدادهایی با Processed = false را انتخاب می‌کند.
  4. ارسال رویداد: رویدادها به سرویس‌های مقصد ارسال می‌شوند.
  5. علامت‌گذاری رویداد به عنوان پردازش شده: پس از ارسال موفقیت‌آمیز رویداد، وضعیت Processed آن در جدول Outbox به true تغییر می‌کند.
public class Order
{
    public int Id { get; set; }
    public string CustomerId { get; set; }
    // ... سایر مشخصات سفارش
}

public class OutboxMessage
{
    public Guid Id { get; set; }
    public string EventType { get; set; }
    public string Payload { get; set; }
    public DateTime Timestamp { get; set; }
    public bool Processed { get; set; }
}

public class OrderService
{
    private readonly ApplicationDbContext _context;

    public OrderService(ApplicationDbContext context)
    {
        _context = context;
    }

    public async Task CreateOrderAsync(string customerId)
    {
        using (var transaction = _context.Database.BeginTransaction())
        {
            try
            {
                var order = new Order { CustomerId = customerId };
                _context.Orders.Add(order);
                await _context.SaveChangesAsync();

                var outboxMessage = new OutboxMessage
                {
                    Id = Guid.NewGuid(),
                    EventType = "OrderCreated",
                    Payload = JsonConvert.SerializeObject(new { order.Id, order.CustomerId }),
                    Timestamp = DateTime.UtcNow,
                    Processed = false
                };
                _context.OutboxMessages.Add(outboxMessage);
                await _context.SaveChangesAsync();

                transaction.Commit();
            }
            catch (Exception ex)
            {
                transaction.Rollback();
                // مدیریت خطا
                throw;
            }
        }
    }
}

۲٫ پیاده‌سازی مبتنی بر صف پیام (Message Queue-based)

در این رویکرد از یک Message Broker مانند RabbitMQ یا Kafka استفاده می‌شود.

Outbox: رویدادها به یک Topic یا Queue اختصاصی برای Outbox ارسال می‌شوند. یک Consumer یا Listener به این صف متصل شده و رویدادها را از صف خوانده و به سایر سرویس‌ها ارسال می‌کند.

Inbox: رویدادها در Topic یا Queue مربوط به Inbox سرویس دریافت‌کننده قرار می‌گیرند و سرویس مربوطه آن‌ها را از صف خوانده و پردازش می‌کند.

مراحل پیاده‌سازی:

  1. پیکربندی Message Broker: یک Message Broker را نصب و پیکربندی کنید.
  2. ارسال رویداد به صف: هنگام ایجاد یک رویداد، آن را به یک Topic یا Queue در Message Broker ارسال کنید.
  3. دریافت رویداد توسط مصرف‌کننده: سرویس‌های مقصد به Topic یا Queue مربوطه subscribe می‌کنند و رویدادها را دریافت و پردازش می‌کنند.
public interface OrderCreated
{
    int OrderId { get; }
    string CustomerId { get; }
}

public class OrderService
{
    private readonly IEventPublisher _eventPublisher;

    public OrderService(IEventPublisher eventPublisher)
    {
        eventPublisher= _eventPublisher;
    }

    public async Task CreateOrderAsync(string customerId)
    {

        await _eventPublisher.Publish<OrderCreated>(new
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId
        });
    }
}

public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        Console.WriteLine($"Order Created: {context.Message.OrderId}");
    }
}

۳٫ تفاوت‌های کلیدی و انتخاب رویکرد مناسب:

  • تراکنش: در رویکرد استفاده از دیتابیس، اتمیک بودن فرآیند با تراکنش‌های پایگاه داده تضمین می‌شود. در رویکرد Message Broker، باید از الگوهایی مانند Saga یا Outbox Pattern به همراه Message Broker استفاده کرد.
  • پیچیدگی: رویکرد پایگاه داده معمولاً ساده‌تر است، به خصوص برای پروژه‌های کوچک. رویکرد Message Broker پیچیدگی بیشتری دارد، اما مقیاس‌پذیری و decoupling بهتری ارائه می‌دهد.
  • عملکرد: رویکرد Message Broker معمولاً برای حجم بالای رویدادها عملکرد بهتری دارد.
ارسال دیدگاه

نشانی ایمیل شما منتشر نخواهد شد. بخش‌های موردنیاز علامت‌گذاری شده‌اند *