Skip to content

Commit

Permalink
feat: remove orphan objects if copy fails at any point
Browse files Browse the repository at this point in the history
Given that there are at least 3 objects uploaded per segment, and it could fail at any stage, the plugin should guard that the chances for orphan objects is minimal.
  • Loading branch information
jeqo committed Apr 5, 2024
1 parent cd4434c commit cdffc7f
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadat
maybeEncryptionKey,
customMetadataBuilder);
} catch (final Exception e) {
try {
// best effort on removing orphan files
tryDeleteSegmentObjects(remoteLogSegmentMetadata);
} catch (final Exception ignored) {
// ignore all exceptions
}
throw new RemoteStorageException(e);
}

Expand Down Expand Up @@ -342,7 +348,7 @@ private TransformChunkEnumeration transformation(
return transformEnum;
}

private SegmentIndexesV1 uploadIndexes(
SegmentIndexesV1 uploadIndexes(
final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final LogSegmentData segmentData,
final DataKeyAndAAD maybeEncryptionKey,
Expand Down Expand Up @@ -661,10 +667,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment
final long startedMs = time.milliseconds();

try {
final Set<ObjectKey> keys = Arrays.stream(ObjectKeyFactory.Suffix.values())
.map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s))
.collect(Collectors.toSet());
deleter.delete(keys);
tryDeleteSegmentObjects(remoteLogSegmentMetadata);
} catch (final Exception e) {
metrics.recordSegmentDeleteError(remoteLogSegmentMetadata.remoteLogSegmentId()
.topicIdPartition().topicPartition());
Expand All @@ -678,6 +681,15 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment
log.info("Deleting log segment data for completed successfully {}", remoteLogSegmentMetadata);
}

private void tryDeleteSegmentObjects(
final RemoteLogSegmentMetadata remoteLogSegmentMetadata
) throws StorageBackendException {
final Set<ObjectKey> keys = Arrays.stream(ObjectKeyFactory.Suffix.values())
.map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s))
.collect(Collectors.toSet());
deleter.delete(keys);
}

@Override
public void close() {
metrics.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
Expand All @@ -36,6 +40,7 @@
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -45,8 +50,12 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

class RemoteStorageManagerTest {
Expand Down Expand Up @@ -221,6 +230,85 @@ void fetchSegmentNonInterruptionExceptionWhenGettingSegment(
.hasRootCauseInstanceOf(exceptionClass);
}

@Test
void deleteObjectsWhenUploadFails(
@TempDir final Path partitionDir
) throws IOException, StorageBackendException, RemoteStorageException {
// given a sample local segment to be uploaded
final var segmentPath = Files.createFile(partitionDir.resolve("0000.log"));
final var segmentContent = "test";
Files.writeString(segmentPath, segmentContent);
final var indexPath = Files.createFile(partitionDir.resolve("0000.index"));
final var timeIndexPath = Files.createFile(partitionDir.resolve("0000.timeindex"));
final var producerSnapshotPath = Files.createFile(partitionDir.resolve("0000.snapshot"));
final var logSegmentData = new LogSegmentData(
segmentPath,
indexPath,
timeIndexPath,
Optional.empty(),
producerSnapshotPath,
ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8))
);

final var remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(
REMOTE_SEGMENT_ID, 0, 1L,
0, 0, 0, segmentContent.length(), Map.of(0, 0L));

final var remotePartitionPath = targetDir.resolve(TOPIC_ID_PARTITION.topic() + "-" + TOPIC_ID)
.resolve(String.valueOf(TOPIC_ID_PARTITION.partition()));

final var config = Map.of(
"chunk.size", "1",
"storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage",
"storage.root", targetDir.toString()
);
rsm.configure(config);
rsm = spy(rsm);

// when first upload fails
doThrow(IOException.class).when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any());

assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData))
.isInstanceOf(RemoteStorageException.class)
.hasRootCauseInstanceOf(IOException.class);

// then no files stored in remote
assertThat(remotePartitionPath).doesNotExist();

// fallback to real method
doCallRealMethod().when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any());

// when second upload fails
doThrow(IOException.class).when(rsm).uploadIndexes(any(), any(), any(), any());

assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData))
.isInstanceOf(RemoteStorageException.class)
.hasRootCauseInstanceOf(IOException.class);

// then no files stored in remote
assertThat(remotePartitionPath).doesNotExist();

// fallback to real method
doCallRealMethod().when(rsm).uploadIndexes(any(), any(), any(), any());

// when third upload fails
doThrow(IOException.class).when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any());

assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData))
.isInstanceOf(RemoteStorageException.class)
.hasRootCauseInstanceOf(IOException.class);

// then no files stored in remote
assertThat(remotePartitionPath).doesNotExist();

// fallback to real method
doCallRealMethod().when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any());

// when all good
rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
assertThat(Files.list(remotePartitionPath)).hasSize(3);
}

static Stream<Arguments> provideNonInterruptionExceptions() {
return Stream.of(
arguments(null, Exception.class),
Expand Down

0 comments on commit cdffc7f

Please sign in to comment.