Compare commits

...

3 Commits

Author SHA1 Message Date
khwezi 0eac9d533f Merge pull request 'payments' (#65) from payments into master
Reviewed-on: #65
2026-06-03 11:12:10 +02:00
Khwezi Mngoma 961f03c1c7 Added guardrails around the cluster as well as software level
continuous-integration/drone/pr Build is passing
2026-06-03 11:11:22 +02:00
Khwezi Mngoma a0cf847e51 Added job interruption handling 2026-06-03 10:40:29 +02:00
7 changed files with 75 additions and 31 deletions
@@ -1,5 +1,6 @@
using LiteCharms.Features.Hasher; using LiteCharms.Features.Hasher;
using LiteCharms.Features.Hasher.Configuration; using LiteCharms.Features.Hasher.Configuration;
using LiteCharms.Features.Mediator;
using LiteCharms.Features.MidrandBooks.Orders; using LiteCharms.Features.MidrandBooks.Orders;
using LiteCharms.Features.MidrandBooks.Payments.Models; using LiteCharms.Features.MidrandBooks.Payments.Models;
@@ -12,6 +13,9 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
public async ValueTask Handle(PayfastPaymentConfirmationReceivedEvent notification, CancellationToken cancellationToken) public async ValueTask Handle(PayfastPaymentConfirmationReceivedEvent notification, CancellationToken cancellationToken)
{ {
using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(PayfastPaymentConfirmationReceivedEvent).Name}");
activity?.SetTag("event.correlation_id", notification.CorrelationId);
await using var scope = services.CreateAsyncScope(); await using var scope = services.CreateAsyncScope();
var hashService = scope.ServiceProvider.GetRequiredService<HashService>(); var hashService = scope.ServiceProvider.GetRequiredService<HashService>();
var orderService = scope.ServiceProvider.GetRequiredService<OrderService>(); var orderService = scope.ServiceProvider.GetRequiredService<OrderService>();
@@ -23,7 +27,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
var dict = payload.ToParamDictionary(); var dict = payload.ToParamDictionary();
var localSignature = PayfastService.GenerateSignature(dict, hasherSettings.PayfastPassphrase); var localSignature = PayfastService.GenerateSignature(dict, hasherSettings.PayfastPassphrase);
if(localSignature.IsFailed) if (localSignature.IsFailed)
throw new Exception("Failed to generate local signature for incoming webhook payload."); throw new Exception("Failed to generate local signature for incoming webhook payload.");
if (!string.Equals(localSignature.Value, payload.Signature, StringComparison.OrdinalIgnoreCase)) if (!string.Equals(localSignature.Value, payload.Signature, StringComparison.OrdinalIgnoreCase))
@@ -154,5 +158,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
logger.LogInformation("Webhook validation pipeline passed checks successfully, logged entry to ledger with status: {Status}", status); logger.LogInformation("Webhook validation pipeline passed checks successfully, logged entry to ledger with status: {Status}", status);
} }
activity?.SetStatus(ActivityStatusCode.Ok);
} }
} }
@@ -0,0 +1,12 @@
namespace LiteCharms.Features.Abstractions;
public interface IJobOrchestrator
{
ValueTask SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
where TNotification : IEvent;
ValueTask ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
where TNotification : IEvent;
ValueTask<bool> InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default);
}
+2 -2
View File
@@ -1,5 +1,5 @@
using LiteCharms.Features.Quartz; using LiteCharms.Features.Abstractions;
using LiteCharms.Features.Quartz.Abstractions; using LiteCharms.Features.Quartz;
using static LiteCharms.Features.Extensions.Postgres; using static LiteCharms.Features.Extensions.Postgres;
namespace LiteCharms.Features.Extensions; namespace LiteCharms.Features.Extensions;
@@ -1,12 +0,0 @@
using LiteCharms.Features.Abstractions;
namespace LiteCharms.Features.Quartz.Abstractions;
public interface IJobOrchestrator
{
Task SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
where TNotification : IEvent;
Task ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
where TNotification : IEvent;
}
+24 -3
View File
@@ -1,11 +1,10 @@
using LiteCharms.Features.Abstractions; using LiteCharms.Features.Abstractions;
using LiteCharms.Features.Quartz.Abstractions;
namespace LiteCharms.Features.Quartz; namespace LiteCharms.Features.Quartz;
public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator
{ {
public async Task SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default) public async ValueTask SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
where TNotification : IEvent where TNotification : IEvent
{ {
var chainedJobGroup = "onetime-jobs"; var chainedJobGroup = "onetime-jobs";
@@ -19,6 +18,7 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
.WithDescription($"Correlation ID: {notification.CorrelationId}") .WithDescription($"Correlation ID: {notification.CorrelationId}")
.UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) }) .UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) })
.DisallowConcurrentExecution() .DisallowConcurrentExecution()
.RequestRecovery()
.Build(); .Build();
var trigger = global::Quartz.TriggerBuilder.Create() var trigger = global::Quartz.TriggerBuilder.Create()
@@ -29,7 +29,7 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken); await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
} }
public async Task ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default) public async ValueTask ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
where TNotification : IEvent where TNotification : IEvent
{ {
var chainedJobGroup = "scheduled-jobs"; var chainedJobGroup = "scheduled-jobs";
@@ -63,4 +63,25 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
else else
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken); await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
} }
public async ValueTask<bool> InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
var jobKeyName = string.Empty;
var jobGroup = string.Empty;
if (!string.IsNullOrWhiteSpace(correlationId))
{
jobKeyName = $"{eventName.ToLower(CultureInfo.InvariantCulture)}-{correlationId.ToLower(CultureInfo.InvariantCulture)}";
jobGroup = "onetime-jobs";
}
else
{
jobKeyName = eventName.ToLower(CultureInfo.InvariantCulture);
jobGroup = "scheduled-jobs";
}
return await scheduler.Interrupt(JobKey.Create(jobKeyName, jobGroup), cancellationToken);
}
} }
+18 -4
View File
@@ -8,6 +8,9 @@ public sealed class MediatorJob<TNotification>(IMediator mediator) : IJob where
{ {
public async Task Execute(IJobExecutionContext context) public async Task Execute(IJobExecutionContext context)
{ {
if (context.Recovering)
Trace.WriteLine($"CRITICAL RECOVERY: Resurrecting job '{typeof(TNotification).Name}' after a previous cluster node crashed mid-execution.");
var data = context.MergedJobDataMap["Payload"] as string; var data = context.MergedJobDataMap["Payload"] as string;
if (string.IsNullOrWhiteSpace(data)) if (string.IsNullOrWhiteSpace(data))
@@ -21,17 +24,28 @@ public sealed class MediatorJob<TNotification>(IMediator mediator) : IJob where
if (notification is null) if (notification is null)
{ {
Trace.WriteLine("Notification could not be JSon converted from data string, job ended"); Trace.WriteLine("Notification could not be Json converted from data string, job ended");
return; return;
} }
using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(TNotification).Name}"); using var activity = MediatorTelemetry.Source.StartActivity(typeof(TNotification).Name);
activity?.SetTag("event.correlation_id", notification.CorrelationId); activity?.SetTag("event.correlation_id", notification.CorrelationId);
await mediator.Publish(notification, context.CancellationToken); try
{
await mediator.Publish(notification, context.CancellationToken);
Trace.WriteLine("Job published"); Trace.WriteLine("Job published successfully");
}
catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
{
Trace.WriteLine($"Job '{typeof(TNotification).Name}' was gracefully interrupted by the cluster control plane.");
activity?.SetStatus(ActivityStatusCode.Ok);
return;
}
} }
} }
@@ -12,6 +12,9 @@ public sealed class RetryJobListener : IJobListener
public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default) public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default)
{ {
if (context.CancellationToken.IsCancellationRequested)
return;
if (jobException is not null && context.RefireCount < RetryCount) if (jobException is not null && context.RefireCount < RetryCount)
jobException.RefireImmediately = true; jobException.RefireImmediately = true;
} }