Added abstractions
Internal service bus support enabled Added quartz support Reconfigured extensions
This commit is contained in:
@@ -0,0 +1,23 @@
|
||||
namespace LiteCharms.Infrastructure.HealthChecks;
|
||||
|
||||
public class QuartzHealthCheck(ISchedulerFactory schedulerFactory) : IHealthCheck
|
||||
{
|
||||
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
|
||||
|
||||
if (!scheduler.IsStarted)
|
||||
return HealthCheckResult.Unhealthy("Quartz scheduler is not running");
|
||||
|
||||
await scheduler.CheckExists(new JobKey(Guid.NewGuid().ToString()), cancellationToken);
|
||||
|
||||
return HealthCheckResult.Healthy("Quartz scheduler is ready");
|
||||
}
|
||||
catch (SchedulerException)
|
||||
{
|
||||
return HealthCheckResult.Unhealthy("Quartz scheduler cannot connect to the store");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,19 @@
|
||||
<None Include="..\icon.png" Pack="true" PackagePath="\" />
|
||||
</ItemGroup>
|
||||
|
||||
<!-- Quartz Scheduler-->
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Quartz" Version="3.18.1" />
|
||||
<PackageReference Include="Quartz.Plugins" Version="3.18.1" />
|
||||
<PackageReference Include="Quartz.Plugins.TimeZoneConverter" Version="3.18.1" />
|
||||
<PackageReference Include="Quartz.Serialization.SystemTextJson" Version="3.18.1" />
|
||||
|
||||
<!-- Global Usings -->
|
||||
<Using Include="Quartz" />
|
||||
<Using Include="Mediator" />
|
||||
<Using Include="FluentResults" />
|
||||
</ItemGroup>
|
||||
|
||||
<!-- Configuration -->
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="10.0.7" />
|
||||
@@ -70,10 +83,18 @@
|
||||
</ItemGroup>
|
||||
|
||||
<!-- Project References -->
|
||||
<ItemGroup>
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\LiteCharms.Abstractions\LiteCharms.Abstractions.csproj" />
|
||||
<ProjectReference Include="..\LiteCharms.Entities\LiteCharms.Entities.csproj" />
|
||||
<ProjectReference Include="..\LiteCharms.Models\LiteCharms.Models.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<!-- Global Usings -->
|
||||
<ItemGroup>
|
||||
<Using Include="System.Text.Json" />
|
||||
<Using Include="Microsoft.Extensions.Hosting" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<None Update="appsettings.json">
|
||||
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
using LiteCharms.Abstractions;
|
||||
using static LiteCharms.Abstractions.Timezones;
|
||||
|
||||
namespace LiteCharms.Infrastructure.Quartz;
|
||||
|
||||
public class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator
|
||||
{
|
||||
public async Task SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent
|
||||
{
|
||||
var chainedJobGroup = "onetime-jobs";
|
||||
|
||||
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
|
||||
var jobKey = new JobKey($"{notification.Name.ToLower()}-{notification.CorrelationId.ToLower()}", chainedJobGroup);
|
||||
var triggerKey = new TriggerKey($"{jobKey.Name}-trigger", chainedJobGroup);
|
||||
|
||||
var job = JobBuilder.Create<MediatorJob<TNotification>>()
|
||||
.WithIdentity(jobKey)
|
||||
.WithDescription($"Correlation ID: {notification.CorrelationId}")
|
||||
.UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) })
|
||||
.DisallowConcurrentExecution()
|
||||
.Build();
|
||||
|
||||
var trigger = global::Quartz.TriggerBuilder.Create()
|
||||
.WithIdentity(triggerKey)
|
||||
.StartNow()
|
||||
.Build();
|
||||
|
||||
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
|
||||
where TNotification : IEvent
|
||||
{
|
||||
var chainedJobGroup = "scheduled-jobs";
|
||||
|
||||
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
|
||||
var jobKey = new JobKey($"{notification.Name.ToLower()}-{notification.CorrelationId.ToLower()}", chainedJobGroup);
|
||||
var triggerKey = new TriggerKey($"{jobKey.Name}-trigger", chainedJobGroup);
|
||||
|
||||
var job = JobBuilder.Create<MediatorJob<TNotification>>()
|
||||
.WithIdentity(jobKey)
|
||||
.WithDescription($"Correlation ID: {notification.CorrelationId}")
|
||||
.UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) })
|
||||
.DisallowConcurrentExecution()
|
||||
.StoreDurably()
|
||||
.Build();
|
||||
|
||||
var now = SouthAfricanTimeZone.UtcNow();
|
||||
|
||||
var trigger = global::Quartz.TriggerBuilder.Create()
|
||||
.WithIdentity(triggerKey)
|
||||
.WithDescription($"Scheduled via Main Job at {now:g}")
|
||||
.WithCronSchedule(cronExpression, cron => cron.InTimeZone(SouthAfricanTimeZone)
|
||||
.WithMisfireHandlingInstructionFireAndProceed())
|
||||
.StartAt(now)
|
||||
.Build();
|
||||
|
||||
await scheduler.AddJob(job, replace: true, cancellationToken);
|
||||
|
||||
if (await scheduler.CheckExists(triggerKey, cancellationToken))
|
||||
await scheduler.RescheduleJob(triggerKey, trigger, cancellationToken);
|
||||
else
|
||||
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
using LiteCharms.Abstractions;
|
||||
|
||||
namespace LiteCharms.Infrastructure.Quartz;
|
||||
|
||||
[DisallowConcurrentExecution]
|
||||
public class MediatorJob<TNotification>(IMediator mediator) : IJob where TNotification : IEvent
|
||||
{
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
var data = context.MergedJobDataMap["Payload"] as string;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(data)) return;
|
||||
|
||||
var notification = JsonSerializer.Deserialize<TNotification>(data);
|
||||
|
||||
if(notification is null) return;
|
||||
|
||||
if(notification is TNotification)
|
||||
await mediator.Publish(notification, context.CancellationToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
namespace LiteCharms.Infrastructure.Quartz;
|
||||
|
||||
public class RetryJobListener : IJobListener
|
||||
{
|
||||
public string Name => "RetryJobListener";
|
||||
|
||||
public int RetryCount { get; set; } = 3;
|
||||
|
||||
public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default) => Task.CompletedTask;
|
||||
|
||||
public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default) => Task.CompletedTask;
|
||||
|
||||
public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (jobException is not null && context.RefireCount < RetryCount)
|
||||
jobException.RefireImmediately = true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
using LiteCharms.Abstractions;
|
||||
using LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus;
|
||||
|
||||
public class EmailServiceBus(EmailQueue messages) : IEventBus
|
||||
{
|
||||
public async Task<Result> PublishAsync<TEvent>(TEvent notification, CancellationToken cancellationToken = default)
|
||||
where TEvent : class, IEvent
|
||||
{
|
||||
try
|
||||
{
|
||||
await messages.Outgoing.WriteAsync(notification, cancellationToken);
|
||||
|
||||
return Result.Ok();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return Result.Fail(new Error(ex.Message).CausedBy(ex));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
using LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus.Exchanges;
|
||||
|
||||
public class EmailExchange(EmailQueue messages) : BackgroundService
|
||||
{
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
if(messages.Incoming.CanCount)
|
||||
await Task.Delay(1000, stoppingToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
using LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus.Exchanges;
|
||||
|
||||
public class GeneralExchange(GeneralQueue messages) : BackgroundService
|
||||
{
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
if (messages.Incoming.CanCount)
|
||||
await Task.Delay(1000, stoppingToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
using LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus.Exchanges;
|
||||
|
||||
public class SalesExchange(SalesQueue messages) : BackgroundService
|
||||
{
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
if (messages.Incoming.CanCount)
|
||||
await Task.Delay(1000, stoppingToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
using LiteCharms.Abstractions;
|
||||
using LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus;
|
||||
|
||||
public class GeneralServiceBus(GeneralQueue messages) : IEventBus
|
||||
{
|
||||
public async Task<Result> PublishAsync<TEvent>(TEvent notification, CancellationToken cancellationToken = default)
|
||||
where TEvent : class, IEvent
|
||||
{
|
||||
try
|
||||
{
|
||||
await messages.Outgoing.WriteAsync(notification, cancellationToken);
|
||||
|
||||
return Result.Ok();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return Result.Fail(new Error(ex.Message).CausedBy(ex));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
using LiteCharms.Abstractions;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
public class EmailQueue : EventBusQueueBase, IEventBusQueue;
|
||||
@@ -0,0 +1,5 @@
|
||||
using LiteCharms.Abstractions;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
public class GeneralQueue : EventBusQueueBase, IEventBusQueue;
|
||||
@@ -0,0 +1,5 @@
|
||||
using LiteCharms.Abstractions;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
public class SalesQueue : EventBusQueueBase, IEventBusQueue;
|
||||
@@ -0,0 +1,22 @@
|
||||
using LiteCharms.Abstractions;
|
||||
using LiteCharms.Infrastructure.ServiceBus.Queues;
|
||||
|
||||
namespace LiteCharms.Infrastructure.ServiceBus;
|
||||
|
||||
public class SalesServiceBus(SalesQueue messages) : IEventBus
|
||||
{
|
||||
public async Task<Result> PublishAsync<TEvent>(TEvent notification, CancellationToken cancellationToken = default)
|
||||
where TEvent : class, IEvent
|
||||
{
|
||||
try
|
||||
{
|
||||
await messages.Outgoing.WriteAsync(notification, cancellationToken);
|
||||
|
||||
return Result.Ok();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return Result.Fail(new Error(ex.Message).CausedBy(ex));
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user