Compare commits

..

1 Commits

Author SHA1 Message Date
khwezi 61172c6139 Merge commit '0f91f102e576fc1f7a76cca21d3c02f3225baec1' 2026-05-15 07:52:03 +00:00
7 changed files with 13 additions and 49 deletions
@@ -1,5 +1,4 @@
using LiteCharms.Features.Shop.Notifications; using LiteCharms.Features.Shop.Notifications;
using LiteCharms.Features.Shop.Notifications.Events;
namespace LiteCharms.Features.Tests; namespace LiteCharms.Features.Tests;
@@ -33,14 +32,4 @@ public class NotificationsFeatureTests(CommonFixture fixture, ITestOutputHelper
foreach (var error in createResult.Errors) output.WriteLine(error.Message); foreach (var error in createResult.Errors) output.WriteLine(error.Message);
} }
[Fact]
public async Task ProcessEmailNotificationsEvent_ShouldSucceed()
{
var notification = ProcessEmailNotificationsEvent.Create();
await fixture.Mediator.Publish(notification);
Assert.True(true);
}
} }
+2 -3
View File
@@ -35,9 +35,8 @@ public class EmailService(IOptions<SmtpSettings> options) : IDisposable
var bodyBuilder = new BodyBuilder(); var bodyBuilder = new BodyBuilder();
if (message.Body!.Properties.HasAttachments) foreach (var attachment in message.Body?.Attachments!)
foreach (var attachment in message.Body?.Attachments!) bodyBuilder.Attachments.Add(attachment.Name!, attachment.FileStream!, cancellationToken);
bodyBuilder.Attachments.Add(attachment.Name!, attachment.FileStream!, cancellationToken);
if (!message.Body.Properties.IsHtml) bodyBuilder.TextBody = message.Body.Message; if (!message.Body.Properties.IsHtml) bodyBuilder.TextBody = message.Body.Message;
if (message.Body.Properties.IsHtml) bodyBuilder.HtmlBody = message.Body.Message; if (message.Body.Properties.IsHtml) bodyBuilder.HtmlBody = message.Body.Message;
+2 -4
View File
@@ -34,7 +34,7 @@ public static class Quartz
storage.UseClustering(cluster => storage.UseClustering(cluster =>
{ {
cluster.CheckinInterval = TimeSpan.FromSeconds(30); cluster.CheckinInterval = TimeSpan.FromSeconds(30);
cluster.CheckinMisfireThreshold = TimeSpan.FromSeconds(90); cluster.CheckinMisfireThreshold = TimeSpan.FromSeconds(20);
}); });
}); });
}); });
@@ -62,8 +62,6 @@ public static class Quartz
config.UseDefaultThreadPool(options => options.MaxConcurrency = 1); config.UseDefaultThreadPool(options => options.MaxConcurrency = 1);
config.UseTimeZoneConverter(); config.UseTimeZoneConverter();
config.SetProperty("quartz.jobStore.misfireThreshold", TimeSpan.FromMinutes(2).TotalMilliseconds.ToString());
config.UsePersistentStore(storage => config.UsePersistentStore(storage =>
{ {
storage.PerformSchemaValidation = false; storage.PerformSchemaValidation = false;
@@ -76,7 +74,7 @@ public static class Quartz
storage.UseClustering(cluster => storage.UseClustering(cluster =>
{ {
cluster.CheckinInterval = TimeSpan.FromSeconds(30); cluster.CheckinInterval = TimeSpan.FromSeconds(30);
cluster.CheckinMisfireThreshold = TimeSpan.FromSeconds(90); cluster.CheckinMisfireThreshold = TimeSpan.FromSeconds(20);
}); });
}); });
}); });
@@ -50,9 +50,9 @@ public class JobOrchestrator(ISchedulerFactory schedulerFactory) : IJobOrchestra
var trigger = global::Quartz.TriggerBuilder.Create() var trigger = global::Quartz.TriggerBuilder.Create()
.WithIdentity(triggerKey) .WithIdentity(triggerKey)
.WithDescription($"Scheduled via Main Job at {now:g}") .WithDescription($"Scheduled via Main Job at {now:g} UTC")
.WithCronSchedule(cronExpression, cron => cron .WithCronSchedule(cronExpression, cron => cron
.WithMisfireHandlingInstructionIgnoreMisfires()) .WithMisfireHandlingInstructionFireAndProceed())
.StartAt(now) .StartAt(now)
.Build(); .Build();
+2 -14
View File
@@ -10,28 +10,16 @@ public class MediatorJob<TNotification>(IMediator mediator) : IJob where TNotifi
{ {
var data = context.MergedJobDataMap["Payload"] as string; var data = context.MergedJobDataMap["Payload"] as string;
if (string.IsNullOrWhiteSpace(data)) if (string.IsNullOrWhiteSpace(data)) return;
{
Trace.WriteLine("Job Payload missing, job ended");
return;
}
var notification = JsonSerializer.Deserialize<TNotification>(data); var notification = JsonSerializer.Deserialize<TNotification>(data);
if (notification is null) if (notification is null) return;
{
Trace.WriteLine("Notification could not be JSon converted from data string, job ended");
return;
}
using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(TNotification).Name}"); using var activity = MediatorTelemetry.Source.StartActivity($"Quartz: {typeof(TNotification).Name}");
activity?.SetTag("event.correlation_id", notification.CorrelationId); activity?.SetTag("event.correlation_id", notification.CorrelationId);
await mediator.Publish(notification, context.CancellationToken); await mediator.Publish(notification, context.CancellationToken);
Trace.WriteLine("Job published");
} }
} }
@@ -1,5 +1,4 @@
using k8s.KubeConfigModels; using LiteCharms.Features.Email;
using LiteCharms.Features.Email;
using LiteCharms.Features.Shop.Notifications.Models; using LiteCharms.Features.Shop.Notifications.Models;
using LiteCharms.Features.Shop.Postgres; using LiteCharms.Features.Shop.Postgres;
@@ -16,20 +15,17 @@ public class ProcessEmailNotificationsEventHandler(IDbContextFactory<ShopDbConte
{ {
using var context = await contextFactory.CreateDbContextAsync(cancellationToken); using var context = await contextFactory.CreateDbContextAsync(cancellationToken);
if (emailService.Status != EmailStatuses.Connected)
await emailService.ConnectAsync(cancellationToken);
var notifications = await context.Notifications var notifications = await context.Notifications
.OrderByDescending(o => o.CreatedAt) .OrderByDescending(o => o.CreatedAt)
.ThenBy(o => o.Priority) .ThenBy(o => o.Priority)
.Where(n => n.Platform == NotificationPlatforms.Email && .Where(n => n.CorrelationIdType == CorrelationIdTypes.Email)
n.Direction == NotificationDirection.Outgoing && n.Processed == false) .Where(n => n.Direction == NotificationDirection.Outgoing)
.Take(message.MaxRecords) .Take(message.MaxRecords)
.ToListAsync(cancellationToken); .ToListAsync(cancellationToken);
foreach (var notification in notifications) foreach (var notification in notifications)
{ {
if (dropBatch) break; if (dropBatch || cancellationToken.IsCancellationRequested) break;
var sendResult = await SendEmailAsync(notification,emailService, cancellationToken); var sendResult = await SendEmailAsync(notification,emailService, cancellationToken);
@@ -50,16 +46,12 @@ public class ProcessEmailNotificationsEventHandler(IDbContextFactory<ShopDbConte
notification.UpdatedAt = DateTime.UtcNow; notification.UpdatedAt = DateTime.UtcNow;
} }
await context.SaveChangesAsync(cancellationToken); await context.SaveChangesAsync(cancellationToken);
} }
catch (Exception ex) catch (Exception ex)
{ {
logger.LogError(ex, ex.Message); logger.LogError(ex, ex.Message);
} }
finally
{
await emailService.DisconnectAsync(cancellationToken);
}
} }
private async Task<Result> SendEmailAsync(Notification notification, EmailService service, CancellationToken cancellationToken = default) private async Task<Result> SendEmailAsync(Notification notification, EmailService service, CancellationToken cancellationToken = default)
@@ -8,8 +8,6 @@ public class ProcessEmailNotificationsEvent : EventBase, IEvent
public int MaxRecords { get; set; } public int MaxRecords { get; set; }
public ProcessEmailNotificationsEvent() { MaxRecords = 1000; }
private ProcessEmailNotificationsEvent(int maxRecords = 1000) => MaxRecords = maxRecords; private ProcessEmailNotificationsEvent(int maxRecords = 1000) => MaxRecords = maxRecords;
public static ProcessEmailNotificationsEvent Create(int maxRecords = 1000) => new(maxRecords); public static ProcessEmailNotificationsEvent Create(int maxRecords = 1000) => new(maxRecords);