Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions packages/durabletask-js/src/worker/orchestration-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,25 +386,11 @@ export class OrchestrationExecutor {
}

private async handleSubOrchestrationCompleted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {
const subOrchestrationInstanceCompletedEvent = event.getSuborchestrationinstancecompleted();
const taskId = subOrchestrationInstanceCompletedEvent
? subOrchestrationInstanceCompletedEvent.getTaskscheduledid()
: undefined;

let subOrchTask;

if (taskId !== undefined) {
subOrchTask = ctx._pendingTasks[taskId];
delete ctx._pendingTasks[taskId];
}

const result = parseJsonField(subOrchestrationInstanceCompletedEvent?.getResult());

if (subOrchTask) {
subOrchTask.complete(result);
}

await ctx.resume();
const completedEvent = event.getSuborchestrationinstancecompleted();
const taskId = completedEvent ? completedEvent.getTaskscheduledid() : undefined;
const result = completedEvent?.getResult();
const normalizedResult = isEmpty(result) ? undefined : result;
await this.handleCompletedTask(ctx, taskId, normalizedResult, "subOrchestrationInstanceCompleted");
}

private async handleSubOrchestrationFailed(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {
Expand Down
49 changes: 48 additions & 1 deletion packages/durabletask-js/test/orchestration_executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import { OrchestrationExecutor, OrchestrationExecutionResult } from "../src/work
import * as pb from "../src/proto/orchestrator_service_pb";
import { Registry } from "../src/worker/registry";
import { TOrchestrator } from "../src/types/orchestrator.type";
import { NoOpLogger } from "../src/types/logger.type";
import { NoOpLogger, StructuredLogger, LogEvent } from "../src/types/logger.type";
import { EVENT_ORCHESTRATION_UNEXPECTED_EVENT } from "../src/worker/logs";
import { ActivityContext } from "../src/task/context/activity-context";
import { CompletableTask } from "../src/task/completable-task";
import { Task } from "../src/task/task";
Expand Down Expand Up @@ -570,6 +571,52 @@ describe("Orchestration Executor", () => {
// assert user_code_statement in complete_action.failureDetails.stackTrace.value
});

it("should produce no actions and log a warning when sub-orchestration completion has unmatched taskId", async () => {
const subOrchestrator = async (_: OrchestrationContext) => {
// do nothing
};
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
const res = yield ctx.callSubOrchestrator(subOrchestrator);
return res;
};
const registry = new Registry();
const subOrchestratorName = registry.addOrchestrator(subOrchestrator);
const orchestratorName = registry.addOrchestrator(orchestrator);
const oldEvents = [
newOrchestratorStartedEvent(),
newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID, undefined),
newSubOrchestrationCreatedEvent(1, subOrchestratorName, "sub-orch-123"),
];
// Send a completion event with a taskId (999) that does not match any pending task.
const newEvents = [newSubOrchestrationCompletedEvent(999, JSON.stringify("unexpected"))];

// Use a spy logger to verify the warning log is emitted via handleCompletedTask's guard clause
const loggedEvents: LogEvent[] = [];
const spyLogger: StructuredLogger = {
error: () => {},
warn: () => {},
info: () => {},
debug: () => {},
logEvent: (_level, event, _message) => {
loggedEvents.push(event);
},
};

const executor = new OrchestrationExecutor(registry, spyLogger);
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);

// The orchestration should still be waiting for the real sub-orchestration to complete.
expect(result.actions.length).toEqual(0);

// Verify the unexpected event warning was logged (proves the guard clause was hit)
const unexpectedEvents = loggedEvents.filter(
(e) => e.eventId === EVENT_ORCHESTRATION_UNEXPECTED_EVENT,
);
expect(unexpectedEvents.length).toEqual(1);
expect(unexpectedEvents[0].properties?.eventType).toEqual("subOrchestrationInstanceCompleted");
expect(unexpectedEvents[0].properties?.eventId).toEqual(999);
});

it("should test that an orchestration can wait for and process an external event sent by a client", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
const res = yield ctx.waitForExternalEvent("my_event");
Expand Down
Loading