diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index 789d3e1..3d17d76 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -386,25 +386,11 @@ export class OrchestrationExecutor { } private async handleSubOrchestrationCompleted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { - const subOrchestrationInstanceCompletedEvent = event.getSuborchestrationinstancecompleted(); - const taskId = subOrchestrationInstanceCompletedEvent - ? subOrchestrationInstanceCompletedEvent.getTaskscheduledid() - : undefined; - - let subOrchTask; - - if (taskId !== undefined) { - subOrchTask = ctx._pendingTasks[taskId]; - delete ctx._pendingTasks[taskId]; - } - - const result = parseJsonField(subOrchestrationInstanceCompletedEvent?.getResult()); - - if (subOrchTask) { - subOrchTask.complete(result); - } - - await ctx.resume(); + const completedEvent = event.getSuborchestrationinstancecompleted(); + const taskId = completedEvent ? completedEvent.getTaskscheduledid() : undefined; + const result = completedEvent?.getResult(); + const normalizedResult = isEmpty(result) ? undefined : result; + await this.handleCompletedTask(ctx, taskId, normalizedResult, "subOrchestrationInstanceCompleted"); } private async handleSubOrchestrationFailed(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index 2783750..f1f0dba 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -23,7 +23,8 @@ import { OrchestrationExecutor, OrchestrationExecutionResult } from "../src/work import * as pb from "../src/proto/orchestrator_service_pb"; import { Registry } from "../src/worker/registry"; import { TOrchestrator } from "../src/types/orchestrator.type"; -import { NoOpLogger } from "../src/types/logger.type"; +import { NoOpLogger, StructuredLogger, LogEvent } from "../src/types/logger.type"; +import { EVENT_ORCHESTRATION_UNEXPECTED_EVENT } from "../src/worker/logs"; import { ActivityContext } from "../src/task/context/activity-context"; import { CompletableTask } from "../src/task/completable-task"; import { Task } from "../src/task/task"; @@ -570,6 +571,52 @@ describe("Orchestration Executor", () => { // assert user_code_statement in complete_action.failureDetails.stackTrace.value }); + it("should produce no actions and log a warning when sub-orchestration completion has unmatched taskId", async () => { + const subOrchestrator = async (_: OrchestrationContext) => { + // do nothing + }; + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { + const res = yield ctx.callSubOrchestrator(subOrchestrator); + return res; + }; + const registry = new Registry(); + const subOrchestratorName = registry.addOrchestrator(subOrchestrator); + const orchestratorName = registry.addOrchestrator(orchestrator); + const oldEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID, undefined), + newSubOrchestrationCreatedEvent(1, subOrchestratorName, "sub-orch-123"), + ]; + // Send a completion event with a taskId (999) that does not match any pending task. + const newEvents = [newSubOrchestrationCompletedEvent(999, JSON.stringify("unexpected"))]; + + // Use a spy logger to verify the warning log is emitted via handleCompletedTask's guard clause + const loggedEvents: LogEvent[] = []; + const spyLogger: StructuredLogger = { + error: () => {}, + warn: () => {}, + info: () => {}, + debug: () => {}, + logEvent: (_level, event, _message) => { + loggedEvents.push(event); + }, + }; + + const executor = new OrchestrationExecutor(registry, spyLogger); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + + // The orchestration should still be waiting for the real sub-orchestration to complete. + expect(result.actions.length).toEqual(0); + + // Verify the unexpected event warning was logged (proves the guard clause was hit) + const unexpectedEvents = loggedEvents.filter( + (e) => e.eventId === EVENT_ORCHESTRATION_UNEXPECTED_EVENT, + ); + expect(unexpectedEvents.length).toEqual(1); + expect(unexpectedEvents[0].properties?.eventType).toEqual("subOrchestrationInstanceCompleted"); + expect(unexpectedEvents[0].properties?.eventId).toEqual(999); + }); + it("should test that an orchestration can wait for and process an external event sent by a client", async () => { const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { const res = yield ctx.waitForExternalEvent("my_event");