"""Tests for the workflow engine subsystem. Covers: - Step registry & auto-discovery - Base classes (StepBase, StepContext, StepResult) - Expression engine - All 10 built-in step types - Workflow definition loading & validation - Workflow engine execution & state persistence - Workflow catalog & registry """ from __future__ import annotations import json import os import shutil import tempfile from pathlib import Path import pytest import yaml # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def temp_dir(): """Create a temporary directory for tests.""" tmpdir = tempfile.mkdtemp() yield Path(tmpdir) shutil.rmtree(tmpdir) @pytest.fixture def project_dir(temp_dir): """Create a mock spec-kit project with .specify/ directory.""" specify_dir = temp_dir / ".specify" specify_dir.mkdir() (specify_dir / "workflows").mkdir() return temp_dir @pytest.fixture def sample_workflow_yaml(): """Return a valid minimal workflow YAML string.""" return """ schema_version: "1.0" workflow: id: "test-workflow" name: "Test Workflow" version: "1.0.0" description: "A test workflow" inputs: spec: type: string required: true scope: type: string default: "full" steps: - id: step-one command: speckit.specify input: args: "{{ inputs.spec }}" - id: step-two command: speckit.plan input: args: "{{ steps.step-one.output.command }}" """ @pytest.fixture def sample_workflow_file(project_dir, sample_workflow_yaml): """Write a sample workflow YAML to a file and return its path.""" wf_dir = project_dir / ".specify" / "workflows" / "test-workflow" wf_dir.mkdir(parents=True, exist_ok=True) wf_path = wf_dir / "workflow.yml" wf_path.write_text(sample_workflow_yaml, encoding="utf-8") return wf_path # ===== Step Registry Tests ===== class TestStepRegistry: """Test STEP_REGISTRY and auto-discovery.""" def test_registry_populated(self): from specify_cli.workflows import STEP_REGISTRY assert len(STEP_REGISTRY) >= 10 def test_all_step_types_registered(self): from specify_cli.workflows import STEP_REGISTRY expected = { "command", "shell", "prompt", "gate", "if", "switch", "while", "do-while", "fan-out", "fan-in", } assert expected.issubset(set(STEP_REGISTRY.keys())) def test_get_step_type(self): from specify_cli.workflows import get_step_type step = get_step_type("command") assert step is not None assert step.type_key == "command" def test_get_step_type_missing(self): from specify_cli.workflows import get_step_type assert get_step_type("nonexistent") is None def test_register_step_duplicate_raises(self): from specify_cli.workflows import _register_step from specify_cli.workflows.steps.command import CommandStep with pytest.raises(KeyError, match="already registered"): _register_step(CommandStep()) def test_register_step_empty_key_raises(self): from specify_cli.workflows import _register_step from specify_cli.workflows.base import StepBase, StepResult class EmptyStep(StepBase): type_key = "" def execute(self, config, context): return StepResult() with pytest.raises(ValueError, match="empty type_key"): _register_step(EmptyStep()) # ===== Base Classes Tests ===== class TestBaseClasses: """Test StepBase, StepContext, StepResult.""" def test_step_context_defaults(self): from specify_cli.workflows.base import StepContext ctx = StepContext() assert ctx.inputs == {} assert ctx.steps == {} assert ctx.item is None assert ctx.fan_in == {} assert ctx.default_integration is None def test_step_context_with_data(self): from specify_cli.workflows.base import StepContext ctx = StepContext( inputs={"name": "test"}, default_integration="claude", default_model="sonnet-4", ) assert ctx.inputs == {"name": "test"} assert ctx.default_integration == "claude" assert ctx.default_model == "sonnet-4" def test_step_result_defaults(self): from specify_cli.workflows.base import StepResult, StepStatus result = StepResult() assert result.status == StepStatus.COMPLETED assert result.output == {} assert result.next_steps == [] assert result.error is None def test_step_status_values(self): from specify_cli.workflows.base import StepStatus assert StepStatus.PENDING == "pending" assert StepStatus.RUNNING == "running" assert StepStatus.COMPLETED == "completed" assert StepStatus.FAILED == "failed" assert StepStatus.SKIPPED == "skipped" assert StepStatus.PAUSED == "paused" def test_run_status_values(self): from specify_cli.workflows.base import RunStatus assert RunStatus.CREATED == "created" assert RunStatus.RUNNING == "running" assert RunStatus.PAUSED == "paused" assert RunStatus.COMPLETED == "completed" assert RunStatus.FAILED == "failed" assert RunStatus.ABORTED == "aborted" # ===== Expression Engine Tests ===== class TestExpressions: """Test sandboxed expression evaluator.""" def test_simple_variable(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"name": "login"}) assert evaluate_expression("{{ inputs.name }}", ctx) == "login" def test_step_output_reference(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"specify": {"output": {"file": "spec.md"}}} ) assert evaluate_expression("{{ steps.specify.output.file }}", ctx) == "spec.md" def test_string_interpolation(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"name": "login"}) result = evaluate_expression("Feature: {{ inputs.name }} done", ctx) assert result == "Feature: login done" def test_comparison_equals(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"scope": "full"}) assert evaluate_expression("{{ inputs.scope == 'full' }}", ctx) is True assert evaluate_expression("{{ inputs.scope == 'partial' }}", ctx) is False def test_comparison_not_equals(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"run-tests": {"output": {"exit_code": 1}}} ) result = evaluate_expression("{{ steps.run-tests.output.exit_code != 0 }}", ctx) assert result is True def test_numeric_comparison(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"plan": {"output": {"task_count": 7}}} ) assert evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True assert evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False def test_boolean_and(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"a": True, "b": True}) assert evaluate_expression("{{ inputs.a and inputs.b }}", ctx) is True def test_boolean_or(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"a": False, "b": True}) assert evaluate_expression("{{ inputs.a or inputs.b }}", ctx) is True def test_filter_default(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) == "fallback" def test_filter_join(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"tags": ["a", "b", "c"]}) assert evaluate_expression("{{ inputs.tags | join(', ') }}", ctx) == "a, b, c" def test_filter_contains(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"text": "hello world"}) assert evaluate_expression("{{ inputs.text | contains('world') }}", ctx) is True def test_condition_evaluation(self): from specify_cli.workflows.expressions import evaluate_condition from specify_cli.workflows.base import StepContext ctx = StepContext(inputs={"ready": True}) assert evaluate_condition("{{ inputs.ready }}", ctx) is True assert evaluate_condition("{{ inputs.missing }}", ctx) is False def test_non_string_passthrough(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression(42, ctx) == 42 assert evaluate_expression(None, ctx) is None def test_string_literal(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ 'hello' }}", ctx) == "hello" def test_numeric_literal(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ 42 }}", ctx) == 42 def test_boolean_literal(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext() assert evaluate_expression("{{ true }}", ctx) is True assert evaluate_expression("{{ false }}", ctx) is False def test_list_indexing(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext( steps={"tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}}} ) result = evaluate_expression("{{ steps.tasks.output.task_list[0].file }}", ctx) assert result == "a.md" def test_context_run_id_resolves(self): """``{{ context.run_id }}`` resolves to ``StepContext.run_id``. Locks the contract from issue #2590: workflow templates can reference the engine-assigned run id for telemetry, artifact metadata, or per-run scratch isolation. """ from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(run_id="a1b2c3d4") assert evaluate_expression("{{ context.run_id }}", ctx) == "a1b2c3d4" def test_context_run_id_defaults_to_empty_when_unset(self): """``{{ context.run_id }}`` resolves to ``""`` when no run is active (dry-run, validation, ad-hoc evaluator usage) rather than raising — workflows referencing the variable never error outside a run context. """ from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext # No run_id set on the context. ctx = StepContext() assert evaluate_expression("{{ context.run_id }}", ctx) == "" def test_context_run_id_string_interpolation(self): """Run id interpolates inside a larger template string — the common pattern for stamping shell commands and artifact paths with the run id. """ from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext ctx = StepContext(run_id="deadbeef") result = evaluate_expression("RUN_ID={{ context.run_id }}", ctx) assert result == "RUN_ID=deadbeef" # ===== Integration Dispatch Tests ===== class TestBuildExecArgs: """Test build_exec_args for CLI-based integrations.""" def test_claude_exec_args(self): from specify_cli.integrations.claude import ClaudeIntegration impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", model="sonnet-4") assert args[0] == "claude" assert args[1] == "-p" assert args[2] == "do stuff" assert "--model" in args assert "sonnet-4" in args assert "--output-format" in args def test_gemini_exec_args(self): from specify_cli.integrations.gemini import GeminiIntegration impl = GeminiIntegration() args = impl.build_exec_args("do stuff", model="gemini-2.5-pro") assert args[0] == "gemini" assert args[1] == "-p" assert "-m" in args assert "gemini-2.5-pro" in args def test_codex_exec_args(self): from specify_cli.integrations.codex import CodexIntegration impl = CodexIntegration() args = impl.build_exec_args("do stuff") assert args[0] == "codex" assert args[1] == "exec" assert args[2] == "do stuff" assert "--json" in args def test_copilot_exec_args(self, monkeypatch): monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False) monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False) from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() args = impl.build_exec_args("do stuff", model="claude-sonnet-4-20250514") expected_exec = "copilot.cmd" if os.name == "nt" else "copilot" assert args[0] == expected_exec assert "-p" in args assert "--yolo" in args assert "--model" in args def test_copilot_new_env_var_disables_yolo(self, monkeypatch): monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "0") monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False) from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() args = impl.build_exec_args("do stuff") assert "--yolo" not in args def test_copilot_deprecated_env_var_still_honoured(self, monkeypatch): monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False) monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0") import warnings from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") args = impl.build_exec_args("do stuff") assert "--yolo" not in args assert any( "SPECKIT_ALLOW_ALL_TOOLS is deprecated" in str(x.message) and issubclass(x.category, UserWarning) for x in w ) def test_copilot_new_env_var_takes_precedence(self, monkeypatch): monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "1") monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0") from specify_cli.integrations.copilot import CopilotIntegration impl = CopilotIntegration() args = impl.build_exec_args("do stuff") assert "--yolo" in args def test_ide_only_returns_none(self): from specify_cli.integrations.windsurf import WindsurfIntegration impl = WindsurfIntegration() assert impl.build_exec_args("test") is None def test_no_model_omits_flag(self): from specify_cli.integrations.claude import ClaudeIntegration impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", model=None) assert "--model" not in args def test_no_json_omits_flag(self): from specify_cli.integrations.claude import ClaudeIntegration impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", output_json=False) assert "--output-format" not in args # ===== Step Type Tests ===== class TestCommandStep: """Test the command step type.""" def test_execute_basic(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={"name": "login"}, default_integration="claude", ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["command"] == "speckit.specify" assert result.output["integration"] == "claude" assert result.output["input"]["args"] == "login" def test_validate_missing_command(self): from specify_cli.workflows.steps.command import CommandStep step = CommandStep() errors = step.validate({"id": "test"}) assert any("missing 'command'" in e for e in errors) def test_step_override_integration(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext step = CommandStep() ctx = StepContext(default_integration="claude") config = { "id": "test", "command": "speckit.plan", "integration": "gemini", "input": {}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["integration"] == "gemini" def test_step_override_model(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext step = CommandStep() ctx = StepContext(default_model="sonnet-4") config = { "id": "test", "command": "speckit.implement", "model": "opus-4", "input": {}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["model"] == "opus-4" def test_options_merge(self): from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext step = CommandStep() ctx = StepContext(default_options={"max-tokens": 8000}) config = { "id": "test", "command": "speckit.plan", "options": {"thinking-budget": 32768}, "input": {}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["options"]["max-tokens"] == 8000 assert result.output["options"]["thinking-budget"] == 32768 def test_dispatch_not_attempted_without_cli(self): """When the CLI tool is not installed, step should fail.""" from unittest.mock import patch from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={"name": "login"}, default_integration="claude", project_root="/tmp", ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["dispatched"] is False assert result.error is not None def test_dispatch_with_mock_cli(self, tmp_path, monkeypatch): """When the CLI is installed, dispatch invokes the command by name.""" from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={"name": "login"}, default_integration="claude", project_root=str(tmp_path), ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = '{"result": "done"}' mock_result.stderr = "" with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \ patch("subprocess.run", return_value=mock_result) as mock_run: result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["dispatched"] is True assert result.output["exit_code"] == 0 # Verify the CLI was called with -p and the skill invocation call_args = mock_run.call_args assert call_args[0][0][0] == "claude" assert call_args[0][0][1] == "-p" # Claude is a SkillsIntegration so uses /speckit-specify assert "/speckit-specify login" in call_args[0][0][2] def test_dispatch_failure_returns_failed_status(self, tmp_path): """When the CLI exits non-zero, the step should fail.""" from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.command import CommandStep from specify_cli.workflows.base import StepContext, StepStatus step = CommandStep() ctx = StepContext( inputs={}, default_integration="claude", project_root=str(tmp_path), ) config = { "id": "test", "command": "speckit.specify", "input": {"args": "test"}, } mock_result = MagicMock() mock_result.returncode = 1 mock_result.stdout = "" mock_result.stderr = "API error" with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \ patch("subprocess.run", return_value=mock_result): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["dispatched"] is True assert result.output["exit_code"] == 1 class TestPromptStep: """Test the prompt step type.""" def test_execute_basic(self): from unittest.mock import patch from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext, StepStatus step = PromptStep() ctx = StepContext( inputs={"file": "auth.py"}, default_integration="claude", ) config = { "id": "review", "type": "prompt", "prompt": "Review {{ inputs.file }} for security issues", } with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["prompt"] == "Review auth.py for security issues" assert result.output["integration"] == "claude" assert result.output["dispatched"] is False def test_execute_with_step_integration(self): from unittest.mock import patch from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext step = PromptStep() ctx = StepContext(default_integration="claude") config = { "id": "review", "type": "prompt", "prompt": "Summarize the codebase", "integration": "gemini", } with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["integration"] == "gemini" def test_execute_with_model(self): from unittest.mock import patch from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext step = PromptStep() ctx = StepContext(default_integration="claude", default_model="sonnet-4") config = { "id": "review", "type": "prompt", "prompt": "hello", "model": "opus-4", } with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None): result = step.execute(config, ctx) assert result.output["model"] == "opus-4" def test_dispatch_with_mock_cli(self, tmp_path): from unittest.mock import patch, MagicMock from specify_cli.workflows.steps.prompt import PromptStep from specify_cli.workflows.base import StepContext, StepStatus step = PromptStep() ctx = StepContext( default_integration="claude", project_root=str(tmp_path), ) config = { "id": "ask", "type": "prompt", "prompt": "Explain this code", } mock_result = MagicMock() mock_result.returncode = 0 mock_result.stdout = "Here is the explanation" mock_result.stderr = "" with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value="/usr/local/bin/claude"), \ patch("subprocess.run", return_value=mock_result): result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["dispatched"] is True assert result.output["exit_code"] == 0 def test_validate_missing_prompt(self): from specify_cli.workflows.steps.prompt import PromptStep step = PromptStep() errors = step.validate({"id": "test"}) assert any("missing 'prompt'" in e for e in errors) def test_validate_valid(self): from specify_cli.workflows.steps.prompt import PromptStep step = PromptStep() errors = step.validate({"id": "test", "prompt": "do something"}) assert errors == [] class TestShellStep: """Test the shell step type.""" def test_execute_echo(self): from specify_cli.workflows.steps.shell import ShellStep from specify_cli.workflows.base import StepContext, StepStatus step = ShellStep() ctx = StepContext() config = {"id": "test", "run": "echo hello"} result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED assert result.output["exit_code"] == 0 assert "hello" in result.output["stdout"] def test_execute_failure(self): from specify_cli.workflows.steps.shell import ShellStep from specify_cli.workflows.base import StepContext, StepStatus step = ShellStep() ctx = StepContext() config = {"id": "test", "run": "exit 1"} result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["exit_code"] == 1 assert result.error is not None def test_validate_missing_run(self): from specify_cli.workflows.steps.shell import ShellStep step = ShellStep() errors = step.validate({"id": "test"}) assert any("missing 'run'" in e for e in errors) class TestGateStep: """Test the gate step type.""" def test_execute_returns_paused(self): from specify_cli.workflows.steps.gate import GateStep from specify_cli.workflows.base import StepContext, StepStatus step = GateStep() ctx = StepContext() config = { "id": "review", "message": "Review the spec.", "options": ["approve", "reject"], "on_reject": "abort", } result = step.execute(config, ctx) assert result.status == StepStatus.PAUSED assert result.output["message"] == "Review the spec." assert result.output["options"] == ["approve", "reject"] def test_validate_missing_message(self): from specify_cli.workflows.steps.gate import GateStep step = GateStep() errors = step.validate({"id": "test", "options": ["approve"]}) assert any("missing 'message'" in e for e in errors) def test_validate_invalid_on_reject(self): from specify_cli.workflows.steps.gate import GateStep step = GateStep() errors = step.validate({ "id": "test", "message": "Review", "on_reject": "invalid", }) assert any("on_reject" in e for e in errors) class TestIfThenStep: """Test the if/then/else step type.""" def test_execute_then_branch(self): from specify_cli.workflows.steps.if_then import IfThenStep from specify_cli.workflows.base import StepContext step = IfThenStep() ctx = StepContext(inputs={"scope": "full"}) config = { "id": "check", "condition": "{{ inputs.scope == 'full' }}", "then": [{"id": "a", "command": "speckit.tasks"}], "else": [{"id": "b", "command": "speckit.plan"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is True assert len(result.next_steps) == 1 assert result.next_steps[0]["id"] == "a" def test_execute_else_branch(self): from specify_cli.workflows.steps.if_then import IfThenStep from specify_cli.workflows.base import StepContext step = IfThenStep() ctx = StepContext(inputs={"scope": "backend"}) config = { "id": "check", "condition": "{{ inputs.scope == 'full' }}", "then": [{"id": "a", "command": "speckit.tasks"}], "else": [{"id": "b", "command": "speckit.plan"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is False assert result.next_steps[0]["id"] == "b" def test_validate_missing_condition(self): from specify_cli.workflows.steps.if_then import IfThenStep step = IfThenStep() errors = step.validate({"id": "test", "then": []}) assert any("missing 'condition'" in e for e in errors) class TestSwitchStep: """Test the switch step type.""" def test_execute_matches_case(self): from specify_cli.workflows.steps.switch import SwitchStep from specify_cli.workflows.base import StepContext step = SwitchStep() ctx = StepContext( steps={"review": {"output": {"choice": "approve"}}} ) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", "cases": { "approve": [{"id": "plan", "command": "speckit.plan"}], "reject": [{"id": "log", "type": "shell", "run": "echo rejected"}], }, "default": [{"id": "abort", "type": "gate", "message": "Unknown"}], } result = step.execute(config, ctx) assert result.output["matched_case"] == "approve" assert result.next_steps[0]["id"] == "plan" def test_execute_falls_to_default(self): from specify_cli.workflows.steps.switch import SwitchStep from specify_cli.workflows.base import StepContext step = SwitchStep() ctx = StepContext( steps={"review": {"output": {"choice": "unknown"}}} ) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", "cases": { "approve": [{"id": "plan", "command": "speckit.plan"}], }, "default": [{"id": "fallback", "type": "gate", "message": "Fallback"}], } result = step.execute(config, ctx) assert result.output["matched_case"] == "__default__" assert result.next_steps[0]["id"] == "fallback" def test_execute_no_default_no_match(self): from specify_cli.workflows.steps.switch import SwitchStep from specify_cli.workflows.base import StepContext step = SwitchStep() ctx = StepContext( steps={"review": {"output": {"choice": "other"}}} ) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", "cases": { "approve": [{"id": "plan", "command": "speckit.plan"}], }, } result = step.execute(config, ctx) assert result.output["matched_case"] == "__default__" assert result.next_steps == [] def test_validate_missing_expression(self): from specify_cli.workflows.steps.switch import SwitchStep step = SwitchStep() errors = step.validate({"id": "test", "cases": {}}) assert any("missing 'expression'" in e for e in errors) def test_validate_invalid_cases_and_default(self): from specify_cli.workflows.steps.switch import SwitchStep step = SwitchStep() errors = step.validate({ "id": "test", "expression": "{{ x }}", "cases": {"a": "not-a-list"}, "default": "also-bad", }) assert any("case 'a' must be a list" in e for e in errors) assert any("'default' must be a list" in e for e in errors) class TestWhileStep: """Test the while loop step type.""" def test_execute_condition_true(self): from specify_cli.workflows.steps.while_loop import WhileStep from specify_cli.workflows.base import StepContext step = WhileStep() ctx = StepContext( steps={"run-tests": {"output": {"exit_code": 1}}} ) config = { "id": "retry", "condition": "{{ steps.run-tests.output.exit_code != 0 }}", "max_iterations": 5, "steps": [{"id": "fix", "command": "speckit.implement"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is True assert len(result.next_steps) == 1 def test_execute_condition_false(self): from specify_cli.workflows.steps.while_loop import WhileStep from specify_cli.workflows.base import StepContext step = WhileStep() ctx = StepContext( steps={"run-tests": {"output": {"exit_code": 0}}} ) config = { "id": "retry", "condition": "{{ steps.run-tests.output.exit_code != 0 }}", "max_iterations": 5, "steps": [{"id": "fix", "command": "speckit.implement"}], } result = step.execute(config, ctx) assert result.output["condition_result"] is False assert result.next_steps == [] def test_validate_missing_fields(self): from specify_cli.workflows.steps.while_loop import WhileStep step = WhileStep() errors = step.validate({"id": "test", "steps": []}) assert any("missing 'condition'" in e for e in errors) # max_iterations is optional (defaults to 10) def test_validate_invalid_max_iterations(self): from specify_cli.workflows.steps.while_loop import WhileStep step = WhileStep() errors = step.validate({"id": "test", "condition": "{{ true }}", "max_iterations": 0, "steps": []}) assert any("must be an integer >= 1" in e for e in errors) class TestDoWhileStep: """Test the do-while loop step type.""" def test_execute_always_runs_once(self): from specify_cli.workflows.steps.do_while import DoWhileStep from specify_cli.workflows.base import StepContext step = DoWhileStep() ctx = StepContext() config = { "id": "cycle", "condition": "{{ false }}", "max_iterations": 3, "steps": [{"id": "refine", "command": "speckit.specify"}], } result = step.execute(config, ctx) assert len(result.next_steps) == 1 assert result.output["loop_type"] == "do-while" assert result.output["condition"] == "{{ false }}" def test_execute_with_true_condition(self): from specify_cli.workflows.steps.do_while import DoWhileStep from specify_cli.workflows.base import StepContext step = DoWhileStep() ctx = StepContext() config = { "id": "cycle", "condition": "{{ true }}", "max_iterations": 5, "steps": [{"id": "work", "command": "speckit.plan"}], } result = step.execute(config, ctx) # Body always executes on first call regardless of condition assert len(result.next_steps) == 1 assert result.output["max_iterations"] == 5 def test_execute_empty_steps(self): from specify_cli.workflows.steps.do_while import DoWhileStep from specify_cli.workflows.base import StepContext step = DoWhileStep() ctx = StepContext() config = { "id": "empty", "condition": "{{ false }}", "max_iterations": 1, "steps": [], } result = step.execute(config, ctx) assert result.next_steps == [] assert result.status.value == "completed" def test_validate_missing_fields(self): from specify_cli.workflows.steps.do_while import DoWhileStep step = DoWhileStep() errors = step.validate({"id": "test", "steps": []}) assert any("missing 'condition'" in e for e in errors) # max_iterations is optional (defaults to 10) def test_validate_steps_not_list(self): from specify_cli.workflows.steps.do_while import DoWhileStep step = DoWhileStep() errors = step.validate({ "id": "test", "condition": "{{ true }}", "max_iterations": 3, "steps": "not-a-list", }) assert any("'steps' must be a list" in e for e in errors) class TestFanOutStep: """Test the fan-out step type.""" def test_execute_with_items(self): from specify_cli.workflows.steps.fan_out import FanOutStep from specify_cli.workflows.base import StepContext step = FanOutStep() ctx = StepContext( steps={"tasks": {"output": {"task_list": [ {"file": "a.md"}, {"file": "b.md"}, ]}}} ) config = { "id": "parallel", "items": "{{ steps.tasks.output.task_list }}", "max_concurrency": 3, "step": {"id": "impl", "command": "speckit.implement"}, } result = step.execute(config, ctx) assert result.output["item_count"] == 2 assert result.output["max_concurrency"] == 3 def test_execute_non_list_items_resolves_empty(self): from specify_cli.workflows.steps.fan_out import FanOutStep from specify_cli.workflows.base import StepContext step = FanOutStep() ctx = StepContext() config = { "id": "parallel", "items": "{{ undefined_var }}", "step": {"id": "impl", "command": "speckit.implement"}, } result = step.execute(config, ctx) assert result.output["item_count"] == 0 assert result.output["items"] == [] def test_validate_missing_fields(self): from specify_cli.workflows.steps.fan_out import FanOutStep step = FanOutStep() errors = step.validate({"id": "test"}) assert any("missing 'items'" in e for e in errors) assert any("missing 'step'" in e for e in errors) def test_validate_step_not_mapping(self): from specify_cli.workflows.steps.fan_out import FanOutStep step = FanOutStep() errors = step.validate({ "id": "test", "items": "{{ x }}", "step": "not-a-dict", }) assert any("'step' must be a mapping" in e for e in errors) class TestFanInStep: """Test the fan-in step type.""" def test_execute_collects_results(self): from specify_cli.workflows.steps.fan_in import FanInStep from specify_cli.workflows.base import StepContext step = FanInStep() ctx = StepContext( steps={ "parallel": {"output": {"item_count": 2, "status": "done"}} } ) config = { "id": "collect", "wait_for": ["parallel"], "output": {}, } result = step.execute(config, ctx) assert len(result.output["results"]) == 1 assert result.output["results"][0]["item_count"] == 2 def test_execute_multiple_wait_for(self): from specify_cli.workflows.steps.fan_in import FanInStep from specify_cli.workflows.base import StepContext step = FanInStep() ctx = StepContext( steps={ "task-a": {"output": {"file": "a.md"}}, "task-b": {"output": {"file": "b.md"}}, } ) config = { "id": "collect", "wait_for": ["task-a", "task-b"], "output": {}, } result = step.execute(config, ctx) assert len(result.output["results"]) == 2 assert result.output["results"][0]["file"] == "a.md" assert result.output["results"][1]["file"] == "b.md" def test_execute_missing_wait_for_step(self): from specify_cli.workflows.steps.fan_in import FanInStep from specify_cli.workflows.base import StepContext step = FanInStep() ctx = StepContext(steps={}) config = { "id": "collect", "wait_for": ["nonexistent"], "output": {}, } result = step.execute(config, ctx) assert result.output["results"] == [{}] def test_validate_empty_wait_for(self): from specify_cli.workflows.steps.fan_in import FanInStep step = FanInStep() errors = step.validate({"id": "test", "wait_for": []}) assert any("non-empty list" in e for e in errors) def test_validate_wait_for_not_list(self): from specify_cli.workflows.steps.fan_in import FanInStep step = FanInStep() errors = step.validate({"id": "test", "wait_for": "not-a-list"}) assert any("non-empty list" in e for e in errors) # ===== Workflow Definition Tests ===== class TestWorkflowDefinition: """Test WorkflowDefinition loading and parsing.""" def test_from_yaml(self, sample_workflow_file): from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_yaml(sample_workflow_file) assert definition.id == "test-workflow" assert definition.name == "Test Workflow" assert definition.version == "1.0.0" assert len(definition.steps) == 2 def test_from_string(self, sample_workflow_yaml): from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_string(sample_workflow_yaml) assert definition.id == "test-workflow" assert len(definition.inputs) == 2 def test_from_string_invalid(self): from specify_cli.workflows.engine import WorkflowDefinition with pytest.raises(ValueError, match="must be a mapping"): WorkflowDefinition.from_string("- just a list") def test_inputs_parsed(self, sample_workflow_yaml): from specify_cli.workflows.engine import WorkflowDefinition definition = WorkflowDefinition.from_string(sample_workflow_yaml) assert "spec" in definition.inputs assert definition.inputs["spec"]["required"] is True assert definition.inputs["scope"]["default"] == "full" # ===== Workflow Validation Tests ===== class TestWorkflowValidation: """Test workflow validation.""" def test_valid_workflow(self, sample_workflow_yaml): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(sample_workflow_yaml) errors = validate_workflow(definition) assert errors == [] def test_missing_id(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: name: "Test" version: "1.0.0" steps: - id: step-one command: speckit.specify """) errors = validate_workflow(definition) assert any("workflow.id" in e for e in errors) def test_invalid_id_format(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "Invalid ID!" name: "Test" version: "1.0.0" steps: - id: step-one command: speckit.specify """) errors = validate_workflow(definition) assert any("lowercase alphanumeric" in e for e in errors) def test_no_steps(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: [] """) errors = validate_workflow(definition) assert any("no steps" in e.lower() for e in errors) def test_duplicate_step_ids(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: - id: same-id command: speckit.specify - id: same-id command: speckit.plan """) errors = validate_workflow(definition) assert any("Duplicate" in e for e in errors) def test_invalid_step_type(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: - id: bad type: nonexistent """) errors = validate_workflow(definition) assert any("invalid type" in e.lower() for e in errors) def test_nested_step_validation(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" steps: - id: branch type: if condition: "{{ true }}" then: - id: nested-a command: speckit.specify else: - id: nested-b command: speckit.plan """) errors = validate_workflow(definition) assert errors == [] def test_invalid_input_type(self): from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" workflow: id: "test" name: "Test" version: "1.0.0" inputs: bad: type: array steps: - id: step-one command: speckit.specify """) errors = validate_workflow(definition) assert any("invalid type" in e.lower() for e in errors) # ===== Workflow Engine Tests ===== class TestWorkflowEngine: """Test WorkflowEngine execution.""" def test_load_from_file(self, sample_workflow_file, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) definition = engine.load_workflow(str(sample_workflow_file)) assert definition.id == "test-workflow" def test_load_from_installed_id(self, sample_workflow_file, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) definition = engine.load_workflow("test-workflow") assert definition.id == "test-workflow" def test_load_not_found(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) with pytest.raises(FileNotFoundError): engine.load_workflow("nonexistent") def test_execute_simple_workflow(self, project_dir): from unittest.mock import patch from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "simple" name: "Simple" version: "1.0.0" integration: claude inputs: name: type: string default: "test" steps: - id: step-one command: speckit.specify input: args: "{{ inputs.name }}" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): state = engine.execute(definition, {"name": "login"}) assert state.status == RunStatus.FAILED assert "step-one" in state.step_results assert state.step_results["step-one"]["output"]["command"] == "speckit.specify" assert state.step_results["step-one"]["output"]["input"]["args"] == "login" def test_execute_with_gate_pauses(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "gated" name: "Gated" version: "1.0.0" steps: - id: step-one type: shell run: "echo test" - id: gate type: gate message: "Review?" options: [approve, reject] on_reject: abort - id: step-two type: shell run: "echo done" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.PAUSED assert "gate" in state.step_results assert state.step_results["gate"]["status"] == "paused" def test_execute_with_shell_step(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "shell-test" name: "Shell Test" version: "1.0.0" steps: - id: echo type: shell run: "echo workflow-output" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert "workflow-output" in state.step_results["echo"]["output"]["stdout"] def test_execute_with_if_then(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "branching" name: "Branching" version: "1.0.0" inputs: scope: type: string default: "full" steps: - id: check type: if condition: "{{ inputs.scope == 'full' }}" then: - id: full-tasks type: shell run: "echo full" else: - id: partial-tasks type: shell run: "echo partial" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition, {"scope": "full"}) assert state.status == RunStatus.COMPLETED assert "full-tasks" in state.step_results assert "partial-tasks" not in state.step_results def test_execute_missing_required_input(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition yaml_str = """ schema_version: "1.0" workflow: id: "needs-input" name: "Needs Input" version: "1.0.0" inputs: name: type: string required: true steps: - id: step-one command: speckit.specify input: args: "{{ inputs.name }}" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) with pytest.raises(ValueError, match="Required input"): engine.execute(definition, {}) def test_integration_auto_default_uses_project_integration(self, project_dir): """`integration: auto` should resolve to .specify/integration.json's integration.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps({"integration": "opencode", "version": "0.7.4"}), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-default" name: "Auto Default" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "opencode" def test_integration_auto_default_falls_back_when_no_integration_json(self, project_dir): """`integration: auto` should keep the literal "auto" when project state is missing. The engine itself must not invent an integration when ``.specify/integration.json`` is absent; any later validation or command resolution will handle an unresolved ``"auto"`` value. """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-fallback" name: "Auto Fallback" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_integration_explicit_input_overrides_auto(self, project_dir): """An explicit --input integration=X must win over `auto` even when integration.json exists.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps({"integration": "opencode"}), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "explicit-wins" name: "Explicit Wins" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {"integration": "claude"}) assert resolved["integration"] == "claude" def test_integration_explicit_auto_resolves_like_default(self, project_dir): """Passing ``integration=auto`` explicitly must resolve the sentinel, not pass it through as a literal — the workflow prompt advertises ``auto`` as a valid value, so the dispatch path must never see it. """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps({"integration": "opencode"}), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "explicit-auto" name: "Explicit Auto" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {"integration": "auto"}) assert resolved["integration"] == "opencode" def test_integration_auto_ignores_malformed_integration_json(self, project_dir): """A malformed integration.json must not crash — fall back to the literal default.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text("{not json", encoding="utf-8") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-malformed" name: "Auto Malformed" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_integration_auto_ignores_non_utf8_integration_json(self, project_dir): """A non-UTF8 integration.json must not crash — fall back to the literal default.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) # 0xFF is invalid as the leading byte of a UTF-8 sequence, so # ``Path.read_text(encoding="utf-8")`` raises UnicodeDecodeError. (specify_dir / "integration.json").write_bytes(b"\xff\xfe\x00\x00") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-non-utf8" name: "Auto Non UTF-8" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_integration_auto_resolves_modern_normalized_state(self, project_dir): """`integration: auto` must resolve modern state files that record ``default_integration`` / ``installed_integrations`` and omit the legacy ``integration`` field.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps( { "version": "0.8.3", "integration_state_schema": 1, "default_integration": "claude", "installed_integrations": ["claude", "copilot"], "integration_settings": {}, } ), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-modern" name: "Auto Modern" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "claude" def test_integration_auto_rejects_future_state_schema(self, project_dir): """`integration: auto` must not silently use a state file written by a newer CLI (``integration_state_schema`` greater than the current supported value); the resolver falls back to the literal default rather than guessing.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.integration_state import INTEGRATION_STATE_SCHEMA specify_dir = project_dir / ".specify" specify_dir.mkdir(parents=True, exist_ok=True) (specify_dir / "integration.json").write_text( json.dumps( { "version": "99.0.0", "integration_state_schema": INTEGRATION_STATE_SCHEMA + 1, "default_integration": "claude", "installed_integrations": ["claude"], "integration_settings": {}, } ), encoding="utf-8", ) definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-future-schema" name: "Auto Future Schema" version: "1.0.0" inputs: integration: type: string default: "auto" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["integration"] == "auto" def test_default_value_is_validated_against_enum(self, project_dir): """Defaults must run through the same coercion/enum check as provided inputs.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "default-enum" name: "Default Enum" version: "1.0.0" inputs: scope: type: string default: "not-in-enum" enum: ["full", "backend-only", "frontend-only"] """) engine = WorkflowEngine(project_dir) with pytest.raises(ValueError, match="not in allowed values"): engine._resolve_inputs(definition, {}) def test_default_value_is_coerced_to_declared_type(self, project_dir): """A numeric default declared as a string should still be coerced like a provided input.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "default-coerce" name: "Default Coerce" version: "1.0.0" inputs: retries: type: number default: "3" """) engine = WorkflowEngine(project_dir) resolved = engine._resolve_inputs(definition, {}) assert resolved["retries"] == 3 assert isinstance(resolved["retries"], int) def test_validate_workflow_rejects_invalid_default(self): """Authoring-time validation should reject defaults that violate enum.""" from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "bad-default" name: "Bad Default" version: "1.0.0" inputs: scope: type: string default: "not-in-enum" enum: ["full", "backend-only", "frontend-only"] steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_validate_workflow_exempts_integration_auto_sentinel(self): """``integration: auto`` is a runtime-resolved sentinel and must not fail validation.""" from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-ok" name: "Auto OK" version: "1.0.0" inputs: integration: type: string default: "auto" enum: ["copilot", "claude", "gemini"] steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert not any("invalid default" in e for e in errors), errors def test_validate_workflow_still_checks_type_for_auto_sentinel(self): """The ``auto`` exemption only skips enum-membership; declared type is still enforced.""" from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "auto-bad-type" name: "Auto Bad Type" version: "1.0.0" inputs: integration: type: number default: "auto" steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_validate_workflow_rejects_bool_default_for_number_type(self): """``type: number`` paired with a bool default must fail — bool is a subclass of int so ``float(True)`` would otherwise silently coerce ``true`` to ``1``. """ from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "bool-as-number" name: "Bool As Number" version: "1.0.0" inputs: count: type: number default: true steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_validate_workflow_rejects_non_string_default_for_string_type(self): """``type: string`` must require an actual string — a numeric YAML default like ``5`` would otherwise slip through unvalidated. """ from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "number-as-string" name: "Number As String" version: "1.0.0" inputs: label: type: string default: 5 steps: - id: noop type: gate message: "noop" options: [approve] """) errors = validate_workflow(definition) assert any("invalid default" in e for e in errors), errors def test_while_loop_condition_reads_latest_iteration(self, project_dir): """Regression: while-loop condition must see updated step output from the most recent iteration, not stale iteration-0 data. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus # Shell step echoes a counter via a file. # Condition: exit_code != 0 means "keep looping" — but a non-zero # exit code would mark the step FAILED and abort the run, so we # use stdout-based comparison instead. # # Iteration 0: counter=1, echoes "1" → not "done" → loop continues # Iteration 1: counter=2, echoes "done" → condition false → stop # Without the fix, condition always reads iteration-0 stdout, # so the loop runs all max_iterations. import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('done' if n >= 2 else str(n), end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "while-condition-update" name: "While Condition Update" version: "1.0.0" steps: - id: retry-loop type: while condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}" max_iterations: 5 steps: - id: attempt type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED # The unprefixed key should reflect the latest iteration's result. assert state.step_results["attempt"]["output"]["stdout"] == "done" # Namespaced iteration-1 result should also exist. assert "retry-loop:attempt:1" in state.step_results # Counter should be 2 (iteration 0 + iteration 1), not 5. assert counter_file.read_text(encoding="utf-8").strip() == "2" def test_do_while_loop_condition_reads_latest_iteration(self, project_dir): """Regression: do-while loop condition must also see updated output. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('done' if n >= 2 else str(n), end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "do-while-condition-update" name: "Do While Condition Update" version: "1.0.0" steps: - id: retry-loop type: do-while condition: "{{{{ 'done' not in steps.attempt.output.stdout }}}}" max_iterations: 5 steps: - id: attempt type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert state.step_results["attempt"]["output"]["stdout"] == "done" assert counter_file.read_text(encoding="utf-8").strip() == "2" def test_while_loop_runs_to_max_when_condition_stays_true(self, project_dir): """While loop must still run to max_iterations when the condition never becomes false — copy-back must not break this path. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('pending', end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "while-max-iterations" name: "While Max Iterations" version: "1.0.0" steps: - id: retry-loop type: while condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}" max_iterations: 3 steps: - id: tick type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED # All 3 iterations ran (iteration 0 + 2 loop iterations). assert counter_file.read_text(encoding="utf-8").strip() == "3" # Unprefixed key holds the last iteration's result. assert state.step_results["tick"]["output"]["stdout"] == "pending" # Namespaced keys for loop iterations exist. assert "retry-loop:tick:1" in state.step_results assert "retry-loop:tick:2" in state.step_results def test_do_while_loop_runs_to_max_when_condition_stays_true(self, project_dir): """Do-while loop must still run to max_iterations when the condition never becomes false. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable script_file = project_dir / "_tick.py" script_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print('pending', end='')\n", encoding="utf-8", ) yaml_str = f""" schema_version: "1.0" workflow: id: "do-while-max-iterations" name: "Do While Max Iterations" version: "1.0.0" steps: - id: retry-loop type: do-while condition: "{{{{ 'done' not in steps.tick.output.stdout }}}}" max_iterations: 3 steps: - id: tick type: shell run: '"{py}" "{script_file}"' """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert counter_file.read_text(encoding="utf-8").strip() == "3" assert state.step_results["tick"]["output"]["stdout"] == "pending" def test_while_loop_multi_step_body_inter_step_refs(self, project_dir): """Multi-step loop body: step B must see step A's output from the current iteration, not a stale previous one. See https://github.com/github/spec-kit/issues/2592 """ from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus import sys counter_file = project_dir / ".counter" counter_file.write_text("0", encoding="utf-8") py = sys.executable # Step A: increments counter file, echoes the value. step_a_file = project_dir / "_step_a.py" step_a_file.write_text( f"import pathlib; p = pathlib.Path(r'{counter_file}')\n" "n = int(p.read_text()) + 1; p.write_text(str(n))\n" "print(str(n), end='')\n", encoding="utf-8", ) # Step B uses {{ steps.step-a.output.stdout }} expression # substitution in its run command so the engine resolves the # aliased unprefixed key — this is the real inter-step test. yaml_str = f""" schema_version: "1.0" workflow: id: "while-multi-step" name: "While Multi Step" version: "1.0.0" steps: - id: retry-loop type: while condition: "{{{{ 'done' not in steps.step-a.output.stdout }}}}" max_iterations: 3 steps: - id: step-a type: shell run: '"{py}" "{step_a_file}"' - id: step-b type: shell run: "echo b-saw-{{{{ steps.step-a.output.stdout }}}}" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED # Both unprefixed keys reflect the latest iteration's results. assert state.step_results["step-a"]["output"]["stdout"] == "3" # Step B saw step A's output via expression substitution. assert "b-saw-3" in state.step_results["step-b"]["output"]["stdout"] # Namespaced keys exist for loop iterations. assert "retry-loop:step-a:1" in state.step_results assert "retry-loop:step-b:1" in state.step_results assert "retry-loop:step-a:2" in state.step_results assert "retry-loop:step-b:2" in state.step_results # ===== context.run_id Tests ===== # # End-to-end coverage for the `{{ context.run_id }}` template # variable introduced in issue #2590. Locks resolution inside the # three step types the acceptance criteria called out — shell `run:`, # command `input.args:`, and switch `expression:` — plus the # "workflow doesn't reference it" backward-compat path. class TestContextRunId: """End-to-end tests for `{{ context.run_id }}` in workflow YAML.""" def test_shell_run_resolves_run_id(self, project_dir): """`run: "echo {{ context.run_id }}"` substitutes the engine-assigned run id into the spawned shell, and the same value appears on `state.run_id`. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "stamp-run-id" name: "Stamp Run Id" version: "1.0.0" steps: - id: stamp type: shell run: "echo RUN_ID={{ context.run_id }}" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition, run_id="abc12345") assert state.run_id == "abc12345" stdout = state.step_results["stamp"]["output"]["stdout"] assert stdout.strip() == "RUN_ID=abc12345" def test_command_input_args_resolves_run_id(self, project_dir): """`input.args: "{{ context.run_id }}"` is resolved by `CommandStep` and recorded in step output, even when CLI dispatch is unavailable (no integration installed). Covers the artifact-metadata use case from the issue. """ from unittest.mock import patch from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "command-stamp" name: "Command Stamp" version: "1.0.0" integration: claude steps: - id: tag-artifact command: speckit.specify input: args: "{{ context.run_id }}" """) engine = WorkflowEngine(project_dir) with patch( "specify_cli.workflows.steps.command.shutil.which", return_value=None, ): state = engine.execute(definition, run_id="cafef00d") # Even when dispatch fails (no CLI), the resolved input is # recorded so downstream observers see the run id in artifact # metadata. assert state.step_results["tag-artifact"]["output"]["input"]["args"] == "cafef00d" def test_switch_expression_matches_on_run_id(self, project_dir): """`switch` over `{{ context.run_id }}` matches against case keys, and the nested branch can ALSO reference `{{ context.run_id }}`. Demonstrates the run id is a first-class value in the expression engine (not just a string-interpolation token) AND that it propagates into nested step execution via the recursive `_execute_steps` traversal. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "switch-on-run-id" name: "Switch On Run Id" version: "1.0.0" steps: - id: route type: switch expression: "{{ context.run_id }}" cases: target-run: - id: matched-branch type: shell run: "echo nested-run-id={{ context.run_id }}" default: - id: default-branch type: shell run: "echo defaulted" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition, run_id="target-run") assert state.status == RunStatus.COMPLETED assert state.step_results["route"]["output"]["matched_case"] == "target-run" assert "matched-branch" in state.step_results assert "default-branch" not in state.step_results # The nested branch sees the same run id — propagation through # recursive `_execute_steps` is intact. nested_stdout = state.step_results["matched-branch"]["output"]["stdout"] assert nested_stdout.strip() == "nested-run-id=target-run" def test_workflow_without_context_reference_unchanged(self, project_dir): """Workflows that do not reference `{{ context.run_id }}` continue to run exactly as before. Locks the byte-equivalent default required by the issue's acceptance criteria. """ from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine from specify_cli.workflows.base import RunStatus definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "no-context-ref" name: "No Context Ref" version: "1.0.0" steps: - id: only-step type: shell run: "echo hello" """) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert state.step_results["only-step"]["output"]["stdout"].strip() == "hello" def test_run_id_uses_speckit_workflow_run_id_env_override(self, project_dir, monkeypatch): """When no run_id argument is provided, SPECKIT_WORKFLOW_RUN_ID overrides the auto-generated run ID.""" from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine monkeypatch.setenv("SPECKIT_WORKFLOW_RUN_ID", "env-run-123") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "env-run-id" name: "Env Run Id" version: "1.0.0" steps: - id: stamp type: shell run: "echo {{ context.run_id }}" """) state = WorkflowEngine(project_dir).execute(definition) assert state.run_id == "env-run-123" assert state.step_results["stamp"]["output"]["stdout"].strip() == "env-run-123" def test_run_id_arg_takes_precedence_over_env_override(self, project_dir, monkeypatch): """Explicit run_id keeps existing precedence over SPECKIT_WORKFLOW_RUN_ID.""" from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine monkeypatch.setenv("SPECKIT_WORKFLOW_RUN_ID", "env-run-123") definition = WorkflowDefinition.from_string(""" schema_version: "1.0" workflow: id: "explicit-run-id" name: "Explicit Run Id" version: "1.0.0" steps: - id: stamp type: shell run: "echo {{ context.run_id }}" """) state = WorkflowEngine(project_dir).execute(definition, run_id="explicit-456") assert state.run_id == "explicit-456" assert state.step_results["stamp"]["output"]["stdout"].strip() == "explicit-456" # ===== State Persistence Tests ===== class TestRunState: """Test RunState persistence and loading.""" def test_save_and_load(self, project_dir): from specify_cli.workflows.engine import RunState from specify_cli.workflows.base import RunStatus state = RunState( run_id="test-run", workflow_id="test-workflow", project_root=project_dir, ) state.status = RunStatus.RUNNING state.inputs = {"name": "login"} state.step_results = { "step-one": { "output": {"file": "spec.md"}, "status": "completed", } } state.save() loaded = RunState.load("test-run", project_dir) assert loaded.run_id == "test-run" assert loaded.workflow_id == "test-workflow" assert loaded.status == RunStatus.RUNNING assert loaded.inputs == {"name": "login"} assert "step-one" in loaded.step_results def test_load_not_found(self, project_dir): from specify_cli.workflows.engine import RunState with pytest.raises(FileNotFoundError): RunState.load("nonexistent", project_dir) def test_append_log(self, project_dir): from specify_cli.workflows.engine import RunState state = RunState( run_id="log-test", workflow_id="test", project_root=project_dir, ) state.append_log({"event": "test_event", "data": "hello"}) log_file = state.runs_dir / "log.jsonl" assert log_file.exists() lines = log_file.read_text().strip().split("\n") entry = json.loads(lines[0]) assert entry["event"] == "test_event" assert "timestamp" in entry class TestListRuns: """Test listing workflow runs.""" def test_list_empty(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine engine = WorkflowEngine(project_dir) assert engine.list_runs() == [] def test_list_after_execution(self, project_dir): from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition yaml_str = """ schema_version: "1.0" workflow: id: "list-test" name: "List Test" version: "1.0.0" steps: - id: step-one type: shell run: "echo test" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) engine.execute(definition) runs = engine.list_runs() assert len(runs) == 1 assert runs[0]["workflow_id"] == "list-test" # ===== Workflow Registry Tests ===== class TestWorkflowRegistry: """Test WorkflowRegistry operations.""" def test_add_and_get(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) entry = registry.get("test-wf") assert entry is not None assert entry["name"] == "Test" assert "installed_at" in entry def test_remove(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) registry.add("test-wf", {"name": "Test"}) assert registry.is_installed("test-wf") registry.remove("test-wf") assert not registry.is_installed("test-wf") def test_list(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) registry.add("wf-a", {"name": "A"}) registry.add("wf-b", {"name": "B"}) installed = registry.list() assert "wf-a" in installed assert "wf-b" in installed def test_is_installed(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry = WorkflowRegistry(project_dir) assert not registry.is_installed("missing") registry.add("exists", {"name": "Exists"}) assert registry.is_installed("exists") def test_persistence(self, project_dir): from specify_cli.workflows.catalog import WorkflowRegistry registry1 = WorkflowRegistry(project_dir) registry1.add("test-wf", {"name": "Test"}) # Load fresh registry2 = WorkflowRegistry(project_dir) assert registry2.is_installed("test-wf") # ===== Workflow Catalog Tests ===== class TestWorkflowCatalog: """Test WorkflowCatalog catalog resolution.""" def test_default_catalogs(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() assert len(entries) == 2 assert entries[0].name == "default" assert entries[1].name == "community" def test_env_var_override(self, project_dir, monkeypatch): from specify_cli.workflows.catalog import WorkflowCatalog monkeypatch.setenv("SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json") catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() assert len(entries) == 1 assert entries[0].name == "env-override" assert entries[0].url == "https://example.com/catalog.json" def test_project_level_config(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog config_path = project_dir / ".specify" / "workflow-catalogs.yml" config_path.write_text(yaml.dump({ "catalogs": [{ "name": "custom", "url": "https://example.com/wf-catalog.json", "priority": 1, "install_allowed": True, }] })) catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() assert len(entries) == 1 assert entries[0].name == "custom" def test_validate_url_http_rejected(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError catalog = WorkflowCatalog(project_dir) with pytest.raises(WorkflowValidationError, match="HTTPS"): catalog._validate_catalog_url("http://evil.com/catalog.json") def test_validate_url_localhost_http_allowed(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) # Should not raise catalog._validate_catalog_url("http://localhost:8080/catalog.json") def test_add_catalog(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/new-catalog.json", "my-catalog") config_path = project_dir / ".specify" / "workflow-catalogs.yml" assert config_path.exists() data = yaml.safe_load(config_path.read_text()) assert len(data["catalogs"]) == 1 assert data["catalogs"][0]["url"] == "https://example.com/new-catalog.json" def test_add_catalog_duplicate_rejected(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/catalog.json") with pytest.raises(WorkflowValidationError, match="already configured"): catalog.add_catalog("https://example.com/catalog.json") def test_remove_catalog(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/c1.json", "first") catalog.add_catalog("https://example.com/c2.json", "second") removed = catalog.remove_catalog(0) assert removed == "first" config_path = project_dir / ".specify" / "workflow-catalogs.yml" data = yaml.safe_load(config_path.read_text()) assert len(data["catalogs"]) == 1 def test_remove_catalog_invalid_index(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/c1.json") with pytest.raises(WorkflowValidationError, match="out of range"): catalog.remove_catalog(5) def test_get_catalog_configs(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog catalog = WorkflowCatalog(project_dir) configs = catalog.get_catalog_configs() assert len(configs) == 2 assert configs[0]["name"] == "default" assert isinstance(configs[0]["install_allowed"], bool) # ===== Integration Test ===== class TestWorkflowIntegration: """End-to-end workflow execution tests.""" def test_full_sequential_workflow(self, project_dir): """Execute a multi-step sequential workflow end to end.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "e2e-test" name: "E2E Test" version: "1.0.0" integration: claude inputs: feature: type: string default: "login" steps: - id: specify type: shell run: "echo speckit.specify {{ inputs.feature }}" - id: check-scope type: if condition: "{{ inputs.feature == 'login' }}" then: - id: echo-full type: shell run: "echo full scope" else: - id: echo-partial type: shell run: "echo partial scope" - id: plan type: shell run: "echo speckit.plan" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert "specify" in state.step_results assert "check-scope" in state.step_results assert "echo-full" in state.step_results assert "echo-partial" not in state.step_results assert "plan" in state.step_results def test_switch_workflow(self, project_dir): """Test switch step type in a workflow.""" from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition from specify_cli.workflows.base import RunStatus yaml_str = """ schema_version: "1.0" workflow: id: "switch-test" name: "Switch Test" version: "1.0.0" inputs: action: type: string default: "plan" steps: - id: route type: switch expression: "{{ inputs.action }}" cases: specify: - id: do-specify type: shell run: "echo specify" plan: - id: do-plan type: shell run: "echo plan" default: - id: do-default type: shell run: "echo default" """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) state = engine.execute(definition) assert state.status == RunStatus.COMPLETED assert "do-plan" in state.step_results assert "do-specify" not in state.step_results