diff --git a/src/prefect/deployments/runner.py b/src/prefect/deployments/runner.py index 3147492aa5e3..56a5bea521d9 100644 --- a/src/prefect/deployments/runner.py +++ b/src/prefect/deployments/runner.py @@ -580,6 +580,7 @@ def from_entrypoint( cls, entrypoint: str, name: str, + flow_name: Optional[str] = None, interval: Optional[ Union[Iterable[Union[int, float, timedelta]], int, float, timedelta] ] = None, @@ -606,6 +607,7 @@ def from_entrypoint( entrypoint: The path to a file containing a flow and the name of the flow function in the format `./path/to/file.py:flow_func_name`. name: A name for the deployment + flow_name: The name of the flow to deploy interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds. cron: A cron schedule of when to execute runs of this flow. @@ -649,7 +651,7 @@ def from_entrypoint( deployment = cls( name=Path(name).stem, - flow_name=flow.name, + flow_name=flow_name or flow.name, schedule=schedule, schedules=constructed_schedules, paused=paused, @@ -678,6 +680,7 @@ async def from_storage( storage: RunnerStorage, entrypoint: str, name: str, + flow_name: Optional[str] = None, interval: Optional[ Union[Iterable[Union[int, float, timedelta]], int, float, timedelta] ] = None, @@ -705,6 +708,7 @@ async def from_storage( entrypoint: The path to a file containing a flow and the name of the flow function in the format `./path/to/file.py:flow_func_name`. name: A name for the deployment + flow_name: The name of the flow to deploy storage: A storage object to use for retrieving flow code. If not provided, a URL must be provided. interval: An interval on which to execute the current flow. Accepts either a number @@ -755,7 +759,7 @@ async def from_storage( deployment = cls( name=Path(name).stem, - flow_name=flow.name, + flow_name=flow_name or flow.name, schedule=schedule, schedules=constructed_schedules, paused=paused, diff --git a/src/prefect/flows.py b/src/prefect/flows.py index ec19bd5a56e6..805f610fb9e7 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -679,6 +679,7 @@ def my_other_flow(name): storage=self._storage, entrypoint=self._entrypoint, name=name, + flow_name=self.name, interval=interval, cron=cron, rrule=rrule, @@ -698,7 +699,7 @@ def my_other_flow(name): ) else: return RunnerDeployment.from_flow( - self, + flow=self, name=name, interval=interval, cron=cron, diff --git a/tests/test_flows.py b/tests/test_flows.py index 2fd78739e150..09b2f2b2f6f3 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -19,6 +19,7 @@ from prefect._internal.pydantic import HAS_PYDANTIC_V2 from prefect.blocks.core import Block +from prefect.runner.storage import RunnerStorage if HAS_PYDANTIC_V2: import pydantic.v1 as pydantic @@ -3531,6 +3532,57 @@ def test_to_deployment_raises_on_multiple_schedule_parameters(self, kwargs): with pytest.raises(ValueError, match=expected_message): test_flow.to_deployment(__file__, **kwargs) + async def test_to_deployment_respects_with_options_name_from_flow(self): + """regression test for https://github.com/PrefectHQ/prefect/issues/15380""" + + @flow(name="original-name") + def test_flow(): + pass + + flow_with_new_name = test_flow.with_options(name="new-name") + deployment = await flow_with_new_name.to_deployment(name="test-deployment") + + assert isinstance(deployment, RunnerDeployment) + assert deployment.name == "test-deployment" + assert deployment.flow_name == "new-name" + + async def test_to_deployment_respects_with_options_name_from_storage( + self, monkeypatch + ): + """regression test for https://github.com/PrefectHQ/prefect/issues/15380""" + + @flow(name="original-name") + def test_flow(): + pass + + flow_with_new_name = test_flow.with_options(name="new-name") + + mock_storage = MagicMock(spec=RunnerStorage) + mock_entrypoint = "fake_module:test_flow" + mock_from_storage = AsyncMock( + return_value=RunnerDeployment( + name="test-deployment", flow_name="new-name", entrypoint=mock_entrypoint + ) + ) + monkeypatch.setattr(RunnerDeployment, "from_storage", mock_from_storage) + + flow_with_new_name._storage = mock_storage + flow_with_new_name._entrypoint = mock_entrypoint + + deployment = await flow_with_new_name.to_deployment(name="test-deployment") + + assert isinstance(deployment, RunnerDeployment) + assert deployment.name == "test-deployment" + assert deployment.flow_name == "new-name" + assert deployment.entrypoint == mock_entrypoint + + mock_from_storage.assert_awaited_once() + call_args = mock_from_storage.call_args[1] + assert call_args["storage"] == mock_storage + assert call_args["entrypoint"] == mock_entrypoint + assert call_args["name"] == "test-deployment" + assert call_args["flow_name"] == "new-name" + class TestFlowServe: @pytest.fixture(autouse=True)