Skip to content

Commit

Permalink
[2.x] preserve flow.name with RunnerDeployment.from_storage (#15394)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Sep 17, 2024
1 parent e0267b4 commit e37e641
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 3 deletions.
8 changes: 6 additions & 2 deletions src/prefect/deployments/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ def from_entrypoint(
cls,
entrypoint: str,
name: str,
flow_name: Optional[str] = None,
interval: Optional[
Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
] = None,
Expand All @@ -606,6 +607,7 @@ def from_entrypoint(
entrypoint: The path to a file containing a flow and the name of the flow function in
the format `./path/to/file.py:flow_func_name`.
name: A name for the deployment
flow_name: The name of the flow to deploy
interval: An interval on which to execute the current flow. Accepts either a number
or a timedelta object. If a number is given, it will be interpreted as seconds.
cron: A cron schedule of when to execute runs of this flow.
Expand Down Expand Up @@ -649,7 +651,7 @@ def from_entrypoint(

deployment = cls(
name=Path(name).stem,
flow_name=flow.name,
flow_name=flow_name or flow.name,
schedule=schedule,
schedules=constructed_schedules,
paused=paused,
Expand Down Expand Up @@ -678,6 +680,7 @@ async def from_storage(
storage: RunnerStorage,
entrypoint: str,
name: str,
flow_name: Optional[str] = None,
interval: Optional[
Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
] = None,
Expand Down Expand Up @@ -705,6 +708,7 @@ async def from_storage(
entrypoint: The path to a file containing a flow and the name of the flow function in
the format `./path/to/file.py:flow_func_name`.
name: A name for the deployment
flow_name: The name of the flow to deploy
storage: A storage object to use for retrieving flow code. If not provided, a
URL must be provided.
interval: An interval on which to execute the current flow. Accepts either a number
Expand Down Expand Up @@ -755,7 +759,7 @@ async def from_storage(

deployment = cls(
name=Path(name).stem,
flow_name=flow.name,
flow_name=flow_name or flow.name,
schedule=schedule,
schedules=constructed_schedules,
paused=paused,
Expand Down
3 changes: 2 additions & 1 deletion src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ def my_other_flow(name):
storage=self._storage,
entrypoint=self._entrypoint,
name=name,
flow_name=self.name,
interval=interval,
cron=cron,
rrule=rrule,
Expand All @@ -698,7 +699,7 @@ def my_other_flow(name):
)
else:
return RunnerDeployment.from_flow(
self,
flow=self,
name=name,
interval=interval,
cron=cron,
Expand Down
52 changes: 52 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from prefect._internal.pydantic import HAS_PYDANTIC_V2
from prefect.blocks.core import Block
from prefect.runner.storage import RunnerStorage

if HAS_PYDANTIC_V2:
import pydantic.v1 as pydantic
Expand Down Expand Up @@ -3531,6 +3532,57 @@ def test_to_deployment_raises_on_multiple_schedule_parameters(self, kwargs):
with pytest.raises(ValueError, match=expected_message):
test_flow.to_deployment(__file__, **kwargs)

async def test_to_deployment_respects_with_options_name_from_flow(self):
"""regression test for https://github.com/PrefectHQ/prefect/issues/15380"""

@flow(name="original-name")
def test_flow():
pass

flow_with_new_name = test_flow.with_options(name="new-name")
deployment = await flow_with_new_name.to_deployment(name="test-deployment")

assert isinstance(deployment, RunnerDeployment)
assert deployment.name == "test-deployment"
assert deployment.flow_name == "new-name"

async def test_to_deployment_respects_with_options_name_from_storage(
self, monkeypatch
):
"""regression test for https://github.com/PrefectHQ/prefect/issues/15380"""

@flow(name="original-name")
def test_flow():
pass

flow_with_new_name = test_flow.with_options(name="new-name")

mock_storage = MagicMock(spec=RunnerStorage)
mock_entrypoint = "fake_module:test_flow"
mock_from_storage = AsyncMock(
return_value=RunnerDeployment(
name="test-deployment", flow_name="new-name", entrypoint=mock_entrypoint
)
)
monkeypatch.setattr(RunnerDeployment, "from_storage", mock_from_storage)

flow_with_new_name._storage = mock_storage
flow_with_new_name._entrypoint = mock_entrypoint

deployment = await flow_with_new_name.to_deployment(name="test-deployment")

assert isinstance(deployment, RunnerDeployment)
assert deployment.name == "test-deployment"
assert deployment.flow_name == "new-name"
assert deployment.entrypoint == mock_entrypoint

mock_from_storage.assert_awaited_once()
call_args = mock_from_storage.call_args[1]
assert call_args["storage"] == mock_storage
assert call_args["entrypoint"] == mock_entrypoint
assert call_args["name"] == "test-deployment"
assert call_args["flow_name"] == "new-name"


class TestFlowServe:
@pytest.fixture(autouse=True)
Expand Down

0 comments on commit e37e641

Please sign in to comment.