diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index 987f0e9fdd89..577b643097a9 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -16,23 +16,16 @@ use std::collections::BTreeMap; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; use risingwave_pb::ddl_service::create_connection_request; use risingwave_sqlparser::ast::CreateConnectionStatement; use super::RwPgResponse; use crate::binder::Binder; use crate::error::ErrorCode::ProtocolError; -use crate::error::{Result, RwError}; +use crate::error::{ErrorCode, Result, RwError}; use crate::handler::HandlerArgs; pub(crate) const CONNECTION_TYPE_PROP: &str = "type"; -pub(crate) const CONNECTION_PROVIDER_PROP: &str = "provider"; -pub(crate) const CONNECTION_SERVICE_NAME_PROP: &str = "service.name"; -pub(crate) const CONNECTION_TAGS_PROP: &str = "tags"; - -pub(crate) const CLOUD_PROVIDER_MOCK: &str = "mock"; // fake privatelink provider for testing -pub(crate) const CLOUD_PROVIDER_AWS: &str = "aws"; #[inline(always)] fn get_connection_property_required( @@ -48,58 +41,19 @@ fn get_connection_property_required( ))) }) } - -fn resolve_private_link_properties( - with_properties: &BTreeMap, -) -> Result { - let provider = - match get_connection_property_required(with_properties, CONNECTION_PROVIDER_PROP)?.as_str() - { - CLOUD_PROVIDER_MOCK => PrivateLinkProvider::Mock, - CLOUD_PROVIDER_AWS => PrivateLinkProvider::Aws, - provider => { - return Err(RwError::from(ProtocolError(format!( - "Unsupported privatelink provider {}", - provider - )))); - } - }; - match provider { - PrivateLinkProvider::Mock => Ok(create_connection_request::PrivateLink { - provider: provider.into(), - service_name: String::new(), - tags: None, - }), - PrivateLinkProvider::Aws => { - let service_name = - get_connection_property_required(with_properties, CONNECTION_SERVICE_NAME_PROP)?; - Ok(create_connection_request::PrivateLink { - provider: provider.into(), - service_name, - tags: with_properties.get(CONNECTION_TAGS_PROP).cloned(), - }) - } - PrivateLinkProvider::Unspecified => Err(RwError::from(ProtocolError( - "Privatelink provider unspecified".to_string(), - ))), - } -} - fn resolve_create_connection_payload( with_properties: &BTreeMap, ) -> Result { let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?; - let create_connection_payload = match connection_type.as_str() { - PRIVATELINK_CONNECTION => create_connection_request::Payload::PrivateLink( - resolve_private_link_properties(with_properties)?, - ), - _ => { - return Err(RwError::from(ProtocolError(format!( - "Connection type \"{connection_type}\" is not supported" - )))); - } + return match connection_type.as_str() { + PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated( + "CREATE CONNECTION to Private Link".to_string(), + "RisingWave Cloud Portal".to_string(), + ))), + _ => Err(RwError::from(ProtocolError(format!( + "Connection type \"{connection_type}\" is not supported" + )))), }; - Ok(create_connection_payload) } pub async fn handle_create_connection( diff --git a/src/meta/model/src/connection.rs b/src/meta/model/src/connection.rs index a6cfa4aefb58..dce0daa462fc 100644 --- a/src/meta/model/src/connection.rs +++ b/src/meta/model/src/connection.rs @@ -26,6 +26,8 @@ pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub connection_id: ConnectionId, pub name: String, + + // todo: Private link service has been deprecated, consider using a new field for the connection info pub info: PrivateLinkService, } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index b556c4ca3452..b70584c437a6 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -35,7 +35,6 @@ use risingwave_meta::rpc::ElectionClientRef; use risingwave_meta::stream::ScaleController; use risingwave_meta::MetaStoreBackend; use risingwave_meta_service::backup_service::BackupServiceImpl; -use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; use risingwave_meta_service::ddl_service::DdlServiceImpl; @@ -55,7 +54,6 @@ use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl; use risingwave_meta_service::user_service::UserServiceImpl; use risingwave_meta_service::AddressInfo; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; -use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer; use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer; use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; @@ -86,7 +84,6 @@ use crate::controller::SqlMetaStore; use crate::hummock::HummockManager; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient}; use crate::rpc::metrics::{ start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS, @@ -531,17 +528,8 @@ pub async fn start_service_as_election_leader( compactor_manager.clone(), )); - let mut aws_cli = None; - if let Some(my_vpc_id) = &env.opts.vpc_id - && let Some(security_group_id) = &env.opts.security_group_id - { - let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await; - aws_cli = Some(cli); - } - let ddl_srv = DdlServiceImpl::new( env.clone(), - aws_cli.clone(), metadata_manager.clone(), stream_manager.clone(), source_manager.clone(), @@ -586,7 +574,6 @@ pub async fn start_service_as_election_leader( let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref()); let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone()); - let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli); let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref()); let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone()); @@ -712,7 +699,6 @@ pub async fn start_service_as_election_leader( .add_service(SessionParamServiceServer::new(session_params_srv)) .add_service(TelemetryInfoServiceServer::new(telemetry_srv)) .add_service(ServingServiceServer::new(serving_srv)) - .add_service(CloudServiceServer::new(cloud_srv)) .add_service(SinkCoordinationServiceServer::new(sink_coordination_srv)) .add_service(EventLogServiceServer::new(event_log_srv)) .add_service(ClusterLimitServiceServer::new(cluster_limit_srv)); diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index e913b91826b6..e69de29bb2d1 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -1,197 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::sync::LazyLock; - -use async_trait::async_trait; -use regex::Regex; -use risingwave_connector::error::ConnectorResult; -use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map; -use risingwave_connector::source::{ - ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, -}; -use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; -use risingwave_meta::manager::MetadataManager; -use risingwave_meta_model::ConnectionId; -use risingwave_pb::catalog::connection::Info::PrivateLinkService; -use risingwave_pb::cloud_service::cloud_service_server::CloudService; -use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType}; -use risingwave_pb::cloud_service::{ - RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType, -}; -use thiserror_ext::AsReport; -use tonic::{Request, Response, Status}; - -use crate::rpc::cloud_provider::AwsEc2Client; - -pub struct CloudServiceImpl { - metadata_manager: MetadataManager, - aws_client: Option, -} - -impl CloudServiceImpl { - pub fn new(metadata_manager: MetadataManager, aws_client: Option) -> Self { - Self { - metadata_manager, - aws_client, - } - } -} - -#[inline(always)] -fn new_rwc_validate_fail_response( - error_type: ErrorType, - error_message: String, -) -> Response { - Response::new(RwCloudValidateSourceResponse { - ok: false, - error: Some(Error { - error_type: error_type.into(), - error_message, - }), - }) -} - -#[async_trait] -impl CloudService for CloudServiceImpl { - async fn rw_cloud_validate_source( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - if req.source_type() != SourceType::Kafka { - return Err(Status::invalid_argument( - "unexpected source type, only kafka source is supported", - )); - } - let mut source_cfg: BTreeMap = req.source_config.into_iter().collect(); - // if connection_id provided, check whether endpoint service is available and resolve - // broker rewrite map currently only support aws privatelink connection - if let Some(connection_id_str) = source_cfg.get("connection.id") { - let connection_id = connection_id_str.parse::().map_err(|e| { - Status::invalid_argument(format!( - "connection.id is not an integer: {}", - e.as_report() - )) - })?; - - let connection = self - .metadata_manager - .catalog_controller - .get_connection_by_id(connection_id) - .await; - - if let Err(e) = connection { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkConnectionNotFound, - e.to_report_string(), - )); - } - if let Some(PrivateLinkService(service)) = connection.unwrap().info { - if self.aws_client.is_none() { - return Ok(new_rwc_validate_fail_response( - ErrorType::AwsClientNotConfigured, - "AWS client is not configured".to_string(), - )); - } - let cli = self.aws_client.as_ref().unwrap(); - let privatelink_status = cli - .is_vpc_endpoint_ready(service.endpoint_id.as_str()) - .await; - match privatelink_status { - Err(e) => { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkUnavailable, - e.to_report_string(), - )); - } - Ok(false) => { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkUnavailable, - format!("Private link endpoint {} is not ready", service.endpoint_id,), - )); - } - _ => (), - }; - if let Err(e) = - insert_privatelink_broker_rewrite_map(&mut source_cfg, Some(&service), None) - { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkResolveErr, - e.to_report_string(), - )); - } - } else { - return Ok(new_rwc_validate_fail_response( - ErrorType::PrivatelinkResolveErr, - format!("connection {} has no info available", connection_id), - )); - } - } - - // XXX: We can't use secret in cloud validate source. - let source_cfg = WithOptionsSecResolved::without_secrets(source_cfg); - - // try fetch kafka metadata, return error message on failure - let props = ConnectorProperties::extract(source_cfg, false); - if let Err(e) = props { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaInvalidProperties, - e.to_report_string(), - )); - }; - - async fn new_enumerator( - props: P, - ) -> ConnectorResult { - P::SplitEnumerator::new(props, SourceEnumeratorContext::dummy().into()).await - } - - dispatch_source_prop!(props.unwrap(), props, { - let enumerator = new_enumerator(*props).await; - if let Err(e) = enumerator { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaInvalidProperties, - e.to_report_string(), - )); - } - if let Err(e) = enumerator.unwrap().list_splits().await { - let error_message = e.to_report_string(); - if error_message.contains("BrokerTransportFailure") { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaBrokerUnreachable, - e.to_report_string(), - )); - } - static TOPIC_NOT_FOUND: LazyLock = - LazyLock::new(|| Regex::new(r"topic .* not found").unwrap()); - if TOPIC_NOT_FOUND.is_match(error_message.as_str()) { - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaTopicNotFound, - e.to_report_string(), - )); - } - return Ok(new_rwc_validate_fail_response( - ErrorType::KafkaOther, - e.to_report_string(), - )); - } - }); - Ok(Response::new(RwCloudValidateSourceResponse { - ok: true, - error: None, - })) - } -} diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 1578813e2ead..7a5d0f315c7b 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -22,15 +22,12 @@ use risingwave_common::catalog::ColumnCatalog; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkId; +use risingwave_meta::error::MetaErrorInner; use risingwave_meta::manager::{EventLogManagerRef, MetadataManager}; use risingwave_meta::rpc::ddl_controller::fill_table_stream_graph_info; use risingwave_meta::rpc::metrics::MetaMetrics; -use risingwave_pb::catalog::connection::private_link_service::{ - PbPrivateLinkProvider, PrivateLinkProvider, -}; -use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Comment, Connection, CreateType, Secret, Table}; +use risingwave_pb::catalog::{connection, Comment, CreateType, Secret, Table}; use risingwave_pb::common::worker_node::State; use risingwave_pb::common::WorkerType; use risingwave_pb::ddl_service::ddl_service_server::DdlService; @@ -44,7 +41,6 @@ use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{MetaSrvEnv, StreamingJob}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::ddl_controller::{ DdlCommand, DdlController, DropMode, ReplaceTableInfo, StreamingJobId, }; @@ -58,7 +54,6 @@ pub struct DdlServiceImpl { metadata_manager: MetadataManager, sink_manager: SinkCoordinatorManager, ddl_controller: DdlController, - aws_client: Arc>, meta_metrics: Arc, } @@ -66,7 +61,6 @@ impl DdlServiceImpl { #[allow(clippy::too_many_arguments)] pub async fn new( env: MetaSrvEnv, - aws_client: Option, metadata_manager: MetadataManager, stream_manager: GlobalStreamManagerRef, source_manager: SourceManagerRef, @@ -74,22 +68,19 @@ impl DdlServiceImpl { sink_manager: SinkCoordinatorManager, meta_metrics: Arc, ) -> Self { - let aws_cli_ref = Arc::new(aws_client); let ddl_controller = DdlController::new( env.clone(), metadata_manager.clone(), stream_manager, source_manager, barrier_manager, - aws_cli_ref.clone(), ) .await; Self { env, metadata_manager, - ddl_controller, - aws_client: aws_cli_ref, sink_manager, + ddl_controller, meta_metrics, } } @@ -747,64 +738,11 @@ impl DdlService for DdlServiceImpl { return Err(Status::invalid_argument("request is empty")); } - match req.payload.unwrap() { - create_connection_request::Payload::PrivateLink(link) => { - // currently we only support AWS - let private_link_svc = match link.get_provider()? { - PbPrivateLinkProvider::Mock => PbPrivateLinkService { - provider: link.provider, - service_name: String::new(), - endpoint_id: String::new(), - endpoint_dns_name: String::new(), - dns_entries: HashMap::new(), - }, - PbPrivateLinkProvider::Aws => { - if let Some(aws_cli) = self.aws_client.as_ref() { - let tags_env = self - .env - .opts - .privatelink_endpoint_default_tags - .as_ref() - .map(|tags| { - tags.iter() - .map(|(key, val)| (key.as_str(), val.as_str())) - .collect() - }); - aws_cli - .create_aws_private_link( - &link.service_name, - link.tags.as_deref(), - tags_env, - ) - .await? - } else { - return Err(Status::from(MetaError::unavailable( - "AWS client is not configured", - ))); - } - } - PbPrivateLinkProvider::Unspecified => { - return Err(Status::invalid_argument("Privatelink provider unspecified")); - } - }; - let connection = Connection { - id: 0, - schema_id: req.schema_id, - database_id: req.database_id, - name: req.name, - owner: req.owner_id, - info: Some(connection::Info::PrivateLinkService(private_link_svc)), - }; - - // save private link info to catalog - let version = self - .ddl_controller - .run_command(DdlCommand::CreateConnection(connection)) - .await?; - - Ok(Response::new(CreateConnectionResponse { version })) - } - } + return match req.payload.unwrap() { + create_connection_request::Payload::PrivateLink(_) => Err(Status::unavailable( + "Private Link is deprecated, please use Cloud Portal", + )), + }; } async fn list_connections( @@ -1095,21 +1033,11 @@ impl DdlServiceImpl { .catalog_controller .get_connection_by_id(connection_id as _) .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &connection.info { - // skip all checks for mock connection - if svc.get_provider()? == PrivateLinkProvider::Mock { - return Ok(()); - } - - // check whether private link is ready - if let Some(aws_cli) = self.aws_client.as_ref() { - if !aws_cli.is_vpc_endpoint_ready(&svc.endpoint_id).await? { - return Err(MetaError::from(anyhow!( - "Private link endpoint {} is not ready", - svc.endpoint_id - ))); - } - } + if let Some(connection::Info::PrivateLinkService(_)) = &connection.info { + return Err(MetaError::from(MetaErrorInner::Deprecated( + "CREATE CONNECTION to Private Link".to_string(), + "RisingWave Cloud Portal".to_string(), + ))); } Ok(()) } diff --git a/src/meta/service/src/lib.rs b/src/meta/service/src/lib.rs index 2e327dc47a59..68a5e4c4a762 100644 --- a/src/meta/service/src/lib.rs +++ b/src/meta/service/src/lib.rs @@ -19,7 +19,6 @@ use risingwave_meta::*; pub mod backup_service; -pub mod cloud_service; pub mod cluster_limit_service; pub mod cluster_service; pub mod ddl_service; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 08824459e916..488538bd8145 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -32,9 +32,9 @@ use risingwave_meta_model::{ actor, connection, database, fragment, function, index, object, object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, - FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, Property, SchemaId, - SecretId, SinkId, SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, - TableId, UserId, ViewId, + FunctionId, I32Array, IndexId, JobStatus, ObjectId, Property, SchemaId, SecretId, SinkId, + SourceId, StreamNode, StreamSourceInfo, StreamingParallelism, SubscriptionId, TableId, UserId, + ViewId, }; use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; @@ -109,7 +109,8 @@ pub struct ReleaseContext { /// Dropped source list, need to unregister from source manager. pub(crate) source_ids: Vec, /// Dropped connection list, need to delete from vpc endpoints. - pub(crate) connections: Vec, + #[allow(dead_code)] + pub(crate) connections: Vec, /// Dropped fragments that are fetching data from the target source. pub(crate) source_fragments: HashMap>, @@ -367,7 +368,7 @@ impl CatalogController { .all(&txn) .await? .into_iter() - .map(|conn| conn.info) + .map(|conn| conn.connection_id) .collect_vec(); // Find affect users with privileges on the database and the objects in the database. diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8bfe188d4a3f..f1c3bb0ffdd8 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -116,9 +116,6 @@ pub enum MetaErrorInner { SinkError, ), - #[error("AWS SDK error: {0}")] - Aws(#[source] BoxedError), - #[error(transparent)] Internal( #[from] @@ -132,6 +129,9 @@ pub enum MetaErrorInner { #[error("Integrity check failed")] IntegrityCheckFailed, + + #[error("{0} has been deprecated, please use {1} instead.")] + Deprecated(String, String), } impl MetaError { @@ -156,15 +156,6 @@ impl MetaError { } } -impl From> for MetaError -where - E: std::error::Error + Sync + Send + 'static, -{ - fn from(e: aws_sdk_ec2::error::SdkError) -> Self { - MetaErrorInner::Aws(e.into()).into() - } -} - impl From for tonic::Status { fn from(err: MetaError) -> Self { use tonic::Code; diff --git a/src/meta/src/rpc/cloud_provider.rs b/src/meta/src/rpc/cloud_provider.rs deleted file mode 100644 index fce20d5eea09..000000000000 --- a/src/meta/src/rpc/cloud_provider.rs +++ /dev/null @@ -1,337 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use anyhow::anyhow; -use aws_config::retry::RetryConfig; -use aws_sdk_ec2::error::ProvideErrorMetadata; -use aws_sdk_ec2::types::{Filter, ResourceType, State, Tag, TagSpecification, VpcEndpointType}; -use itertools::Itertools; -use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider; -use risingwave_pb::catalog::connection::PrivateLinkService; - -use crate::{MetaError, MetaResult}; - -#[derive(Clone)] -pub struct AwsEc2Client { - client: aws_sdk_ec2::Client, - /// `vpc_id`: The VPC of the running RisingWave instance - vpc_id: String, - security_group_id: String, -} - -impl AwsEc2Client { - pub async fn new(vpc_id: &str, security_group_id: &str) -> Self { - let sdk_config = aws_config::from_env() - .retry_config(RetryConfig::standard().with_max_attempts(4)) - .load() - .await; - let client = aws_sdk_ec2::Client::new(&sdk_config); - - Self { - client, - vpc_id: vpc_id.to_string(), - security_group_id: security_group_id.to_string(), - } - } - - pub async fn delete_vpc_endpoint(&self, vpc_endpoint_id: &str) -> MetaResult<()> { - let output = self - .client - .delete_vpc_endpoints() - .vpc_endpoint_ids(vpc_endpoint_id) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to delete VPC endpoint. endpoint_id {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - if !output.unsuccessful().is_empty() { - return Err(MetaError::from(anyhow!( - "Failed to delete VPC endpoint {}, error: {:?}", - vpc_endpoint_id, - output.unsuccessful() - ))); - } - Ok(()) - } - - /// `service_name`: The name of the endpoint service we want to access - /// `tags_user_str`: The tags specified in with clause of `create connection` - /// `tags_env`: The default tags specified in env var `RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS` - pub async fn create_aws_private_link( - &self, - service_name: &str, - tags_user_str: Option<&str>, - tags_env: Option>, - ) -> MetaResult { - // fetch the AZs of the endpoint service - let service_azs = self.get_endpoint_service_az_names(service_name).await?; - let subnet_and_azs = self.describe_subnets(&self.vpc_id, &service_azs).await?; - - let subnet_ids: Vec = subnet_and_azs.iter().map(|(id, _, _)| id.clone()).collect(); - let az_to_azid_map: HashMap = subnet_and_azs - .into_iter() - .map(|(_, az, az_id)| (az, az_id)) - .collect(); - - let tags_vec = match tags_user_str { - Some(tags_user_str) => { - let mut tags_user = tags_user_str - .split(',') - .map(|s| { - s.split_once('=').ok_or_else(|| { - MetaError::invalid_parameter("Failed to parse `tags` parameter") - }) - }) - .collect::>>()?; - match tags_env { - Some(tags_env) => { - tags_user.extend(tags_env); - Some(tags_user) - } - None => Some(tags_user), - } - } - None => tags_env, - }; - - let (endpoint_id, endpoint_dns_names) = self - .create_vpc_endpoint( - &self.vpc_id, - service_name, - &self.security_group_id, - &subnet_ids, - tags_vec, - ) - .await?; - - // The number of returned DNS names may not equal to the input AZs, - // because some AZs may not have a subnet in the RW VPC - let mut azid_to_dns_map = HashMap::new(); - if endpoint_dns_names.first().is_none() { - return Err(MetaError::from(anyhow!( - "No DNS name returned for the endpoint" - ))); - } - - // The first dns name doesn't has AZ info - let endpoint_dns_name = endpoint_dns_names.first().unwrap().clone(); - for dns_name in &endpoint_dns_names { - for az in az_to_azid_map.keys() { - if dns_name.contains(az) { - azid_to_dns_map - .insert(az_to_azid_map.get(az).unwrap().clone(), dns_name.clone()); - break; - } - } - } - - Ok(PrivateLinkService { - provider: PrivateLinkProvider::Aws.into(), - service_name: service_name.to_string(), - endpoint_id, - dns_entries: azid_to_dns_map, - endpoint_dns_name, - }) - } - - pub async fn is_vpc_endpoint_ready(&self, vpc_endpoint_id: &str) -> MetaResult { - let mut is_ready = false; - let filter = Filter::builder() - .name("vpc-endpoint-id") - .values(vpc_endpoint_id) - .build(); - let output = self - .client - .describe_vpc_endpoints() - .set_filters(Some(vec![filter])) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to check availability of VPC endpoint. endpoint_id: {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - match output.vpc_endpoints { - Some(endpoints) => { - let endpoint = endpoints - .into_iter() - .exactly_one() - .map_err(|_| anyhow!("More than one VPC endpoint found with the same ID"))?; - if let Some(state) = endpoint.state { - match state { - State::Available => { - is_ready = true; - } - // forward-compatible with protocol change - other => { - is_ready = other.as_str().eq_ignore_ascii_case("available"); - } - } - } - } - None => { - return Err(MetaError::from(anyhow!( - "No VPC endpoint found with the ID {}", - vpc_endpoint_id - ))); - } - } - Ok(is_ready) - } - - async fn get_endpoint_service_az_names(&self, service_name: &str) -> MetaResult> { - let mut service_azs = Vec::new(); - let output = self - .client - .describe_vpc_endpoint_services() - .set_service_names(Some(vec![service_name.to_string()])) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to describe VPC endpoint service, error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - match output.service_details { - Some(details) => { - let detail = details.into_iter().exactly_one().map_err(|_| { - anyhow!("More than one VPC endpoint service found with the same name") - })?; - if let Some(azs) = detail.availability_zones { - service_azs.extend(azs.into_iter()); - } - } - None => { - return Err(MetaError::from(anyhow!( - "No VPC endpoint service found with the name {}", - service_name - ))); - } - } - Ok(service_azs) - } - - async fn describe_subnets( - &self, - vpc_id: &str, - az_names: &[String], - ) -> MetaResult> { - let vpc_filter = Filter::builder().name("vpc-id").values(vpc_id).build(); - let az_filter = Filter::builder() - .name("availability-zone") - .set_values(Some(Vec::from(az_names))) - .build(); - let output = self - .client - .describe_subnets() - .set_filters(Some(vec![vpc_filter, az_filter])) - .send() - .await - .map_err(|e| { - anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id")) - })?; - - let subnets = output - .subnets - .unwrap_or_default() - .into_iter() - .unique_by(|s| s.availability_zone().unwrap_or_default().to_string()) - .map(|s| { - ( - s.subnet_id.unwrap_or_default(), - s.availability_zone.unwrap_or_default(), - s.availability_zone_id.unwrap_or_default(), - ) - }) - .collect(); - Ok(subnets) - } - - async fn create_vpc_endpoint( - &self, - vpc_id: &str, - service_name: &str, - security_group_id: &str, - subnet_ids: &[String], - tags_vec: Option>, - ) -> MetaResult<(String, Vec)> { - let tag_spec = match tags_vec { - Some(tags_vec) => { - let tags = tags_vec - .into_iter() - .map(|(tag_key, tag_val)| { - Tag::builder() - .set_key(Some(tag_key.to_string())) - .set_value(Some(tag_val.to_string())) - .build() - }) - .collect(); - Some(vec![TagSpecification::builder() - .set_resource_type(Some(ResourceType::VpcEndpoint)) - .set_tags(Some(tags)) - .build()]) - } - None => None, - }; - - let output = self - .client - .create_vpc_endpoint() - .vpc_endpoint_type(VpcEndpointType::Interface) - .vpc_id(vpc_id) - .security_group_ids(security_group_id) - .service_name(service_name) - .set_subnet_ids(Some(subnet_ids.to_owned())) - .set_tag_specifications(tag_spec) - .send() - .await - .map_err(|e| { - anyhow!( - "Failed to create vpc endpoint: vpc_id {vpc_id}, \ - service_name {service_name}. error: {:?}, aws_request_id: {:?}", - e.message(), - e.meta().extra("aws_request_id") - ) - })?; - - let endpoint = output.vpc_endpoint().unwrap(); - let mut dns_names = Vec::new(); - - endpoint.dns_entries().iter().for_each(|e| { - if let Some(dns_name) = e.dns_name() { - dns_names.push(dns_name.to_string()); - } - }); - - Ok(( - endpoint.vpc_endpoint_id().unwrap_or_default().to_string(), - dns_names, - )) - } -} diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 995643215d31..4d67af3b5ddf 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -43,13 +43,11 @@ use risingwave_meta_model::{ ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, }; -use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider; -use risingwave_pb::catalog::connection::PrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::{ - connection, Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, - Schema, Secret, Sink, Source, Subscription, Table, View, + Comment, Connection, CreateType, Database, Function, PbSink, PbSource, PbTable, Schema, Secret, + Sink, Source, Subscription, Table, View, }; use risingwave_pb::ddl_service::alter_owner_request::Object; use risingwave_pb::ddl_service::{ @@ -67,7 +65,6 @@ use risingwave_pb::stream_plan::{ use thiserror_ext::AsReport; use tokio::sync::Semaphore; use tokio::time::sleep; -use tracing::log::warn; use tracing::Instrument; use crate::barrier::BarrierManagerRef; @@ -79,7 +76,6 @@ use crate::manager::{ IGNORED_NOTIFICATION_VERSION, }; use crate::model::{FragmentId, StreamContext, TableFragments, TableParallelism}; -use crate::rpc::cloud_provider::AwsEc2Client; use crate::stream::{ create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption, @@ -188,7 +184,6 @@ pub struct DdlController { pub(crate) source_manager: SourceManagerRef, barrier_manager: BarrierManagerRef, - aws_client: Arc>, // The semaphore is used to limit the number of concurrent streaming job creation. pub(crate) creating_streaming_job_permits: Arc, } @@ -258,7 +253,6 @@ impl DdlController { stream_manager: GlobalStreamManagerRef, source_manager: SourceManagerRef, barrier_manager: BarrierManagerRef, - aws_client: Arc>, ) -> Self { let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await); Self { @@ -267,7 +261,6 @@ impl DdlController { stream_manager, source_manager, barrier_manager, - aws_client, creating_streaming_job_permits, } } @@ -551,21 +544,6 @@ impl DdlController { .await } - pub(crate) async fn delete_vpc_endpoint(&self, svc: &PrivateLinkService) -> MetaResult<()> { - // delete AWS vpc endpoint - if svc.get_provider()? == PbPrivateLinkProvider::Aws { - if let Some(aws_cli) = self.aws_client.as_ref() { - aws_cli.delete_vpc_endpoint(&svc.endpoint_id).await?; - } else { - warn!( - "AWS client is not initialized, skip deleting vpc endpoint {}", - svc.endpoint_id - ); - } - } - Ok(()) - } - async fn create_subscription( &self, mut subscription: Subscription, @@ -1161,14 +1139,11 @@ impl DdlController { .await; } ObjectType::Connection => { - let (version, conn) = self + let (version, _conn) = self .metadata_manager .catalog_controller .drop_connection(object_id) .await?; - if let Some(connection::Info::PrivateLinkService(svc)) = &conn.info { - self.delete_vpc_endpoint(svc).await?; - } return Ok(version); } _ => { @@ -1279,22 +1254,12 @@ impl DdlController { streaming_job_ids, state_table_ids, source_ids, - connections, source_fragments, removed_actors, removed_fragments, + .. } = release_ctx; - // delete vpc endpoints. - for conn in connections { - let _ = self - .delete_vpc_endpoint(&conn.to_protobuf()) - .await - .inspect_err(|err| { - tracing::warn!(err = ?err.as_report(), "failed to delete vpc endpoint"); - }); - } - // unregister sources. self.source_manager .unregister_sources(source_ids.into_iter().map(|id| id as _).collect()) diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index 8b256d1b2145..9f840ded5aa4 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod cloud_provider; pub mod ddl_controller; pub mod election; pub mod intercept;