Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eng 3068 create node result qet routes for metrics and checks #1403

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion integration_tests/backend/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
import requests
from aqueduct.constants.enums import ArtifactType
import utils
from aqueduct.constants.enums import RuntimeType
from aqueduct.models.response_models import (
Expand All @@ -16,6 +17,7 @@
GetNodeResultContentResponse,
GetOperatorResultResponse,
GetOperatorWithArtifactNodeResponse,
GetOperatorWithArtifactNodeResultResponse,
)
from aqueduct_executor.operators.utils.enums import JobType
from exec_state import assert_exec_state
Expand Down Expand Up @@ -52,11 +54,13 @@ class TestBackend:
GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE = (
"/api/v2/workflow/%s/dag/%s/node/metric/%s/result/%s/content"
)
GET_NODE_METRIC_RESULTS_TEMPLATE = "/api/v2/workflow/%s/dag/%s/node/metric/%s/results"

GET_NODE_CHECK_TEMPLATE = "/api/v2/workflow/%s/dag/%s/node/check/%s"
GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE = (
"/api/v2/workflow/%s/dag/%s/node/check/%s/result/%s/content"
)
GET_NODE_CHECK_RESULTS_TEMPLATE = "/api/v2/workflow/%s/dag/%s/node/check/%s/results"

# V1
LIST_WORKFLOW_SAVED_OBJECTS_TEMPLATE = "/api/workflow/%s/objects"
Expand Down Expand Up @@ -142,7 +146,6 @@ def test_endpoint_list_workflow_tables(self):
]
)

print(data)
assert (
set(
[
Expand Down Expand Up @@ -680,6 +683,28 @@ def test_endpoint_node_metric_result_content_get(self):
# One of these should be successful (direct descendent of operator)
assert not resp_obj.is_downsampled
assert len(resp_obj.content) > 0

def test_endpoint_node_metric_results_get(self):
flow_id = self.flows["flow_with_metrics_and_checks"][0]
flow = self.client.flow(flow_id)
workflow_resp = flow._get_workflow_resp()
dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
dag_result_id = workflow_resp.workflow_dag_results[0].id

dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
flow_id,
dag_result_id,
)
metric_artifact_id = None
for artifact_id, artifact in dag_result_resp.artifacts.items():
eunice-chan marked this conversation as resolved.
Show resolved Hide resolved
if artifact.type == ArtifactType.NUMERIC:
metric_artifact_id = artifact_id
break
resp = self.get_response(
self.GET_NODE_METRIC_RESULTS_TEMPLATE % (flow_id, dag_id, metric_artifact_id)
).json()
for result in resp:
result = GetOperatorWithArtifactNodeResultResponse(**result)

def test_endpoint_node_check_get(self):
flow_id, _ = self.flows["flow_with_metrics_and_checks"]
Expand Down Expand Up @@ -749,3 +774,25 @@ def test_endpoint_node_check_result_content_get(self):
# One of these should be successful (direct descendent of operator)
assert not resp_obj.is_downsampled
assert len(resp_obj.content) > 0

def test_endpoint_node_check_results_get(self):
flow_id = self.flows["flow_with_metrics_and_checks"][0]
flow = self.client.flow(flow_id)
workflow_resp = flow._get_workflow_resp()
dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
dag_result_id = workflow_resp.workflow_dag_results[0].id

dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
flow_id,
dag_result_id,
)
check_artifact_id = None
for artifact_id, artifact in dag_result_resp.artifacts.items():
if artifact.type == ArtifactType.BOOL:
check_artifact_id = artifact_id
break
resp = self.get_response(
self.GET_NODE_CHECK_RESULTS_TEMPLATE % (flow_id, dag_id, check_artifact_id)
).json()
for result in resp:
result = GetOperatorWithArtifactNodeResultResponse(**result)
9 changes: 3 additions & 6 deletions sdk/aqueduct/backend/api_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import datetime
from dateutil import parser
import io
import json
import uuid
Expand Down Expand Up @@ -610,11 +610,8 @@ def get_workflow(self, flow_id: str) -> GetWorkflowV1Response:
WorkflowDagResultResponse(
id=dag_result.id,
created_at=int(
datetime.datetime.strptime(
resp_dags[str(dag_result.dag_id)].created_at[:-4],
"%Y-%m-%dT%H:%M:%S.%f"
if resp_dags[str(dag_result.dag_id)].created_at[-1] == "Z"
else "%Y-%m-%dT%H:%M:%S.%f%z",
parser.parse(
resp_dags[str(dag_result.dag_id)].created_at
).timestamp()
),
status=dag_result.exec_state.status,
Expand Down
51 changes: 51 additions & 0 deletions sdk/aqueduct/models/response_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,57 @@ class GetOperatorWithArtifactNodeResponse(BaseModel):
outputs: List[uuid.UUID]


class GetOperatorWithArtifactNodeResultResponse(BaseModel):
"""Represents a single merged node (metric or check) result in a workflow run.

Attributes:
id:
The id of the operator result node.
artifact_result_id:
The id of the artifact result node.
operator_id:
The id of the operator node.
artifact_id:
The id of the artifact node.
operator_result_exec_state:
The execution state of the run operator result.
artifact_result_exec_state:
The execution state of the run artifact result.
serialization_type:
What is being serialized.
content_path:
Path to get content.
content_serialized:
If the content is too big, none. Otherwise, the content.
}
dag_id:
This id can be used to find the corresponding workflow dag version.
name:
The name of the operator.
description:
The description of the operator.
type:
The artifact type.
spec:
The operator spec.
inputs:
The id(s) of the input artifact(s) of the operator.
outputs:
The id(s) of the operator(s) that take this artifact as input.

"""

id: uuid.UUID
artifact_result_id: uuid.UUID
operator_id: uuid.UUID
artifact_id: uuid.UUID
operator_result_exec_state: ExecutionState
artifact_result_exec_state: ExecutionState
serialization_type: SerializationType
content_path: str
content_serialized: Optional[str]


# V1 Responses
class PreviewResponse(BaseModel):
"""This is the response object returned by api_client.preview().
Expand Down
147 changes: 147 additions & 0 deletions src/golang/cmd/server/handler/v2/node_check_results_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package v2

import (
"context"
"fmt"
"net/http"

"github.com/aqueducthq/aqueduct/cmd/server/handler"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/models/views"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/aqueducthq/aqueduct/lib/response"
"github.com/aqueducthq/aqueduct/lib/storage"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
)

// This file should map directly to
// src/ui/common/src/handlers/v2/NodeCheckResultsGet.tsx
//
// Returns all downstream artifact results
// Route: /api/v2/workflow/{workflowID}/dag/{dagID}/node/check/{nodeID}/results
// Method: GET
// Params:
// `workflowID`: ID for `workflow` object
// `dagID`: ID for `workflow_dag` object
// `nodeID`: ID for operator object
// Request:
// Headers:
// `api-key`: user's API Key
// Response:
// Body:
// `[]response.OperatorWithArtifactResultNode`

type NodeCheckResultsGetHandler struct {
nodeGetHandler
handler.GetHandler

Database database.Database

WorkflowRepo repos.Workflow
DAGRepo repos.DAG
OperatorRepo repos.Operator
OperatorResultRepo repos.OperatorResult
ArtifactRepo repos.Artifact
ArtifactResultRepo repos.ArtifactResult
}

func (*NodeCheckResultsGetHandler) Name() string {
return "NodeCheckResultsGet"
}

func (h *NodeCheckResultsGetHandler) Prepare(r *http.Request) (interface{}, int, error) {
return h.nodeGetHandler.Prepare(r)
}

func (h *NodeCheckResultsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
args := interfaceArgs.(*nodeGetArgs)

artfID := args.nodeID
wfID := args.workflowID

emptyResponse := []response.OperatorWithArtifactResultNode{}

dbOperatorWithArtifactNodes, err := h.OperatorRepo.GetOperatorWithArtifactByArtifactIdNodeBatch(ctx, []uuid.UUID{artfID}, h.Database)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading check node.")
}
dbOperatorWithArtifactNode := views.OperatorWithArtifactNode{}
if len(dbOperatorWithArtifactNodes) == 0 {
return emptyResponse, http.StatusOK, nil
} else {
dbOperatorWithArtifactNode = dbOperatorWithArtifactNodes[0]
}

results, err := h.OperatorResultRepo.GetOperatorWithArtifactResultNodesByOperatorNameAndWorkflow(ctx, dbOperatorWithArtifactNode.Name, wfID, h.Database)
if err != nil {
return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve check results.")
}

if len(results) == 0 {
return emptyResponse, http.StatusOK, nil
}

resultArtifactIds := make([]uuid.UUID, 0, len(results))
for _, result := range results {
resultArtifactIds = append(resultArtifactIds, result.ArtifactResultID)
eunice-chan marked this conversation as resolved.
Show resolved Hide resolved
}

artfResultToDAG, err := h.DAGRepo.GetByArtifactResultBatch(ctx, resultArtifactIds, h.Database)
if err != nil {
return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve workflow dags.")
}

// maps from db dag Ids
dbDagByDagId := make(map[uuid.UUID]models.DAG, len(artfResultToDAG))
nodeResultByDagId := make(map[uuid.UUID][]views.OperatorWithArtifactResultNode, len(artfResultToDAG))
for _, nodeResult := range results {
if dbDag, ok := artfResultToDAG[nodeResult.ArtifactResultID]; ok {
if _, okDagsMap := dbDagByDagId[dbDag.ID]; !okDagsMap {
dbDagByDagId[dbDag.ID] = dbDag
}

nodeResultByDagId[dbDag.ID] = append(nodeResultByDagId[dbDag.ID], nodeResult)
} else {
return emptyResponse, http.StatusInternalServerError, errors.Newf("Error retrieving dag associated with artifact result %s", nodeResult.ArtifactResultID)
}
}

responses := make([]response.OperatorWithArtifactResultNode, 0, len(results))
for dbDagId, nodeResults := range nodeResultByDagId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to consolidate these repeating parts. We can file a task if the solution is not immediately clear

if dag, ok := dbDagByDagId[dbDagId]; ok {
storageObj := storage.NewStorage(&dag.StorageConfig)
if err != nil {
return emptyResponse, http.StatusInternalServerError, errors.New("Error retrieving artifact contents.")
}

for _, nodeResult := range nodeResults {
var contentPtr *string = nil
if !nodeResult.ArtifactResultExecState.IsNull &&
(nodeResult.ArtifactResultExecState.ExecutionState.Status == shared.FailedExecutionStatus ||
nodeResult.ArtifactResultExecState.ExecutionState.Status == shared.SucceededExecutionStatus) {
exists := storageObj.Exists(ctx, nodeResult.ContentPath)
if exists {
contentBytes, err := storageObj.Get(ctx, nodeResult.ContentPath)
if err != nil {
return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, fmt.Sprintf("Error retrieving artifact content for result %s", nodeResult.ArtifactID))
}

contentStr := string(contentBytes)
contentPtr = &contentStr
}
}

responses = append(responses, *response.NewOperatorWithArtifactResultNodeFromDBObject(
&nodeResult, contentPtr,
))
}
} else {
return emptyResponse, http.StatusInternalServerError, errors.Newf("Error retrieving dag %s", dbDagId)
}
}

return responses, http.StatusOK, nil
}
Loading