diff --git a/integration_tests/backend/test_reads.py b/integration_tests/backend/test_reads.py index 18c3441dd..00132c1db 100644 --- a/integration_tests/backend/test_reads.py +++ b/integration_tests/backend/test_reads.py @@ -5,6 +5,7 @@ import pytest import requests +from aqueduct.constants.enums import ArtifactType import utils from aqueduct.constants.enums import RuntimeType from aqueduct.models.response_models import ( @@ -16,6 +17,7 @@ GetNodeResultContentResponse, GetOperatorResultResponse, GetOperatorWithArtifactNodeResponse, + GetOperatorWithArtifactNodeResultResponse, ) from aqueduct_executor.operators.utils.enums import JobType from exec_state import assert_exec_state @@ -52,11 +54,13 @@ class TestBackend: GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE = ( "/api/v2/workflow/%s/dag/%s/node/metric/%s/result/%s/content" ) + GET_NODE_METRIC_RESULTS_TEMPLATE = "/api/v2/workflow/%s/dag/%s/node/metric/%s/results" GET_NODE_CHECK_TEMPLATE = "/api/v2/workflow/%s/dag/%s/node/check/%s" GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE = ( "/api/v2/workflow/%s/dag/%s/node/check/%s/result/%s/content" ) + GET_NODE_CHECK_RESULTS_TEMPLATE = "/api/v2/workflow/%s/dag/%s/node/check/%s/results" # V1 LIST_WORKFLOW_SAVED_OBJECTS_TEMPLATE = "/api/workflow/%s/objects" @@ -142,7 +146,6 @@ def test_endpoint_list_workflow_tables(self): ] ) - print(data) assert ( set( [ @@ -680,6 +683,28 @@ def test_endpoint_node_metric_result_content_get(self): # One of these should be successful (direct descendent of operator) assert not resp_obj.is_downsampled assert len(resp_obj.content) > 0 + + def test_endpoint_node_metric_results_get(self): + flow_id = self.flows["flow_with_metrics_and_checks"][0] + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id + + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + metric_artifact_id = None + for artifact_id, artifact in dag_result_resp.artifacts.items(): + if artifact.type == ArtifactType.NUMERIC: + metric_artifact_id = artifact_id + break + resp = self.get_response( + self.GET_NODE_METRIC_RESULTS_TEMPLATE % (flow_id, dag_id, metric_artifact_id) + ).json() + for result in resp: + result = GetOperatorWithArtifactNodeResultResponse(**result) def test_endpoint_node_check_get(self): flow_id, _ = self.flows["flow_with_metrics_and_checks"] @@ -749,3 +774,25 @@ def test_endpoint_node_check_result_content_get(self): # One of these should be successful (direct descendent of operator) assert not resp_obj.is_downsampled assert len(resp_obj.content) > 0 + + def test_endpoint_node_check_results_get(self): + flow_id = self.flows["flow_with_metrics_and_checks"][0] + flow = self.client.flow(flow_id) + workflow_resp = flow._get_workflow_resp() + dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id + dag_result_id = workflow_resp.workflow_dag_results[0].id + + dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result( + flow_id, + dag_result_id, + ) + check_artifact_id = None + for artifact_id, artifact in dag_result_resp.artifacts.items(): + if artifact.type == ArtifactType.BOOL: + check_artifact_id = artifact_id + break + resp = self.get_response( + self.GET_NODE_CHECK_RESULTS_TEMPLATE % (flow_id, dag_id, check_artifact_id) + ).json() + for result in resp: + result = GetOperatorWithArtifactNodeResultResponse(**result) \ No newline at end of file diff --git a/sdk/aqueduct/backend/api_client.py b/sdk/aqueduct/backend/api_client.py index e2b8ad442..641bfb51b 100644 --- a/sdk/aqueduct/backend/api_client.py +++ b/sdk/aqueduct/backend/api_client.py @@ -1,4 +1,4 @@ -import datetime +from dateutil import parser import io import json import uuid @@ -610,11 +610,8 @@ def get_workflow(self, flow_id: str) -> GetWorkflowV1Response: WorkflowDagResultResponse( id=dag_result.id, created_at=int( - datetime.datetime.strptime( - resp_dags[str(dag_result.dag_id)].created_at[:-4], - "%Y-%m-%dT%H:%M:%S.%f" - if resp_dags[str(dag_result.dag_id)].created_at[-1] == "Z" - else "%Y-%m-%dT%H:%M:%S.%f%z", + parser.parse( + resp_dags[str(dag_result.dag_id)].created_at ).timestamp() ), status=dag_result.exec_state.status, diff --git a/sdk/aqueduct/models/response_models.py b/sdk/aqueduct/models/response_models.py index 7aafebf1e..6e28c8698 100644 --- a/sdk/aqueduct/models/response_models.py +++ b/sdk/aqueduct/models/response_models.py @@ -227,6 +227,57 @@ class GetOperatorWithArtifactNodeResponse(BaseModel): outputs: List[uuid.UUID] +class GetOperatorWithArtifactNodeResultResponse(BaseModel): + """Represents a single merged node (metric or check) result in a workflow run. + + Attributes: + id: + The id of the operator result node. + artifact_result_id: + The id of the artifact result node. + operator_id: + The id of the operator node. + artifact_id: + The id of the artifact node. + operator_result_exec_state: + The execution state of the run operator result. + artifact_result_exec_state: + The execution state of the run artifact result. + serialization_type: + What is being serialized. + content_path: + Path to get content. + content_serialized: + If the content is too big, none. Otherwise, the content. +} + dag_id: + This id can be used to find the corresponding workflow dag version. + name: + The name of the operator. + description: + The description of the operator. + type: + The artifact type. + spec: + The operator spec. + inputs: + The id(s) of the input artifact(s) of the operator. + outputs: + The id(s) of the operator(s) that take this artifact as input. + + """ + + id: uuid.UUID + artifact_result_id: uuid.UUID + operator_id: uuid.UUID + artifact_id: uuid.UUID + operator_result_exec_state: ExecutionState + artifact_result_exec_state: ExecutionState + serialization_type: SerializationType + content_path: str + content_serialized: Optional[str] + + # V1 Responses class PreviewResponse(BaseModel): """This is the response object returned by api_client.preview(). diff --git a/src/golang/cmd/server/handler/v2/node_check_results_get.go b/src/golang/cmd/server/handler/v2/node_check_results_get.go new file mode 100644 index 000000000..129789ced --- /dev/null +++ b/src/golang/cmd/server/handler/v2/node_check_results_get.go @@ -0,0 +1,147 @@ +package v2 + +import ( + "context" + "fmt" + "net/http" + + "github.com/aqueducthq/aqueduct/cmd/server/handler" + "github.com/aqueducthq/aqueduct/lib/database" + "github.com/aqueducthq/aqueduct/lib/models" + "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/aqueducthq/aqueduct/lib/models/views" + "github.com/aqueducthq/aqueduct/lib/repos" + "github.com/aqueducthq/aqueduct/lib/response" + "github.com/aqueducthq/aqueduct/lib/storage" + "github.com/dropbox/godropbox/errors" + "github.com/google/uuid" +) + +// This file should map directly to +// src/ui/common/src/handlers/v2/NodeCheckResultsGet.tsx +// +// Returns all downstream artifact results +// Route: /api/v2/workflow/{workflowID}/dag/{dagID}/node/check/{nodeID}/results +// Method: GET +// Params: +// `workflowID`: ID for `workflow` object +// `dagID`: ID for `workflow_dag` object +// `nodeID`: ID for operator object +// Request: +// Headers: +// `api-key`: user's API Key +// Response: +// Body: +// `[]response.OperatorWithArtifactResultNode` + +type NodeCheckResultsGetHandler struct { + nodeGetHandler + handler.GetHandler + + Database database.Database + + WorkflowRepo repos.Workflow + DAGRepo repos.DAG + OperatorRepo repos.Operator + OperatorResultRepo repos.OperatorResult + ArtifactRepo repos.Artifact + ArtifactResultRepo repos.ArtifactResult +} + +func (*NodeCheckResultsGetHandler) Name() string { + return "NodeCheckResultsGet" +} + +func (h *NodeCheckResultsGetHandler) Prepare(r *http.Request) (interface{}, int, error) { + return h.nodeGetHandler.Prepare(r) +} + +func (h *NodeCheckResultsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { + args := interfaceArgs.(*nodeGetArgs) + + artfID := args.nodeID + wfID := args.workflowID + + emptyResponse := []response.OperatorWithArtifactResultNode{} + + dbOperatorWithArtifactNodes, err := h.OperatorRepo.GetOperatorWithArtifactByArtifactIdNodeBatch(ctx, []uuid.UUID{artfID}, h.Database) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading check node.") + } + dbOperatorWithArtifactNode := views.OperatorWithArtifactNode{} + if len(dbOperatorWithArtifactNodes) == 0 { + return emptyResponse, http.StatusOK, nil + } else { + dbOperatorWithArtifactNode = dbOperatorWithArtifactNodes[0] + } + + results, err := h.OperatorResultRepo.GetOperatorWithArtifactResultNodesByOperatorNameAndWorkflow(ctx, dbOperatorWithArtifactNode.Name, wfID, h.Database) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve check results.") + } + + if len(results) == 0 { + return emptyResponse, http.StatusOK, nil + } + + resultArtifactIds := make([]uuid.UUID, 0, len(results)) + for _, result := range results { + resultArtifactIds = append(resultArtifactIds, result.ArtifactResultID) + } + + artfResultToDAG, err := h.DAGRepo.GetByArtifactResultBatch(ctx, resultArtifactIds, h.Database) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve workflow dags.") + } + + // maps from db dag Ids + dbDagByDagId := make(map[uuid.UUID]models.DAG, len(artfResultToDAG)) + nodeResultByDagId := make(map[uuid.UUID][]views.OperatorWithArtifactResultNode, len(artfResultToDAG)) + for _, nodeResult := range results { + if dbDag, ok := artfResultToDAG[nodeResult.ArtifactResultID]; ok { + if _, okDagsMap := dbDagByDagId[dbDag.ID]; !okDagsMap { + dbDagByDagId[dbDag.ID] = dbDag + } + + nodeResultByDagId[dbDag.ID] = append(nodeResultByDagId[dbDag.ID], nodeResult) + } else { + return emptyResponse, http.StatusInternalServerError, errors.Newf("Error retrieving dag associated with artifact result %s", nodeResult.ArtifactResultID) + } + } + + responses := make([]response.OperatorWithArtifactResultNode, 0, len(results)) + for dbDagId, nodeResults := range nodeResultByDagId { + if dag, ok := dbDagByDagId[dbDagId]; ok { + storageObj := storage.NewStorage(&dag.StorageConfig) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.New("Error retrieving artifact contents.") + } + + for _, nodeResult := range nodeResults { + var contentPtr *string = nil + if !nodeResult.ArtifactResultExecState.IsNull && + (nodeResult.ArtifactResultExecState.ExecutionState.Status == shared.FailedExecutionStatus || + nodeResult.ArtifactResultExecState.ExecutionState.Status == shared.SucceededExecutionStatus) { + exists := storageObj.Exists(ctx, nodeResult.ContentPath) + if exists { + contentBytes, err := storageObj.Get(ctx, nodeResult.ContentPath) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, fmt.Sprintf("Error retrieving artifact content for result %s", nodeResult.ArtifactID)) + } + + contentStr := string(contentBytes) + contentPtr = &contentStr + } + } + + responses = append(responses, *response.NewOperatorWithArtifactResultNodeFromDBObject( + &nodeResult, contentPtr, + )) + } + } else { + return emptyResponse, http.StatusInternalServerError, errors.Newf("Error retrieving dag %s", dbDagId) + } + } + + return responses, http.StatusOK, nil +} diff --git a/src/golang/cmd/server/handler/v2/node_metric_results_get.go b/src/golang/cmd/server/handler/v2/node_metric_results_get.go new file mode 100644 index 000000000..4c6daaa10 --- /dev/null +++ b/src/golang/cmd/server/handler/v2/node_metric_results_get.go @@ -0,0 +1,145 @@ +package v2 + +import ( + "context" + "fmt" + "net/http" + + "github.com/aqueducthq/aqueduct/cmd/server/handler" + "github.com/aqueducthq/aqueduct/lib/database" + "github.com/aqueducthq/aqueduct/lib/models" + "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/aqueducthq/aqueduct/lib/models/views" + "github.com/aqueducthq/aqueduct/lib/repos" + "github.com/aqueducthq/aqueduct/lib/response" + "github.com/aqueducthq/aqueduct/lib/storage" + "github.com/dropbox/godropbox/errors" + "github.com/google/uuid" +) + +// This file should map directly to +// src/ui/common/src/handlers/v2/NodeMetricResultsGet.tsx +// +// Returns all downstream artifact results +// Route: /api/v2/workflow/{workflowID}/dag/{dagID}/node/metric/{nodeID}/results +// Method: GET +// Params: +// `workflowID`: ID for `workflow` object +// `dagID`: ID for `workflow_dag` object +// `nodeID`: ID for operator object +// Request: +// Headers: +// `api-key`: user's API Key +// Response: +// Body: +// `[]response.OperatorWithArtifactResultNode` + +type NodeMetricResultsGetHandler struct { + nodeGetHandler + handler.GetHandler + + Database database.Database + + WorkflowRepo repos.Workflow + DAGRepo repos.DAG + OperatorRepo repos.Operator + OperatorResultRepo repos.OperatorResult +} + +func (*NodeMetricResultsGetHandler) Name() string { + return "NodeMetricResultsGet" +} + +func (h *NodeMetricResultsGetHandler) Prepare(r *http.Request) (interface{}, int, error) { + return h.nodeGetHandler.Prepare(r) +} + +func (h *NodeMetricResultsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) { + args := interfaceArgs.(*nodeGetArgs) + + artfID := args.nodeID + wfID := args.workflowID + + emptyResponse := []response.OperatorWithArtifactResultNode{} + + dbOperatorWithArtifactNodes, err := h.OperatorRepo.GetOperatorWithArtifactByArtifactIdNodeBatch(ctx, []uuid.UUID{artfID}, h.Database) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading metric node.") + } + dbOperatorWithArtifactNode := views.OperatorWithArtifactNode{} + if len(dbOperatorWithArtifactNodes) == 0 { + return emptyResponse, http.StatusOK, nil + } else { + dbOperatorWithArtifactNode = dbOperatorWithArtifactNodes[0] + } + + results, err := h.OperatorResultRepo.GetOperatorWithArtifactResultNodesByOperatorNameAndWorkflow(ctx, dbOperatorWithArtifactNode.Name, wfID, h.Database) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve metric results.") + } + + if len(results) == 0 { + return emptyResponse, http.StatusOK, nil + } + + resultArtifactIds := make([]uuid.UUID, 0, len(results)) + for _, result := range results { + resultArtifactIds = append(resultArtifactIds, result.ArtifactResultID) + } + + artfResultToDAG, err := h.DAGRepo.GetByArtifactResultBatch(ctx, resultArtifactIds, h.Database) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, "Unable to retrieve workflow dags.") + } + + // maps from db dag Ids + dbDagByDagId := make(map[uuid.UUID]models.DAG, len(artfResultToDAG)) + nodeResultByDagId := make(map[uuid.UUID][]views.OperatorWithArtifactResultNode, len(artfResultToDAG)) + for _, NodeResult := range results { + if dbDag, ok := artfResultToDAG[NodeResult.ArtifactResultID]; ok { + if _, okDagsMap := dbDagByDagId[dbDag.ID]; !okDagsMap { + dbDagByDagId[dbDag.ID] = dbDag + } + + nodeResultByDagId[dbDag.ID] = append(nodeResultByDagId[dbDag.ID], NodeResult) + } else { + return emptyResponse, http.StatusInternalServerError, errors.Newf("Error retrieving dag associated with artifact result %s", NodeResult.ArtifactResultID) + } + } + + responses := make([]response.OperatorWithArtifactResultNode, 0, len(results)) + for dbDagId, nodeResults := range nodeResultByDagId { + if dag, ok := dbDagByDagId[dbDagId]; ok { + storageObj := storage.NewStorage(&dag.StorageConfig) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.New("Error retrieving artifact contents.") + } + + for _, nodeResult := range nodeResults { + var contentPtr *string = nil + if !nodeResult.ArtifactResultExecState.IsNull && + (nodeResult.ArtifactResultExecState.ExecutionState.Status == shared.FailedExecutionStatus || + nodeResult.ArtifactResultExecState.ExecutionState.Status == shared.SucceededExecutionStatus) { + exists := storageObj.Exists(ctx, nodeResult.ContentPath) + if exists { + contentBytes, err := storageObj.Get(ctx, nodeResult.ContentPath) + if err != nil { + return emptyResponse, http.StatusInternalServerError, errors.Wrap(err, fmt.Sprintf("Error retrieving artifact content for result %s", nodeResult.ArtifactID)) + } + + contentStr := string(contentBytes) + contentPtr = &contentStr + } + } + + responses = append(responses, *response.NewOperatorWithArtifactResultNodeFromDBObject( + &nodeResult, contentPtr, + )) + } + } else { + return emptyResponse, http.StatusInternalServerError, errors.Newf("Error retrieving dag %s", dbDagId) + } + } + + return responses, http.StatusOK, nil +} diff --git a/src/golang/cmd/server/routes/routes.go b/src/golang/cmd/server/routes/routes.go index e358bfd60..8aaa13199 100644 --- a/src/golang/cmd/server/routes/routes.go +++ b/src/golang/cmd/server/routes/routes.go @@ -20,8 +20,10 @@ const ( NodeArtifactResultsRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/artifact/{nodeID}/results" NodeMetricRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/metric/{nodeID}" NodeMetricResultContentRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/metric/{nodeID}/result/{nodeResultID}/content" + NodeMetricResultsRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/metric/{nodeID}/results" NodeCheckRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/check/{nodeID}" NodeCheckResultContentRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/check/{nodeID}/result/{nodeResultID}/content" + NodeCheckResultsRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/check/{nodeID}/results" NodeOperatorRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/operator/{nodeID}" NodeDagOperatorsRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/operators" NodeOperatorContentRoute = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/operator/{nodeID}/content" diff --git a/src/golang/cmd/server/server/handlers.go b/src/golang/cmd/server/server/handlers.go index 28be1b361..748013415 100644 --- a/src/golang/cmd/server/server/handlers.go +++ b/src/golang/cmd/server/server/handlers.go @@ -84,6 +84,13 @@ func (s *AqServer) Handlers() map[string]handler.Handler { ArtifactRepo: s.ArtifactRepo, ArtifactResultRepo: s.ArtifactResultRepo, }, + routes.NodeMetricResultsRoute: &v2.NodeMetricResultsGetHandler{ + Database: s.Database, + WorkflowRepo: s.WorkflowRepo, + DAGRepo: s.DAGRepo, + OperatorRepo: s.OperatorRepo, + OperatorResultRepo: s.OperatorResultRepo, + }, routes.NodeCheckRoute: &v2.NodeCheckGetHandler{ Database: s.Database, WorkflowRepo: s.WorkflowRepo, @@ -103,6 +110,13 @@ func (s *AqServer) Handlers() map[string]handler.Handler { DAGRepo: s.DAGRepo, OperatorRepo: s.OperatorRepo, }, + routes.NodeCheckResultsRoute: &v2.NodeCheckResultsGetHandler{ + Database: s.Database, + WorkflowRepo: s.WorkflowRepo, + DAGRepo: s.DAGRepo, + OperatorRepo: s.OperatorRepo, + OperatorResultRepo: s.OperatorResultRepo, + }, routes.NodeOperatorRoute: &v2.NodeOperatorGetHandler{ Database: s.Database, WorkflowRepo: s.WorkflowRepo, diff --git a/src/golang/lib/models/views/merged_node_result.go b/src/golang/lib/models/views/merged_node_result.go deleted file mode 100644 index b3e8f71de..000000000 --- a/src/golang/lib/models/views/merged_node_result.go +++ /dev/null @@ -1,58 +0,0 @@ -package views - -import ( - "fmt" - "strings" - - "github.com/aqueducthq/aqueduct/lib/models/shared" - "github.com/google/uuid" -) - -const ( - OperatorWithArtifactNodeResultTable = "merged_node_result" - - // OperatorWithArtifactNodeResult table column names - OperatorWithArtifactNodeResultID = "id" - OperatorWithArtifactNodeResultOperatorExecState = "operator_exec_state" - OperatorWithArtifactNodeResultArtifactID = "artifact_id" - OperatorWithArtifactNodeResultMetadata = "metadata" - OperatorWithArtifactNodeResultContentPath = "content_path" - OperatorWithArtifactNodeResultArtifactExecState = "artifact_exec_state" -) - -// An OperatorWithArtifactNodeResult maps to the merged_node_result table. -type OperatorWithArtifactNodeResult struct { - ID uuid.UUID `db:"id" json:"id"` - OperatorExecState shared.NullExecutionState `db:"operator_exec_state" json:"operator_exec_state"` - ArtifactID uuid.UUID `db:"artifact_id" json:"artifact_id"` - Metadata shared.NullArtifactResultMetadata `db:"metadata" json:"metadata"` - ContentPath string `db:"content_path" json:"content_path"` - ArtifactExecState shared.NullExecutionState `db:"artifact_exec_state" json:"artifact_exec_state"` -} - -// OperatorWithArtifactNodeResultCols returns a comma-separated string of all OperatorWithArtifactNodeResult columns. -func OperatorWithArtifactNodeResultCols() string { - return strings.Join(allOperatorWithArtifactNodeResultCols(), ",") -} - -// OperatorWithArtifactNodeResultColsWithPrefix returns a comma-separated string of all -// OperatorWithArtifactNodeResult columns prefixed by the table name. -func OperatorWithArtifactNodeResultColsWithPrefix() string { - cols := allOperatorWithArtifactNodeResultCols() - for i, col := range cols { - cols[i] = fmt.Sprintf("%s.%s", OperatorWithArtifactNodeResultTable, col) - } - - return strings.Join(cols, ",") -} - -func allOperatorWithArtifactNodeResultCols() []string { - return []string{ - OperatorWithArtifactNodeResultID, - OperatorWithArtifactNodeResultOperatorExecState, - OperatorWithArtifactNodeResultArtifactID, - OperatorWithArtifactNodeResultMetadata, - OperatorWithArtifactNodeResultContentPath, - OperatorWithArtifactNodeResultArtifactExecState, - } -} diff --git a/src/golang/lib/models/views/merged_node.go b/src/golang/lib/models/views/operator_with_artifact_node.go similarity index 96% rename from src/golang/lib/models/views/merged_node.go rename to src/golang/lib/models/views/operator_with_artifact_node.go index 5cf2687b6..c8d77c1f4 100644 --- a/src/golang/lib/models/views/merged_node.go +++ b/src/golang/lib/models/views/operator_with_artifact_node.go @@ -10,7 +10,7 @@ import ( ) const ( - OperatorWithArtifactNodeView = "merged_node" + OperatorWithArtifactNodeView = "operator_with_artifact_node" OperatorWithArtifactNodeID = "id" OperatorWithArtifactNodeDagID = "dag_id" OperatorWithArtifactNodeArtifactID = "artifact_id" diff --git a/src/golang/lib/models/views/operator_with_artifact_node_result.go b/src/golang/lib/models/views/operator_with_artifact_node_result.go new file mode 100644 index 000000000..4d2739e79 --- /dev/null +++ b/src/golang/lib/models/views/operator_with_artifact_node_result.go @@ -0,0 +1,64 @@ +package views + +import ( + "fmt" + "strings" + + "github.com/aqueducthq/aqueduct/lib/models/shared" + "github.com/google/uuid" +) + +const ( + OperatorWithArtifactResultNodeTable = "operator_with_artifact_node_result" + + // OperatorWithArtifactResultNode table column names + OperatorWithArtifactResultNodeID = "id" // operator result ID + OperatorWithArtifactResultNodeArtifactResultID = "artifact_result_id" + OperatorWithArtifactResultNodeOperatorID = "operator_id" + OperatorWithArtifactResultNodeArtifactID = "artifact_id" + OperatorWithArtifactResultNodeOperatorResultExecState = "operator_result_exec_state" + OperatorWithArtifactResultNodeMetadata = "metadata" + OperatorWithArtifactResultNodeContentPath = "content_path" + OperatorWithArtifactResultNodeArtifactResultExecState = "artifact_result_exec_state" +) + +// An OperatorWithArtifactResultNode maps to the merged_node_result table. +type OperatorWithArtifactResultNode struct { + ID uuid.UUID `db:"id" json:"id"` + OperatorID uuid.UUID `db:"operator_id" json:"operator_id"` + OperatorResultExecState shared.NullExecutionState `db:"operator_result_exec_state" json:"operator_result_exec_state"` + ArtifactID uuid.UUID `db:"artifact_id" json:"artifact_id"` + ArtifactResultID uuid.UUID `db:"artifact_result_id" json:"artifact_result_id"` + Metadata shared.NullArtifactResultMetadata `db:"metadata" json:"metadata"` + ContentPath string `db:"content_path" json:"content_path"` + ArtifactResultExecState shared.NullExecutionState `db:"artifact_result_exec_state" json:"artifact_result_exec_state"` +} + +// OperatorWithArtifactResultNodeCols returns a comma-separated string of all OperatorWithArtifactResultNode columns. +func OperatorWithArtifactResultNodeCols() string { + return strings.Join(allOperatorWithArtifactResultNodeCols(), ",") +} + +// OperatorWithArtifactResultNodeColsWithPrefix returns a comma-separated string of all +// OperatorWithArtifactResultNode columns prefixed by the table name. +func OperatorWithArtifactResultNodeColsWithPrefix() string { + cols := allOperatorWithArtifactResultNodeCols() + for i, col := range cols { + cols[i] = fmt.Sprintf("%s.%s", OperatorWithArtifactResultNodeTable, col) + } + + return strings.Join(cols, ",") +} + +func allOperatorWithArtifactResultNodeCols() []string { + return []string{ + OperatorWithArtifactResultNodeID, + OperatorWithArtifactResultNodeOperatorResultExecState, + OperatorWithArtifactResultNodeOperatorID, + OperatorWithArtifactResultNodeArtifactID, + OperatorWithArtifactResultNodeArtifactResultID, + OperatorWithArtifactResultNodeMetadata, + OperatorWithArtifactResultNodeContentPath, + OperatorWithArtifactResultNodeArtifactResultExecState, + } +} diff --git a/src/golang/lib/repos/operator.go b/src/golang/lib/repos/operator.go index 1d2891649..546f5a60a 100644 --- a/src/golang/lib/repos/operator.go +++ b/src/golang/lib/repos/operator.go @@ -36,6 +36,12 @@ type operatorReader interface { // GetOperatorWithArtifactNodeBatch returns the OperatorWithArtifactNode views given the operator IDs. GetOperatorWithArtifactNodeBatch(ctx context.Context, IDs []uuid.UUID, DB database.Database) ([]views.OperatorWithArtifactNode, error) + // GetOperatorWithArtifactByArtifactIdNode returns the OperatorWithArtifactNode view given the artifact ID. + GetOperatorWithArtifactByArtifactIdNode(ctx context.Context, artifactID uuid.UUID, DB database.Database) (*views.OperatorWithArtifactNode, error) + + // GetOperatorWithArtifactByArtifactIdNodeBatch returns the OperatorWithArtifactNode views given the artifact IDs. + GetOperatorWithArtifactByArtifactIdNodeBatch(ctx context.Context, artifactIDs []uuid.UUID, DB database.Database) ([]views.OperatorWithArtifactNode, error) + // GetBatch returns the Operators with IDs. GetBatch(ctx context.Context, IDs []uuid.UUID, DB database.Database) ([]models.Operator, error) diff --git a/src/golang/lib/repos/operator_result.go b/src/golang/lib/repos/operator_result.go index 1b06a2598..31d0a3b7a 100644 --- a/src/golang/lib/repos/operator_result.go +++ b/src/golang/lib/repos/operator_result.go @@ -30,6 +30,9 @@ type operatorResultReader interface { // GetByDAGResultBatch returns all OperatorResults for the DAGResults specified. GetByDAGResultBatch(ctx context.Context, dagResultIDs []uuid.UUID, DB database.Database) ([]models.OperatorResult, error) + // GetOperatorWithArtifactResultNodesByOperatorNameAndWorkflow returns the OperatorWithArtifactNode for the Workflow and Operator specified. + GetOperatorWithArtifactResultNodesByOperatorNameAndWorkflow(ctx context.Context, operatorName string, workflowID uuid.UUID, DB database.Database) ([]views.OperatorWithArtifactResultNode, error) + // GetCheckStatusByArtifactBatch returns an OperatorResultStatus for all OperatorResults // associated with a Check Operator where the Operator has incoming DAGEdge // from an Artifact in artifactIDs. diff --git a/src/golang/lib/repos/sqlite/operator.go b/src/golang/lib/repos/sqlite/operator.go index ba102aebc..76e065fcd 100644 --- a/src/golang/lib/repos/sqlite/operator.go +++ b/src/golang/lib/repos/sqlite/operator.go @@ -93,7 +93,7 @@ const operatorNodeViewSubQuery = ` WHERE op_with_outputs.outputs IS NULL ` -var mergedNodeViewSubQuery = fmt.Sprintf(` +var operatorWithArtifactNodeViewSubQuery = fmt.Sprintf(` WITH operator_node AS (%s), artifact_node AS (%s) @@ -190,7 +190,7 @@ func (*operatorReader) GetOperatorWithArtifactNodeBatch(ctx context.Context, IDs query := fmt.Sprintf( "WITH %s AS (%s) SELECT %s FROM %s WHERE %s IN (%s)", views.OperatorWithArtifactNodeView, - mergedNodeViewSubQuery, + operatorWithArtifactNodeViewSubQuery, views.OperatorWithArtifactNodeCols(), views.OperatorWithArtifactNodeView, views.OperatorWithArtifactNodeID, @@ -200,6 +200,32 @@ func (*operatorReader) GetOperatorWithArtifactNodeBatch(ctx context.Context, IDs return getOperatorWithArtifactNodes(ctx, DB, query, args...) } +func (r *operatorReader) GetOperatorWithArtifactByArtifactIdNode(ctx context.Context, artifactID uuid.UUID, DB database.Database) (*views.OperatorWithArtifactNode, error) { + nodes, err := r.GetOperatorWithArtifactNodeBatch(ctx, []uuid.UUID{artifactID}, DB) + if err != nil { + return nil, err + } + return &nodes[0], nil +} + +func (*operatorReader) GetOperatorWithArtifactByArtifactIdNodeBatch(ctx context.Context, artifactIDs []uuid.UUID, DB database.Database) ([]views.OperatorWithArtifactNode, error) { + if len(artifactIDs) == 0 { + return nil, errors.New("Provided empty artifact IDs list.") + } + + query := fmt.Sprintf( + "WITH %s AS (%s) SELECT %s FROM %s WHERE %s IN (%s)", + views.OperatorWithArtifactNodeView, + operatorWithArtifactNodeViewSubQuery, + views.OperatorWithArtifactNodeCols(), + views.OperatorWithArtifactNodeView, + views.OperatorWithArtifactNodeArtifactID, + stmt_preparers.GenerateArgsList(len(artifactIDs), 1), + ) + args := stmt_preparers.CastIdsListToInterfaceList(artifactIDs) + return getOperatorWithArtifactNodes(ctx, DB, query, args...) +} + func (*operatorReader) GetBatch(ctx context.Context, IDs []uuid.UUID, DB database.Database) ([]models.Operator, error) { if len(IDs) == 0 { return nil, errors.New("Provided empty IDs list.") diff --git a/src/golang/lib/repos/sqlite/operator_result.go b/src/golang/lib/repos/sqlite/operator_result.go index 38a2544a6..87efba0d4 100644 --- a/src/golang/lib/repos/sqlite/operator_result.go +++ b/src/golang/lib/repos/sqlite/operator_result.go @@ -184,6 +184,45 @@ func (*operatorResultReader) GetStatusByDAGResultAndArtifactBatch( return statuses, err } +func (*operatorResultReader) GetOperatorWithArtifactResultNodesByOperatorNameAndWorkflow( + ctx context.Context, + operatorName string, + workflowID uuid.UUID, + DB database.Database, +) ([]views.OperatorWithArtifactResultNode, error) { + // For all workflow dags that belong to the workflow (identified by ID), + // get the workflow dag edges of the workflow dag. + // Get all operators with the operator name, get the operator ids and + // find the operator results of each operator (by id). + // Get all the artifact results by finding all workflow_dag_edges + // from operator by id to artifact result by artifact id. + query := `SELECT + operator_result.id, + operator.id AS operator_id, + operator_result.execution_state AS operator_result_exec_state, + artifact_result.artifact_id, + artifact_result.id AS artifact_result_id, + artifact_result.metadata, + artifact_result.content_path, + artifact_result.execution_state AS artifact_result_exec_state + FROM operator, operator_result, artifact_result, workflow_dag, workflow_dag_edge, workflow_dag_result + WHERE + workflow_dag.workflow_id = $1 + AND workflow_dag_edge.workflow_dag_id = workflow_dag.id + AND workflow_dag_result.workflow_dag_id = workflow_dag.id + AND operator.name = $2 + AND operator_result.operator_id = operator.id + AND workflow_dag_edge.from_id = operator.id + AND workflow_dag_edge.to_id = artifact_result.artifact_id + AND artifact_result.workflow_dag_result_id = workflow_dag_result.id;` + + args := []interface{}{workflowID, operatorName} + + var operatorWithArtifactResultNodes []views.OperatorWithArtifactResultNode + err := DB.Query(ctx, &operatorWithArtifactResultNodes, query, args...) + return operatorWithArtifactResultNodes, err +} + func (*operatorResultWriter) Create( ctx context.Context, dagResultID uuid.UUID, diff --git a/src/golang/lib/response/node.go b/src/golang/lib/response/node.go index 26289c6d3..35b6325a2 100644 --- a/src/golang/lib/response/node.go +++ b/src/golang/lib/response/node.go @@ -43,13 +43,15 @@ func NewOperatorWithArtifactNodeFromDBObject(dbOperatorWithArtifactNode *views.O } } -type OperatorWithArtifactNodeResult struct { - // Operator ID - ID uuid.UUID `json:"id"` - OperatorExecState *shared.ExecutionState `json:"operator_exec_state"` - - ArtifactID uuid.UUID `json:"artifact_id"` - SerializationType shared.ArtifactSerializationType `json:"serialization_type"` +type OperatorWithArtifactResultNode struct { + // Operator Result ID + ID uuid.UUID `json:"id"` + ArtifactResultID uuid.UUID `json:"artifact_result_id"` + OperatorID uuid.UUID `json:"operator_id"` + ArtifactID uuid.UUID `json:"artifact_id"` + OperatorResultExecState *shared.ExecutionState `json:"operator_result_exec_state"` + ArtifactResultExecState *shared.ExecutionState `json:"artifact_result_exec_state"` + SerializationType shared.ArtifactSerializationType `json:"serialization_type"` // If `ContentSerialized` is set, the content is small and we directly send // it as a part of response. It's consistent with the object stored in `ContentPath`. @@ -59,32 +61,32 @@ type OperatorWithArtifactNodeResult struct { // one should send an additional request to fetch the content. ContentPath string `json:"content_path"` ContentSerialized *string `json:"content_serialized"` - - ArtifactExecState *shared.ExecutionState `json:"artifact_exec_state"` } -func NewOperatorWithArtifactNodeResultFromDBObject( - dbOperatorWithArtifactNodeResult *views.OperatorWithArtifactNodeResult, +func NewOperatorWithArtifactResultNodeFromDBObject( + dbOperatorWithArtifactResultNode *views.OperatorWithArtifactResultNode, content *string, -) *OperatorWithArtifactNodeResult { - result := &OperatorWithArtifactNodeResult{ - ID: dbOperatorWithArtifactNodeResult.ID, - ArtifactID: dbOperatorWithArtifactNodeResult.ArtifactID, - SerializationType: dbOperatorWithArtifactNodeResult.Metadata.SerializationType, - ContentPath: dbOperatorWithArtifactNodeResult.ContentPath, +) *OperatorWithArtifactResultNode { + result := &OperatorWithArtifactResultNode{ + ID: dbOperatorWithArtifactResultNode.ID, + ArtifactResultID: dbOperatorWithArtifactResultNode.ArtifactResultID, + OperatorID: dbOperatorWithArtifactResultNode.OperatorID, + ArtifactID: dbOperatorWithArtifactResultNode.ArtifactID, + SerializationType: dbOperatorWithArtifactResultNode.Metadata.SerializationType, + ContentPath: dbOperatorWithArtifactResultNode.ContentPath, ContentSerialized: content, } - if !dbOperatorWithArtifactNodeResult.OperatorExecState.IsNull { + if !dbOperatorWithArtifactResultNode.OperatorResultExecState.IsNull { // make a copy of execState's value - execStateVal := dbOperatorWithArtifactNodeResult.OperatorExecState.ExecutionState - result.OperatorExecState = &execStateVal + execStateVal := dbOperatorWithArtifactResultNode.OperatorResultExecState.ExecutionState + result.OperatorResultExecState = &execStateVal } - if !dbOperatorWithArtifactNodeResult.ArtifactExecState.IsNull { + if !dbOperatorWithArtifactResultNode.ArtifactResultExecState.IsNull { // make a copy of execState's value - execStateVal := dbOperatorWithArtifactNodeResult.ArtifactExecState.ExecutionState - result.ArtifactExecState = &execStateVal + execStateVal := dbOperatorWithArtifactResultNode.ArtifactResultExecState.ExecutionState + result.ArtifactResultExecState = &execStateVal } return result @@ -251,8 +253,8 @@ type NodeResults struct { Operators []OperatorResult `json:"operators"` Artifacts []ArtifactResult `json:"artifacts"` // TODO: ENG-2987 Create separate sections for Metrics/Checks - // Metrics []OperatorWithArtifactNodeResult `json:"metrics"` - // Checks []OperatorWithArtifactNodeResult `json:"checks"` + // Metrics []OperatorWithArtifactResultNode `json:"metrics"` + // Checks []OperatorWithArtifactResultNode `json:"checks"` } func NewNodeResultsFromDBObjects( diff --git a/src/ui/common/src/handlers/AqueductApi.ts b/src/ui/common/src/handlers/AqueductApi.ts index f06b876c9..72d87b2dc 100644 --- a/src/ui/common/src/handlers/AqueductApi.ts +++ b/src/ui/common/src/handlers/AqueductApi.ts @@ -213,6 +213,20 @@ export const aqueductApi = createApi({ query: (req) => nodeArtifactResultsGetQuery(req), transformErrorResponse, }), + nodeMetricResultsGet: builder.query< + NodeMetricResultsGetResponse, + NodeMetricResultsGetRequest + >({ + query: (req) => nodeMetricResultsGetQuery(req), + transformErrorResponse, + }), + nodeCheckResultsGet: builder.query< + NodeCheckResultsGetResponse, + NodeCheckResultsGetRequest + >({ + query: (req) => nodeCheckResultsGetQuery(req), + transformErrorResponse, + }), nodeOperatorGet: builder.query< NodeOperatorGetResponse, NodeOperatorGetRequest @@ -320,6 +334,8 @@ export const { useNodeArtifactGetQuery, useNodeArtifactResultContentGetQuery, useNodeArtifactResultsGetQuery, + useNodeMetricResultsGetQuery, + useNodeCheckResultsGetQuery, useNodeOperatorGetQuery, useNodeOperatorContentGetQuery, useNodeMetricGetQuery, diff --git a/src/ui/common/src/handlers/v2/NodeCheckResultsGet.ts b/src/ui/common/src/handlers/v2/NodeCheckResultsGet.ts new file mode 100644 index 000000000..0b05bf4d2 --- /dev/null +++ b/src/ui/common/src/handlers/v2/NodeCheckResultsGet.ts @@ -0,0 +1,23 @@ +// This file should map exactly to +// src/golang/cmd/server/handler/v2/node_check_results_get.go + +import { APIKeyParameter } from '../parameters/Header'; +import { + DagIdParameter, + NodeIdParameter, + WorkflowIdParameter, +} from '../parameters/Path'; +import { OperatorWithArtifactNodeResultResponse } from '../responses/node'; + +export type NodeCheckResultsGetRequest = APIKeyParameter & + DagIdParameter & + NodeIdParameter & + WorkflowIdParameter; + +export type NodeCheckResultsGetResponse = + OperatorWithArtifactNodeResultResponse[]; + +export const nodeCheckResultsGetQuery = (req: NodeCheckResultsGetRequest) => ({ + url: `workflow/${req.workflowId}/dag/${req.dagId}/node/check/${req.nodeId}/results`, + headers: { 'api-key': req.apiKey }, +}); diff --git a/src/ui/common/src/handlers/v2/NodeMetricResultsGet.ts b/src/ui/common/src/handlers/v2/NodeMetricResultsGet.ts new file mode 100644 index 000000000..2f96e4b98 --- /dev/null +++ b/src/ui/common/src/handlers/v2/NodeMetricResultsGet.ts @@ -0,0 +1,25 @@ +// This file should map exactly to +// src/golang/cmd/server/handler/v2/node_metric_results_get.go + +import { APIKeyParameter } from '../parameters/Header'; +import { + DagIdParameter, + NodeIdParameter, + WorkflowIdParameter, +} from '../parameters/Path'; +import { OperatorWithArtifactNodeResultResponse } from '../responses/node'; + +export type NodeMetricResultsGetRequest = APIKeyParameter & + DagIdParameter & + NodeIdParameter & + WorkflowIdParameter; + +export type NodeMetricResultsGetResponse = + OperatorWithArtifactNodeResultResponse[]; + +export const nodeMetricResultsGetQuery = ( + req: NodeMetricResultsGetResponse +) => ({ + url: `workflow/${req.workflowId}/dag/${req.dagId}/node/metric/${req.nodeId}/results`, + headers: { 'api-key': req.apiKey }, +});