UseBroadcaster

Enables cross-process lifecycle event broadcasting. When trains execute on a remote worker process, their lifecycle events (started, completed, failed, cancelled) are published to a message bus and delivered to hub processes — where GraphQL subscriptions forward them to connected clients.

Without UseBroadcaster(), subscriptions only fire for trains that execute in the same process as the GraphQL API. With it, subscriptions work regardless of which process executes the train.

Signature

public static TBuilder UseBroadcaster<TBuilder>(
    this TBuilder builder,
    Action<BroadcasterBuilder> configure
)
    where TBuilder : TraxEffectBuilder

The generic type parameter TBuilder is inferred by the compiler — callers just write .UseBroadcaster(...). This preserves the concrete builder type through chaining (e.g., TraxEffectBuilderWithData stays as TraxEffectBuilderWithData).

ParameterTypeRequiredDescription
builderTBuilderYesThe effect configuration builder
configureAction<BroadcasterBuilder>YesCallback to select a transport (e.g., UseRabbitMq())

What It Registers

ComponentDescription
BroadcastLifecycleHookLifecycle hook that publishes events to ITrainEventBroadcaster
TrainEventReceiverServiceBackgroundService that consumes events from ITrainEventReceiver and dispatches to ITrainEventHandler instances

The transport-specific ITrainEventBroadcaster and ITrainEventReceiver are registered by the callback (e.g., UseRabbitMq()).

Connection Resilience

The TrainEventReceiverService automatically retries if the transport connection fails (e.g., RabbitMQ is unavailable at startup). It uses exponential backoff starting at 5 seconds, capping at 2 minutes. The service will not crash the host — it logs a warning and keeps retrying until the transport becomes available or the host shuts down.

De-duplication

When a train runs locally on the hub (via a run mutation), the GraphQLSubscriptionHook fires directly and notifies subscribers. The same event is also published to the message bus by BroadcastLifecycleHook. The TrainEventReceiverService detects this by comparing the event's Executor field against the local process name and skips events that originated locally. This prevents double-notification.

The Executor field is always stamped by the broadcasting process (via Assembly.GetEntryAssembly()), not copied from metadata.Executor. This is important because metadata may be pre-created by a different process (e.g., the API pre-creates metadata for queued jobs that execute on a worker). If the hook used metadata.Executor, the hub would incorrectly discard worker events as "local."

Abstractions

The broadcaster system is built on four interfaces that allow alternative transport implementations:

// Publishes lifecycle events to a message bus
public interface ITrainEventBroadcaster
{
    Task PublishAsync(TrainLifecycleEventMessage message, CancellationToken ct);
}
 
// Receives lifecycle events from a message bus
public interface ITrainEventReceiver : IAsyncDisposable
{
    Task StartAsync(
        Func<TrainLifecycleEventMessage, CancellationToken, Task> handler,
        CancellationToken ct
    );
    Task StopAsync(CancellationToken ct);
}
 
// Handles received events (e.g., forwarding to GraphQL subscriptions)
public interface ITrainEventHandler
{
    Task HandleAsync(TrainLifecycleEventMessage message, CancellationToken ct);
}

The TrainLifecycleEventMessage is a serializable record containing:

FieldTypeDescription
MetadataIdlongDatabase metadata row ID
ExternalIdstringExternal identifier for the execution
TrainNamestringFully-qualified train class name
TrainStatestringCurrent state (serialized as string for transport)
TimestampDateTimeWhen the event occurred
FailureJunctionstring?Junction that failed (if applicable)
FailureReasonstring?Failure message (if applicable)
EventTypestringOne of: Started, Completed, Failed, Cancelled
Executorstring?Assembly name of the process that broadcast the event

Transports

RabbitMQ

effects.UseBroadcaster(b => b.UseRabbitMq("amqp://guest:guest@localhost:5672"))
ParameterTypeRequiredDefaultDescription
connectionStringstringYesAMQP connection URI
configureAction<RabbitMqBroadcasterOptions>?NonullOptional callback to customize options

Options:

PropertyTypeDefaultDescription
ConnectionStringstringAMQP connection URI
ExchangeNamestring"trax.lifecycle"Fanout exchange name

The RabbitMQ transport uses a fanout exchange so all connected hub instances receive every event. Each receiver creates its own exclusive, auto-delete queue.

effects.UseBroadcaster(b =>
    b.UseRabbitMq("amqp://localhost", opts =>
        opts.ExchangeName = "my-app.lifecycle"
    )
)

Example: Distributed Workers

Both the hub (API + scheduler) and worker processes call UseBroadcaster() with the same RabbitMQ connection:

Hub (Program.cs):

builder.Services.AddTrax(trax =>
    trax.AddEffects(effects =>
            effects
                .UsePostgres(connectionString)
                .AddJson()
                .UseBroadcaster(b => b.UseRabbitMq(rabbitMqConnectionString))
        )
        .AddMediator(typeof(Program).Assembly)
        .AddScheduler(scheduler => scheduler /* ... */)
);
 
// AddTraxGraphQL() auto-detects the broadcaster and registers
// GraphQLTrainEventHandler to forward remote events to subscriptions
builder.Services.AddTraxGraphQL();

Worker (Program.cs):

builder.Services.AddTrax(trax =>
    trax.AddEffects(effects =>
            effects
                .UsePostgres(connectionString)
                .AddJson()
                .UseBroadcaster(b => b.UseRabbitMq(rabbitMqConnectionString))
        )
        .AddMediator(typeof(Program).Assembly)
);
 
builder.Services.AddTraxWorker(opts => { opts.WorkerCount = 4; });

GraphQL Integration

When AddTraxGraphQL() detects that ITrainEventReceiver is registered (via UseBroadcaster()), it automatically registers GraphQLTrainEventHandler as an ITrainEventHandler. This handler maps received TrainLifecycleEventMessage records to TrainLifecycleEvent DTOs and sends them to HotChocolate's ITopicEventSender, making them available to all connected WebSocket subscribers.

No additional configuration is needed — just call UseBroadcaster() in your effects and AddTraxGraphQL() as usual.

Architecture

Worker Process                          Hub Process
─────────────                          ────────────
Train.Run()                            GraphQL Subscription Clients
  → LifecycleHookRunner                  ↑
    → BroadcastLifecycleHook             TrainEventReceiverService
      → ITrainEventBroadcaster             → GraphQLTrainEventHandler
        → RabbitMQ Exchange ─────────→       → ITopicEventSender
                                               → WebSocket delivery

The database remains the single source of truth for all train data. The broadcaster only carries lightweight lifecycle event notifications — all metadata, logs, manifests, and train state are always persisted to and queried from PostgreSQL.

Implementing a Custom Transport

To implement a transport other than RabbitMQ:

  1. Implement ITrainEventBroadcaster and ITrainEventReceiver
  2. Create an extension method on BroadcasterBuilder that registers both:
public static BroadcasterBuilder UseMyTransport(
    this BroadcasterBuilder builder,
    string connectionString
)
{
    builder.ServiceCollection.AddSingleton<ITrainEventBroadcaster>(
        new MyTransportBroadcaster(connectionString)
    );
    builder.ServiceCollection.AddSingleton<ITrainEventReceiver>(
        new MyTransportReceiver(connectionString)
    );
    return builder;
}

Packages

dotnet add package Trax.Effect                        # Abstractions
dotnet add package Trax.Effect.Broadcaster.RabbitMQ    # RabbitMQ transport