From a0cf847e51e42829fb7a44ed1a42dae04b8deb68 Mon Sep 17 00:00:00 2001 From: Khwezi Mngoma Date: Wed, 3 Jun 2026 10:40:29 +0200 Subject: [PATCH] Added job interruption handling --- ...PaymentConfirmationReceivedEventHandler.cs | 20 ++++++++----- .../Abstractions/IJobOrchestrator.cs | 12 ++++++++ LiteCharms.Features/Extensions/Quartz.cs | 4 +-- .../Quartz/Abstractions/IJobOrchestrator.cs | 12 -------- LiteCharms.Features/Quartz/JobOrchestrator.cs | 28 ++++++++++++++++--- LiteCharms.Features/Quartz/MediatorJob.cs | 23 +++++++++++---- .../Quartz/RetryJobListener.cs | 3 ++ 7 files changed, 71 insertions(+), 31 deletions(-) create mode 100644 LiteCharms.Features/Abstractions/IJobOrchestrator.cs delete mode 100644 LiteCharms.Features/Quartz/Abstractions/IJobOrchestrator.cs diff --git a/LiteCharms.Features.MidrandBooks/Payments/Events/Handlers/PayfastPaymentConfirmationReceivedEventHandler.cs b/LiteCharms.Features.MidrandBooks/Payments/Events/Handlers/PayfastPaymentConfirmationReceivedEventHandler.cs index dcd943a..cae4c21 100644 --- a/LiteCharms.Features.MidrandBooks/Payments/Events/Handlers/PayfastPaymentConfirmationReceivedEventHandler.cs +++ b/LiteCharms.Features.MidrandBooks/Payments/Events/Handlers/PayfastPaymentConfirmationReceivedEventHandler.cs @@ -1,17 +1,21 @@ using LiteCharms.Features.Hasher; using LiteCharms.Features.Hasher.Configuration; +using LiteCharms.Features.Mediator; using LiteCharms.Features.MidrandBooks.Orders; using LiteCharms.Features.MidrandBooks.Payments.Models; namespace LiteCharms.Features.MidrandBooks.Payments.Events.Handlers; -public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvider services, IOptions hasherOptions, ILogger logger) : +public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvider services, IOptions hasherOptions, ILogger logger) : INotificationHandler { private readonly HasherSettings hasherSettings = hasherOptions.Value; public async ValueTask Handle(PayfastPaymentConfirmationReceivedEvent notification, CancellationToken cancellationToken) { + using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(PayfastPaymentConfirmationReceivedEvent).Name}"); + activity?.SetTag("event.correlation_id", notification.CorrelationId); + await using var scope = services.CreateAsyncScope(); var hashService = scope.ServiceProvider.GetRequiredService(); var orderService = scope.ServiceProvider.GetRequiredService(); @@ -23,7 +27,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi var dict = payload.ToParamDictionary(); var localSignature = PayfastService.GenerateSignature(dict, hasherSettings.PayfastPassphrase); - if(localSignature.IsFailed) + if (localSignature.IsFailed) throw new Exception("Failed to generate local signature for incoming webhook payload."); if (!string.Equals(localSignature.Value, payload.Signature, StringComparison.OrdinalIgnoreCase)) @@ -63,7 +67,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi { var isHostValid = await payfastService.ValidateReferrerIpAsync(notification.RemoteIpAddress!, notification.AllowLoopback, cancellationToken); - if (isHostValid.IsFailed) + if (isHostValid.IsFailed) throw new Exception("Security validation exception: Webhook packet source address failed cluster validation checks."); if (!isHostValid.Value) @@ -71,7 +75,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi var isAmountValid = payfastService.ValidatePaymentAmount(orderResult.Value.Total, payload.AmountGross); - if (!isAmountValid.Value) + if (!isAmountValid.Value) throw new Exception("Security validation exception: Transaction cost variance bounds breached."); var paramList = new List(); @@ -91,8 +95,8 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi var serverConfirmation = await payfastService.ValidateServerConfirmationAsync(rawParamString, isSandbox: true, cancellationToken); - if (serverConfirmation.IsFailed) - throw new Exception("Security validation exception: Payfast central handshake server rejected payload legitimacy."); + if (serverConfirmation.IsFailed) + throw new Exception("Security validation exception: Payfast central handshake server rejected payload legitimacy."); } await payfastService.WriteLedgerEntryAsync(new CreateGatewayLedgerEntry @@ -105,7 +109,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi AmountFee = fee, AmountGross = gross, AmountNet = net, - PaymentStatus = status, + PaymentStatus = status, }, cancellationToken); if (status.Equals("COMPLETE", StringComparison.OrdinalIgnoreCase)) @@ -154,5 +158,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi logger.LogInformation("Webhook validation pipeline passed checks successfully, logged entry to ledger with status: {Status}", status); } + activity?.SetStatus(ActivityStatusCode.Ok); + } } diff --git a/LiteCharms.Features/Abstractions/IJobOrchestrator.cs b/LiteCharms.Features/Abstractions/IJobOrchestrator.cs new file mode 100644 index 0000000..c0afd08 --- /dev/null +++ b/LiteCharms.Features/Abstractions/IJobOrchestrator.cs @@ -0,0 +1,12 @@ +namespace LiteCharms.Features.Abstractions; + +public interface IJobOrchestrator +{ + ValueTask SendAsync(TNotification notification, CancellationToken cancellationToken = default) + where TNotification : IEvent; + + ValueTask ScheduleAsync(TNotification notification, string cronExpression, CancellationToken cancellationToken = default) + where TNotification : IEvent; + + ValueTask InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default); +} diff --git a/LiteCharms.Features/Extensions/Quartz.cs b/LiteCharms.Features/Extensions/Quartz.cs index 315a973..341d8c8 100644 --- a/LiteCharms.Features/Extensions/Quartz.cs +++ b/LiteCharms.Features/Extensions/Quartz.cs @@ -1,5 +1,5 @@ -using LiteCharms.Features.Quartz; -using LiteCharms.Features.Quartz.Abstractions; +using LiteCharms.Features.Abstractions; +using LiteCharms.Features.Quartz; using static LiteCharms.Features.Extensions.Postgres; namespace LiteCharms.Features.Extensions; diff --git a/LiteCharms.Features/Quartz/Abstractions/IJobOrchestrator.cs b/LiteCharms.Features/Quartz/Abstractions/IJobOrchestrator.cs deleted file mode 100644 index 8ce2c33..0000000 --- a/LiteCharms.Features/Quartz/Abstractions/IJobOrchestrator.cs +++ /dev/null @@ -1,12 +0,0 @@ -using LiteCharms.Features.Abstractions; - -namespace LiteCharms.Features.Quartz.Abstractions; - -public interface IJobOrchestrator -{ - Task SendAsync(TNotification notification, CancellationToken cancellationToken = default) - where TNotification : IEvent; - - Task ScheduleAsync(TNotification notification, string cronExpression, CancellationToken cancellationToken = default) - where TNotification : IEvent; -} diff --git a/LiteCharms.Features/Quartz/JobOrchestrator.cs b/LiteCharms.Features/Quartz/JobOrchestrator.cs index 7c79b8a..629365e 100644 --- a/LiteCharms.Features/Quartz/JobOrchestrator.cs +++ b/LiteCharms.Features/Quartz/JobOrchestrator.cs @@ -1,11 +1,10 @@ using LiteCharms.Features.Abstractions; -using LiteCharms.Features.Quartz.Abstractions; namespace LiteCharms.Features.Quartz; public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator { - public async Task SendAsync(TNotification notification, CancellationToken cancellationToken = default) + public async ValueTask SendAsync(TNotification notification, CancellationToken cancellationToken = default) where TNotification : IEvent { var chainedJobGroup = "onetime-jobs"; @@ -23,13 +22,13 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr var trigger = global::Quartz.TriggerBuilder.Create() .WithIdentity(triggerKey) - .StartNow() + .StartNow() .Build(); await scheduler.ScheduleJob(job, new List { trigger }.AsReadOnly(), replace: true, cancellationToken); } - public async Task ScheduleAsync(TNotification notification, string cronExpression, CancellationToken cancellationToken = default) + public async ValueTask ScheduleAsync(TNotification notification, string cronExpression, CancellationToken cancellationToken = default) where TNotification : IEvent { var chainedJobGroup = "scheduled-jobs"; @@ -63,4 +62,25 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr else await scheduler.ScheduleJob(job, new List { trigger }.AsReadOnly(), replace: true, cancellationToken); } + + public async ValueTask InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default) + { + var scheduler = await schedulerFactory.GetScheduler(cancellationToken); + + var jobKeyName = string.Empty; + var jobGroup = string.Empty; + + if (!string.IsNullOrWhiteSpace(correlationId)) + { + jobKeyName = $"{eventName.ToLower(CultureInfo.InvariantCulture)}-{correlationId.ToLower(CultureInfo.InvariantCulture)}"; + jobGroup = "onetime-jobs"; + } + else + { + jobKeyName = eventName.ToLower(CultureInfo.InvariantCulture); + jobGroup = "scheduled-jobs"; + } + + return await scheduler.Interrupt(JobKey.Create(jobKeyName, jobGroup), cancellationToken); + } } diff --git a/LiteCharms.Features/Quartz/MediatorJob.cs b/LiteCharms.Features/Quartz/MediatorJob.cs index 5fc7649..878d988 100644 --- a/LiteCharms.Features/Quartz/MediatorJob.cs +++ b/LiteCharms.Features/Quartz/MediatorJob.cs @@ -21,17 +21,28 @@ public sealed class MediatorJob(IMediator mediator) : IJob where if (notification is null) { - Trace.WriteLine("Notification could not be JSon converted from data string, job ended"); + Trace.WriteLine("Notification could not be Json converted from data string, job ended"); return; } - - using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(TNotification).Name}"); - + + using var activity = MediatorTelemetry.Source.StartActivity(typeof(TNotification).Name); + activity?.SetTag("event.correlation_id", notification.CorrelationId); - await mediator.Publish(notification, context.CancellationToken); + try + { + await mediator.Publish(notification, context.CancellationToken); - Trace.WriteLine("Job published"); + Trace.WriteLine("Job published successfully"); + } + catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested) + { + Trace.WriteLine($"Job '{typeof(TNotification).Name}' was gracefully interrupted by the cluster control plane."); + + activity?.SetStatus(ActivityStatusCode.Ok); + + return; + } } } diff --git a/LiteCharms.Features/Quartz/RetryJobListener.cs b/LiteCharms.Features/Quartz/RetryJobListener.cs index cc12662..1de4161 100644 --- a/LiteCharms.Features/Quartz/RetryJobListener.cs +++ b/LiteCharms.Features/Quartz/RetryJobListener.cs @@ -12,6 +12,9 @@ public sealed class RetryJobListener : IJobListener public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default) { + if (context.CancellationToken.IsCancellationRequested) + return; + if (jobException is not null && context.RefireCount < RetryCount) jobException.RefireImmediately = true; }