diff --git a/plugins/common/adx/adx_commons.go b/plugins/common/adx/adx_commons.go new file mode 100644 index 0000000000000..594d7bb287847 --- /dev/null +++ b/plugins/common/adx/adx_commons.go @@ -0,0 +1,286 @@ +package adx_commons + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/Azure/azure-kusto-go/kusto" + kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" + + "github.com/Azure/azure-kusto-go/kusto/ingest" + "github.com/Azure/azure-kusto-go/kusto/kql" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/plugins/serializers/json" +) + +var sampleConfig string + +type AzureDataExplorer struct { + Endpoint string `toml:"endpoint_url"` + Database string `toml:"database"` + Log telegraf.Logger `toml:"-"` + Timeout config.Duration `toml:"timeout"` + MetricsGrouping string `toml:"metrics_grouping_type"` + TableName string `toml:"table_name"` + CreateTables bool `toml:"create_tables"` + IngestionType string `toml:"ingestion_type"` + serializer telegraf.Serializer + kustoClient *kusto.Client + metricIngestors map[string]ingest.Ingestor + AppName string +} + +const ( + tablePerMetric = "tablepermetric" + singleTable = "singletable" + // These control the amount of memory we use when ingesting blobs + bufferSize = 1 << 20 // 1 MiB + maxBuffers = 5 +) + +const managedIngestion = "managed" +const queuedIngestion = "queued" + +func (*AzureDataExplorer) SampleConfig() string { + return sampleConfig +} + +// Initialize the client and the ingestor +func (adx *AzureDataExplorer) Connect() error { + conn := kusto.NewConnectionStringBuilder(adx.Endpoint).WithDefaultAzureCredential() + // Since init is called before connect, we can set the connector details here including the type. This will be used for telemetry and tracing. + conn.SetConnectorDetails("Telegraf", internal.ProductToken(), adx.AppName, "", false, "") + client, err := kusto.New(conn) + if err != nil { + return err + } + adx.kustoClient = client + adx.metricIngestors = make(map[string]ingest.Ingestor) + + return nil +} + +func (adx *AzureDataExplorer) SetSerializer(serializer telegraf.Serializer) { + adx.serializer = serializer +} + +// Clean up and close the ingestor +func (adx *AzureDataExplorer) Close() error { + var errs []error + for _, v := range adx.metricIngestors { + if err := v.Close(); err != nil { + // accumulate errors while closing ingestors + errs = append(errs, err) + } + } + if err := adx.kustoClient.Close(); err != nil { + errs = append(errs, err) + } + + adx.kustoClient = nil + adx.metricIngestors = nil + + if len(errs) == 0 { + adx.Log.Info("Closed ingestors and client") + return nil + } + // Combine errors into a single object and return the combined error + return kustoerrors.GetCombinedError(errs...) +} + +func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error { + if adx.MetricsGrouping == tablePerMetric { + return adx.writeTablePerMetric(metrics) + } + return adx.writeSingleTable(metrics) +} + +func (adx *AzureDataExplorer) writeTablePerMetric(metrics []telegraf.Metric) error { + tableMetricGroups := make(map[string][]byte) + // Group metrics by name and serialize them + for _, m := range metrics { + tableName := m.Name() + metricInBytes, err := adx.serializer.Serialize(m) + if err != nil { + return err + } + if existingBytes, ok := tableMetricGroups[tableName]; ok { + tableMetricGroups[tableName] = append(existingBytes, metricInBytes...) + } else { + tableMetricGroups[tableName] = metricInBytes + } + } + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout)) + defer cancel() + + // Push the metrics for each table + format := ingest.FileFormat(ingest.JSON) + for tableName, tableMetrics := range tableMetricGroups { + if err := adx.pushMetrics(ctx, format, tableName, tableMetrics); err != nil { + return err + } + } + + return nil +} + +func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error { + // serialise each metric in metrics - store in byte[] + metricsArray := make([]byte, 0) + for _, m := range metrics { + metricsInBytes, err := adx.serializer.Serialize(m) + if err != nil { + return err + } + metricsArray = append(metricsArray, metricsInBytes...) + } + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout)) + defer cancel() + + // push metrics to a single table + format := ingest.FileFormat(ingest.JSON) + err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray) + return err +} + +func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error { + var metricIngestor ingest.Ingestor + var err error + + metricIngestor, err = adx.getMetricIngestor(ctx, tableName) + if err != nil { + return err + } + + length := len(metricsArray) + adx.Log.Debugf("Writing %d metrics to table %q", length, tableName) + reader := bytes.NewReader(metricsArray) + mapping := ingest.IngestionMappingRef(tableName+"_mapping", ingest.JSON) + if metricIngestor != nil { + if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil { + adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) + } + } + return nil +} + +func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) { + ingestor := adx.metricIngestors[tableName] + + if ingestor == nil { + if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil { + return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err) + } + // create a new ingestor client for the table + tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType) + if err != nil { + return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err) + } + adx.metricIngestors[tableName] = tempIngestor + adx.Log.Debugf("Ingestor for table %s created", tableName) + ingestor = tempIngestor + } + return ingestor, nil +} + +func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error { + if !adx.CreateTables { + adx.Log.Info("skipped table creation") + return nil + } + + if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableCommand(tableName)); err != nil { + return err + } + + if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingCommand(tableName)); err != nil { + return err + } + + return nil +} + +func (adx *AzureDataExplorer) Init() error { + if adx.Endpoint == "" { + return errors.New("endpoint configuration cannot be empty") + } + if adx.Database == "" { + return errors.New("database configuration cannot be empty") + } + + adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping) + if adx.MetricsGrouping == singleTable && adx.TableName == "" { + return errors.New("table name cannot be empty for SingleTable metrics grouping type") + } + + if adx.MetricsGrouping == "" { + adx.MetricsGrouping = tablePerMetric + } + + if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) { + return errors.New("metrics grouping type is not valid") + } + + if adx.Timeout == 0 { + adx.Timeout = config.Duration(20 * time.Second) + } + + if adx.IngestionType == "" { + adx.IngestionType = queuedIngestion + } else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) { + return fmt.Errorf("unknown ingestion type %q", adx.IngestionType) + } + + serializer := &json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, + } + if err := serializer.Init(); err != nil { + return err + } + adx.serializer = serializer + return nil +} + +// For each table create the ingestor +func createIngestorByTable(client *kusto.Client, database, tableName, ingestionType string) (ingest.Ingestor, error) { + switch strings.ToLower(ingestionType) { + case managedIngestion: + mi, err := ingest.NewManaged(client, database, tableName) + return mi, err + case queuedIngestion: + qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers)) + return qi, err + } + return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion) +} + +func createTableCommand(table string) kusto.Statement { + builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `) + builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`) + + return builder +} + +func createTableMappingCommand(table string) kusto.Statement { + builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `) + builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `) + builder.AddLiteral(`'[{"column":"fields", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `) + builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`) + + return builder +} diff --git a/plugins/common/adx/adx_commons_test.go b/plugins/common/adx/adx_commons_test.go new file mode 100644 index 0000000000000..dc139797e1659 --- /dev/null +++ b/plugins/common/adx/adx_commons_test.go @@ -0,0 +1,489 @@ +package adx_commons + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "io" + "log" + "os" + "strings" + "testing" + "time" + + "github.com/Azure/azure-kusto-go/kusto" + "github.com/Azure/azure-kusto-go/kusto/ingest" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + serializers_json "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/testutil" +) + +func TestWrite(t *testing.T) { + testCases := []struct { + name string + inputMetric []telegraf.Metric + metricsGrouping string + tableName string + expected map[string]interface{} + expectedWriteError string + createTables bool + ingestionType string + }{ + { + name: "Valid metric", + inputMetric: testutil.MockMetrics(), + createTables: true, + tableName: "test1", + metricsGrouping: tablePerMetric, + expected: map[string]interface{}{ + "metricName": "test1", + "fields": map[string]interface{}{ + "value": 1.0, + }, + "tags": map[string]interface{}{ + "tag1": "value1", + }, + "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), + }, + }, + { + name: "Don't create tables'", + inputMetric: testutil.MockMetrics(), + createTables: false, + tableName: "test1", + metricsGrouping: tablePerMetric, + expected: map[string]interface{}{ + "metricName": "test1", + "fields": map[string]interface{}{ + "value": 1.0, + }, + "tags": map[string]interface{}{ + "tag1": "value1", + }, + "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), + }, + }, + { + name: "SingleTable metric grouping type", + inputMetric: testutil.MockMetrics(), + createTables: true, + tableName: "test1", + metricsGrouping: singleTable, + expected: map[string]interface{}{ + "metricName": "test1", + "fields": map[string]interface{}{ + "value": 1.0, + }, + "tags": map[string]interface{}{ + "tag1": "value1", + }, + "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), + }, + }, + { + name: "Valid metric managed ingestion", + inputMetric: testutil.MockMetrics(), + createTables: true, + tableName: "test1", + metricsGrouping: tablePerMetric, + ingestionType: managedIngestion, + expected: map[string]interface{}{ + "metricName": "test1", + "fields": map[string]interface{}{ + "value": 1.0, + }, + "tags": map[string]interface{}{ + "tag1": "value1", + }, + "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), + }, + }, + } + + for _, tC := range testCases { + t.Run(tC.name, func(t *testing.T) { + serializer := &serializers_json.Serializer{} + require.NoError(t, serializer.Init()) + + ingestionType := "queued" + if tC.ingestionType != "" { + ingestionType = tC.ingestionType + } + + localFakeIngestor := &fakeIngestor{} + plugin := AzureDataExplorer{ + Endpoint: "someendpoint", + Database: "databasename", + Log: testutil.Logger{}, + MetricsGrouping: tC.metricsGrouping, + TableName: tC.tableName, + CreateTables: tC.createTables, + kustoClient: kusto.NewMockClient(), + metricIngestors: map[string]ingest.Ingestor{ + tC.tableName: localFakeIngestor, + }, + IngestionType: ingestionType, + } + + plugin.SetSerializer(serializer) + + errorInWrite := plugin.Write(testutil.MockMetrics()) + + if tC.expectedWriteError != "" { + require.EqualError(t, errorInWrite, tC.expectedWriteError) + } else { + require.NoError(t, errorInWrite) + expectedNameOfMetric := tC.expected["metricName"].(string) + createdFakeIngestor := localFakeIngestor + require.Equal(t, expectedNameOfMetric, createdFakeIngestor.actualOutputMetric["name"]) + expectedFields := tC.expected["fields"].(map[string]interface{}) + require.Equal(t, expectedFields, createdFakeIngestor.actualOutputMetric["fields"]) + expectedTags := tC.expected["tags"].(map[string]interface{}) + require.Equal(t, expectedTags, createdFakeIngestor.actualOutputMetric["tags"]) + expectedTime := tC.expected["timestamp"].(float64) + require.InDelta(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"], testutil.DefaultDelta) + } + plugin.Close() + }) + } +} + +func TestCreateAzureDataExplorerTable(t *testing.T) { + serializer := &serializers_json.Serializer{} + require.NoError(t, serializer.Init()) + plugin := AzureDataExplorer{ + Endpoint: "someendpoint", + Database: "databasename", + Log: testutil.Logger{}, + MetricsGrouping: tablePerMetric, + TableName: "test1", + CreateTables: false, + kustoClient: kusto.NewMockClient(), + metricIngestors: map[string]ingest.Ingestor{ + "test1": &fakeIngestor{}, + }, + IngestionType: queuedIngestion, + } + plugin.SetSerializer(serializer) + var buf bytes.Buffer + log.SetOutput(&buf) + defer func() { + log.SetOutput(os.Stderr) + }() + + err := plugin.createAzureDataExplorerTable(context.Background(), "test1") + + output := buf.String() + + if err == nil && !strings.Contains(output, "skipped table creation") { + t.Logf("FAILED : TestCreateAzureDataExplorerTable: Should have skipped table creation.") + t.Fail() + } +} + +func TestWriteWithType(t *testing.T) { + metricName := "test1" + fakeClient := kusto.NewMockClient() + expectedResultMap := map[string]string{metricName: `{"fields":{"value":1},"name":"test1","tags":{"tag1":"value1"},"timestamp":1257894000}`} + mockMetrics := testutil.MockMetrics() + // Multi tables + mockMetricsMulti := []telegraf.Metric{ + testutil.TestMetric(1.0, "test2"), + testutil.TestMetric(2.0, "test3"), + } + expectedResultMap2 := map[string]string{ + "test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`, + "test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`, + } + // List of tests + testCases := []struct { + name string + inputMetric []telegraf.Metric + metricsGrouping string + tableNameToExpectedResult map[string]string + expectedWriteError string + createTables bool + ingestionType string + }{ + { + name: "Valid metric", + inputMetric: mockMetrics, + createTables: true, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap, + }, + { + name: "Don't create tables'", + inputMetric: mockMetrics, + createTables: false, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap, + }, + { + name: "SingleTable metric grouping type", + inputMetric: mockMetrics, + createTables: true, + metricsGrouping: singleTable, + tableNameToExpectedResult: expectedResultMap, + }, + { + name: "Valid metric managed ingestion", + inputMetric: mockMetrics, + createTables: true, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap, + ingestionType: managedIngestion, + }, + { + name: "Table per metric type", + inputMetric: mockMetricsMulti, + createTables: true, + metricsGrouping: tablePerMetric, + tableNameToExpectedResult: expectedResultMap2, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + serializer := &serializers_json.Serializer{} + require.NoError(t, serializer.Init()) + for tableName, jsonValue := range testCase.tableNameToExpectedResult { + ingestionType := "queued" + if testCase.ingestionType != "" { + ingestionType = testCase.ingestionType + } + mockIngestor := &mockIngestor{} + plugin := AzureDataExplorer{ + Endpoint: "someendpoint", + Database: "databasename", + Log: testutil.Logger{}, + IngestionType: ingestionType, + MetricsGrouping: testCase.metricsGrouping, + TableName: tableName, + CreateTables: testCase.createTables, + kustoClient: fakeClient, + metricIngestors: map[string]ingest.Ingestor{ + tableName: mockIngestor, + }, + serializer: serializer, + } + err := plugin.Write(testCase.inputMetric) + if testCase.expectedWriteError != "" { + require.EqualError(t, err, testCase.expectedWriteError) + continue + } + require.NoError(t, err) + createdIngestor := plugin.metricIngestors[tableName] + if testCase.metricsGrouping == singleTable { + createdIngestor = plugin.metricIngestors[tableName] + } + records := mockIngestor.records[0] // the first element + require.NotNil(t, createdIngestor) + require.JSONEq(t, jsonValue, records) + } + }) + } +} + +func TestInit(t *testing.T) { + testCases := []struct { + name string + endpoint string + database string + metricsGrouping string + tableName string + timeout config.Duration + ingestionType string + expectedInitError string + }{ + { + name: "Valid configuration", + endpoint: "someendpoint", + database: "databasename", + metricsGrouping: tablePerMetric, + timeout: config.Duration(20 * time.Second), + ingestionType: queuedIngestion, + }, + { + name: "Empty endpoint", + database: "databasename", + metricsGrouping: tablePerMetric, + expectedInitError: "endpoint configuration cannot be empty", + }, + { + name: "Empty database", + endpoint: "someendpoint", + metricsGrouping: tablePerMetric, + expectedInitError: "database configuration cannot be empty", + }, + { + name: "SingleTable without table name", + endpoint: "someendpoint", + database: "databasename", + metricsGrouping: singleTable, + expectedInitError: "table name cannot be empty for SingleTable metrics grouping type", + }, + { + name: "Invalid metrics grouping type", + endpoint: "someendpoint", + database: "databasename", + metricsGrouping: "invalidtype", + expectedInitError: "metrics grouping type is not valid", + }, + { + name: "Unknown ingestion type", + endpoint: "someendpoint", + database: "databasename", + metricsGrouping: tablePerMetric, + ingestionType: "unknown", + expectedInitError: "unknown ingestion type \"unknown\"", + }, + } + + for _, tC := range testCases { + t.Run(tC.name, func(t *testing.T) { + plugin := AzureDataExplorer{ + Endpoint: tC.endpoint, + Database: tC.database, + MetricsGrouping: tC.metricsGrouping, + TableName: tC.tableName, + Timeout: tC.timeout, + IngestionType: tC.ingestionType, + Log: testutil.Logger{}, + } + + errorInit := plugin.Init() + + if tC.expectedInitError != "" { + require.EqualError(t, errorInit, tC.expectedInitError) + } else { + require.NoError(t, errorInit) + } + }) + } +} + +func TestInitBlankEndpointData(t *testing.T) { + plugin := AzureDataExplorer{ + Log: testutil.Logger{}, + kustoClient: kusto.NewMockClient(), + metricIngestors: map[string]ingest.Ingestor{}, + } + + errorInit := plugin.Init() + require.Error(t, errorInit) + require.Equal(t, "endpoint configuration cannot be empty", errorInit.Error()) +} + +func TestQueryConstruction(t *testing.T) { + const tableName = "mytable" + const expectedCreate = `.create-merge table ['mytable'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);` + const expectedMapping = `` + + `.create-or-alter table ['mytable'] ingestion json mapping 'mytable_mapping' '[{"column":"fields", ` + + `"Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` + + `"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'` + require.Equal(t, expectedCreate, createTableCommand(tableName).String()) + require.Equal(t, expectedMapping, createTableMappingCommand(tableName).String()) +} + +type fakeIngestor struct { + actualOutputMetric map[string]interface{} +} + +func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { + scanner := bufio.NewScanner(reader) + scanner.Scan() + firstLine := scanner.Text() + err := json.Unmarshal([]byte(firstLine), &f.actualOutputMetric) + if err != nil { + return nil, err + } + return &ingest.Result{}, nil +} + +func (f *fakeIngestor) FromFile(_ context.Context, _ string, _ ...ingest.FileOption) (*ingest.Result, error) { + return &ingest.Result{}, nil +} + +func (f *fakeIngestor) Close() error { + return nil +} + +type mockIngestor struct { + records []string +} + +func (m *mockIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { + bufbytes, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + metricjson := string(bufbytes) + m.SetRecords(strings.Split(metricjson, "\n")) + return &ingest.Result{}, nil +} + +func (m *mockIngestor) FromFile(_ context.Context, _ string, _ ...ingest.FileOption) (*ingest.Result, error) { + return &ingest.Result{}, nil +} + +func (m *mockIngestor) SetRecords(records []string) { + m.records = records +} + +// Name receives a copy of Foo since it doesn't need to modify it. +func (m *mockIngestor) Records() []string { + return m.records +} + +func (m *mockIngestor) Close() error { + return nil +} + +func TestConnect(t *testing.T) { + testCases := []struct { + name string + endpoint string + expectedError string + expectedPanic bool + }{ + { + name: "Valid connection", + endpoint: "https://valid.endpoint", + expectedError: "", + expectedPanic: false, + }, + { + name: "Invalid connection", + endpoint: "", + expectedError: "error: Connection string cannot be empty", + expectedPanic: true, + }, + } + + for _, tC := range testCases { + t.Run(tC.name, func(t *testing.T) { + plugin := AzureDataExplorer{ + Endpoint: tC.endpoint, + Log: testutil.Logger{}, + } + + if tC.expectedPanic { + require.PanicsWithValue(t, tC.expectedError, func() { + err := plugin.Connect() + require.NoError(t, err) + }) + } else { + require.NotPanics(t, func() { + err := plugin.Connect() + require.NoError(t, err) + require.NotNil(t, plugin.kustoClient) + require.NotNil(t, plugin.metricIngestors) + }) + } + }) + } +} diff --git a/plugins/common/eventhub/eh_commons.go b/plugins/common/eventhub/eh_commons.go new file mode 100644 index 0000000000000..d8cd4b5f8f4cf --- /dev/null +++ b/plugins/common/eventhub/eh_commons.go @@ -0,0 +1,136 @@ +package eh_commons + +import ( + "context" + "time" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" +) + +var sampleConfig string + +/* +** Wrapper interface for eventhub.Hub + */ + +type EventHubInterface interface { + GetHub(s string) error + Close(ctx context.Context) error + SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error +} + +type EventHub struct { + hub *eventhub.Hub +} + +func (eh *EventHub) GetHub(s string) error { + hub, err := eventhub.NewHubFromConnectionString(s) + + if err != nil { + return err + } + + eh.hub = hub + + return nil +} + +func (eh *EventHub) Close(ctx context.Context) error { + return eh.hub.Close(ctx) +} + +func (eh *EventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { + return eh.hub.SendBatch(ctx, iterator, opts...) +} + +/* End wrapper interface */ + +type EventHubs struct { + Log telegraf.Logger `toml:"-"` + ConnectionString string `toml:"connection_string"` + Timeout config.Duration `toml:"timeout"` + PartitionKey string `toml:"partition_key"` + MaxMessageSize int `toml:"max_message_size"` + + Hub EventHubInterface + batchOptions []eventhub.BatchOption + serializer telegraf.Serializer +} + +func (*EventHubs) SampleConfig() string { + return sampleConfig +} + +func (e *EventHubs) Init() error { + err := e.Hub.GetHub(e.ConnectionString) + + if err != nil { + return err + } + + if e.MaxMessageSize > 0 { + e.batchOptions = append(e.batchOptions, eventhub.BatchWithMaxSizeInBytes(e.MaxMessageSize)) + } + + return nil +} + +func (e *EventHubs) Connect() error { + return nil +} + +func (e *EventHubs) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + err := e.Hub.Close(ctx) + + if err != nil { + return err + } + + return nil +} + +func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) { + e.serializer = serializer +} + +func (e *EventHubs) Write(metrics []telegraf.Metric) error { + events := make([]*eventhub.Event, 0, len(metrics)) + for _, metric := range metrics { + payload, err := e.serializer.Serialize(metric) + + if err != nil { + e.Log.Debugf("Could not serialize metric: %v", err) + continue + } + + event := eventhub.NewEvent(payload) + if e.PartitionKey != "" { + if key, ok := metric.GetTag(e.PartitionKey); ok { + event.PartitionKey = &key + } else if key, ok := metric.GetField(e.PartitionKey); ok { + if strKey, ok := key.(string); ok { + event.PartitionKey = &strKey + } + } + } + + events = append(events, event) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + + err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...) + + if err != nil { + return err + } + + return nil +} diff --git a/plugins/common/eventhub/eh_commons_test.go b/plugins/common/eventhub/eh_commons_test.go new file mode 100644 index 0000000000000..9b4b61f9ecc7f --- /dev/null +++ b/plugins/common/eventhub/eh_commons_test.go @@ -0,0 +1,163 @@ +package eh_commons + +import ( + "context" + "fmt" + "math/rand" + "os" + "testing" + "time" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/testutil" +) + +/* +** Wrapper interface mock for eventhub.Hub + */ + +type mockEventHub struct { + mock.Mock +} + +func (eh *mockEventHub) GetHub(s string) error { + args := eh.Called(s) + return args.Error(0) +} + +func (eh *mockEventHub) Close(ctx context.Context) error { + args := eh.Called(ctx) + return args.Error(0) +} + +func (eh *mockEventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { + args := eh.Called(ctx, iterator, opts) + return args.Error(0) +} + +/* End wrapper interface */ + +func TestInitAndWrite(t *testing.T) { + serializer := &json.Serializer{} + require.NoError(t, serializer.Init()) + + mockHub := &mockEventHub{} + e := &EventHubs{ + Hub: mockHub, + ConnectionString: "mock", + Timeout: config.Duration(time.Second * 5), + MaxMessageSize: 1000000, + } + e.SetSerializer(serializer) + mockHub.On("GetHub", mock.Anything).Return(nil).Once() + require.NoError(t, e.Init()) + mockHub.AssertExpectations(t) + + metrics := testutil.MockMetrics() + + mockHub.On("SendBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + require.NoError(t, e.Write(metrics)) + mockHub.AssertExpectations(t) +} + +/* +** Integration test (requires an Event Hubs instance) + */ + +func TestInitAndWriteIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + if os.Getenv("EVENTHUB_CONNECTION_STRING") == "" { + t.Skip("Missing environment variable EVENTHUB_CONNECTION_STRING") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + // Create a new, empty Event Hub + // NB: for this to work, the connection string needs to grant "Manage" permissions on the root namespace + mHub, err := eventhub.NewHubManagerFromConnectionString(os.Getenv("EVENTHUB_CONNECTION_STRING")) + require.NoError(t, err) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + name := fmt.Sprintf("testmetrics%05d", r.Intn(10000)) + + entity, err := mHub.Put(ctx, name, eventhub.HubWithPartitionCount(1)) + require.NoError(t, err) + + // Delete the test hub + defer func() { + err := mHub.Delete(ctx, entity.Name) + require.NoError(t, err) + }() + + testHubCS := os.Getenv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + entity.Name + + // Configure the plugin to target the newly created hub + serializer := &json.Serializer{} + require.NoError(t, serializer.Init()) + e := &EventHubs{ + Hub: &EventHub{}, + ConnectionString: testHubCS, + Timeout: config.Duration(time.Second * 5), + } + + e.SetSerializer(serializer) + + // Verify that we can connect to Event Hubs + require.NoError(t, e.Init()) + + // Verify that we can successfully write data to Event Hubs + metrics := testutil.MockMetrics() + require.NoError(t, e.Write(metrics)) + e.Close() + + /* + ** Verify we can read data back from the test hub + */ + + exit := make(chan string) + + // Create a hub client for receiving + hub, err := eventhub.NewHubFromConnectionString(testHubCS) + require.NoError(t, err) + + // The handler function will pass received messages via the channel + handler := func(_ context.Context, event *eventhub.Event) error { + exit <- string(event.Data) + return nil + } + + // Set up the receivers + runtimeInfo, err := hub.GetRuntimeInformation(ctx) + require.NoError(t, err) + + for _, partitionID := range runtimeInfo.PartitionIDs { + _, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset("-1")) + require.NoError(t, err) + } + + // Wait to receive the same number of messages sent, with timeout + received := 0 +wait: + for _, metric := range metrics { + select { + case m := <-exit: + t.Logf("Received for %s: %s", metric.Name(), m) + received = received + 1 + case <-time.After(10 * time.Second): + t.Logf("Timeout") + break wait + } + } + + // Make sure received == sent + require.Len(t, metrics, received) +} diff --git a/plugins/outputs/all/microsoft_fabric.go b/plugins/outputs/all/microsoft_fabric.go new file mode 100644 index 0000000000000..d8e1602289739 --- /dev/null +++ b/plugins/outputs/all/microsoft_fabric.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.microsoft_fabric + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/microsoft_fabric" // register plugin diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer.go b/plugins/outputs/azure_data_explorer/azure_data_explorer.go index 1c6cf4f1e8417..e84ffd8dce499 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer.go @@ -2,286 +2,49 @@ package azure_data_explorer import ( - "bytes" - "context" _ "embed" - "errors" - "fmt" - "strings" - "time" - "github.com/Azure/azure-kusto-go/kusto" - kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors" - "github.com/Azure/azure-kusto-go/kusto/ingest" - "github.com/Azure/azure-kusto-go/kusto/kql" + adx_commons "github.com/influxdata/telegraf/plugins/common/adx" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf/plugins/serializers/json" ) //go:embed sample.conf var sampleConfig string type AzureDataExplorer struct { - Endpoint string `toml:"endpoint_url"` - Database string `toml:"database"` - Log telegraf.Logger `toml:"-"` - Timeout config.Duration `toml:"timeout"` - MetricsGrouping string `toml:"metrics_grouping_type"` - TableName string `toml:"table_name"` - CreateTables bool `toml:"create_tables"` - IngestionType string `toml:"ingestion_type"` - serializer telegraf.Serializer - kustoClient *kusto.Client - metricIngestors map[string]ingest.Ingestor + adx_commons.AzureDataExplorer } -const ( - tablePerMetric = "tablepermetric" - singleTable = "singletable" - // These control the amount of memory we use when ingesting blobs - bufferSize = 1 << 20 // 1 MiB - maxBuffers = 5 -) - -const managedIngestion = "managed" -const queuedIngestion = "queued" - func (*AzureDataExplorer) SampleConfig() string { return sampleConfig } // Initialize the client and the ingestor func (adx *AzureDataExplorer) Connect() error { - conn := kusto.NewConnectionStringBuilder(adx.Endpoint).WithDefaultAzureCredential() - // Since init is called before connect, we can set the connector details here including the type. This will be used for telemetry and tracing. - conn.SetConnectorDetails("Telegraf", internal.ProductToken(), "", "", false, "") - client, err := kusto.New(conn) - if err != nil { - return err - } - adx.kustoClient = client - adx.metricIngestors = make(map[string]ingest.Ingestor) - - return nil + return adx.AzureDataExplorer.Connect() } // Clean up and close the ingestor func (adx *AzureDataExplorer) Close() error { - var errs []error - for _, v := range adx.metricIngestors { - if err := v.Close(); err != nil { - // accumulate errors while closing ingestors - errs = append(errs, err) - } - } - if err := adx.kustoClient.Close(); err != nil { - errs = append(errs, err) - } - - adx.kustoClient = nil - adx.metricIngestors = nil - - if len(errs) == 0 { - adx.Log.Info("Closed ingestors and client") - return nil - } - // Combine errors into a single object and return the combined error - return kustoerrors.GetCombinedError(errs...) + return adx.AzureDataExplorer.Close() } func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error { - if adx.MetricsGrouping == tablePerMetric { - return adx.writeTablePerMetric(metrics) - } - return adx.writeSingleTable(metrics) -} - -func (adx *AzureDataExplorer) writeTablePerMetric(metrics []telegraf.Metric) error { - tableMetricGroups := make(map[string][]byte) - // Group metrics by name and serialize them - for _, m := range metrics { - tableName := m.Name() - metricInBytes, err := adx.serializer.Serialize(m) - if err != nil { - return err - } - if existingBytes, ok := tableMetricGroups[tableName]; ok { - tableMetricGroups[tableName] = append(existingBytes, metricInBytes...) - } else { - tableMetricGroups[tableName] = metricInBytes - } - } - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout)) - defer cancel() - - // Push the metrics for each table - format := ingest.FileFormat(ingest.JSON) - for tableName, tableMetrics := range tableMetricGroups { - if err := adx.pushMetrics(ctx, format, tableName, tableMetrics); err != nil { - return err - } - } - - return nil -} - -func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error { - // serialise each metric in metrics - store in byte[] - metricsArray := make([]byte, 0) - for _, m := range metrics { - metricsInBytes, err := adx.serializer.Serialize(m) - if err != nil { - return err - } - metricsArray = append(metricsArray, metricsInBytes...) - } - - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout)) - defer cancel() - - // push metrics to a single table - format := ingest.FileFormat(ingest.JSON) - err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray) - return err -} - -func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error { - var metricIngestor ingest.Ingestor - var err error - - metricIngestor, err = adx.getMetricIngestor(ctx, tableName) - if err != nil { - return err - } - - length := len(metricsArray) - adx.Log.Debugf("Writing %d metrics to table %q", length, tableName) - reader := bytes.NewReader(metricsArray) - mapping := ingest.IngestionMappingRef(tableName+"_mapping", ingest.JSON) - if metricIngestor != nil { - if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil { - adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err) - } - } - return nil -} - -func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) { - ingestor := adx.metricIngestors[tableName] - - if ingestor == nil { - if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil { - return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err) - } - // create a new ingestor client for the table - tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType) - if err != nil { - return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err) - } - adx.metricIngestors[tableName] = tempIngestor - adx.Log.Debugf("Ingestor for table %s created", tableName) - ingestor = tempIngestor - } - return ingestor, nil -} - -func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error { - if !adx.CreateTables { - adx.Log.Info("skipped table creation") - return nil - } - - if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableCommand(tableName)); err != nil { - return err - } - - if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingCommand(tableName)); err != nil { - return err - } - - return nil + return adx.AzureDataExplorer.Write(metrics) } func (adx *AzureDataExplorer) Init() error { - if adx.Endpoint == "" { - return errors.New("endpoint configuration cannot be empty") - } - if adx.Database == "" { - return errors.New("database configuration cannot be empty") - } - - adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping) - if adx.MetricsGrouping == singleTable && adx.TableName == "" { - return errors.New("table name cannot be empty for SingleTable metrics grouping type") - } - if adx.MetricsGrouping == "" { - adx.MetricsGrouping = tablePerMetric - } - if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) { - return errors.New("metrics grouping type is not valid") - } - - if adx.IngestionType == "" { - adx.IngestionType = queuedIngestion - } else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) { - return fmt.Errorf("unknown ingestion type %q", adx.IngestionType) - } - - serializer := &json.Serializer{ - TimestampUnits: config.Duration(time.Nanosecond), - TimestampFormat: time.RFC3339Nano, - } - if err := serializer.Init(); err != nil { - return err - } - adx.serializer = serializer - return nil + return adx.AzureDataExplorer.Init() } - func init() { outputs.Add("azure_data_explorer", func() telegraf.Output { return &AzureDataExplorer{ - Timeout: config.Duration(20 * time.Second), - CreateTables: true, + AzureDataExplorer: adx_commons.AzureDataExplorer{ + CreateTables: true, + AppName: "Kusto.Telegraf", + }, } }) } - -// For each table create the ingestor -func createIngestorByTable(client *kusto.Client, database, tableName, ingestionType string) (ingest.Ingestor, error) { - switch strings.ToLower(ingestionType) { - case managedIngestion: - mi, err := ingest.NewManaged(client, database, tableName) - return mi, err - case queuedIngestion: - qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers)) - return qi, err - } - return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion) -} - -func createTableCommand(table string) kusto.Statement { - builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `) - builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`) - - return builder -} - -func createTableMappingCommand(table string) kusto.Statement { - builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `) - builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `) - builder.AddLiteral(`'[{"column":"fields", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `) - builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`) - - return builder -} diff --git a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go index 9ff90f21299d2..21dead266ec87 100644 --- a/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go +++ b/plugins/outputs/azure_data_explorer/azure_data_explorer_test.go @@ -1,369 +1,93 @@ package azure_data_explorer import ( - "bufio" - "bytes" - "context" - "encoding/json" - "io" - "log" - "os" - "strings" "testing" "time" - "github.com/Azure/azure-kusto-go/kusto" - "github.com/Azure/azure-kusto-go/kusto/ingest" - "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf" - serializers_json "github.com/influxdata/telegraf/plugins/serializers/json" + "github.com/influxdata/telegraf/config" + adx_commons "github.com/influxdata/telegraf/plugins/common/adx" + "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" ) -func TestWrite(t *testing.T) { +func TestConnect(t *testing.T) { testCases := []struct { - name string - inputMetric []telegraf.Metric - metricsGrouping string - tableName string - expected map[string]interface{} - expectedWriteError string - createTables bool - ingestionType string + name string + endpoint string + expectedError string + expectedPanic bool }{ { - name: "Valid metric", - inputMetric: testutil.MockMetrics(), - createTables: true, - tableName: "test1", - metricsGrouping: tablePerMetric, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - }, - { - name: "Don't create tables'", - inputMetric: testutil.MockMetrics(), - createTables: false, - tableName: "test1", - metricsGrouping: tablePerMetric, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - }, - { - name: "SingleTable metric grouping type", - inputMetric: testutil.MockMetrics(), - createTables: true, - tableName: "test1", - metricsGrouping: singleTable, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, - }, - { - name: "Valid metric managed ingestion", - inputMetric: testutil.MockMetrics(), - createTables: true, - tableName: "test1", - metricsGrouping: tablePerMetric, - ingestionType: managedIngestion, - expected: map[string]interface{}{ - "metricName": "test1", - "fields": map[string]interface{}{ - "value": 1.0, - }, - "tags": map[string]interface{}{ - "tag1": "value1", - }, - "timestamp": float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).UnixNano() / int64(time.Second)), - }, + name: "Valid connection", + endpoint: "https://valid.endpoint", + expectedError: "", + expectedPanic: false, }, } for _, tC := range testCases { t.Run(tC.name, func(t *testing.T) { - serializer := &serializers_json.Serializer{} - require.NoError(t, serializer.Init()) - - ingestionType := "queued" - if tC.ingestionType != "" { - ingestionType = tC.ingestionType - } - - localFakeIngestor := &fakeIngestor{} plugin := AzureDataExplorer{ - Endpoint: "someendpoint", - Database: "databasename", - Log: testutil.Logger{}, - MetricsGrouping: tC.metricsGrouping, - TableName: tC.tableName, - CreateTables: tC.createTables, - kustoClient: kusto.NewMockClient(), - metricIngestors: map[string]ingest.Ingestor{ - tC.tableName: localFakeIngestor, + AzureDataExplorer: adx_commons.AzureDataExplorer{ + Endpoint: tC.endpoint, + Log: testutil.Logger{}, }, - serializer: serializer, - IngestionType: ingestionType, } - errorInWrite := plugin.Write(testutil.MockMetrics()) - - if tC.expectedWriteError != "" { - require.EqualError(t, errorInWrite, tC.expectedWriteError) + if tC.expectedPanic { + require.PanicsWithValue(t, tC.expectedError, func() { + err := plugin.Connect() + require.NoError(t, err) + }) } else { - require.NoError(t, errorInWrite) - - expectedNameOfMetric := tC.expected["metricName"].(string) - - createdFakeIngestor := localFakeIngestor - - require.Equal(t, expectedNameOfMetric, createdFakeIngestor.actualOutputMetric["name"]) - - expectedFields := tC.expected["fields"].(map[string]interface{}) - require.Equal(t, expectedFields, createdFakeIngestor.actualOutputMetric["fields"]) - - expectedTags := tC.expected["tags"].(map[string]interface{}) - require.Equal(t, expectedTags, createdFakeIngestor.actualOutputMetric["tags"]) - - expectedTime := tC.expected["timestamp"].(float64) - require.InDelta(t, expectedTime, createdFakeIngestor.actualOutputMetric["timestamp"], testutil.DefaultDelta) + require.NotPanics(t, func() { + err := plugin.Connect() + require.NoError(t, err) + }) } }) } } -func TestCreateAzureDataExplorerTable(t *testing.T) { - serializer := &serializers_json.Serializer{} - require.NoError(t, serializer.Init()) +func TestWrite(t *testing.T) { plugin := AzureDataExplorer{ - Endpoint: "someendpoint", - Database: "databasename", - Log: testutil.Logger{}, - MetricsGrouping: tablePerMetric, - TableName: "test1", - CreateTables: false, - kustoClient: kusto.NewMockClient(), - metricIngestors: map[string]ingest.Ingestor{ - "test1": &fakeIngestor{}, - }, - serializer: serializer, - IngestionType: queuedIngestion, - } - var buf bytes.Buffer - log.SetOutput(&buf) - defer func() { - log.SetOutput(os.Stderr) - }() - - err := plugin.createAzureDataExplorerTable(context.Background(), "test1") - - output := buf.String() - - if err == nil && !strings.Contains(output, "skipped table creation") { - t.Logf("FAILED : TestCreateAzureDataExplorerTable: Should have skipped table creation.") - t.Fail() - } -} - -func TestWriteWithType(t *testing.T) { - metricName := "test1" - fakeClient := kusto.NewMockClient() - expectedResultMap := map[string]string{metricName: `{"fields":{"value":1},"name":"test1","tags":{"tag1":"value1"},"timestamp":1257894000}`} - mockMetrics := testutil.MockMetrics() - // Multi tables - mockMetricsMulti := []telegraf.Metric{ - testutil.TestMetric(1.0, "test2"), - testutil.TestMetric(2.0, "test3"), - } - expectedResultMap2 := map[string]string{ - "test2": `{"fields":{"value":1.0},"name":"test2","tags":{"tag1":"value1"},"timestamp":1257894000}`, - "test3": `{"fields":{"value":2.0},"name":"test3","tags":{"tag1":"value1"},"timestamp":1257894000}`, - } - // List of tests - testCases := []struct { - name string - inputMetric []telegraf.Metric - metricsGrouping string - tableNameToExpectedResult map[string]string - expectedWriteError string - createTables bool - ingestionType string - }{ - { - name: "Valid metric", - inputMetric: mockMetrics, - createTables: true, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap, - }, - { - name: "Don't create tables'", - inputMetric: mockMetrics, - createTables: false, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap, - }, - { - name: "SingleTable metric grouping type", - inputMetric: mockMetrics, - createTables: true, - metricsGrouping: singleTable, - tableNameToExpectedResult: expectedResultMap, - }, - { - name: "Valid metric managed ingestion", - inputMetric: mockMetrics, - createTables: true, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap, - ingestionType: managedIngestion, - }, - { - name: "Table per metric type", - inputMetric: mockMetricsMulti, - createTables: true, - metricsGrouping: tablePerMetric, - tableNameToExpectedResult: expectedResultMap2, + AzureDataExplorer: adx_commons.AzureDataExplorer{ + Endpoint: "https://valid.endpoint", + Database: "database", + Log: testutil.Logger{}, }, } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - serializer := &serializers_json.Serializer{} - require.NoError(t, serializer.Init()) - for tableName, jsonValue := range testCase.tableNameToExpectedResult { - ingestionType := "queued" - if testCase.ingestionType != "" { - ingestionType = testCase.ingestionType - } - mockIngestor := &mockIngestor{} - plugin := AzureDataExplorer{ - Endpoint: "someendpoint", - Database: "databasename", - Log: testutil.Logger{}, - IngestionType: ingestionType, - MetricsGrouping: testCase.metricsGrouping, - TableName: tableName, - CreateTables: testCase.createTables, - kustoClient: fakeClient, - metricIngestors: map[string]ingest.Ingestor{ - tableName: mockIngestor, - }, - serializer: serializer, - } - err := plugin.Write(testCase.inputMetric) - if testCase.expectedWriteError != "" { - require.EqualError(t, err, testCase.expectedWriteError) - continue - } - require.NoError(t, err) - createdIngestor := plugin.metricIngestors[tableName] - if testCase.metricsGrouping == singleTable { - createdIngestor = plugin.metricIngestors[tableName] - } - records := mockIngestor.records[0] // the first element - require.NotNil(t, createdIngestor) - require.JSONEq(t, jsonValue, records) - } - }) + serializer := &json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, } -} - -func TestInitBlankEndpointData(t *testing.T) { - plugin := AzureDataExplorer{ - Log: testutil.Logger{}, - kustoClient: kusto.NewMockClient(), - metricIngestors: map[string]ingest.Ingestor{}, - } - - errorInit := plugin.Init() - require.Error(t, errorInit) - require.Equal(t, "endpoint configuration cannot be empty", errorInit.Error()) -} - -func TestQueryConstruction(t *testing.T) { - const tableName = "mytable" - const expectedCreate = `.create-merge table ['mytable'] (['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);` - const expectedMapping = `` + - `.create-or-alter table ['mytable'] ingestion json mapping 'mytable_mapping' '[{"column":"fields", ` + - `"Properties":{"Path":"$[\'fields\']"}},{"column":"name", "Properties":{"Path":"$[\'name\']"}},{"column":"tags", ` + - `"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}}]'` - require.Equal(t, expectedCreate, createTableCommand(tableName).String()) - require.Equal(t, expectedMapping, createTableMappingCommand(tableName).String()) -} - -type fakeIngestor struct { - actualOutputMetric map[string]interface{} -} - -func (f *fakeIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { - scanner := bufio.NewScanner(reader) - scanner.Scan() - firstLine := scanner.Text() - err := json.Unmarshal([]byte(firstLine), &f.actualOutputMetric) - if err != nil { - return nil, err + plugin.SetSerializer(serializer) + perr := plugin.Init() + require.NoError(t, perr) + err := plugin.Connect() + require.NoError(t, err) + + metrics := []telegraf.Metric{ + testutil.TestMetric(1.0, "test_metric"), } - return &ingest.Result{}, nil -} -func (*fakeIngestor) FromFile(context.Context, string, ...ingest.FileOption) (*ingest.Result, error) { - return &ingest.Result{}, nil + err = plugin.Write(metrics) + require.NoError(t, err) } -func (*fakeIngestor) Close() error { - return nil -} - -type mockIngestor struct { - records []string -} - -func (m *mockIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) { - bufbytes, err := io.ReadAll(reader) - if err != nil { - return nil, err +func TestClose(t *testing.T) { + plugin := AzureDataExplorer{ + AzureDataExplorer: adx_commons.AzureDataExplorer{ + Endpoint: "https://valid.endpoint", + Log: testutil.Logger{}, + }, } - metricjson := string(bufbytes) - m.SetRecords(strings.Split(metricjson, "\n")) - return &ingest.Result{}, nil -} -func (*mockIngestor) FromFile(context.Context, string, ...ingest.FileOption) (*ingest.Result, error) { - return &ingest.Result{}, nil -} - -func (m *mockIngestor) SetRecords(records []string) { - m.records = records -} - -// Name receives a copy of Foo since it doesn't need to modify it. -func (m *mockIngestor) Records() []string { - return m.records -} + err := plugin.Connect() + require.NoError(t, err) -func (*mockIngestor) Close() error { - return nil + err = plugin.Close() + require.NoError(t, err) } diff --git a/plugins/outputs/event_hubs/event_hubs.go b/plugins/outputs/event_hubs/event_hubs.go index 7a0c01e6f717f..a0340083dd8b7 100644 --- a/plugins/outputs/event_hubs/event_hubs.go +++ b/plugins/outputs/event_hubs/event_hubs.go @@ -2,88 +2,34 @@ package event_hubs import ( - "context" _ "embed" "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + eh_commons "github.com/influxdata/telegraf/plugins/common/eventhub" "github.com/influxdata/telegraf/plugins/outputs" ) -//go:embed sample.conf -var sampleConfig string - -/* -** Wrapper interface for eventhub.Hub - */ - -type EventHubInterface interface { - GetHub(s string) error - Close(ctx context.Context) error - SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error -} - -type eventHub struct { - hub *eventhub.Hub -} - -func (eh *eventHub) GetHub(s string) error { - hub, err := eventhub.NewHubFromConnectionString(s) - - if err != nil { - return err - } - - eh.hub = hub - - return nil -} - -func (eh *eventHub) Close(ctx context.Context) error { - return eh.hub.Close(ctx) -} - -func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error { - return eh.hub.SendBatch(ctx, iterator, opts...) -} - /* End wrapper interface */ type EventHubs struct { - Log telegraf.Logger `toml:"-"` - ConnectionString string `toml:"connection_string"` - Timeout config.Duration `toml:"timeout"` - PartitionKey string `toml:"partition_key"` - MaxMessageSize int `toml:"max_message_size"` - - Hub EventHubInterface - batchOptions []eventhub.BatchOption - serializer telegraf.Serializer + eh_commons.EventHubs } const ( defaultRequestTimeout = time.Second * 30 ) -func (*EventHubs) SampleConfig() string { +//go:embed sample.conf +var sampleConfig string + +func (e *EventHubs) SampleConfig() string { return sampleConfig } func (e *EventHubs) Init() error { - err := e.Hub.GetHub(e.ConnectionString) - - if err != nil { - return err - } - - if e.MaxMessageSize > 0 { - e.batchOptions = append(e.batchOptions, eventhub.BatchWithMaxSizeInBytes(e.MaxMessageSize)) - } - - return nil + return e.EventHubs.Init() } func (*EventHubs) Connect() error { @@ -91,63 +37,24 @@ func (*EventHubs) Connect() error { } func (e *EventHubs) Close() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) - defer cancel() - - err := e.Hub.Close(ctx) - - if err != nil { - return err - } - - return nil + return e.EventHubs.Close() } func (e *EventHubs) SetSerializer(serializer telegraf.Serializer) { - e.serializer = serializer + e.EventHubs.SetSerializer(serializer) } func (e *EventHubs) Write(metrics []telegraf.Metric) error { - events := make([]*eventhub.Event, 0, len(metrics)) - for _, metric := range metrics { - payload, err := e.serializer.Serialize(metric) - - if err != nil { - e.Log.Debugf("Could not serialize metric: %v", err) - continue - } - - event := eventhub.NewEvent(payload) - if e.PartitionKey != "" { - if key, ok := metric.GetTag(e.PartitionKey); ok { - event.PartitionKey = &key - } else if key, ok := metric.GetField(e.PartitionKey); ok { - if strKey, ok := key.(string); ok { - event.PartitionKey = &strKey - } - } - } - - events = append(events, event) - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) - defer cancel() - - err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...), e.batchOptions...) - - if err != nil { - return err - } - - return nil + return e.EventHubs.Write(metrics) } func init() { outputs.Add("event_hubs", func() telegraf.Output { return &EventHubs{ - Hub: &eventHub{}, - Timeout: config.Duration(defaultRequestTimeout), + EventHubs: eh_commons.EventHubs{ + Hub: &eh_commons.EventHub{}, + Timeout: config.Duration(defaultRequestTimeout), + }, } }) } diff --git a/plugins/outputs/event_hubs/event_hubs_test.go b/plugins/outputs/event_hubs/event_hubs_test.go index eb65c7ed9642f..5be32ed55cb7d 100644 --- a/plugins/outputs/event_hubs/event_hubs_test.go +++ b/plugins/outputs/event_hubs/event_hubs_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/config" + eh_commons "github.com/influxdata/telegraf/plugins/common/eventhub" "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/testutil" ) @@ -47,14 +48,13 @@ func TestInitAndWrite(t *testing.T) { require.NoError(t, serializer.Init()) mockHub := &mockEventHub{} - e := &EventHubs{ + e := &eh_commons.EventHubs{ Hub: mockHub, ConnectionString: "mock", Timeout: config.Duration(time.Second * 5), MaxMessageSize: 1000000, - serializer: serializer, } - + e.SetSerializer(serializer) mockHub.On("GetHub", mock.Anything).Return(nil).Once() require.NoError(t, e.Init()) mockHub.AssertExpectations(t) @@ -104,13 +104,14 @@ func TestInitAndWriteIntegration(t *testing.T) { // Configure the plugin to target the newly created hub serializer := &json.Serializer{} require.NoError(t, serializer.Init()) - e := &EventHubs{ - Hub: &eventHub{}, + e := &eh_commons.EventHubs{ + Hub: &eh_commons.EventHub{}, ConnectionString: testHubCS, Timeout: config.Duration(time.Second * 5), - serializer: serializer, } + e.SetSerializer(serializer) + // Verify that we can connect to Event Hubs require.NoError(t, e.Init()) diff --git a/plugins/outputs/microsoft_fabric/README.md b/plugins/outputs/microsoft_fabric/README.md new file mode 100644 index 0000000000000..96a90e83e5c15 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/README.md @@ -0,0 +1,165 @@ +# Microsoft Fabric Output Plugin + +This plugin writes metrics to [Real time analytics in Fabric][fabric] services. + +[fabric]: https://learn.microsoft.com/en-us/fabric/real-time-analytics/overview + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml +# Sends metrics to Microsoft Fabric +[[outputs.microsoft_fabric]] + ## The URI property of the resource on Azure + ## ex: connection_string = "https://myadxresource.australiasoutheast.kusto.windows.net" + ## ex: connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;*****;EntityPath=hubName" + connection_string = "" + + + [outputs.microsoft_fabric.eh_conf] + ## The Eventhouse database that the metrics will be ingested into. + ## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion. + ## ex: "exampledatabase" + database = "" + + ## Timeout for Eventhouse operations + # timeout = "20s" + + ## Type of metrics grouping used when pushing to Eventhouse. + ## Default is "TablePerMetric" for one table per different metric. + ## For more information, please check the plugin README. + # metrics_grouping_type = "TablePerMetric" + + ## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable"). + # table_name = "" + + ## Creates tables and relevant mapping if set to true(default). + ## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role. + # create_tables = true + + ## Ingestion method to use. + ## Available options are + ## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below + ## - queued -- queue up metrics data and process sequentially + # ingestion_type = "queued" + + [outputs.microsoft_fabric.es_conf] + ## The full connection string to the Event stream (required) + ## The shared access key must have "Send" permissions on the target Event stream. + + ## Client timeout (defaults to 30s) + # timeout = "30s" + + ## Partition key + ## Metric tag or field name to use for the event partition key. The value of + ## this tag or field is set as the key for events if it exists. If both, tag + ## and field, exist the tag is preferred. + # partition_key = "" + + ## Set the maximum batch message size in bytes + ## The allowable size depends on the Event stream tier + ## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers + ## Setting this to 0 means using the default size from the Azure Event streams Client library (1000000 bytes) + # max_message_size = 1000000 + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "json" +``` + +## Description + +The `microsoft_fabric` output plugin sends metrics to Microsoft Fabric, +a scalable data platform for real-time analytics. +This plugin allows you to leverage Microsoft Fabric's +capabilities to store and analyze your Telegraf metrics. +Following are the currently supported datastores: + +### Eventhouse + +Eventhouse is a high-performance, scalable data store designed for + real-time analytics. It allows you to ingest, store, and query large + volumes of data with low latency. For more information, visit the + [Eventhouse documentation]( + https://learn.microsoft.com/fabric/real-time-intelligence/eventhouse + ). + +```toml +[outputs.microsoft_fabric.eh_conf] + ## The Eventhouse database that the metrics will be ingested into. + ## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion. + ## ex: "exampledatabase" + database = "" + + ## Timeout for Eventhouse operations + # timeout = "20s" + + ## Type of metrics grouping used when pushing to Eventhouse. + ## Default is "TablePerMetric" for one table per different metric. + ## For more information, please check the plugin README. + # metrics_grouping_type = "TablePerMetric" + + ## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable"). + # table_name = "" + + ## Creates tables and relevant mapping if set to true(default). + ## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role. + # create_tables = true + + ## Ingestion method to use. + ## Available options are + ## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below + ## - queued -- queue up metrics data and process sequentially + # ingestion_type = "queued" + +``` + +More about the eventhouse configuration properties +can be found [here](../azure_data_explorer/README.md#metrics-grouping) + +### Eventstream + +The eventstreams feature in the Microsoft Fabric Real-Time Intelligence +experience lets you bring real-time events into Fabric, transform them, +and then route them to various destinations without writing any code (no-code). +For more information, visit the [Eventstream documentation][]. + +[Eventstream documentation]: https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/overview?tabs=enhancedcapabilities + +```toml +[outputs.microsoft_fabric.es_conf] + ## The full connection string to the Event stream (required) + ## The shared access key must have "Send" permissions on the target Event stream. + + ## Client timeout (defaults to 30s) + # timeout = "30s" + + ## Partition key + ## Metric tag or field name to use for the event partition key. The value of + ## this tag or field is set as the key for events if it exists. If both, tag + ## and field, exist the tag is preferred. + # partition_key = "" + + ## Set the maximum batch message size in bytes + ## The allowable size depends on the Event stream tier + ## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers + ## Setting this to 0 means using the default size from the Azure Event streams Client library (1000000 bytes) + # max_message_size = 1000000 + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "json" + +``` diff --git a/plugins/outputs/microsoft_fabric/microsoft_fabric.go b/plugins/outputs/microsoft_fabric/microsoft_fabric.go new file mode 100644 index 0000000000000..a7fdef6a2754b --- /dev/null +++ b/plugins/outputs/microsoft_fabric/microsoft_fabric.go @@ -0,0 +1,124 @@ +//go:generate ../../../tools/readme_config_includer/generator +package microsoft_fabric + +import ( + _ "embed" // embed is used to embed sample configuration file in the binary + "errors" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + adx_commons "github.com/influxdata/telegraf/plugins/common/adx" + eh_commons "github.com/influxdata/telegraf/plugins/common/eventhub" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers/json" +) + +//go:embed sample.conf +var sampleConfig string + +type MicrosoftFabric struct { + ConnectionString string `toml:"connection_string"` + Log telegraf.Logger `toml:"-"` + EventHouseConf *adx_commons.AzureDataExplorer `toml:"eh_conf"` + EventStreamConf *eh_commons.EventHubs `toml:"es_conf"` + FabricSinkService telegraf.Output +} + +// Close implements telegraf.Output. +func (m *MicrosoftFabric) Close() error { + return m.FabricSinkService.Close() +} + +// Connect implements telegraf.Output. +func (m *MicrosoftFabric) Connect() error { + return m.FabricSinkService.Connect() +} + +// SampleConfig implements telegraf.Output. +func (m *MicrosoftFabric) SampleConfig() string { + return sampleConfig +} + +// Write implements telegraf.Output. +func (m *MicrosoftFabric) Write(metrics []telegraf.Metric) error { + return m.FabricSinkService.Write(metrics) +} + +func (m *MicrosoftFabric) Init() error { + connectionString := m.ConnectionString + + if connectionString == "" { + return errors.New("endpoint must not be empty. For EventHouse refer : " + + "https://learn.microsoft.com/kusto/api/connection-strings/kusto?view=microsoft-fabric " + + "for EventStream refer : https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/add-manage-eventstream-sources" + + "?pivots=enhanced-capabilities") + } + + if strings.HasPrefix(connectionString, "Endpoint=sb") { + m.Log.Info("Detected EventStream endpoint, using EventStream output plugin") + + serializer := &json.Serializer{ + TimestampUnits: config.Duration(time.Nanosecond), + TimestampFormat: time.RFC3339Nano, + } + m.EventStreamConf.ConnectionString = connectionString + m.EventStreamConf.Log = m.Log + m.EventStreamConf.SetSerializer(serializer) + err := m.EventStreamConf.Init() + if err != nil { + return errors.New("error initializing EventStream plugin: " + err.Error()) + } + m.FabricSinkService = m.EventStreamConf + } else if isKustoEndpoint(strings.ToLower(connectionString)) { + m.Log.Info("Detected EventHouse endpoint, using EventHouse output plugin") + // Setting up the AzureDataExplorer plugin initial properties + m.EventHouseConf.Endpoint = connectionString + m.EventHouseConf.Log = m.Log + err := m.EventHouseConf.Init() + if err != nil { + return errors.New("error initializing EventHouse plugin: " + err.Error()) + } + m.FabricSinkService = m.EventHouseConf + } else { + return errors.New("invalid connection string. For EventHouse refer : " + + "https://learn.microsoft.com/kusto/api/connection-strings/kusto?view=microsoft-fabric" + + " for EventStream refer : https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/" + + "add-manage-eventstream-sources?pivots=enhanced-capabilities") + } + return nil +} + +func isKustoEndpoint(endpoint string) bool { + prefixes := []string{ + "data source=", + "addr=", + "address=", + "network address=", + "server=", + } + + for _, prefix := range prefixes { + if strings.HasPrefix(endpoint, prefix) { + return true + } + } + return false +} + +func init() { + outputs.Add("microsoft_fabric", func() telegraf.Output { + return &MicrosoftFabric{ + EventHouseConf: &adx_commons.AzureDataExplorer{ + Timeout: config.Duration(20 * time.Second), + CreateTables: true, + AppName: "Fabric.Telegraf", + }, + EventStreamConf: &eh_commons.EventHubs{ + Hub: &eh_commons.EventHub{}, + Timeout: config.Duration(30 * time.Second), + }, + } + }) +} diff --git a/plugins/outputs/microsoft_fabric/microsoft_fabric_test.go b/plugins/outputs/microsoft_fabric/microsoft_fabric_test.go new file mode 100644 index 0000000000000..3338d192abfcc --- /dev/null +++ b/plugins/outputs/microsoft_fabric/microsoft_fabric_test.go @@ -0,0 +1,183 @@ +package microsoft_fabric + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/config" + adx_commons "github.com/influxdata/telegraf/plugins/common/adx" + eh_commons "github.com/influxdata/telegraf/plugins/common/eventhub" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type MockOutput struct { + mock.Mock +} + +func (m *MockOutput) Connect() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockOutput) Close() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockOutput) Write(metrics []telegraf.Metric) error { + args := m.Called(metrics) + return args.Error(0) +} + +func (m *MockOutput) SampleConfig() string { + args := m.Called() + return args.String(0) +} + +func TestMicrosoftFabric_Connect(t *testing.T) { + mockOutput := new(MockOutput) + mockOutput.On("Connect").Return(nil) + + plugin := MicrosoftFabric{ + FabricSinkService: mockOutput, + } + + err := plugin.Connect() + require.NoError(t, err) + mockOutput.AssertExpectations(t) +} + +func TestMicrosoftFabric_Close(t *testing.T) { + mockOutput := new(MockOutput) + mockOutput.On("Close").Return(nil) + + plugin := MicrosoftFabric{ + FabricSinkService: mockOutput, + } + + err := plugin.Close() + require.NoError(t, err) + mockOutput.AssertExpectations(t) +} + +func TestMicrosoftFabric_Write(t *testing.T) { + mockOutput := new(MockOutput) + mockOutput.On("Write", mock.Anything).Return(nil) + + plugin := MicrosoftFabric{ + FabricSinkService: mockOutput, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(1.0, "test_metric"), + } + + err := plugin.Write(metrics) + require.NoError(t, err) + mockOutput.AssertExpectations(t) +} + +func TestIsKustoEndpoint(t *testing.T) { + testCases := []struct { + name string + endpoint string + expected bool + }{ + { + name: "Valid address prefix", + endpoint: "address=https://example.com", + expected: true, + }, + { + name: "Valid network address prefix", + endpoint: "network address=https://example.com", + expected: true, + }, + { + name: "Valid server prefix", + endpoint: "server=https://example.com", + expected: true, + }, + { + name: "Invalid prefix", + endpoint: "https://example.com", + expected: false, + }, + { + name: "Empty endpoint", + endpoint: "", + expected: false, + }, + } + + for _, tC := range testCases { + t.Run(tC.name, func(t *testing.T) { + result := isKustoEndpoint(tC.endpoint) + require.Equal(t, tC.expected, result) + }) + } +} + +func TestMicrosoftFabric_Init(t *testing.T) { + tests := []struct { + name string + connectionString string + expectedError string + }{ + { + name: "Empty connection string", + connectionString: "", + expectedError: "endpoint must not be empty. For EventHouse refer : " + + "https://learn.microsoft.com/kusto/api/connection-strings/kusto?view=microsoft-fabric" + + " for EventStream refer : " + + "https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/add-manage-eventstream-sources?pivots=enhanced-capabilities", + }, + { + name: "Invalid connection string", + connectionString: "invalid_connection_string", + expectedError: "invalid connection string. For EventHouse refer : " + + "https://learn.microsoft.com/kusto/api/connection-strings/kusto?view=microsoft-fabric" + + " for EventStream refer : " + + "https://learn.microsoft.com/fabric/real-time-intelligence/event-streams/add-manage-eventstream-sources?pivots=enhanced-capabilities", + }, + { + name: "Valid EventHouse connection string", + connectionString: "Endpoint=sb://namespace.servicebus.windows.net/;" + + "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234;EntityPath=hubName", + expectedError: "", + }, + { + name: "Valid Kusto connection string", + connectionString: "data source=https://example.kusto.windows.net;Database=e2e", + expectedError: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mf := &MicrosoftFabric{ + ConnectionString: tt.connectionString, + Log: testutil.Logger{}, + EventHouseConf: &adx_commons.AzureDataExplorer{ + Database: "database", + }, + EventStreamConf: &eh_commons.EventHubs{ + Hub: &eh_commons.EventHub{}, + Timeout: config.Duration(30 * time.Second), + }, + } + err := mf.Init() + if tt.expectedError != "" { + require.Error(t, err) + assert.Equal(t, tt.expectedError, err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/plugins/outputs/microsoft_fabric/sample.conf b/plugins/outputs/microsoft_fabric/sample.conf new file mode 100644 index 0000000000000..9c351d8c61e50 --- /dev/null +++ b/plugins/outputs/microsoft_fabric/sample.conf @@ -0,0 +1,60 @@ +# Sends metrics to Microsoft Fabric +[[outputs.microsoft_fabric]] + ## The URI property of the Eventhouse resource on Azure + ## ex: connection_string = "Data Source=https://myadxresource.australiasoutheast.kusto.windows.net" + connection_string = "" + + + [outputs.microsoft_fabric.eh_conf] + ## The Eventhouse database that the metrics will be ingested into. + ## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion. + ## ex: "exampledatabase" + database = "" + + ## Timeout for Eventhouse operations + # timeout = "20s" + + ## Type of metrics grouping used when pushing to Eventhouse. + ## Default is "TablePerMetric" for one table per different metric. + ## For more information, please check the plugin README. + # metrics_grouping_type = "TablePerMetric" + + ## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable"). + # table_name = "" + + ## Creates tables and relevant mapping if set to true(default). + ## Skips table and mapping creation if set to false, this is useful for running Telegraf with the lowest possible permissions i.e. table ingestor role. + # create_tables = true + + ## Ingestion method to use. + ## Available options are + ## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below + ## - queued -- queue up metrics data and process sequentially + # ingestion_type = "queued" + + [outputs.microsoft_fabric.es_conf] + ## The full connection string to the Event stream (required) + ## The shared access key must have "Send" permissions on the target Event stream. + + ## Client timeout (defaults to 30s) + # timeout = "30s" + + ## Partition key + ## Metric tag or field name to use for the event partition key. The value of + ## this tag or field is set as the key for events if it exists. If both, tag + ## and field, exist the tag is preferred. + # partition_key = "" + + ## Set the maximum batch message size in bytes + ## The allowable size depends on the Event stream tier + ## See: https://learn.microsoft.com/azure/event-hubs/event-hubs-quotas#basic-vs-standard-vs-premium-vs-dedicated-tiers + ## Setting this to 0 means using the default size from the Azure Event streams Client library (1000000 bytes) + # max_message_size = 1000000 + + ## Data format to output. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "json" + +