Files
components/LiteCharms.Features/Quartz/JobOrchestrator.cs
T
Khwezi Mngoma 961f03c1c7
continuous-integration/drone/pr Build is passing
Added guardrails around the cluster as well as software level
2026-06-03 11:11:22 +02:00

88 lines
3.8 KiB
C#

using LiteCharms.Features.Abstractions;
namespace LiteCharms.Features.Quartz;
public sealed class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestrator
{
public async ValueTask SendAsync<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
where TNotification : IEvent
{
var chainedJobGroup = "onetime-jobs";
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
var jobKey = new JobKey($"{notification.Name.ToLower(CultureInfo.InvariantCulture)}-{notification.CorrelationId.ToLower(CultureInfo.InvariantCulture)}", chainedJobGroup);
var triggerKey = new TriggerKey($"{jobKey.Name}-trigger", chainedJobGroup);
var job = JobBuilder.Create<MediatorJob<TNotification>>()
.WithIdentity(jobKey)
.WithDescription($"Correlation ID: {notification.CorrelationId}")
.UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) })
.DisallowConcurrentExecution()
.RequestRecovery()
.Build();
var trigger = global::Quartz.TriggerBuilder.Create()
.WithIdentity(triggerKey)
.StartNow()
.Build();
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
}
public async ValueTask ScheduleAsync<TNotification>(TNotification notification, string cronExpression, CancellationToken cancellationToken = default)
where TNotification : IEvent
{
var chainedJobGroup = "scheduled-jobs";
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
var jobKey = new JobKey($"{notification.Name.ToLower(CultureInfo.InvariantCulture)}", chainedJobGroup);
var triggerKey = new TriggerKey($"{jobKey.Name}-trigger", chainedJobGroup);
var job = JobBuilder.Create<MediatorJob<TNotification>>()
.WithIdentity(jobKey)
.WithDescription($"Correlation ID: {notification.CorrelationId}")
.UsingJobData(new JobDataMap { ["Payload"] = JsonSerializer.Serialize(notification) })
.DisallowConcurrentExecution()
.StoreDurably()
.Build();
var now = DateTime.UtcNow;
var trigger = global::Quartz.TriggerBuilder.Create()
.WithIdentity(triggerKey)
.WithDescription($"Scheduled via Main Job at {now:g}")
.WithCronSchedule(cronExpression, cron => cron
.WithMisfireHandlingInstructionIgnoreMisfires())
.StartAt((DateTimeOffset)now)
.Build();
await scheduler.AddJob(job, replace: true, cancellationToken);
if (await scheduler.CheckExists(triggerKey, cancellationToken))
await scheduler.RescheduleJob(triggerKey, trigger, cancellationToken);
else
await scheduler.ScheduleJob(job, new List<ITrigger> { trigger }.AsReadOnly(), replace: true, cancellationToken);
}
public async ValueTask<bool> InterruptAsync(string eventName, string? correlationId = null, CancellationToken cancellationToken = default)
{
var scheduler = await schedulerFactory.GetScheduler(cancellationToken);
var jobKeyName = string.Empty;
var jobGroup = string.Empty;
if (!string.IsNullOrWhiteSpace(correlationId))
{
jobKeyName = $"{eventName.ToLower(CultureInfo.InvariantCulture)}-{correlationId.ToLower(CultureInfo.InvariantCulture)}";
jobGroup = "onetime-jobs";
}
else
{
jobKeyName = eventName.ToLower(CultureInfo.InvariantCulture);
jobGroup = "scheduled-jobs";
}
return await scheduler.Interrupt(JobKey.Create(jobKeyName, jobGroup), cancellationToken);
}
}