-
Notifications
You must be signed in to change notification settings - Fork 900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aeron Archive - potential bug extend recordings on same channel different stream id #1719
Comments
@matteo-gsr If you record a stream with multiple publisher (multiple images on the subcriber side) then Archive will create a recording for each publisher, because recordings are session-based. |
@vyazelenko - I have a single publisher for each unique combination of channel/stream_id. The archive is the only consumer with simulate spies subscription on. |
Can you post a channel uri for the publisher and subscriber and the code snippet how you start the recording? |
publisher1: channel=aeron:udp?control-mode=dynamic|control=127.0.0.1:4002, streamId=11004 on first start:
however, archive tool shows three recordings, which is wrong:
|
and the archive logs too, three recordings, which is wrong. It doesn't happen if I set a different channel for the 2nd publisher.
|
final AvailableImageHandler handler = (image) -> taskQueue.addLast(() -> startRecordingSession(
controlSession, correlationId, strippedChannel, originalChannel, image, autoStop)); could it be that the image handler is calling that twice because it's the same channel? |
The issue here is that there are two session for one of the streams:
Here |
but why two recordings for 11004 and only one for 11005? it doesn't make much sense. If you look at the archive logs, I am creating two recordings, one for 11004 and one for 11005 but the archive is creating three. |
there's even a duplicate correlation id - doesn't make sense. And both correlation ids have a duplicate subscription id.
|
final AvailableImageHandler handler = (image) ->
{
System.err.println("imageStreamId="
+ image.subscription().streamId()
+ "|recordingStreamId=" + streamId);
System.err.flush();
taskQueue.addLast(() -> startRecordingSession(
controlSession, correlationId, strippedChannel, originalChannel, image, autoStop));
}; I have added this. You can clearly see from the logs:
here we start the recording for channel aeron:udp?control-mode=dynamic|control=127.0.0.1:4002, stream id 11004
perform some query just to see if there isn't an existing recording, none is found, as expected.
here we start the recording for aeron:udp?control-mode=dynamic|control=127.0.0.1:4002, stream id 11005
archive prints that log - which is related to the first recording. archive is processing 2nd recording here:
archive prints again 1004 and 1005. It should only print 1005.
|
One more thing, if I switch to AERON_ARCHIVE_SOURCE_LOCATION_REMOTE, it doesn't happen, it works as expected. So it seems to be related when one uses spy and |
And one more thing - it's only happening with the C media driver. The Java media driver doesn't show the issue. Just tested. So the behaviour is not aligned between the two. So to cut a long story short, the issue is showing when:
even though you asked to record two publications, three recordings are erroneously created. |
you can reproduce it with this simple example: #if defined(__linux__)
#define _BSD_SOURCE
#define _GNU_SOURCE
#endif
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <inttypes.h>
#include <string.h>
#if !defined(_MSC_VER)
#include <unistd.h>
#endif
#include "aeronc.h"
#include "concurrent/aeron_atomic.h"
#include "util/aeron_strutil.h"
#include "util/aeron_parse_util.h"
#include "aeron_agent.h"
#include "aeron_alloc.h"
#include "aeron_client.h"
#include "aeron_archive.h"
#include "aeron_archive_client.h"
#include "aeron_archive_context.h"
int main(int argc, char **argv)
{
int status = EXIT_FAILURE, opt;
aeron_context_t *context = NULL;
aeron_t *aeron = NULL;
const char *aeron_dir = "/tmp/aeron";
if (aeron_context_init(&context) < 0)
{
fprintf(stderr, "aeron_context_init: %s\n", aeron_errmsg());
goto cleanup;
}
if (NULL != aeron_dir)
{
if (aeron_context_set_dir(context, aeron_dir) < 0)
{
fprintf(stderr, "aeron_context_set_dir: %s\n", aeron_errmsg());
goto cleanup;
}
}
if (aeron_init(&aeron, context) < 0)
{
fprintf(stderr, "aeron_init: %s\n", aeron_errmsg());
goto cleanup;
}
if (aeron_start(aeron) < 0)
{
fprintf(stderr, "aeron_start: %s\n", aeron_errmsg());
goto cleanup;
}
const char *channel_1 = "aeron:udp?control-mode=dynamic|control=127.0.0.1:4002";
int32_t stream_id_1 = 11004;
const char *channel_2 = "aeron:udp?control-mode=dynamic|control=127.0.0.1:4002";
int32_t stream_id_2 = 11005;
aeron_archive_context_t *archive_ctx;
aeron_archive_async_connect_t *archive_async;
aeron_archive_t *archive = NULL;
int m_idle_duration_ns = 1000000;
aeron_archive_context_init(&archive_ctx);
aeron_archive_context_set_aeron(archive_ctx, aeron);
aeron_archive_context_set_control_request_channel(archive_ctx, "aeron:udp?endpoint=127.0.0.1:7001");
aeron_archive_context_set_control_response_channel(archive_ctx, "aeron:udp?endpoint=127.0.0.1:0");
fprintf(stderr, "connecting to archive");
aeron_archive_async_connect(&archive_async, archive_ctx);
while (NULL == archive) {
if (aeron_archive_async_connect_poll(&archive, archive_async) < 0) {
fprintf(stderr, "aeron_archive_async_connect_poll: %s\n", aeron_errmsg());
goto cleanup;
}
sched_yield();
}
fprintf(stderr, "connected to archive");
// start the recording on the first publication
int64_t subscription_id_1;
if (aeron_archive_start_recording(&subscription_id_1, archive, channel_1, stream_id_1, AERON_ARCHIVE_SOURCE_LOCATION_LOCAL, true) < 0) {
fprintf(stderr, "aeron_archive_start_recording: %s\n", aeron_errmsg());
} else {
fprintf(stdout, "aeron_archive_start_recording: started\n");
}
// start the recording on the second publication
int64_t subscription_id_2;
if (aeron_archive_start_recording(&subscription_id_2, archive, channel_2, stream_id_2, AERON_ARCHIVE_SOURCE_LOCATION_LOCAL, true) < 0) {
fprintf(stderr, "aeron_archive_start_recording: %s\n", aeron_errmsg());
} else {
fprintf(stdout, "aeron_archive_start_recording: started\n");
}
// add the first exclusive publication now
aeron_async_add_exclusive_publication_t *async_1 = NULL;
aeron_exclusive_publication_t *publication_1 = NULL;
if (aeron_async_add_exclusive_publication(&async_1, aeron, channel_1, stream_id_1) < 0)
{
fprintf(stderr, "aeron_async_add_exclusive_publication: %s\n", aeron_errmsg());
goto cleanup;
}
while (NULL == publication_1)
{
if (aeron_async_add_exclusive_publication_poll(&publication_1, async_1) < 0)
{
fprintf(stderr, "aeron_async_add_exclusive_publication_poll: %s\n", aeron_errmsg());
goto cleanup;
}
sched_yield();
}
// add the second exclusive publication now
aeron_async_add_exclusive_publication_t *async_2 = NULL;
aeron_exclusive_publication_t *publication_2 = NULL;
if (aeron_async_add_exclusive_publication(&async_2, aeron, channel_2, stream_id_2) < 0)
{
fprintf(stderr, "aeron_async_add_exclusive_publication: %s\n", aeron_errmsg());
goto cleanup;
}
while (NULL == publication_2)
{
if (aeron_async_add_exclusive_publication_poll(&publication_2, async_2) < 0)
{
fprintf(stderr, "aeron_async_add_exclusive_publication_poll: %s\n", aeron_errmsg());
goto cleanup;
}
sched_yield();
}
while (true) {
sleep(1);
}
status = EXIT_SUCCESS;
cleanup:
aeron_exclusive_publication_close(publication_1, NULL, NULL);
aeron_exclusive_publication_close(publication_2, NULL, NULL);
aeron_archive_close(archive);
aeron_archive_context_close(archive_ctx);
aeron_close(aeron);
aeron_context_close(context);
return status;
} run the C media driver as follows:
and then the archive as follows:
Run the archive tool, you would expect two recordings to be created. You'll be surprised!
|
if you move the add publications before the start recording, the issue doesn't happen. Eager to hear what you think the problem might be. |
@matteo-gsr There seems to be a bug in the C media driver around spy matching logic. I was able to reproduce the issue without an Archive. |
I believe I've found the issue and have opened a PR: #1722 |
Fixed with #1722. |
There seems to be a potential bug in the archive if you try extending multiple exclusive publications on the same channel but different streams. The archive seems to be creating a duplicate recording out of nowhere and the extend fails with
cannot extend active recording
. As we are trying to use extend, we first query the archive for existing recordings but we unexpectedly get two recordings for a channel/stream, which should never be the case.I see in the logs it's starting a recording for the same same subscription id twice but there are two different recordings in there. Something is up.
Note I am using the Archive C client.
The text was updated successfully, but these errors were encountered: