Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9285cedfa9 | |||
| 29574f4df0 | |||
| 343874551a | |||
| b4a48c9cbf | |||
| 0eac9d533f | |||
| 961f03c1c7 | |||
| a0cf847e51 |
+7
-1
@@ -1,5 +1,6 @@
|
|||||||
using LiteCharms.Features.Hasher;
|
using LiteCharms.Features.Hasher;
|
||||||
using LiteCharms.Features.Hasher.Configuration;
|
using LiteCharms.Features.Hasher.Configuration;
|
||||||
|
using LiteCharms.Features.Mediator;
|
||||||
using LiteCharms.Features.MidrandBooks.Orders;
|
using LiteCharms.Features.MidrandBooks.Orders;
|
||||||
using LiteCharms.Features.MidrandBooks.Payments.Models;
|
using LiteCharms.Features.MidrandBooks.Payments.Models;
|
||||||
|
|
||||||
@@ -12,6 +13,9 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
|||||||
|
|
||||||
public async ValueTask Handle(PayfastPaymentConfirmationReceivedEvent notification, CancellationToken cancellationToken)
|
public async ValueTask Handle(PayfastPaymentConfirmationReceivedEvent notification, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(PayfastPaymentConfirmationReceivedEvent).Name}");
|
||||||
|
activity?.SetTag("event.correlation_id", notification.CorrelationId);
|
||||||
|
|
||||||
await using var scope = services.CreateAsyncScope();
|
await using var scope = services.CreateAsyncScope();
|
||||||
var hashService = scope.ServiceProvider.GetRequiredService<HashService>();
|
var hashService = scope.ServiceProvider.GetRequiredService<HashService>();
|
||||||
var orderService = scope.ServiceProvider.GetRequiredService<OrderService>();
|
var orderService = scope.ServiceProvider.GetRequiredService<OrderService>();
|
||||||
@@ -23,7 +27,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
|||||||
var dict = payload.ToParamDictionary();
|
var dict = payload.ToParamDictionary();
|
||||||
var localSignature = PayfastService.GenerateSignature(dict, hasherSettings.PayfastPassphrase);
|
var localSignature = PayfastService.GenerateSignature(dict, hasherSettings.PayfastPassphrase);
|
||||||
|
|
||||||
if(localSignature.IsFailed)
|
if (localSignature.IsFailed)
|
||||||
throw new Exception("Failed to generate local signature for incoming webhook payload.");
|
throw new Exception("Failed to generate local signature for incoming webhook payload.");
|
||||||
|
|
||||||
if (!string.Equals(localSignature.Value, payload.Signature, StringComparison.OrdinalIgnoreCase))
|
if (!string.Equals(localSignature.Value, payload.Signature, StringComparison.OrdinalIgnoreCase))
|
||||||
@@ -154,5 +158,7 @@ public sealed class PayfastPaymentConfirmationReceivedEventHandler(IServiceProvi
|
|||||||
|
|
||||||
logger.LogInformation("Webhook validation pipeline passed checks successfully, logged entry to ledger with status: {Status}", status);
|
logger.LogInformation("Webhook validation pipeline passed checks successfully, logged entry to ledger with status: {Status}", status);
|
||||||
}
|
}
|
||||||
|
activity?.SetStatus(ActivityStatusCode.Ok);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
namespace LiteCharms.Features.Abstractions;
|
||||||
|
|
||||||
|
public interface IJobOrchestrator
|
||||||
|
{
|
||||||
|
ValueTask SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
||||||
|
where TNotification : IEvent;
|
||||||
|
|
||||||
|
ValueTask ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
||||||
|
where TNotification : IEvent;
|
||||||
|
|
||||||
|
ValueTask<bool> InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default);
|
||||||
|
}
|
||||||
@@ -8,7 +8,7 @@ public sealed class OpenApiBearerSecuritySchemeTransformer : IOpenApiDocumentTra
|
|||||||
{
|
{
|
||||||
Type = SecuritySchemeType.Http,
|
Type = SecuritySchemeType.Http,
|
||||||
Scheme = "bearer",
|
Scheme = "bearer",
|
||||||
Description = "JWT Authorization header using the Bearer scheme. Example: \"Bearer {token}\"",
|
Description = "JWT Authorization header using the Bearer scheme",
|
||||||
};
|
};
|
||||||
|
|
||||||
document.AddComponent("Bearer", bearerScheme);
|
document.AddComponent("Bearer", bearerScheme);
|
||||||
|
|||||||
@@ -70,7 +70,9 @@ public static class Api
|
|||||||
if (!string.IsNullOrWhiteSpace(urls))
|
if (!string.IsNullOrWhiteSpace(urls))
|
||||||
{
|
{
|
||||||
string firstUrl = urls.Split(';').FirstOrDefault(s => s.Contains("http://"))!
|
string firstUrl = urls.Split(';').FirstOrDefault(s => s.Contains("http://"))!
|
||||||
.Replace("*", "localhost").Replace("+", "localhost");
|
.Replace("0.0.0.0", "localhost")
|
||||||
|
.Replace("*", "localhost")
|
||||||
|
.Replace("+", "localhost");
|
||||||
|
|
||||||
healthUrl = $"{firstUrl.TrimEnd('/')}/health";
|
healthUrl = $"{firstUrl.TrimEnd('/')}/health";
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
using LiteCharms.Features.Quartz;
|
using LiteCharms.Features.Abstractions;
|
||||||
using LiteCharms.Features.Quartz.Abstractions;
|
using LiteCharms.Features.Quartz;
|
||||||
using static LiteCharms.Features.Extensions.Postgres;
|
using static LiteCharms.Features.Extensions.Postgres;
|
||||||
|
|
||||||
namespace LiteCharms.Features.Extensions;
|
namespace LiteCharms.Features.Extensions;
|
||||||
|
|||||||
@@ -1,12 +0,0 @@
|
|||||||
using LiteCharms.Features.Abstractions;
|
|
||||||
|
|
||||||
namespace LiteCharms.Features.Quartz.Abstractions;
|
|
||||||
|
|
||||||
public interface IJobOrchestrator
|
|
||||||
{
|
|
||||||
Task SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
|
||||||
where TNotification : IEvent;
|
|
||||||
|
|
||||||
Task ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
|
||||||
where TNotification : IEvent;
|
|
||||||
}
|
|
||||||
@@ -1,11 +1,10 @@
|
|||||||
using LiteCharms.Features.Abstractions;
|
using LiteCharms.Features.Abstractions;
|
||||||
using LiteCharms.Features.Quartz.Abstractions;
|
|
||||||
|
|
||||||
namespace LiteCharms.Features.Quartz;
|
namespace LiteCharms.Features.Quartz;
|
||||||
|
|
||||||
public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator
|
public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator
|
||||||
{
|
{
|
||||||
public async Task SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
public async ValueTask SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
||||||
where TNotification : IEvent
|
where TNotification : IEvent
|
||||||
{
|
{
|
||||||
var chainedJobGroup = "onetime-jobs";
|
var chainedJobGroup = "onetime-jobs";
|
||||||
@@ -19,6 +18,7 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
|
|||||||
.WithDescription($"Correlation ID: {notification.CorrelationId}")
|
.WithDescription($"Correlation ID: {notification.CorrelationId}")
|
||||||
.UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) })
|
.UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) })
|
||||||
.DisallowConcurrentExecution()
|
.DisallowConcurrentExecution()
|
||||||
|
.RequestRecovery()
|
||||||
.Build();
|
.Build();
|
||||||
|
|
||||||
var trigger = global::Quartz.TriggerBuilder.Create()
|
var trigger = global::Quartz.TriggerBuilder.Create()
|
||||||
@@ -29,7 +29,7 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
|
|||||||
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
public async ValueTask ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
||||||
where TNotification : IEvent
|
where TNotification : IEvent
|
||||||
{
|
{
|
||||||
var chainedJobGroup = "scheduled-jobs";
|
var chainedJobGroup = "scheduled-jobs";
|
||||||
@@ -63,4 +63,25 @@ public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOr
|
|||||||
else
|
else
|
||||||
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async ValueTask<bool> InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
|
||||||
|
|
||||||
|
var jobKeyName = string.Empty;
|
||||||
|
var jobGroup = string.Empty;
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(correlationId))
|
||||||
|
{
|
||||||
|
jobKeyName = $"{eventName.ToLower(CultureInfo.InvariantCulture)}-{correlationId.ToLower(CultureInfo.InvariantCulture)}";
|
||||||
|
jobGroup = "onetime-jobs";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
jobKeyName = eventName.ToLower(CultureInfo.InvariantCulture);
|
||||||
|
jobGroup = "scheduled-jobs";
|
||||||
|
}
|
||||||
|
|
||||||
|
return await scheduler.Interrupt(JobKey.Create(jobKeyName, jobGroup), cancellationToken);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,9 @@ public sealed class MediatorJob<TNotification>(IMediator mediator) : IJob where
|
|||||||
{
|
{
|
||||||
public async Task Execute(IJobExecutionContext context)
|
public async Task Execute(IJobExecutionContext context)
|
||||||
{
|
{
|
||||||
|
if (context.Recovering)
|
||||||
|
Trace.WriteLine($"CRITICAL RECOVERY: Resurrecting job '{typeof(TNotification).Name}' after a previous cluster node crashed mid-execution.");
|
||||||
|
|
||||||
var data = context.MergedJobDataMap["Payload"] as string;
|
var data = context.MergedJobDataMap["Payload"] as string;
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(data))
|
if (string.IsNullOrWhiteSpace(data))
|
||||||
@@ -21,17 +24,28 @@ public sealed class MediatorJob<TNotification>(IMediator mediator) : IJob where
|
|||||||
|
|
||||||
if (notification is null)
|
if (notification is null)
|
||||||
{
|
{
|
||||||
Trace.WriteLine("Notification could not be JSon converted from data string, job ended");
|
Trace.WriteLine("Notification could not be Json converted from data string, job ended");
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(TNotification).Name}");
|
using var activity = MediatorTelemetry.Source.StartActivity(typeof(TNotification).Name);
|
||||||
|
|
||||||
activity?.SetTag("event.correlation_id", notification.CorrelationId);
|
activity?.SetTag("event.correlation_id", notification.CorrelationId);
|
||||||
|
|
||||||
await mediator.Publish(notification, context.CancellationToken);
|
try
|
||||||
|
{
|
||||||
|
await mediator.Publish(notification, context.CancellationToken);
|
||||||
|
|
||||||
Trace.WriteLine("Job published");
|
Trace.WriteLine("Job published successfully");
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
Trace.WriteLine($"Job '{typeof(TNotification).Name}' was gracefully interrupted by the cluster control plane.");
|
||||||
|
|
||||||
|
activity?.SetStatus(ActivityStatusCode.Ok);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,9 @@ public sealed class RetryJobListener : IJobListener
|
|||||||
|
|
||||||
public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default)
|
public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
|
if (context.CancellationToken.IsCancellationRequested)
|
||||||
|
return;
|
||||||
|
|
||||||
if (jobException is not null && context.RefireCount < RetryCount)
|
if (jobException is not null && context.RefireCount < RetryCount)
|
||||||
jobException.RefireImmediately = true;
|
jobException.RefireImmediately = true;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user