Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove private link related connection #18975

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 9 additions & 55 deletions src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,16 @@ use std::collections::BTreeMap;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_connector::source::kafka::PRIVATELINK_CONNECTION;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_sqlparser::ast::CreateConnectionStatement;

use super::RwPgResponse;
use crate::binder::Binder;
use crate::error::ErrorCode::ProtocolError;
use crate::error::{Result, RwError};
use crate::error::{ErrorCode, Result, RwError};
use crate::handler::HandlerArgs;

pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
pub(crate) const CONNECTION_PROVIDER_PROP: &str = "provider";
pub(crate) const CONNECTION_SERVICE_NAME_PROP: &str = "service.name";
pub(crate) const CONNECTION_TAGS_PROP: &str = "tags";

pub(crate) const CLOUD_PROVIDER_MOCK: &str = "mock"; // fake privatelink provider for testing
pub(crate) const CLOUD_PROVIDER_AWS: &str = "aws";

#[inline(always)]
fn get_connection_property_required(
Expand All @@ -48,58 +41,19 @@ fn get_connection_property_required(
)))
})
}

fn resolve_private_link_properties(
with_properties: &BTreeMap<String, String>,
) -> Result<create_connection_request::PrivateLink> {
let provider =
match get_connection_property_required(with_properties, CONNECTION_PROVIDER_PROP)?.as_str()
{
CLOUD_PROVIDER_MOCK => PrivateLinkProvider::Mock,
CLOUD_PROVIDER_AWS => PrivateLinkProvider::Aws,
provider => {
return Err(RwError::from(ProtocolError(format!(
"Unsupported privatelink provider {}",
provider
))));
}
};
match provider {
PrivateLinkProvider::Mock => Ok(create_connection_request::PrivateLink {
provider: provider.into(),
service_name: String::new(),
tags: None,
}),
PrivateLinkProvider::Aws => {
let service_name =
get_connection_property_required(with_properties, CONNECTION_SERVICE_NAME_PROP)?;
Ok(create_connection_request::PrivateLink {
provider: provider.into(),
service_name,
tags: with_properties.get(CONNECTION_TAGS_PROP).cloned(),
})
}
PrivateLinkProvider::Unspecified => Err(RwError::from(ProtocolError(
"Privatelink provider unspecified".to_string(),
))),
}
}

fn resolve_create_connection_payload(
with_properties: &BTreeMap<String, String>,
) -> Result<create_connection_request::Payload> {
let connection_type = get_connection_property_required(with_properties, CONNECTION_TYPE_PROP)?;
let create_connection_payload = match connection_type.as_str() {
PRIVATELINK_CONNECTION => create_connection_request::Payload::PrivateLink(
resolve_private_link_properties(with_properties)?,
),
_ => {
return Err(RwError::from(ProtocolError(format!(
"Connection type \"{connection_type}\" is not supported"
))));
}
return match connection_type.as_str() {
PRIVATELINK_CONNECTION => Err(RwError::from(ErrorCode::Deprecated(
"CREATE CONNECTION to Private Link".to_string(),
"RisingWave Cloud Portal".to_string(),
))),
_ => Err(RwError::from(ProtocolError(format!(
"Connection type \"{connection_type}\" is not supported"
)))),
};
Ok(create_connection_payload)
}

pub async fn handle_create_connection(
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub connection_id: ConnectionId,
pub name: String,

// todo: Private link service has been deprecated, consider using a new field for the connection info
pub info: PrivateLinkService,
}

Expand Down
14 changes: 0 additions & 14 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::stream::ScaleController;
use risingwave_meta::MetaStoreBackend;
use risingwave_meta_service::backup_service::BackupServiceImpl;
use risingwave_meta_service::cloud_service::CloudServiceImpl;
use risingwave_meta_service::cluster_limit_service::ClusterLimitServiceImpl;
use risingwave_meta_service::cluster_service::ClusterServiceImpl;
use risingwave_meta_service::ddl_service::DdlServiceImpl;
Expand All @@ -55,7 +54,6 @@ use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
use risingwave_meta_service::user_service::UserServiceImpl;
use risingwave_meta_service::AddressInfo;
use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
use risingwave_pb::health::health_server::HealthServer;
Expand Down Expand Up @@ -86,7 +84,6 @@ use crate::controller::SqlMetaStore;
use crate::hummock::HummockManager;
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{IdleManager, MetaOpts, MetaSrvEnv};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
use crate::rpc::metrics::{
start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS,
Expand Down Expand Up @@ -531,17 +528,8 @@ pub async fn start_service_as_election_leader(
compactor_manager.clone(),
));

let mut aws_cli = None;
if let Some(my_vpc_id) = &env.opts.vpc_id
&& let Some(security_group_id) = &env.opts.security_group_id
{
let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await;
aws_cli = Some(cli);
}

let ddl_srv = DdlServiceImpl::new(
env.clone(),
aws_cli.clone(),
metadata_manager.clone(),
stream_manager.clone(),
source_manager.clone(),
Expand Down Expand Up @@ -586,7 +574,6 @@ pub async fn start_service_as_election_leader(
let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref());
let serving_srv =
ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli);
let event_log_srv = EventLogServiceImpl::new(env.event_log_manager_ref());
let cluster_limit_srv = ClusterLimitServiceImpl::new(env.clone(), metadata_manager.clone());

Expand Down Expand Up @@ -712,7 +699,6 @@ pub async fn start_service_as_election_leader(
.add_service(SessionParamServiceServer::new(session_params_srv))
.add_service(TelemetryInfoServiceServer::new(telemetry_srv))
.add_service(ServingServiceServer::new(serving_srv))
.add_service(CloudServiceServer::new(cloud_srv))
.add_service(SinkCoordinationServiceServer::new(sink_coordination_srv))
.add_service(EventLogServiceServer::new(event_log_srv))
.add_service(ClusterLimitServiceServer::new(cluster_limit_srv));
Expand Down
197 changes: 0 additions & 197 deletions src/meta/service/src/cloud_service.rs
Original file line number Diff line number Diff line change
@@ -1,197 +0,0 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::LazyLock;

use async_trait::async_trait;
use regex::Regex;
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map;
use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator,
};
use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved};
use risingwave_meta::manager::MetadataManager;
use risingwave_meta_model::ConnectionId;
use risingwave_pb::catalog::connection::Info::PrivateLinkService;
use risingwave_pb::cloud_service::cloud_service_server::CloudService;
use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType};
use risingwave_pb::cloud_service::{
RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType,
};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

use crate::rpc::cloud_provider::AwsEc2Client;

pub struct CloudServiceImpl {
metadata_manager: MetadataManager,
aws_client: Option<AwsEc2Client>,
}

impl CloudServiceImpl {
pub fn new(metadata_manager: MetadataManager, aws_client: Option<AwsEc2Client>) -> Self {
Self {
metadata_manager,
aws_client,
}
}
}

#[inline(always)]
fn new_rwc_validate_fail_response(
error_type: ErrorType,
error_message: String,
) -> Response<RwCloudValidateSourceResponse> {
Response::new(RwCloudValidateSourceResponse {
ok: false,
error: Some(Error {
error_type: error_type.into(),
error_message,
}),
})
}

#[async_trait]
impl CloudService for CloudServiceImpl {
async fn rw_cloud_validate_source(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rpc interface is provided to the cloud portal previously, I am not sure whether it is still valid. Please confirm with @cloudcarver @neverchanje
#10139

&self,
request: Request<RwCloudValidateSourceRequest>,
) -> Result<Response<RwCloudValidateSourceResponse>, Status> {
let req = request.into_inner();
if req.source_type() != SourceType::Kafka {
return Err(Status::invalid_argument(
"unexpected source type, only kafka source is supported",
));
}
let mut source_cfg: BTreeMap<String, String> = req.source_config.into_iter().collect();
// if connection_id provided, check whether endpoint service is available and resolve
// broker rewrite map currently only support aws privatelink connection
if let Some(connection_id_str) = source_cfg.get("connection.id") {
let connection_id = connection_id_str.parse::<ConnectionId>().map_err(|e| {
Status::invalid_argument(format!(
"connection.id is not an integer: {}",
e.as_report()
))
})?;

let connection = self
.metadata_manager
.catalog_controller
.get_connection_by_id(connection_id)
.await;

if let Err(e) = connection {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkConnectionNotFound,
e.to_report_string(),
));
}
if let Some(PrivateLinkService(service)) = connection.unwrap().info {
if self.aws_client.is_none() {
return Ok(new_rwc_validate_fail_response(
ErrorType::AwsClientNotConfigured,
"AWS client is not configured".to_string(),
));
}
let cli = self.aws_client.as_ref().unwrap();
let privatelink_status = cli
.is_vpc_endpoint_ready(service.endpoint_id.as_str())
.await;
match privatelink_status {
Err(e) => {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkUnavailable,
e.to_report_string(),
));
}
Ok(false) => {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkUnavailable,
format!("Private link endpoint {} is not ready", service.endpoint_id,),
));
}
_ => (),
};
if let Err(e) =
insert_privatelink_broker_rewrite_map(&mut source_cfg, Some(&service), None)
{
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkResolveErr,
e.to_report_string(),
));
}
} else {
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkResolveErr,
format!("connection {} has no info available", connection_id),
));
}
}

// XXX: We can't use secret in cloud validate source.
let source_cfg = WithOptionsSecResolved::without_secrets(source_cfg);

// try fetch kafka metadata, return error message on failure
let props = ConnectorProperties::extract(source_cfg, false);
if let Err(e) = props {
return Ok(new_rwc_validate_fail_response(
ErrorType::KafkaInvalidProperties,
e.to_report_string(),
));
};

async fn new_enumerator<P: SourceProperties>(
props: P,
) -> ConnectorResult<P::SplitEnumerator> {
P::SplitEnumerator::new(props, SourceEnumeratorContext::dummy().into()).await
}

dispatch_source_prop!(props.unwrap(), props, {
let enumerator = new_enumerator(*props).await;
if let Err(e) = enumerator {
return Ok(new_rwc_validate_fail_response(
ErrorType::KafkaInvalidProperties,
e.to_report_string(),
));
}
if let Err(e) = enumerator.unwrap().list_splits().await {
let error_message = e.to_report_string();
if error_message.contains("BrokerTransportFailure") {
return Ok(new_rwc_validate_fail_response(
ErrorType::KafkaBrokerUnreachable,
e.to_report_string(),
));
}
static TOPIC_NOT_FOUND: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"topic .* not found").unwrap());
if TOPIC_NOT_FOUND.is_match(error_message.as_str()) {
return Ok(new_rwc_validate_fail_response(
ErrorType::KafkaTopicNotFound,
e.to_report_string(),
));
}
return Ok(new_rwc_validate_fail_response(
ErrorType::KafkaOther,
e.to_report_string(),
));
}
});
Ok(Response::new(RwCloudValidateSourceResponse {
ok: true,
error: None,
}))
}
}
Loading
Loading