Skip to content

Commit

Permalink
#4682 PlanB State Store
Browse files Browse the repository at this point in the history
  • Loading branch information
stroomdev66 committed Jan 21, 2025
1 parent 3d44393 commit 6cc7cbd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,34 +1,46 @@
package stroom.planb.impl.data;

import stroom.planb.impl.db.StatePaths;
import stroom.security.api.SecurityContext;
import stroom.util.io.FileUtil;
import stroom.util.io.StreamUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.shared.PermissionException;
import stroom.util.string.StringIdUtil;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;

@Singleton
public class FileTransferServiceImpl implements FileTransferService {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(FileTransferServiceImpl.class);

private final SequentialFileStore fileStore;
private final SecurityContext securityContext;
private final ShardManager shardManager;

private final Path receiveDir;
private final AtomicLong receiveId = new AtomicLong();

@Inject
public FileTransferServiceImpl(final SequentialFileStore fileStore,
final SecurityContext securityContext,
final ShardManager shardManager) {
final ShardManager shardManager,
final StatePaths statePaths) {
this.fileStore = fileStore;
this.securityContext = securityContext;
this.shardManager = shardManager;

// Create the receive directory.
receiveDir = statePaths.getReceiveDir();
FileUtil.ensureDirExists(receiveDir);
if (!FileUtil.deleteContents(receiveDir)) {
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(receiveDir));
}
}

/**
Expand Down Expand Up @@ -80,21 +92,10 @@ public void receivePart(final long createTime,
}

final FileDescriptor fileDescriptor = new FileDescriptor(createTime, metaId, fileHash);
SequentialFile tempFile = null;
try {
tempFile = fileStore.createTemp();
StreamUtil.streamToFile(inputStream, tempFile.getZip());
fileStore.add(fileDescriptor, tempFile.getZip());

} finally {
// Cleanup.
if (tempFile != null) {
try {
tempFile.delete();
} catch (final IOException e) {
LOGGER.error(e::getMessage, e);
}
}
}
final String receiveFileName = StringIdUtil.idToString(receiveId.incrementAndGet()) +
SequentialFile.ZIP_EXTENSION;
final Path receiveFile = receiveDir.resolve(receiveFileName);
StreamUtil.streamToFile(inputStream, receiveFile);
fileStore.add(fileDescriptor, receiveFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import stroom.planb.impl.db.StatePaths;
import stroom.util.concurrent.UncheckedInterruptedException;
import stroom.util.io.FileUtil;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;

Expand All @@ -19,7 +18,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand All @@ -32,10 +30,7 @@ public class SequentialFileStore {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(SequentialFileStore.class);

private final Path receiveDir;
private final Path stagingDir;

private final AtomicLong tempId = new AtomicLong();
private final AtomicLong storeId = new AtomicLong();

private final Lock lock = new ReentrantLock();
Expand All @@ -48,14 +43,6 @@ public SequentialFileStore(final StatePaths statePaths) {
// Create the root directory
ensureDirExists(statePaths.getRootDir());

// Create the receive directory.
receiveDir = statePaths.getReceiveDir();
if (ensureDirExists(receiveDir)) {
if (!FileUtil.deleteContents(receiveDir)) {
throw new RuntimeException("Unable to delete contents of: " + FileUtil.getCanonicalPath(receiveDir));
}
}

// Create the store directory and initialise the store id.
stagingDir = statePaths.getStagingDir();
if (ensureDirExists(stagingDir)) {
Expand All @@ -74,11 +61,6 @@ public void add(final FileDescriptor fileDescriptor,
add(path);
}

public SequentialFile createTemp() {
final long currentStoreId = tempId.incrementAndGet();
return getTempFileSet(currentStoreId);
}

public SequentialFile awaitNext(final long storeId) {
try {
lock.lockInterruptibly();
Expand Down Expand Up @@ -121,10 +103,6 @@ public SequentialFile awaitNext(final long storeId) {
// return Optional.of(getStoreFileSet(storeId));
// }

private SequentialFile getTempFileSet(final long storeId) {
return SequentialFile.get(receiveDir, storeId, true);
}

private SequentialFile getStoreFileSet(final long storeId) {
return SequentialFile.get(stagingDir, storeId, true);
}
Expand Down

0 comments on commit 6cc7cbd

Please sign in to comment.