Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache MINA integration #844

Open
SergiyBojko opened this issue Oct 16, 2024 · 9 comments
Open

Apache MINA integration #844

SergiyBojko opened this issue Oct 16, 2024 · 9 comments
Labels
bug Something isn't working help wanted Extra attention is needed needs triage The issue needs to be triaged, before work can commence

Comments

@SergiyBojko
Copy link

SergiyBojko commented Oct 16, 2024

Bug Description

I'm trying to expose Minio bucket through Apache MINA SFTP server (2.14.0) using example in the docs but it errors with:

 Not a directory: s3://Q3AM3UQ867SPQQA43P2F@play.min.io/foo/
	at org.apache.sshd.common.util.io.IoUtils.ensureDirectory(IoUtils.java:483) 

using following file system factory config:

		FileSystem fileSystem = FileSystems.newFileSystem(URI.create("s3://play.min.io/"),
				env, 
				Thread.currentThread()
						.getContextClassLoader());
		Path bucketPath = fileSystem.getPath("/foo/"); // "foo", "foo/" and "/foo" does't work either
		return new VirtualFileSystemFactory(bucketPath);

Data from the file actually can be read by:

Files.readAllLines(fileSystem.getPath("/foo/bar")) 

I managed to bypass this by implementing custom FileSystemFactory, but after that MINA still fails because S3FileSystemProvider does not have newFileChannel method implemented.

This really doesn't seem right, am I doing something wrong?

Proposed Solution

Known Workarounds

Useful Links

Task Relationships

This bug:

@SergiyBojko SergiyBojko added bug Something isn't working needs triage The issue needs to be triaged, before work can commence labels Oct 16, 2024
@carlspring
Copy link
Owner

Hi @SergiyBojko ,

Would it be possible for you to provide a docker-compose script (or a test with TestContainers) with MinIO and/or a JUnit test case that we can try and reproduce this with?

@SergiyBojko
Copy link
Author

Hi @carlspring
Here is example project
example.zip

You can run it with

./gradlew run

After server has started you should be able to run:

scp -P 2222 user@localhost:/bar abc.txt
passsword: p

this will try to copy file https://play.min.io/foo/bar and result in error:

14:17:50.617 [sshd-SftpSubsystem-64455-thread-1] DEBUG o.a.sshd.sftp.server.SftpSubsystem -- doSendStatus[ServerSessionImpl[user@/[0:0:0:0:0:0:0:1]:53170]][id=4,cmd=3] exception
java.lang.UnsupportedOperationException: null
        at java.base/java.nio.file.spi.FileSystemProvider.newFileChannel(FileSystemProvider.java:535)
        at org.apache.sshd.common.file.root.RootedFileSystemProvider.newFileChannel(RootedFileSystemProvider.java:193)
        at java.base/java.nio.channels.FileChannel.open(FileChannel.java:298)
        at org.apache.sshd.sftp.server.SftpFileSystemAccessor.openFile(SftpFileSystemAccessor.java:215)
        at org.apache.sshd.sftp.server.FileHandle.<init>(FileHandle.java:92)
        at org.apache.sshd.sftp.server.SftpSubsystem.doOpen(SftpSubsystem.java:967)
        at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doOpen(AbstractSftpSubsystemHelper.java:496)
        at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doProcess(AbstractSftpSubsystemHelper.java:353)
        at org.apache.sshd.sftp.server.SftpSubsystem.doProcess(SftpSubsystem.java:354)
        at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.process(AbstractSftpSubsystemHelper.java:344)
        at org.apache.sshd.sftp.server.SftpSubsystem.run(SftpSubsystem.java:330)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

@steve-todorov
Copy link
Collaborator

@SergiyBojko We haven't had a chance to look into this yet. However, can you try the code you have with the 1.0.6-SNAPSHOT version which is published in Maven Central Snapshots? There was a bug related to the properties not being passed from the Files.newFileSystem down to the S3FileSystem which could have been related to the problem you are seeing.

@zchtodd
Copy link

zchtodd commented Dec 10, 2024

I'm using the 1.0.6-SNAPSHOT and seeing the same problem when integrating with Apache MINA. My case seems to be the same as the original, except I'm connecting to Digital Ocean Spaces.

My dependencies:

  <dependencies>
    <dependency>
        <groupId>org.apache.sshd</groupId>
        <artifactId>sshd-core</artifactId>
        <version>2.10.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.sshd</groupId>
        <artifactId>sshd-sftp</artifactId>
        <version>2.10.0</version>
    </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <version>2.20.18</version>
    </dependency>
    <dependency>
        <groupId>org.carlspring.cloud.aws</groupId>
        <artifactId>s3fs-nio</artifactId>
        <version>1.0.6</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.15.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.9</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.4.11</version>
    </dependency>
  </dependencies>

The traceback, which I think is the same as above:

java.lang.UnsupportedOperationException: null
	at java.base/java.nio.file.spi.FileSystemProvider.newFileChannel(FileSystemProvider.java:533)
	at org.apache.sshd.common.file.root.RootedFileSystemProvider.newFileChannel(RootedFileSystemProvider.java:193)
	at java.base/java.nio.channels.FileChannel.open(FileChannel.java:309)
	at org.apache.sshd.sftp.server.SftpFileSystemAccessor.openFile(SftpFileSystemAccessor.java:215)
	at org.apache.sshd.sftp.server.FileHandle.<init>(FileHandle.java:85)
	at org.apache.sshd.sftp.server.SftpSubsystem.doOpen(SftpSubsystem.java:928)
	at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doOpen(AbstractSftpSubsystemHelper.java:496)
	at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.doProcess(AbstractSftpSubsystemHelper.java:353)
	at org.apache.sshd.sftp.server.SftpSubsystem.doProcess(SftpSubsystem.java:327)
	at org.apache.sshd.sftp.server.AbstractSftpSubsystemHelper.process(AbstractSftpSubsystemHelper.java:344)
	at org.apache.sshd.sftp.server.SftpSubsystem.run(SftpSubsystem.java:303)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

I'm not sure if it's related, but it seems like the MINA code is expecting newFileChannel to exist, but s3fs-nio implements newAsynchronousFileChannel instead. I'll keep looking at this, but thought it was worth mentioning that this does seem to be an issue.

@zchtodd
Copy link

zchtodd commented Dec 14, 2024

I've made some progress on the Apache MINA integration. The main problem is that the SFTP subsystem expects newFileChannel to exist and, by extension, to function synchronously.

To get it working, I wrapped S3FileChannel in a new S3FileChannelSyncWrapper class.

package org.carlspring.cloud.storage.s3fs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class S3FileChannelSyncWrapper extends FileChannel {

    private static final Logger LOGGER = LoggerFactory.getLogger(S3FileChannelSyncWrapper.class);
    private final S3FileChannel asyncFileChannel;
    private long position = 0;

    public S3FileChannelSyncWrapper(S3FileChannel asyncFileChannel) {
        this.asyncFileChannel = asyncFileChannel;
    }

    @Override
    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
        LOGGER.warn("unsupported: read(ByteBuffer[], int, int)");
        throw new UnsupportedOperationException("read(ByteBuffer[], int, int) is not supported.");
    }

    @Override
    public int read(ByteBuffer dst, long position) throws IOException {
        try {
            Future<Integer> future = asyncFileChannel.read(dst, position);
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Error while reading", e);
        }
    }

    @Override
    public int read(ByteBuffer dst) throws IOException {
        int bytesRead = read(dst, position);
        if (bytesRead > 0) {
            position += bytesRead;
        }
        return bytesRead;
    }

    @Override
    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
        throw new UnsupportedOperationException("write(ByteBuffer[], int, int) is not supported.");
    }

    @Override
    public int write(ByteBuffer src, long position) throws IOException {
        try {   
            Future<Integer> future = asyncFileChannel.write(src, position);
            int bytesWritten = future.get();
            LOGGER.info("Bytes written: {}", bytesWritten);
            return bytesWritten;
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Error while writing", e);
        }   
    }

    @Override
    public int write(ByteBuffer src) throws IOException {
        int bytesWritten = write(src, position);
        if (bytesWritten > 0) {
            position += bytesWritten;
        }
        return bytesWritten;
    }

    @Override
    public long position() {
        return position;
    }

    @Override
    public FileChannel position(long newPosition) {
        this.position = newPosition;
        return this;
    }

    @Override
    public long size() throws IOException {
        return asyncFileChannel.size();
    }

    @Override
    public FileChannel truncate(long size) throws IOException {
        asyncFileChannel.truncate(size);
        return this;
    }

    protected void sync() throws IOException {
        asyncFileChannel.sync();
    }

    @Override
    public void force(boolean metaData) throws IOException {
        asyncFileChannel.force(metaData);
    }

    @Override
    public FileLock lock(long position, long size, boolean shared) throws IOException {
        try {
            Future<FileLock> future = asyncFileChannel.lock(position, size, shared);
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Error while locking", e);
        }
    }

    @Override
    public FileLock tryLock(long position, long size, boolean shared) throws IOException {
        return asyncFileChannel.tryLock(position, size, shared);
    }

    @Override
    public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
        throw new UnsupportedOperationException("Memory mapping is not supported.");
    }

    @Override
    public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
        LOGGER.warn("unsupported: transferFrom(ReadableByteChannel, long, long)");
        throw new UnsupportedOperationException("transferFrom is not supported.");
    }

    @Override
    public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
        LOGGER.warn("unsupported: transferTo(long, long, WritableByteChannel)");
        throw new UnsupportedOperationException("transferTo is not supported.");
    }

    @Override
    protected void implCloseChannel() throws IOException {
        asyncFileChannel.close();
    }
}

To put this class into use, add the newFileChannel method to the S3FileSystemProvider class.

    @Override
    public FileChannel newFileChannel(Path path,
                                      Set<? extends OpenOption> options,
                                      FileAttribute<?>... attrs)
                               throws IOException {
        S3Path s3Path = toS3Path(path);

        ExecutorService defaultExecutor = Executors.newFixedThreadPool(4);

        S3FileChannel s3FileChannel = new S3FileChannel(s3Path, options, defaultExecutor, true);
        return new S3FileChannelSyncWrapper(s3FileChannel);
    }

Posting this in case it's helpful to someone, but it is very experimental.

@steve-todorov
Copy link
Collaborator

This is an interesting finding!

It looks like this was introduced back in 2020 as part of #99. The newFileChannel method was removed back then and only the async one was left. To me it would make more sense to rename the existing S3FileChannel to S3AsyncFileChannel and add the S3FileChannel, but I guess the original S3FileChannel class name was preserved to avoid backwards compatibility issues. I would need to dig into the code as well, but unfortunately my time is currently limited. Might be able to do this early next year.

Would you be interested into writing a test reproducing your problem with Apache MINA, the solution and submit it as an editable PR?

@carlspring
Copy link
Owner

Guys, we could really use the help with MINA. We haven't really worked with it much and we have very limited time. We are keen on keeping this project evolving.

It seems we're getting more and more issues reported with MINA. We're open to contibutors joining and helping us in this regard.

If you can provide a fix, that would be great! If not, if you could provide a test, with TestContainers, or Docker, that would be awesome!

@zchtodd
Copy link

zchtodd commented Dec 15, 2024

I'm continuing to test the MINA integration. I'm having some trouble with reads returning stale data, but that problem disappears if I disable the cache. Obviously the problem there is that it becomes very slow, but seems to work pretty reliably. I'm going to try and find out what's happening there.

It may take a while, but I'll try to open a PR. Hopefully I don't end up just making it work for my use case while breaking others.

@carlspring
Copy link
Owner

Either way, please report your findings even if you are not able to prepare a complete fix.
Thank you! :)

@carlspring carlspring added the help wanted Extra attention is needed label Dec 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed needs triage The issue needs to be triaged, before work can commence
Projects
None yet
Development

No branches or pull requests

4 participants