forked from atx/kvak
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.cpp
355 lines (297 loc) · 8.33 KB
/
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
#include <argp.h>
#include <complex>
#include <cstdio>
#include <experimental/filesystem>
#include <fcntl.h>
#include <iostream>
#include <mutex>
#include <thread>
#include <sys/types.h>
#include <sys/stat.h>
#ifdef WITH_OPENMP
#include <omp.h>
#endif
#include "agc.hpp"
#include "demodulator.hpp"
#include "log.hpp"
#include "main.hpp"
#include "server.hpp"
#include "utils.hpp"
namespace filesystem = std::experimental::filesystem;
// Implementation of channel -- this should probably get moved to another file TODO
namespace kvak {
channel::~channel()
{
if (this->file != nullptr) {
std::fclose(this->file);
}
}
float channel::get_power()
{
return this->agc_all.get_channel_power(this->id);
}
void channel::mute(bool m)
{
this->muted= m;
}
bool channel::is_muted()
{
return this->muted;
}
void channel::push_sample(std::complex<float> sample)
{
std::optional<std::uint8_t> ret = this->demod.push_sample(sample);
if (!ret) {
return;
}
std::uint8_t val = ret.value();
// TODO: Do we want not-unpacking back?
this->out_data[this->out_counter++] =
kvak::utils::bit(val, 1);
this->out_data[this->out_counter++] =
kvak::utils::bit(val, 0);
}
void channel::flush()
{
unsigned int to_write = this->out_counter;
this->out_counter = 0; // Whatever happens, we have to drop the buffer
// This is done to prevent blocking on channels which don't have any other
// side reading from the FIFO - useful for debugging/tetra-rx crashing etc.
if (this->file == nullptr) {
// Note that O_CREAT should only apply for non-FIFO mode, as FIFOs
// are alreadt created
int fd = open(this->output_path.c_str(), O_WRONLY | O_NONBLOCK | O_CREAT | O_TRUNC, 0660);
if (fd < 0) {
// There is noone at the reading end, try next time
if (errno != ENXIO) {
kvak::log::info << "Failed to open " << kvak::log::perror;
}
return;
}
kvak::log::debug << "Opened file " << this->output_path << " for output";
this->file = fdopen(fd, "wb");
}
// If the other side hung up, we are going to get EPIPE here
std::fwrite(
this->out_data.data(),
sizeof(this->out_data[0]),
to_write,
this->file
);
}
}
// argp stuff
const char *argp_program_version = "kvak 0.1";
const char *argp_program_bug_address = "Josef Gajdusek <atx@atx.name>";
static char argp_doc[] = "pi/4-DQPSK multi-channel demodulator";
struct arguments {
arguments()
:
nchannels(1),
input_path(""),
output_path(""),
fifo_mode(false),
chunk_size(1024),
bind("127.0.0.1:6677"),
loop(false),
force_single_file(false),
verbose(false),
muted(false)
{
}
enum arg_ids {
LOOP = 1001,
SINGLE_FILE = 1002,
};
unsigned int nchannels;
filesystem::path input_path;
filesystem::path output_path;
bool fifo_mode;
std::size_t chunk_size;
std::string bind;
bool loop;
bool force_single_file;
bool verbose;
bool muted;
};
static struct argp_option argp_options[] = {
{ "nchannels", 'n', "NCHANNELS", 0, "Number of channels", 0 },
{ "input", 'i', "INPUT", 0, "Input file path", 0 },
{ "output", 'o', "OUTPUT", 0,
"Output file paths (use %d for channel number substitution)", 0 },
{ "fifo", 'f', nullptr, 0,
"Explicitly create output FIFOs instead of files", 0 },
{ "chunk-size", 'c', "CHUNK", 0, "Chunk size", 0 },
{ "bind", 'b', "ADDR", 0, "Bind to ADDR:PORT", 0 },
{ "loop", arguments::arg_ids::LOOP,
nullptr, 0, "Loop the input file", 0 },
{ "force-single-file", arguments::arg_ids::SINGLE_FILE,
nullptr, 0, "Force output to a single file for multiple channels", 0 },
{ "muted", 'm', nullptr, 0, "Start all channels muted", 0 },
{ "verbose", 'v', nullptr, 0, "Enable verbose debugging", 0 },
{ nullptr, 0, nullptr, 0, nullptr, 0 },
};
static error_t parse_opt(int key, char *arg_, struct argp_state *state)
{
arguments *args = static_cast<arguments *>(state->input);
std::string arg(arg_ != nullptr ? arg_ : "");
#define FAIL(with, ...) argp_failure(state, EXIT_FAILURE, 0, with, ##__VA_ARGS__)
switch (key) {
case 'n':
args->nchannels = std::stoul(arg);
break;
case 'i':
args->input_path = arg;
break;
case 'o':
args->output_path = arg;
break;
case 'f':
args->fifo_mode = true;
break;
case 'c':
args->chunk_size = std::stoul(arg);
if (args->chunk_size == 0) {
FAIL("Invalid chunk size specified (0)");
}
break;
case 'b':
args->bind = arg;
// TODO: Check the argument maybe?
break;
case arguments::arg_ids::LOOP:
args->loop = true;
break;
case arguments::arg_ids::SINGLE_FILE:
args->force_single_file = true;
break;
case 'v':
args->verbose = true;
break;
case 'm':
args->muted = true;
break;
case ARGP_KEY_END:
if (!args->input_path.has_filename()) {
FAIL("No input path specified");
}
if (!args->output_path.has_filename()) {
FAIL("No output path specified");
}
if (args->nchannels > 1 && !args->force_single_file &&
args->output_path.string().find("%d") == std::string::npos) {
FAIL("No channel number placeholder (%%d) specified in the output path string '%s'",
args->output_path.c_str());
}
}
#undef FAIL
return 0;
}
static argp argp_parser = {
.options = argp_options,
.parser = parse_opt,
.args_doc = nullptr,
.doc = argp_doc,
.children = nullptr,
.help_filter = nullptr,
.argp_domain = nullptr,
};
int main(int argc, char *argv[])
{
struct arguments args;
argp_parse(&argp_parser, argc, argv, 0, nullptr, &args);
if (!args.verbose) {
kvak::log::debug.mute();
}
// Setup buffers
std::vector<std::complex<float>> input_buffer(args.chunk_size * args.nchannels);
// Open input file - note that we can block for a considerable amount
// of time if this is a FIFO
std::FILE *input_file = std::fopen(args.input_path.c_str(), "r");
if (input_file == nullptr) {
kvak::log::error << "Failed to open the input file " << args.input_path
<< kvak::log::perror;
return EXIT_FAILURE;
}
kvak::agc agc(args.nchannels);
std::vector<kvak::channel> channels;
// Open output files (mkfifo if necesarry, again, we can block here in
// that case until the other side connects)
for (unsigned int n = 0; n < args.nchannels; n++) {
std::string name = kvak::utils::replace_first(
args.output_path, "%d", std::to_string(n));
if (args.fifo_mode && !filesystem::is_fifo(name)) {
int ret = mkfifo(name.c_str(), S_IRUSR | S_IWUSR);
if (ret) {
kvak::log::error << "Failed to create FIFO " << name
<< kvak::log::perror;
return EXIT_FAILURE;
}
}
channels.push_back(kvak::channel(n, agc, name, args.chunk_size * 2, args.muted));
}
kvak::log::info << "Opened files " << args.output_path << " [0-"
<< (args.nchannels - 1) << "]";
// Start up the server
std::mutex server_mtx; // We have one global mutex for all channels
std::optional<std::thread> server_thread;
if (args.bind != "false") {
server_thread = std::thread([&] () {
kvak::server::server(args.bind, channels, server_mtx);
});
}
#ifdef WITH_OPENMP
kvak::log::info << "Running with " << omp_get_max_threads() << " OpenMP threads";
#endif
while (true) {
// Note that the input floats are read in _machine byte order_
// This is intentional
std::size_t len = std::fread(
input_buffer.data(),
sizeof(input_buffer[0]),
input_buffer.size(),
input_file
);
if (len == 0) {
if (std::feof(input_file)) {
if (args.loop) {
std::fseek(input_file, 0, SEEK_SET);
kvak::log::debug << "Looping";
continue;
}
kvak::log::info << "EOF reached, quitting";
break;
}
kvak::log::error << "Read error, quitting...";
break;
}
agc.push_samples(input_buffer.data(), len / args.nchannels);
std::lock_guard<std::mutex> lock(server_mtx);
#pragma omp parallel for
for (unsigned int n = 0; n < args.nchannels; n++) {
auto iter = input_buffer.cbegin() + n;
kvak::channel &ch = channels[n];
if (ch.is_muted()) {
// TODO: As the running set is not updated very often, it's
// probably for the best to make a separate "running" list
// instead of skipping like this
continue;
}
for (unsigned int i = 0; i < len / args.nchannels; i++) {
ch.push_sample(*iter);
iter += args.nchannels;
}
}
for (auto &ch : channels) {
ch.flush();
}
if (len % args.nchannels != 0) {
// At this point, we either quit after the next read or loop if --loop
// got passed
kvak::log::error << "Dropping short read of " << (len % args.nchannels);
}
}
if (server_thread) {
// TODO: Kill the thread somehow
}
}