From e662e47418f5b20c9fcbd86939132194ec77e76e Mon Sep 17 00:00:00 2001 From: benjobs Date: Sun, 3 Dec 2023 20:30:25 +0800 Subject: [PATCH] [Improve] FE job dependency component improvements (#3373) * [Improve] FE job dependency component improvements * [Improve] MavenTool support mirror|proxy * [Improve] FE dependency component get latest value bug fixed * [Improve] resolve maven artifacts bug fixed. --------- Co-authored-by: benjobs --- .mvn/wrapper/MavenWrapperHelper.java | 2 +- .../console/core/entity/Application.java | 6 ++ .../service/impl/AppBuildPipeServiceImpl.java | 14 +++- .../service/impl/ApplicationServiceImpl.java | 2 +- .../src/views/flink/app/Add.vue | 7 +- .../src/views/flink/app/EditFlink.vue | 6 +- .../src/views/flink/app/EditStreamPark.vue | 6 +- .../views/flink/app/components/Dependency.vue | 10 +-- .../flink/app/hooks/useCreateAndEditSchema.ts | 23 +------ .../views/flink/app/hooks/useCreateSchema.ts | 9 +++ .../flink/app/hooks/useEditStreamPark.ts | 14 +++- .../views/flink/app/hooks/useFlinkRender.tsx | 11 ++- .../src/views/flink/app/styles/Add.less | 2 - .../flink/packer/maven/MavenTool.scala | 69 ++++++++++++++++--- 14 files changed, 126 insertions(+), 55 deletions(-) diff --git a/.mvn/wrapper/MavenWrapperHelper.java b/.mvn/wrapper/MavenWrapperHelper.java index 6be3497895..428fa81ffd 100644 --- a/.mvn/wrapper/MavenWrapperHelper.java +++ b/.mvn/wrapper/MavenWrapperHelper.java @@ -98,7 +98,7 @@ protected PasswordAuthentication getPasswordAuthentication() { log(" - Downloader complete"); } - public static String getFileMd5(String path) throws Exception { + private static String getFileMd5(String path) throws Exception { MessageDigest md5 = MessageDigest.getInstance("MD5"); try (FileInputStream inputStream = new FileInputStream(path)) { byte[] buffer = new byte[1024]; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 1bbee13828..e375d521ae 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -462,6 +462,12 @@ public boolean isCustomCodeJob() { return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType()); } + @JsonIgnore + public boolean isApacheFlinkCustomCodeJob() { + return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType()) + && getApplicationType() == ApplicationType.APACHE_FLINK; + } + @JsonIgnore public boolean isUploadJob() { return isCustomCodeJob() && ResourceFrom.UPLOAD.getValue().equals(this.getResourceFrom()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 59c0e108f2..f17995c144 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -378,7 +378,7 @@ private void prepareJars(Application app) throws IOException { FsOperator localFS = FsOperator.lfs(); // 1. copy jar to local upload dir - if (app.isFlinkSqlJob() || app.isUploadJob()) { + if (app.isFlinkSqlJob() || app.isApacheFlinkCustomCodeJob()) { if (!app.getMavenDependency().getJar().isEmpty()) { for (String jar : app.getMavenDependency().getJar()) { File localJar = new File(WebUtils.getAppTempDir(), jar); @@ -393,7 +393,7 @@ private void prepareJars(Application app) throws IOException { } } - if (app.isCustomCodeJob()) { + if (app.isApacheFlinkCustomCodeJob()) { // customCode upload jar to appHome... FsOperator fsOperator = app.getFsOperator(); ResourceFrom resourceFrom = ResourceFrom.of(app.getResourceFrom()); @@ -437,7 +437,15 @@ private void prepareJars(Application app) throws IOException { if (!app.getMavenDependency().getPom().isEmpty()) { Set artifacts = app.getMavenDependency().getPom().stream() - .filter(x -> !new File(localUploadDIR, x.artifactName()).exists()) + .filter( + dep -> { + File file = new File(localUploadDIR, dep.artifactName()); + if (file.exists()) { + dependencyJars.add(file); + return false; + } + return true; + }) .map( pom -> new Artifact( diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 0d1d45785f..8b8b5b53b4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -852,7 +852,7 @@ public boolean update(Application appParam) { application.setRelease(ReleaseState.NEED_RELEASE.get()); - if (application.isUploadJob()) { + if (application.isApacheFlinkCustomCodeJob()) { MavenDependency thisDependency = MavenDependency.of(appParam.getDependency()); MavenDependency targetDependency = MavenDependency.of(application.getDependency()); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue index 071231e0ca..f23aa06e38 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue @@ -201,8 +201,9 @@ } else { params['jar'] = values.jar || null; params['mainClass'] = values.mainClass || null; + params['dependency'] = await getDependency(); } - handleCreateApp(params); + await handleCreateApp(params); } else { // from upload Object.assign(params, { @@ -212,13 +213,13 @@ mainClass: values.mainClass, dependency: await getDependency(), }); - handleCreateApp(params); + await handleCreateApp(params); } } } async function getDependency() { // Trigger a pom confirmation operation. - unref(dependencyRef)?.handleApplyPom(); + await unref(dependencyRef)?.handleApplyPom(); // common params... const dependency: { pom?: string; jar?: string } = {}; const dependencyRecords = unref(dependencyRef)?.dependencyRecords; diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue index e75e0d544f..31a6e32082 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue @@ -144,9 +144,9 @@ } /* Handling update parameters */ - function handleAppUpdate(values: Recordable) { + async function handleAppUpdate(values: Recordable) { // Trigger a pom confirmation operation. - unref(dependencyRef)?.handleApplyPom(); + await unref(dependencyRef)?.handleApplyPom(); // common params... const dependency: { pom?: string; jar?: string } = {}; const dependencyRecords = unref(dependencyRef)?.dependencyRecords; @@ -173,7 +173,7 @@ : JSON.stringify(dependency), }; handleSubmitParams(params, values, k8sTemplate); - handleUpdateApp(params); + await handleUpdateApp(params); } catch (error) { submitLoading.value = false; } diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue index 9e307ea244..c6f9197b02 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue @@ -176,10 +176,10 @@ } } - function handleSubmitSQL(values: Recordable) { + async function handleSubmitSQL(values: Recordable) { try { // Trigger a pom confirmation operation. - unref(dependencyRef)?.handleApplyPom(); + await unref(dependencyRef)?.handleApplyPom(); // common params... const dependency: { pom?: string; jar?: string } = {}; const dependencyRecords = unref(dependencyRef)?.dependencyRecords; @@ -212,7 +212,7 @@ : JSON.stringify(dependency), }; handleSubmitParams(params, values, k8sTemplate); - handleUpdateApp(params); + await handleUpdateApp(params); } catch (error) { createMessage.error('edit error'); submitLoading.value = false; diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue index 5c9686a456..f3c1b4c372 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue @@ -75,7 +75,7 @@ const { t } = useI18n(); const defaultValue = ''; const { Swal } = useMessage(); - const { onChange, setContent } = useMonaco(pomBox, { + const { onChange, setContent, getContent } = useMonaco(pomBox, { language: 'xml', code: props.value || defaultValue, options: { @@ -106,7 +106,8 @@ const classifierExp = /([\s\S]*?)<\/classifier>/; const exclusionsExp = /([\s\S]*?)<\/exclusions>/; const invalidArtifact: Array = []; - props.value + const propsValue = await getContent(); + propsValue .split('') .filter((x) => x.replace(/\\s+/, '') !== '') .forEach((dep) => { @@ -217,8 +218,7 @@ /* load history config records */ async function handleReloadHistoryUploads() { selectedHistoryUploadJars.value = []; - const res = await fetchUploadJars(); - historyUploadJars.value = res; + historyUploadJars.value = await fetchUploadJars(); } const filteredHistoryUploadJarsOptions = computed(() => { @@ -291,7 +291,7 @@
-
+
{{ t('common.apply') }} diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts index 2a1a3b9150..8a6942398e 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts @@ -49,13 +49,7 @@ import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv'; import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type'; import { AlertSetting } from '/@/api/flink/setting/types/alert.type'; import { FlinkCluster } from '/@/api/flink/setting/types/flinkCluster.type'; -import { - AppTypeEnum, - ClusterStateEnum, - ExecModeEnum, - JobTypeEnum, - ResourceFromEnum, -} from '/@/enums/flinkEnum'; +import { AppTypeEnum, ClusterStateEnum, ExecModeEnum, JobTypeEnum } from '/@/enums/flinkEnum'; import { isK8sExecMode } from '../utils'; import { useI18n } from '/@/hooks/web/useI18n'; import { fetchCheckHadoop } from '/@/api/flink/setting'; @@ -125,21 +119,6 @@ export const useCreateAndEditSchema = ( }, rules: [{ required: true, message: t('flink.app.addAppTips.flinkSqlIsRequiredMessage') }], }, - { - field: 'dependency', - label: t('flink.app.dependency'), - component: 'Input', - slot: 'dependency', - ifShow: ({ values }) => { - if (edit?.appId) { - return values.jobType == JobTypeEnum.SQL - ? true - : values.resourceFrom == ResourceFromEnum.UPLOAD; - } else { - return values?.jobType == 'sql' ? true : values?.resourceFrom != 'cvs'; - } - }, - }, { field: 'configOverride', label: '', component: 'Input', show: false }, { field: 'isSetConfig', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts index 4ef2557b35..f123923038 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts @@ -242,6 +242,15 @@ export const useCreateSchema = (dependencyRef: Ref) => { values.appType == String(AppTypeEnum.APACHE_FLINK), rules: [{ required: true, message: t('flink.app.addAppTips.mainClassIsRequiredMessage') }], }, + { + field: 'dependency', + label: t('flink.app.dependency'), + component: 'Input', + slot: 'dependency', + ifShow: ({ values }) => { + return values?.jobType == 'sql' ? true : values?.appType == AppTypeEnum.APACHE_FLINK; + }, + }, { field: 'config', label: t('flink.app.appConf'), diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditStreamPark.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditStreamPark.ts index a4bb6c15fe..3ad8db2cd1 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditStreamPark.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditStreamPark.ts @@ -16,7 +16,7 @@ */ import { FormSchema } from '/@/components/Table'; import { computed, h, Ref, ref, unref } from 'vue'; -import { ExecModeEnum, JobTypeEnum, UseStrategyEnum } from '/@/enums/flinkEnum'; +import { AppTypeEnum, ExecModeEnum, JobTypeEnum, UseStrategyEnum } from '/@/enums/flinkEnum'; import { useCreateAndEditSchema } from './useCreateAndEditSchema'; import { renderSqlHistory } from './useFlinkRender'; import { Alert } from 'ant-design-vue'; @@ -141,7 +141,6 @@ export const useEditStreamParkSchema = ( ifShow: ({ model, values }) => values.jobType != JobTypeEnum.SQL && model.projectName, }, { field: 'project', label: 'ProjectId', component: 'Input', show: false }, - { field: 'module', label: 'Application', @@ -149,6 +148,17 @@ export const useEditStreamParkSchema = ( render: ({ model }) => h(Alert, { message: model.module, type: 'info' }), ifShow: ({ model, values }) => values.jobType != JobTypeEnum.SQL && model.module, }, + { + field: 'dependency', + label: t('flink.app.dependency'), + component: 'Input', + slot: 'dependency', + ifShow: ({ values }) => { + return values.jobType == JobTypeEnum.SQL + ? true + : values?.appType == AppTypeEnum.APACHE_FLINK; + }, + }, { field: 'configId', label: 'configId', component: 'Input', show: false }, { field: 'config', label: '', component: 'Input', show: false }, { field: 'strategy', label: '', component: 'Input', show: false }, diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx index ba00e667a8..8517782274 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx @@ -39,7 +39,7 @@ import { handleConfTemplate } from '/@/api/flink/config'; import { decodeByBase64 } from '/@/utils/cipher'; import { useMessage } from '/@/hooks/web/useMessage'; import { SelectValue } from 'ant-design-vue/lib/select'; -import { CandidateTypeEnum, FailoverStrategyEnum } from '/@/enums/flinkEnum'; +import { AppTypeEnum, CandidateTypeEnum, FailoverStrategyEnum } from '/@/enums/flinkEnum'; import { useI18n } from '/@/hooks/web/useI18n'; import { fetchYarnQueueList } from '/@/api/flink/setting/yarnQueue'; import { ApiSelect } from '/@/components/Form'; @@ -507,10 +507,17 @@ export const renderCompareSelectTag = (ver: any) => { ); }; +function handleResourceFrom(model: Recordable, value: string) { + model.resourceFrom = value; + if (value == 'upload') { + model.appType = String(AppTypeEnum.APACHE_FLINK); + } +} + export const renderResourceFrom = (model: Recordable) => { return (