UseSqsWorkers

Routes specific trains to an Amazon SQS queue for execution. Trains not included in the routing configuration continue to execute locally. Messages are consumed by an AWS Lambda function (or any SQS consumer) that runs JobRunnerTrain.

Package

dotnet add package Trax.Scheduler.Sqs

Signature

public static SchedulerConfigurationBuilder UseSqsWorkers(
    this SchedulerConfigurationBuilder builder,
    Action<SqsWorkerOptions> configure,
    Action<SubmitterRouting>? routing = null
)

Defined in Trax.Scheduler.Sqs.Extensions.SqsSchedulerExtensions.

Parameters

ParameterTypeRequiredDescription
configureAction<SqsWorkerOptions>YesCallback to set the SQS queue URL and client options
routingAction<SubmitterRouting>?NoCallback to specify which trains should be dispatched to this SQS queue. When omitted, only [TraxRemote]-attributed trains are routed.

Returns

SchedulerConfigurationBuilder — for continued fluent chaining.

SqsWorkerOptions

PropertyTypeDefaultDescription
QueueUrlstring(required)The SQS queue URL (e.g., https://sqs.us-east-1.amazonaws.com/123456789/trax-jobs)
ConfigureSqsClientAction<AmazonSQSConfig>?nullOptional callback to configure the SQS client — set region, endpoint override (LocalStack), etc.
MessageGroupIdstring?nullFor FIFO queues: a fixed message group ID. When null, each message gets a unique group ID (no ordering). Ignored for standard queues.

SubmitterRouting

MethodDescription
ForTrain<TTrain>()Routes the specified train type to this SQS queue. Returns the routing instance for chaining.

Examples

Basic Usage

using Trax.Scheduler.Sqs.Extensions;
 
services.AddTrax(trax => trax
    .AddEffects(effects => effects
        .UsePostgres(connectionString)
    )
    .AddMediator(assemblies)
    .AddScheduler(scheduler => scheduler
        .UseSqsWorkers(
            sqs => sqs.QueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/trax-jobs",
            routing => routing.ForTrain<IBatchProcessTrain>())
        .Schedule<IMyTrain, MyInput>("my-job", new MyInput(), Every.Minutes(5))
        .Schedule<IBatchProcessTrain, BatchInput>("batch", new BatchInput(), Every.Hours(1))
    )
);

In this example, IBatchProcessTrain is dispatched to SQS. IMyTrain executes locally.

With Custom Region

.UseSqsWorkers(
    sqs =>
    {
        sqs.QueueUrl = "https://sqs.eu-west-1.amazonaws.com/123456789/trax-jobs";
        sqs.ConfigureSqsClient = config =>
            config.RegionEndpoint = Amazon.RegionEndpoint.EUWest1;
    },
    routing => routing.ForTrain<IBatchProcessTrain>())

With LocalStack (Development)

.UseSqsWorkers(
    sqs =>
    {
        sqs.QueueUrl = "http://localhost:4566/000000000000/trax-jobs";
        sqs.ConfigureSqsClient = config =>
        {
            config.ServiceURL = "http://localhost:4566";
            config.AuthenticationRegion = "us-east-1";
        };
    },
    routing => routing.ForTrain<IBatchProcessTrain>())

FIFO Queue with Ordering

.UseSqsWorkers(
    sqs =>
    {
        sqs.QueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/trax-jobs.fifo";
        sqs.MessageGroupId = "trax-jobs";
    },
    routing => routing.ForTrain<IOrderedTrain>())

Mixed with Remote Workers

You can use both SQS and HTTP remote workers — each for different trains:

.AddScheduler(scheduler => scheduler
    .UseRemoteWorkers(
        remote => remote.BaseUrl = "https://gpu-workers/trax/execute",
        routing => routing.ForTrain<IAiInferenceTrain>())
    .UseSqsWorkers(
        sqs => sqs.QueueUrl = "https://sqs.../trax-jobs",
        routing => routing.ForTrain<IBatchProcessTrain>())
)

Each train can only be routed to one submitter. Routing the same train to both throws InvalidOperationException at build time.

Lambda Consumer

On the consumer side, use SqsJobRunnerHandler in an AWS Lambda function:

using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
using Trax.Scheduler.Sqs.Lambda;
 
public class Function
{
    private static readonly IServiceProvider Services = BuildServiceProvider();
    private readonly SqsJobRunnerHandler _handler = new(Services);
 
    public async Task FunctionHandler(SQSEvent sqsEvent, ILambdaContext context)
    {
        await _handler.HandleAsync(sqsEvent, context.CancellationToken);
    }
 
    private static IServiceProvider BuildServiceProvider()
    {
        var services = new ServiceCollection();
        services.AddTrax(trax => trax
            .AddEffects(effects => effects.UsePostgres(connectionString))
            .AddMediator(typeof(MyTrain).Assembly)
        );
        services.AddTraxJobRunner();
        return services.BuildServiceProvider();
    }
}

The handler:

  1. Deserializes each SQS record as a RemoteJobRequest
  2. Delegates to ITraxRequestHandler.ExecuteJobAsync in a scoped DI container
  3. Re-throws exceptions so SQS retry and dead-letter queue policies apply

Registered Services

UseSqsWorkers() registers:

ServiceLifetimeDescription
SqsWorkerOptionsSingletonConfiguration options
IAmazonSQSSingletonAWS SQS client
SqsJobSubmitterScoped (concrete type)Dispatches jobs as SQS messages — resolved per train via routing

> Note: UseSqsWorkers() does not replace the default IJobSubmitter. Local workers continue to run for trains not routed to this queue.

How It Works

When the JobDispatcher processes a work queue entry, it checks the JobSubmitterRoutingConfiguration for the entry's train name. If a route exists to SqsJobSubmitter, the SqsJobSubmitter:

  1. Serializes a RemoteJobRequest containing the metadata ID and optional input
  2. Sends the JSON as an SQS message to QueueUrl
  3. For FIFO queues, sets MessageGroupId and MessageDeduplicationId
  4. Returns a job ID in the format "sqs-{messageId}"

SQS Queue Configuration

Standard queues provide nearly unlimited throughput with at-least-once delivery. This is the best fit for most Trax workloads.

FIFO Queue

FIFO queues guarantee ordering within a message group (300 messages/sec, or 3,000 with batching). Use this only when job execution order matters.

Dead Letter Queue

Configure a DLQ on your SQS queue for jobs that fail repeatedly:

{
  "RedrivePolicy": {
    "deadLetterTargetArn": "arn:aws:sqs:us-east-1:123456789:trax-jobs-dlq",
    "maxReceiveCount": 3
  }
}

IAM Permissions

The API process needs sqs:SendMessage. The Lambda function needs sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueAttributes.

Limitations

  • Message size limit: SQS messages are limited to 256 KB. If your serialized train input exceeds this, the send will fail. For large inputs, store the data externally and pass a reference.
  • No synchronous return: SQS is fire-and-forget. For mutations that need a return value, continue using UseRemoteRun() alongside UseSqsWorkers().
  • Cancellation is process-local: Same limitation as other remote execution models — dashboard "Cancel" only affects trains on the same process.

See Also