Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.azure_data_explorer): Added MS_Fabric with refactoring #16372

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
286 changes: 286 additions & 0 deletions plugins/common/adx/adx_commons.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package adx_commons
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name the package adx as the directory.


import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/Azure/azure-kusto-go/kusto"
kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"

"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/kql"
Comment on lines +11 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"github.com/Azure/azure-kusto-go/kusto"
kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/kql"
"github.com/Azure/azure-kusto-go/kusto"
kustoerrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/kql"


"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/serializers/json"
)

var sampleConfig string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not something that should belong in common, this is plugin specific!


type AzureDataExplorer struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we do have a config here and a "factory" function to create a client instance from it. So I would call this Config and have a function to create aClient from it with no exported fields...

Endpoint string `toml:"endpoint_url"`
Database string `toml:"database"`
Log telegraf.Logger `toml:"-"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the logger to the plugin(s)!

Timeout config.Duration `toml:"timeout"`
MetricsGrouping string `toml:"metrics_grouping_type"`
TableName string `toml:"table_name"`
CreateTables bool `toml:"create_tables"`
IngestionType string `toml:"ingestion_type"`
serializer telegraf.Serializer
kustoClient *kusto.Client
metricIngestors map[string]ingest.Ingestor
AppName string
}

const (
tablePerMetric = "tablepermetric"
singleTable = "singletable"
// These control the amount of memory we use when ingesting blobs
bufferSize = 1 << 20 // 1 MiB
maxBuffers = 5
)

const managedIngestion = "managed"
const queuedIngestion = "queued"

func (*AzureDataExplorer) SampleConfig() string {
return sampleConfig
}

Comment on lines +52 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plugin specific.

// Initialize the client and the ingestor
func (adx *AzureDataExplorer) Connect() error {
conn := kusto.NewConnectionStringBuilder(adx.Endpoint).WithDefaultAzureCredential()
// Since init is called before connect, we can set the connector details here including the type. This will be used for telemetry and tracing.
conn.SetConnectorDetails("Telegraf", internal.ProductToken(), adx.AppName, "", false, "")
client, err := kusto.New(conn)
if err != nil {
return err
}
adx.kustoClient = client
adx.metricIngestors = make(map[string]ingest.Ingestor)

return nil
}

func (adx *AzureDataExplorer) SetSerializer(serializer telegraf.Serializer) {
adx.serializer = serializer
}

Comment on lines +71 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove here.

// Clean up and close the ingestor
func (adx *AzureDataExplorer) Close() error {
var errs []error
for _, v := range adx.metricIngestors {
if err := v.Close(); err != nil {
// accumulate errors while closing ingestors
errs = append(errs, err)
}
}
if err := adx.kustoClient.Close(); err != nil {
errs = append(errs, err)
}

adx.kustoClient = nil
adx.metricIngestors = nil

if len(errs) == 0 {
adx.Log.Info("Closed ingestors and client")
return nil
}
// Combine errors into a single object and return the combined error
return kustoerrors.GetCombinedError(errs...)
}

func (adx *AzureDataExplorer) Write(metrics []telegraf.Metric) error {
if adx.MetricsGrouping == tablePerMetric {
return adx.writeTablePerMetric(metrics)
}
return adx.writeSingleTable(metrics)
}

func (adx *AzureDataExplorer) writeTablePerMetric(metrics []telegraf.Metric) error {
tableMetricGroups := make(map[string][]byte)
// Group metrics by name and serialize them
for _, m := range metrics {
tableName := m.Name()
metricInBytes, err := adx.serializer.Serialize(m)
if err != nil {
return err
}
if existingBytes, ok := tableMetricGroups[tableName]; ok {
tableMetricGroups[tableName] = append(existingBytes, metricInBytes...)
} else {
tableMetricGroups[tableName] = metricInBytes
}
}
Comment on lines +107 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serialization and grouping should happen in the plugin(s) if different data-formats should be supported!

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
defer cancel()

// Push the metrics for each table
format := ingest.FileFormat(ingest.JSON)
for tableName, tableMetrics := range tableMetricGroups {
if err := adx.pushMetrics(ctx, format, tableName, tableMetrics); err != nil {
return err
}
}

return nil
}

func (adx *AzureDataExplorer) writeSingleTable(metrics []telegraf.Metric) error {
// serialise each metric in metrics - store in byte[]
metricsArray := make([]byte, 0)
for _, m := range metrics {
metricsInBytes, err := adx.serializer.Serialize(m)
if err != nil {
return err
}
metricsArray = append(metricsArray, metricsInBytes...)
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(adx.Timeout))
defer cancel()

// push metrics to a single table
format := ingest.FileFormat(ingest.JSON)
err := adx.pushMetrics(ctx, format, adx.TableName, metricsArray)
return err
}

func (adx *AzureDataExplorer) pushMetrics(ctx context.Context, format ingest.FileOption, tableName string, metricsArray []byte) error {
var metricIngestor ingest.Ingestor
var err error

metricIngestor, err = adx.getMetricIngestor(ctx, tableName)
if err != nil {
return err
}

length := len(metricsArray)
adx.Log.Debugf("Writing %d metrics to table %q", length, tableName)
reader := bytes.NewReader(metricsArray)
mapping := ingest.IngestionMappingRef(tableName+"_mapping", ingest.JSON)
if metricIngestor != nil {
if _, err := metricIngestor.FromReader(ctx, reader, format, mapping); err != nil {
adx.Log.Errorf("sending ingestion request to Azure Data Explorer for table %q failed: %v", tableName, err)
}
}
return nil
}

func (adx *AzureDataExplorer) getMetricIngestor(ctx context.Context, tableName string) (ingest.Ingestor, error) {
ingestor := adx.metricIngestors[tableName]

if ingestor == nil {
if err := adx.createAzureDataExplorerTable(ctx, tableName); err != nil {
return nil, fmt.Errorf("creating table for %q failed: %w", tableName, err)
}
// create a new ingestor client for the table
tempIngestor, err := createIngestorByTable(adx.kustoClient, adx.Database, tableName, adx.IngestionType)
if err != nil {
return nil, fmt.Errorf("creating ingestor for %q failed: %w", tableName, err)
}
adx.metricIngestors[tableName] = tempIngestor
adx.Log.Debugf("Ingestor for table %s created", tableName)
ingestor = tempIngestor
}
return ingestor, nil
}

func (adx *AzureDataExplorer) createAzureDataExplorerTable(ctx context.Context, tableName string) error {
if !adx.CreateTables {
adx.Log.Info("skipped table creation")
return nil
}

if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableCommand(tableName)); err != nil {
return err
}

if _, err := adx.kustoClient.Mgmt(ctx, adx.Database, createTableMappingCommand(tableName)); err != nil {
return err
}

return nil
}

func (adx *AzureDataExplorer) Init() error {
if adx.Endpoint == "" {
return errors.New("endpoint configuration cannot be empty")
}
if adx.Database == "" {
return errors.New("database configuration cannot be empty")
}

adx.MetricsGrouping = strings.ToLower(adx.MetricsGrouping)
if adx.MetricsGrouping == singleTable && adx.TableName == "" {
return errors.New("table name cannot be empty for SingleTable metrics grouping type")
}

if adx.MetricsGrouping == "" {
adx.MetricsGrouping = tablePerMetric
}

if !(adx.MetricsGrouping == singleTable || adx.MetricsGrouping == tablePerMetric) {
return errors.New("metrics grouping type is not valid")
}

if adx.Timeout == 0 {
adx.Timeout = config.Duration(20 * time.Second)
}

if adx.IngestionType == "" {
adx.IngestionType = queuedIngestion
} else if !(choice.Contains(adx.IngestionType, []string{managedIngestion, queuedIngestion})) {
return fmt.Errorf("unknown ingestion type %q", adx.IngestionType)
}

serializer := &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond),
TimestampFormat: time.RFC3339Nano,
}
if err := serializer.Init(); err != nil {
return err
}
adx.serializer = serializer
return nil
}

// For each table create the ingestor
func createIngestorByTable(client *kusto.Client, database, tableName, ingestionType string) (ingest.Ingestor, error) {
switch strings.ToLower(ingestionType) {
case managedIngestion:
mi, err := ingest.NewManaged(client, database, tableName)
return mi, err
case queuedIngestion:
qi, err := ingest.New(client, database, tableName, ingest.WithStaticBuffer(bufferSize, maxBuffers))
return qi, err
}
return nil, fmt.Errorf(`ingestion_type has to be one of %q or %q`, managedIngestion, queuedIngestion)
}

func createTableCommand(table string) kusto.Statement {
builder := kql.New(`.create-merge table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`(['fields']:dynamic, ['name']:string, ['tags']:dynamic, ['timestamp']:datetime);`)

return builder
}

func createTableMappingCommand(table string) kusto.Statement {
builder := kql.New(`.create-or-alter table ['`).AddTable(table).AddLiteral(`'] `)
builder.AddLiteral(`ingestion json mapping '`).AddTable(table + "_mapping").AddLiteral(`' `)
builder.AddLiteral(`'[{"column":"fields", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'fields\']"}},{"column":"name", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'name\']"}},{"column":"tags", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'tags\']"}},{"column":"timestamp", `)
builder.AddLiteral(`"Properties":{"Path":"$[\'timestamp\']"}}]'`)

return builder
}
Loading
Loading