Added job interruption handling
This commit is contained in:
+13
-7
@@ -1,17 +1,21 @@
|
||||
using LiteCharms.Features.Hasher;
|
||||
using LiteCharms.Features.Hasher.Configuration;
|
||||
using LiteCharms.Features.Mediator;
|
||||
using LiteCharms.Features.MidrandBooks.Orders;
|
||||
using LiteCharms.Features.MidrandBooks.Payments.Models;
|
||||
|
||||
namespace LiteCharms.Features.MidrandBooks.Payments.Events.Handlers;
|
||||
|
||||
public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvider services, IOptions<HasherSettings> hasherOptions, ILogger<PayfastPaymentConfirmationReceivedEvent> logger) :
|
||||
public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvider services, IOptions<HasherSettings> hasherOptions, ILogger<PayfastPaymentConfirmationReceivedEvent> logger) :
|
||||
INotificationHandler<PayfastPaymentConfirmationReceivedEvent>
|
||||
{
|
||||
private readonly HasherSettings hasherSettings = hasherOptions.Value;
|
||||
|
||||
public async ValueTask Handle(PayfastPaymentConfirmationReceivedEvent notification, CancellationToken cancellationToken)
|
||||
{
|
||||
using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(PayfastPaymentConfirmationReceivedEvent).Name}");
|
||||
activity?.SetTag("event.correlation_id", notification.CorrelationId);
|
||||
|
||||
await using var scope = services.CreateAsyncScope();
|
||||
var hashService = scope.ServiceProvider.GetRequiredService<HashService>();
|
||||
var orderService = scope.ServiceProvider.GetRequiredService<OrderService>();
|
||||
@@ -23,7 +27,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
||||
var dict = payload.ToParamDictionary();
|
||||
var localSignature = PayfastService.GenerateSignature(dict, hasherSettings.PayfastPassphrase);
|
||||
|
||||
if(localSignature.IsFailed)
|
||||
if (localSignature.IsFailed)
|
||||
throw new Exception("Failed to generate local signature for incoming webhook payload.");
|
||||
|
||||
if (!string.Equals(localSignature.Value, payload.Signature, StringComparison.OrdinalIgnoreCase))
|
||||
@@ -63,7 +67,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
||||
{
|
||||
var isHostValid = await payfastService.ValidateReferrerIpAsync(notification.RemoteIpAddress!, notification.AllowLoopback, cancellationToken);
|
||||
|
||||
if (isHostValid.IsFailed)
|
||||
if (isHostValid.IsFailed)
|
||||
throw new Exception("Security validation exception: Webhook packet source address failed cluster validation checks.");
|
||||
|
||||
if (!isHostValid.Value)
|
||||
@@ -71,7 +75,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
||||
|
||||
var isAmountValid = payfastService.ValidatePaymentAmount(orderResult.Value.Total, payload.AmountGross);
|
||||
|
||||
if (!isAmountValid.Value)
|
||||
if (!isAmountValid.Value)
|
||||
throw new Exception("Security validation exception: Transaction cost variance bounds breached.");
|
||||
|
||||
var paramList = new List<string>();
|
||||
@@ -91,8 +95,8 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
||||
|
||||
var serverConfirmation = await payfastService.ValidateServerConfirmationAsync(rawParamString, isSandbox: true, cancellationToken);
|
||||
|
||||
if (serverConfirmation.IsFailed)
|
||||
throw new Exception("Security validation exception: Payfast central handshake server rejected payload legitimacy.");
|
||||
if (serverConfirmation.IsFailed)
|
||||
throw new Exception("Security validation exception: Payfast central handshake server rejected payload legitimacy.");
|
||||
}
|
||||
|
||||
await payfastService.WriteLedgerEntryAsync(new CreateGatewayLedgerEntry
|
||||
@@ -105,7 +109,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
||||
AmountFee = fee,
|
||||
AmountGross = gross,
|
||||
AmountNet = net,
|
||||
PaymentStatus = status,
|
||||
PaymentStatus = status,
|
||||
}, cancellationToken);
|
||||
|
||||
if (status.Equals("COMPLETE", StringComparison.OrdinalIgnoreCase))
|
||||
@@ -154,5 +158,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
||||
|
||||
logger.LogInformation("Webhook validation pipeline passed checks successfully, logged entry to ledger with status: {Status}", status);
|
||||
}
|
||||
activity?.SetStatus(ActivityStatusCode.Ok);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
namespace LiteCharms.Features.Abstractions;
|
||||
|
||||
public interface IJobOrchestrator
|
||||
{
|
||||
ValueTask SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent;
|
||||
|
||||
ValueTask ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent;
|
||||
|
||||
ValueTask<bool> InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
using LiteCharms.Features.Quartz;
|
||||
using LiteCharms.Features.Quartz.Abstractions;
|
||||
using LiteCharms.Features.Abstractions;
|
||||
using LiteCharms.Features.Quartz;
|
||||
using static LiteCharms.Features.Extensions.Postgres;
|
||||
|
||||
namespace LiteCharms.Features.Extensions;
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
using LiteCharms.Features.Abstractions;
|
||||
|
||||
namespace LiteCharms.Features.Quartz.Abstractions;
|
||||
|
||||
public interface IJobOrchestrator
|
||||
{
|
||||
Task SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent;
|
||||
|
||||
Task ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent;
|
||||
}
|
||||
@@ -1,11 +1,10 @@
|
||||
using LiteCharms.Features.Abstractions;
|
||||
using LiteCharms.Features.Quartz.Abstractions;
|
||||
|
||||
namespace LiteCharms.Features.Quartz;
|
||||
|
||||
public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator
|
||||
{
|
||||
public async Task SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
||||
public async ValueTask SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent
|
||||
{
|
||||
var chainedJobGroup = "onetime-jobs";
|
||||
@@ -23,13 +22,13 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
|
||||
|
||||
var trigger = global::Quartz.TriggerBuilder.Create()
|
||||
.WithIdentity(triggerKey)
|
||||
.StartNow()
|
||||
.StartNow()
|
||||
.Build();
|
||||
|
||||
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
||||
public async ValueTask ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent
|
||||
{
|
||||
var chainedJobGroup = "scheduled-jobs";
|
||||
@@ -63,4 +62,25 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
|
||||
else
|
||||
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
||||
}
|
||||
|
||||
public async ValueTask<bool> InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
|
||||
|
||||
var jobKeyName = string.Empty;
|
||||
var jobGroup = string.Empty;
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(correlationId))
|
||||
{
|
||||
jobKeyName = $"{eventName.ToLower(CultureInfo.InvariantCulture)}-{correlationId.ToLower(CultureInfo.InvariantCulture)}";
|
||||
jobGroup = "onetime-jobs";
|
||||
}
|
||||
else
|
||||
{
|
||||
jobKeyName = eventName.ToLower(CultureInfo.InvariantCulture);
|
||||
jobGroup = "scheduled-jobs";
|
||||
}
|
||||
|
||||
return await scheduler.Interrupt(JobKey.Create(jobKeyName, jobGroup), cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,17 +21,28 @@ public sealed class MediatorJob<TNotification>(IMediator mediator) : IJob where
|
||||
|
||||
if (notification is null)
|
||||
{
|
||||
Trace.WriteLine("Notification could not be JSon converted from data string, job ended");
|
||||
Trace.WriteLine("Notification could not be Json converted from data string, job ended");
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(TNotification).Name}");
|
||||
|
||||
|
||||
using var activity = MediatorTelemetry.Source.StartActivity(typeof(TNotification).Name);
|
||||
|
||||
activity?.SetTag("event.correlation_id", notification.CorrelationId);
|
||||
|
||||
await mediator.Publish(notification, context.CancellationToken);
|
||||
try
|
||||
{
|
||||
await mediator.Publish(notification, context.CancellationToken);
|
||||
|
||||
Trace.WriteLine("Job published");
|
||||
Trace.WriteLine("Job published successfully");
|
||||
}
|
||||
catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
|
||||
{
|
||||
Trace.WriteLine($"Job '{typeof(TNotification).Name}' was gracefully interrupted by the cluster control plane.");
|
||||
|
||||
activity?.SetStatus(ActivityStatusCode.Ok);
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,9 @@ public sealed class RetryJobListener : IJobListener
|
||||
|
||||
public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (context.CancellationToken.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
if (jobException is not null && context.RefireCount < RetryCount)
|
||||
jobException.RefireImmediately = true;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user