Skip to content

Commit

Permalink
feat(services/webdfs): Add user.name support for webhdfs (#5567)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Jan 22, 2025
1 parent cca3a87 commit b8a3b7a
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 4 deletions.
37 changes: 37 additions & 0 deletions .github/services/webhdfs/webhdfs_with_user_name/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: webhdfs
description: "Behavior test for webhdfs with user name specified"

runs:
using: "composite"
steps:
- name: Setup webhdfs
shell: bash
working-directory: fixtures/webhdfs
run: |
docker compose -f docker-compose-webhdfs.yml up -d --wait
- name: Setup
shell: bash
run: |
cat << EOF >> $GITHUB_ENV
OPENDAL_WEBHDFS_ROOT=/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_ATOMIC_WRITE_DIR=.opendal_tmp/
OPENDAL_WEBHDFS_USER_NAME=root
EOF
50 changes: 46 additions & 4 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ impl WebhdfsBuilder {
self
}

/// Set the username of this backend,
/// used for authentication
///
pub fn user_name(mut self, user_name: &str) -> Self {
if !user_name.is_empty() {
self.config.user_name = Some(user_name.to_string());
}
self
}

/// Set the delegation token of this backend,
/// used for authentication
///
Expand Down Expand Up @@ -179,6 +189,7 @@ impl Builder for WebhdfsBuilder {
let backend = WebhdfsBackend {
root,
endpoint,
user_name: self.config.user_name,
auth,
client,
root_checker: OnceCell::new(),
Expand All @@ -195,6 +206,7 @@ impl Builder for WebhdfsBuilder {
pub struct WebhdfsBackend {
root: String,
endpoint: String,
user_name: Option<String>,
auth: Option<String>,
root_checker: OnceCell<()>,

Expand All @@ -212,6 +224,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -220,6 +235,7 @@ impl WebhdfsBackend {

req.body(Buffer::new()).map_err(new_request_build_error)
}

/// create object
pub async fn webhdfs_create_object_request(
&self,
Expand All @@ -235,6 +251,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand Down Expand Up @@ -277,6 +296,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand Down Expand Up @@ -311,7 +333,9 @@ impl WebhdfsBackend {
percent_encode_path(&from),
percent_encode_path(&to)
);

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand All @@ -330,7 +354,9 @@ impl WebhdfsBackend {
body: Buffer,
) -> Result<Request<Buffer>> {
let mut url = location.to_string();

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand Down Expand Up @@ -362,7 +388,9 @@ impl WebhdfsBackend {
percent_encode_path(&p),
percent_encode_path(&sources),
);

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand All @@ -379,6 +407,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += &format!("&{auth}");
}
Expand All @@ -404,6 +435,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -429,6 +463,9 @@ impl WebhdfsBackend {
if !start_after.is_empty() {
url += format!("&startAfter={}", start_after).as_str();
}
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -455,7 +492,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);

if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand All @@ -474,6 +513,9 @@ impl WebhdfsBackend {
self.endpoint,
percent_encode_path(&p),
);
if let Some(user) = &self.user_name {
url += format!("&user.name={user}").as_str();
}
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/services/webhdfs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub struct WebhdfsConfig {
pub root: Option<String>,
/// Endpoint for webhdfs.
pub endpoint: Option<String>,
/// Name of the user for webhdfs.
pub user_name: Option<String>,
/// Delegation token for webhdfs.
pub delegation: Option<String>,
/// Disable batch listing
Expand All @@ -43,6 +45,7 @@ impl Debug for WebhdfsConfig {
f.debug_struct("WebhdfsConfig")
.field("root", &self.root)
.field("endpoint", &self.endpoint)
.field("user_name", &self.user_name)
.field("atomic_write_dir", &self.atomic_write_dir)
.finish_non_exhaustive()
}
Expand Down

0 comments on commit b8a3b7a

Please sign in to comment.