Skip to content

Commit

Permalink
Improve timestamp infos and DS init
Browse files Browse the repository at this point in the history
  • Loading branch information
ddelemeny committed May 1, 2024
1 parent 766aac5 commit d89c3fa
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 202 deletions.
1 change: 1 addition & 0 deletions pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type DatasourceInfo struct {
Database string
ConfiguredFields ConfiguredFields
MaxConcurrentShardRequests int64
IsReady bool
}

type ConfiguredFields struct {
Expand Down
62 changes: 50 additions & 12 deletions pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
return nil, err
}

timeField, toOk := jsonData["timeField"].(string)
timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string)

logLevelField, ok := jsonData["logLevelField"].(string)
if !ok {
logLevelField = ""
Expand All @@ -74,6 +71,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
if !ok {
index = ""
}
// XXX : Legacy check, should not happen ?
if index == "" {
index = settings.Database
}
Expand All @@ -92,18 +90,11 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
maxConcurrentShardRequests = 256
}

if !toOk || !tofOk {
timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
if nil != err {
return nil, err
}
}

configuredFields := es.ConfiguredFields{
TimeField: timeField,
TimeOutputFormat: timeOutputFormat,
LogLevelField: logLevelField,
LogMessageField: logMessageField,
TimeField: "",
TimeOutputFormat: "",
}

model := es.DatasourceInfo{
Expand All @@ -113,10 +104,40 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
Database: index,
MaxConcurrentShardRequests: int64(maxConcurrentShardRequests),
ConfiguredFields: configuredFields,
IsReady: false,
}
return &QuickwitDatasource{dsInfo: model}, nil
}

// Network dependent datasource initialization.
// This is not done in the "constructor" function to allow saving the ds
// even if the server is not responsive.
func (ds *QuickwitDatasource) initDatasource(force bool) error {
if ds.dsInfo.IsReady && !force {
return nil
}

indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient)
if err != nil {
return fmt.Errorf("failed to get index metadata : %w", err)
}

if len(indexMetadataList) == 0 {
return fmt.Errorf("no index found for %s", ds.dsInfo.Database)
}

timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList)
if nil != err {
return err
}

ds.dsInfo.ConfiguredFields.TimeField = timeField
ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat

ds.dsInfo.IsReady = true
return nil
}

// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
// created. As soon as datasource settings change detected by SDK old datasource instance will
// be disposed and a new one will be created using NewSampleDatasource factory function.
Expand All @@ -132,12 +153,29 @@ func (ds *QuickwitDatasource) Dispose() {
func (ds *QuickwitDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
res := &backend.CheckHealthResult{}

if err := ds.initDatasource(true); err != nil {
res.Status = backend.HealthStatusError
res.Message = fmt.Errorf("Failed to initialize datasource: %w", err).Error()
return res, nil
}

if ds.dsInfo.ConfiguredFields.TimeField == "" || ds.dsInfo.ConfiguredFields.TimeOutputFormat == "" {
res.Status = backend.HealthStatusError
res.Message = fmt.Sprintf("timefield is missing from index config \"%s\"", ds.dsInfo.Database)
return res, nil
}

res.Status = backend.HealthStatusOk
res.Message = "plugin is running"
return res, nil
}

func (ds *QuickwitDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
// Ensure ds is initialized, we need timestamp infos
if err := ds.initDatasource(false); err != nil {
return &backend.QueryDataResponse{}, fmt.Errorf("Failed to initialize datasource")
}

return queryData(ctx, req.Queries, &ds.dsInfo)
}

Expand Down
126 changes: 38 additions & 88 deletions pkg/quickwit/timestamp_infos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"io"
"net/http"
"strings"
)

type QuickwitIndexMetadata struct {
IndexConfig struct {
IndexID string `json:"index_id"`
DocMapping struct {
TimestampField string `json:"timestamp_field"`
FieldMappings []FieldMappings `json:"field_mappings"`
Expand All @@ -23,6 +23,7 @@ type QuickwitCreationErrorPayload struct {
StatusCode int `json:"status"`
}

// TODO: Revamp error handling
func NewErrorCreationPayload(statusCode int, message string) error {
var payload QuickwitCreationErrorPayload
payload.Message = message
Expand All @@ -35,124 +36,73 @@ func NewErrorCreationPayload(statusCode int, message string) error {
return errors.New(string(json))
}

// TODO: refactor either by using a timestamp alias suppprted by quickwit
// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint
// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1
func GetTimestampFieldInfos(index string, qwickwitUrl string, cli *http.Client) (string, string, error) {
if strings.Contains(index, "*") || strings.Contains(index, ",") {
return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli)
func FilterErrorResponses(r *http.Response) (*http.Response, error) {
if r.StatusCode < 200 || r.StatusCode >= 400 {
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, NewErrorCreationPayload(r.StatusCode, fmt.Errorf("failed to read error body: err = %w", err).Error())
}
return nil, NewErrorCreationPayload(r.StatusCode, fmt.Sprintf("error = %s", (body)))
}
return GetTimestampFieldFromIndex(index, qwickwitUrl, cli)
return r, nil
}

func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, string, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes/" + index
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", err
func GetTimestampFieldInfos(indexMetadataList []QuickwitIndexMetadata) (string, string, error) {
if len(indexMetadataList) == 0 {
return "", "", fmt.Errorf("index metadata list is empty")
}
defer r.Body.Close()

statusCode := r.StatusCode

if statusCode < 200 || statusCode >= 400 {
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
refTimestampFieldName, refTimestampOutputFormat := FindTimestampFieldInfos(indexMetadataList[0])
if refTimestampFieldName == "" || refTimestampOutputFormat == "" {
return "", "", fmt.Errorf("Invalid timestamp field infos for %s: %s, %s", indexMetadataList[0].IndexConfig.IndexID, refTimestampFieldName, refTimestampOutputFormat)
}

body, err := io.ReadAll(r.Body)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
for _, indexMetadata := range indexMetadataList[1:] {
timestampFieldName, timestampOutputFormat := FindTimestampFieldInfos(indexMetadata)

if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat {
return "", "", fmt.Errorf("Indexes matching pattern have incompatible timestamp fields, found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat)
}
}

return DecodeTimestampFieldFromIndexConfig(body)
return refTimestampFieldName, refTimestampOutputFormat, nil
}

func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, string, error) {
func GetIndexesMetadata(indexPattern string, qwickwitUrl string, cli *http.Client) ([]QuickwitIndexMetadata, error) {
mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_patterns=" + indexPattern
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
r, err := cli.Get(mappingEndpointUrl)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", err
return nil, fmt.Errorf("Error when calling url = %s: %w", mappingEndpointUrl, err)
}
defer r.Body.Close()

statusCode := r.StatusCode

if statusCode < 200 || statusCode >= 400 {
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
r, err = FilterErrorResponses(r)
if err != nil {
return nil, fmt.Errorf("API returned invalid response: %w", err)
}

body, err := io.ReadAll(r.Body)
if err != nil {
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(statusCode, errMsg)
return nil, fmt.Errorf("failed to read response body: %w", err)
}

return DecodeTimestampFieldFromIndexConfigs(body)
}

func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, string, error) {
var payload []QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
err = json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(500, errMsg)
}

var refTimestampFieldName string = ""
var refTimestampOutputFormat string = ""
var timestampFieldName string = ""
var timestampOutputFormat string = ""

for _, indexMetadata := range payload {
timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField
timestampOutputFormat, _ = FindTimeStampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings)

if refTimestampFieldName == "" {
refTimestampFieldName = timestampFieldName
refTimestampOutputFormat = timestampOutputFormat
continue
}

if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat {
errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat)
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(400, errMsg)
}
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
}

qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s, timestamptOutputFormat = %s", timestampFieldName, timestampOutputFormat))
return timestampFieldName, timestampOutputFormat, nil
return payload, nil
}

func DecodeTimestampFieldFromIndexConfig(body []byte) (string, string, error) {
var payload QuickwitIndexMetadata
err := json.Unmarshal(body, &payload)
if err != nil {
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
qwlog.Error(errMsg)
return "", "", NewErrorCreationPayload(500, errMsg)
}
timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
timestampFieldFormat, _ := FindTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings)
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
return timestampFieldName, timestampFieldFormat, nil
func FindTimestampFieldInfos(indexMetadata QuickwitIndexMetadata) (string, string) {
timestampFieldName := indexMetadata.IndexConfig.DocMapping.TimestampField
timestampOutputFormat, _ := FindTimestampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings)
return timestampFieldName, timestampOutputFormat
}

func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) {
func FindTimestampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) {
if nil == fieldMappings {
return "", false
}
Expand All @@ -166,7 +116,7 @@ func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMap
if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat {
return *field.OutputFormat, true
} else if field.Type == "object" && nil != field.FieldMappings {
return FindTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings)
return FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings)
}
}

Expand Down
Loading

0 comments on commit d89c3fa

Please sign in to comment.