Skip to content

Commit

Permalink
[Java] Write message header before mark file header in the `archive-m…
Browse files Browse the repository at this point in the history
…ark.dat` file to be able to use SBE features based on the `actingBlockLength` and `actingVersion`.
  • Loading branch information
vyazelenko authored and DarrylGamroth committed Jan 22, 2025
1 parent b741756 commit 95697b6
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 47 deletions.
177 changes: 138 additions & 39 deletions aeron-archive/src/main/java/io/aeron/archive/ArchiveMarkFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.codecs.mark.MarkFileHeaderDecoder;
import io.aeron.archive.codecs.mark.MarkFileHeaderEncoder;
import io.aeron.archive.codecs.mark.MessageHeaderDecoder;
import io.aeron.archive.codecs.mark.MessageHeaderEncoder;
import io.aeron.archive.codecs.mark.VarAsciiEncodingEncoder;
import org.agrona.CloseHelper;
import org.agrona.IoUtil;
import org.agrona.MarkFile;
import org.agrona.SemanticVersion;
import org.agrona.SystemUtil;
Expand All @@ -30,6 +33,7 @@

import java.io.File;
import java.io.PrintStream;
import java.nio.MappedByteBuffer;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.function.Consumer;
Expand Down Expand Up @@ -79,6 +83,8 @@ public class ArchiveMarkFile implements AutoCloseable
*/
public static final String LINK_FILENAME = "archive-mark.lnk";

private final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
private final MarkFileHeaderDecoder headerDecoder = new MarkFileHeaderDecoder();
private final MarkFileHeaderEncoder headerEncoder = new MarkFileHeaderEncoder();
private final MarkFile markFile;
Expand All @@ -104,35 +110,38 @@ public class ArchiveMarkFile implements AutoCloseable
final EpochClock epochClock,
final long timeoutMs)
{
final boolean markFileExists = file.exists();

markFile = new MarkFile(
file,
markFileExists,
MarkFileHeaderDecoder.versionEncodingOffset(),
MarkFileHeaderDecoder.activityTimestampEncodingOffset(),
totalFileLength,
timeoutMs,
epochClock,
(version) -> validateVersion(file, version),
null);

buffer = markFile.buffer();
if (file.exists())
{
final int headerOffset = headerOffset(file);

errorBuffer = errorBufferLength > 0 ?
new UnsafeBuffer(buffer, HEADER_LENGTH, errorBufferLength) : new UnsafeBuffer(buffer, 0, 0);
final MarkFile markFile = new MarkFile(
file,
true,
headerOffset + MarkFileHeaderDecoder.versionEncodingOffset(),
headerOffset + MarkFileHeaderDecoder.activityTimestampEncodingOffset(),
totalFileLength,
timeoutMs,
epochClock,
(version) -> validateVersion(file, version),
null);

headerEncoder.wrap(buffer, 0);
headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION);
final UnsafeBuffer buffer = markFile.buffer();

if (markFileExists)
{
if (buffer.capacity() != totalFileLength)
{
throw new ArchiveException(
"ArchiveMarkFile capacity=" + buffer.capacity() + " < expectedCapacity=" + totalFileLength);
}

if (0 != headerOffset)
{
headerDecoder.wrapAndApplyHeader(buffer, 0, messageHeaderDecoder);
}
else
{
headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION);
}

final int existingErrorBufferLength = headerDecoder.errorBufferLength();
if (existingErrorBufferLength > 0)
{
Expand All @@ -142,9 +151,51 @@ public class ArchiveMarkFile implements AutoCloseable
saveExistingErrors(file, existingErrorBuffer, CommonContext.fallbackLogger());
existingErrorBuffer.setMemory(0, existingErrorBufferLength, (byte)0);
}

if (0 != headerOffset)
{
this.markFile = markFile;
this.buffer = buffer;
}
else
{
CloseHelper.close(markFile);
this.markFile = new MarkFile(
file,
false,
MessageHeaderDecoder.ENCODED_LENGTH + MarkFileHeaderDecoder.versionEncodingOffset(),
MessageHeaderDecoder.ENCODED_LENGTH + MarkFileHeaderDecoder.activityTimestampEncodingOffset(),
totalFileLength,
timeoutMs,
epochClock,
null,
null);
this.buffer = markFile.buffer();
this.buffer.setMemory(0, this.buffer.capacity(), (byte)0);
}
}
else
{
markFile = new MarkFile(
file,
false,
MessageHeaderDecoder.ENCODED_LENGTH + MarkFileHeaderDecoder.versionEncodingOffset(),
MessageHeaderDecoder.ENCODED_LENGTH + MarkFileHeaderDecoder.activityTimestampEncodingOffset(),
totalFileLength,
timeoutMs,
epochClock,
null,
null);
buffer = markFile.buffer();
}

headerEncoder.pid(SystemUtil.getPid());
headerEncoder
.wrapAndApplyHeader(buffer, 0, messageHeaderEncoder)
.pid(SystemUtil.getPid());

headerDecoder.wrapAndApplyHeader(buffer, 0, messageHeaderDecoder);

errorBuffer = new UnsafeBuffer(buffer, HEADER_LENGTH, errorBufferLength);
}

/**
Expand Down Expand Up @@ -190,31 +241,32 @@ public ArchiveMarkFile(
final IntConsumer versionCheck,
final Consumer<String> logger)
{
this(new MarkFile(
directory,
filename,
MarkFileHeaderDecoder.versionEncodingOffset(),
MarkFileHeaderDecoder.activityTimestampEncodingOffset(),
timeoutMs,
epochClock,
versionCheck,
logger));
this(openExistingMarkFile(directory, filename, epochClock, timeoutMs, versionCheck, logger));
}

ArchiveMarkFile(final MarkFile markFile)
{
this.markFile = markFile;

buffer = markFile.buffer();
headerEncoder.wrap(buffer, 0);
headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION);

final int schemaVersion = 0 != headerDecoder.codecSchemaVersion() ? headerDecoder.codecSchemaVersion() :
(headerDecoder.headerLength() > 0 ? 1 : 0);
final int blockLength = 0 != headerDecoder.codecBlockLength() ? headerDecoder.codecBlockLength() : 128;
headerDecoder.wrap(buffer, 0, blockLength, schemaVersion);
if (0 != headerOffset(buffer))
{
headerEncoder.wrap(buffer, MessageHeaderDecoder.ENCODED_LENGTH);
headerDecoder.wrapAndApplyHeader(buffer, 0, messageHeaderDecoder);
}
else
{
headerDecoder.wrap(buffer, 0, MarkFileHeaderDecoder.BLOCK_LENGTH, MarkFileHeaderDecoder.SCHEMA_VERSION);

errorBuffer = schemaVersion >= MarkFileHeaderDecoder.headerLengthSinceVersion() ?
// determine the actual sbe schema version used
final int actingBlockLength = 128;
final int actingVersion = headerDecoder.headerLength() > 0 ? 1 : 0;
headerDecoder.wrap(buffer, 0, actingBlockLength, actingVersion);
headerEncoder.wrap(buffer, 0);
}

errorBuffer = headerDecoder.headerLength() > 0 ?
new UnsafeBuffer(buffer, headerDecoder.headerLength(), headerDecoder.errorBufferLength()) :
new UnsafeBuffer(buffer, 0, 0);
}
Expand Down Expand Up @@ -351,6 +403,7 @@ public void force()
private static int alignedTotalFileLength(final Archive.Context ctx)
{
final int headerLength =
MessageHeaderDecoder.ENCODED_LENGTH +
MarkFileHeaderEncoder.BLOCK_LENGTH +
(4 * VarAsciiEncodingEncoder.lengthEncodingLength()) +
(null != ctx.controlChannel() ? ctx.controlChannel().length() : 0) +
Expand All @@ -376,11 +429,14 @@ String aeronDirectory()
return headerDecoder.aeronDirectory();
}

UnsafeBuffer buffer()
{
return buffer;
}

private void encode(final Archive.Context ctx)
{
headerEncoder
.codecSchemaVersion(headerEncoder.sbeSchemaVersion())
.codecBlockLength(headerEncoder.sbeBlockLength())
.startTimestamp(ctx.epochClock().time())
.controlStreamId(ctx.controlStreamId())
.localControlStreamId(ctx.localControlStreamId())
Expand All @@ -404,6 +460,49 @@ private static void validateVersion(final File markFile, final int version)
}
}

private static int headerOffset(final File file)
{
final MappedByteBuffer mappedByteBuffer = IoUtil.mapExistingFile(file, FILENAME);
try
{
final UnsafeBuffer unsafeBuffer =
new UnsafeBuffer(mappedByteBuffer, 0, MessageHeaderDecoder.ENCODED_LENGTH);
return headerOffset(unsafeBuffer);
}
finally
{
IoUtil.unmap(mappedByteBuffer);
}
}

private static MarkFile openExistingMarkFile(
final File directory,
final String filename,
final EpochClock epochClock,
final long timeoutMs,
final IntConsumer versionCheck,
final Consumer<String> logger)
{
final int headerOffset = headerOffset(new File(directory, filename));
return new MarkFile(
directory,
filename,
headerOffset + MarkFileHeaderDecoder.versionEncodingOffset(),
headerOffset + MarkFileHeaderDecoder.activityTimestampEncodingOffset(),
timeoutMs,
epochClock,
versionCheck,
logger);
}

private static int headerOffset(final UnsafeBuffer headerBuffer)
{
final MessageHeaderDecoder decoder = new MessageHeaderDecoder();
decoder.wrap(headerBuffer, 0);
return MarkFileHeaderDecoder.TEMPLATE_ID == decoder.templateId() &&
MarkFileHeaderDecoder.SCHEMA_ID == decoder.schemaId() ? MessageHeaderDecoder.ENCODED_LENGTH : 0;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
blockLength="128"
description="Mark file header.">
<field name="version" id="1" type="int32"/>
<field name="codecSchemaVersion" id="14" type="uint16" sinceVersion="2"/>
<field name="codecBlockLength" id="15" type="uint16" sinceVersion="2"/>
<field name="activityTimestamp" id="2" type="time_t" offset="8"/>
<field name="startTimestamp" id="3" type="time_t"/>
<field name="pid" id="4" type="int64"/>
Expand All @@ -41,7 +39,7 @@
<field name="eventsStreamId" id="7" type="int32"/>
<field name="headerLength" id="8" type="length_t" sinceVersion="1"/>
<field name="errorBufferLength" id="9" type="length_t" sinceVersion="1"/>
<field name="archiveId" id="16" type="archiveId_t" offset="56" sinceVersion="2"/>
<field name="archiveId" id="14" type="archiveId_t" offset="56" sinceVersion="2"/>
<data name="controlChannel" id="10" type="varAsciiEncoding"/>
<data name="localControlChannel" id="11" type="varAsciiEncoding"/>
<data name="eventsChannel" id="12" type="varAsciiEncoding"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.mark.MarkFileHeaderDecoder;
import io.aeron.archive.codecs.mark.MarkFileHeaderEncoder;
import io.aeron.archive.codecs.mark.MessageHeaderDecoder;
import io.aeron.driver.MediaDriver;
import io.aeron.exceptions.DriverTimeoutException;
import io.aeron.test.TestContexts;
Expand Down Expand Up @@ -109,14 +110,15 @@ void anErrorOnStartupShouldNotLeaveAnUninitilisedMarkFile(final @TempDir File te

final File aeronDir = new File(tempDir, "aeron");
final File archiveDir = new File(tempDir, "archive_dir");
final File archiveMarkFile = new File(archiveDir, ArchiveMarkFile.FILENAME);
final File markFileDir = new File(tempDir, "mark/file/dir");
final File archiveMarkFile = new File(markFileDir, ArchiveMarkFile.FILENAME);

final MediaDriver.Context driverContext = new MediaDriver.Context()
.aeronDirectoryName(aeronDir.getAbsolutePath());
final Archive.Context archiveContext = TestContexts.localhostArchive()
.aeronDirectoryName(driverContext.aeronDirectoryName())
.archiveDir(archiveDir)
.markFileDir(archiveMarkFile.getParentFile())
.markFileDir(markFileDir)
.epochClock(SystemEpochClock.INSTANCE);

// Force an error on startup by attempting to start an archive without a media driver.
Expand Down Expand Up @@ -309,7 +311,7 @@ void shouldBeAbleToReadOldMarkFileV1(final @TempDir File tempDir) throws IOExcep
}

final Archive.Context context = new Archive.Context()
.archiveDir(new File(tempDir, "archive"))
.archiveDir(new File(tempDir, "archive/a/long/path/to"))
.markFileDir(file.getParent().toFile())
.aeronDirectoryName(aeronDirectory)
.errorBufferLength(errorBufferLength);
Expand All @@ -334,9 +336,14 @@ private static void shouldEncodeMarkFileFromArchiveContext(final Archive.Context
{
archiveMarkFile.signalReady();

final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
messageHeaderDecoder.wrap(archiveMarkFile.buffer(), 0);
assertEquals(MarkFileHeaderDecoder.BLOCK_LENGTH, messageHeaderDecoder.blockLength());
assertEquals(MarkFileHeaderDecoder.SCHEMA_ID, messageHeaderDecoder.schemaId());
assertEquals(MarkFileHeaderDecoder.TEMPLATE_ID, messageHeaderDecoder.templateId());
assertEquals(MarkFileHeaderDecoder.SCHEMA_VERSION, messageHeaderDecoder.version());

assertEquals(ArchiveMarkFile.SEMANTIC_VERSION, archiveMarkFile.decoder().version());
assertEquals(archiveMarkFile.encoder().sbeSchemaVersion(), archiveMarkFile.decoder().codecSchemaVersion());
assertEquals(archiveMarkFile.encoder().sbeBlockLength(), archiveMarkFile.decoder().codecBlockLength());
assertEquals(epochClock.time(), archiveMarkFile.decoder().startTimestamp());
assertEquals(SystemUtil.getPid(), archiveMarkFile.decoder().pid());
assertEquals(ctx.controlStreamId(), archiveMarkFile.decoder().controlStreamId());
Expand Down

0 comments on commit 95697b6

Please sign in to comment.