Skip to content

Directory (Files)

AlexMuliar edited this page Sep 30, 2022 · 31 revisions

General description and requirements

This collector can read files from a directory. Files must be present on the same machine where the agent is installed.

To allow the agent to read files you need to map the directory to a StreamSets container's volume. Example:

services:
  dc:
    image: anodot/streamsets:latest
    restart: on-failure
    volumes:
      - sdc-data:/data
      - /path/to/dir/on/the/host:/home/dir-name

All files must be named in ascending order, for example, an older file will have the name some-name-2022-01-01.csv, a newer file will have the name some-name-2022-01-02.csv, thus the pipeline will be able to distinguish in what order files must be processed. A single file must contain data only for a single time bucket. After processing a single file the bucket will be flushed and data will be available for processing and seen in the UI. Note that bucket flushing is a scheduled process and you may need to wait some time until you will be able to see it in the UI.

All files must arrive in the directory strictly in chronological order.

When you create a pipeline, the schema is created. All metrics you push are associated with that schema. If you edit the pipeline schema changes as well, the old schema and all metrics associated with it are deleted and the new schema is created

Source file config

Property Type Description
type String Specify source type. Value - directory
name String Unique source name - also the config file name
config Object Source configuration

All properties are required

config object properties:

Property Type Required Description
conf.spoolDir String Yes Path to the directory inside the container (in the example above it's /home/dir-name)
conf.filePattern String Yes Files which names match this pattern (GLOB) the agent will read
conf.dataFormat String no Allowed values: JSON, DELIMITED, AVRO, LOG. Default - JSON
conf.dataFormatConfig.csvFileFormat String no Allowed values: CSV, CUSTOM. Default - CSV
conf.csvCustomDelimiter String no Custom delimiter
csv_mapping object no Names of columns for delimited data
conf.dataFormatConfig.avroSchemaSource String no Allowed values SOURCE (schema is present in data itself), INLINE (specify schema in conf.dataFormatConfig.avroSchema parameter), REGISTRY (Confluent schema registry)
conf.dataFormatConfig.avroSchema Object no Avro schema (json object)
conf.dataFormatConfig.schemaRegistryUrls Array no Schema registry urls
conf.dataFormatConfig.schemaLookupMode String no How to look up a schema in the registry. Allowed values SUBJECT, ID, AUTO
conf.dataFormatConfig.subject String no Schema subject (specify if schemaLookupMode is SUBJECT)
conf.dataFormatConfig.schemaId String no Schema id (specify if schemaLookupMode is ID)
grok_definition_file String no File with grok patterns
conf.dataFormatConfig.grokPattern String no Grok pattern to parse the message

File config example:

[
  {
    "type": "directory",
    "name": "test_dir_json",
    "config": {
      "conf.spoolDir": "/home/test-directory-collector",
      "conf.filePattern": "*.csv",
      "conf.dataFormat": "DELIMITED",
      "conf.dataFormatConfig.csvHeader": "NO_HEADER",
      "csv_mapping": {"0": "first_column_name", "1": "second_column_name"},
      "conf.dataFormatConfig.csvFileFormat": "CUSTOM",
      "conf.dataFormatConfig.csvCustomDelimiter": "|",
      "conf.postProcessing": "ARCHIVE",
      "conf.archiveDir": "/data/processed",
      "conf.retentionTimeMins": "7200"
    }
  }
]

Pipeline File config

Properties list

Property Required Property name in config file Value type in config file Description
Source yes source String Source config name
Pipeline ID yes pipeline_id String Unique pipeline identifier (use a human-readable name so you could easily use it further)
Query file path Yes query_file String Path to the file with a search query
Static what no static_what bool If measurement_name is static
Count records? no count_records bool to include the number of records as a separate metric, default false
Measurement name no count_records_measurement_name String what property for counting records
Value columns with target types yes values Object Key-value pairs (target_type). A target type represents how samples of the same metric are aggregated in Anodot. Valid values are gauge (average aggregation), counter (sum aggregation). If what property is not static - instead of an actual target type specify a property name where the target type is stored
Values units no units Object Key-value pairs (value:unit). The value must be from the values column, units can be any.
Measurement properties names yes measurement_names Object Key-value pairs (property:name). If what property is not static - instead of an actual what value specify a property name where it is stored.
Dimensions yes dimensions Object Dimensions object
Timestamp yes timestamp Object Timestamp object
Static dimensions no properties Object with key-value pairs Dimensions with static values to pass to Anodot.
Flush bucket size yes flush_bucket_size String Valid values: 1m, 5m, 1h, 1d, 1w. A bucket to flush after processing a single file
Timezone no timezone String A timezone of a timestamp field if its type is string, e.g. Europe/London, default UTC
Transform script no transform_script Object A python script that can be used to apply any kind of transformations to the original data
Tag configurations no tag_configurations Object Configure tags with dynamic values. Each value in these configurations is a field. To learn about fields visit the fields wiki page
Notifications No notifications object See notifications page

Value object properties:

Property Type Description
type String property or constant
value String property name or constant value

All properties are required

Timestamp object properties:

Property Type Description
type String string, unix or unix_ms
name String Property name
format String Specify format if timestamp type is string

Required properties are type and name

Dimensions object properties:

Property Type Description
required List of strings These properties are always present in a record
optional List of strings These properties may be missing in a record

All properties are required

transform_script object properties:

Property Type Description
file String Path to the python script

All properties are required

Example

[{
    "source": "test_directory",
    "pipeline_id": "test",
    "count_records_measurement_name": "clicks",
    "count_records": true,
    "values": {"amount": "gauge"},
    "units": {"amount": "USD"},
    "measurement_names": {"amount": "name"},
    "dimensions": {
      "required": ["ver", "Country"],
      "optional": ["Exchange", "optional_dim"]
    },
    "timestamp": {
      "type": "string",
      "name": "timestamp_string",
      "format": "M/d/yyyy HH:mm:ss"
    },
    "properties": {"key1": "value1", "key2": "value2", "key3": "value3"},
    "tags": {"key1": "value1", "key2": "value2", "key3": "value3"},
    "flush_bucket_size": "1d",
    "transform_script": {
      "file": "/path/to/script.py"
    },
    "tag_configurations": {
      "Tag_name1": {
        "value_path": "property1"
      },
      "Tag_name2": {
        "value_path": "property2"
      }
    }
}]

Transformation script

Example

Input data:

[
{"timestamp_unix" : 1512867600, "AdType" : "Display", "metric": "clicks.display", "Clicks" : 6784},
{"timestamp_unix" : 1512871200, "AdType" : "Display", "metric": "clicks.display", "Clicks" : 6839},
{"timestamp_unix" : 1512874800, "AdType" : "Search", "metric": "clicks.search", "Clicks" : 4500},
{"timestamp_unix" : 1512878400, "AdType" : "Display", "metric": "clicks.display", "Clicks" : 3984},
{"timestamp_unix" : 1512882000, "AdType" : "Search", "metric": "clicks.search", "Clicks" : 7235},
{"timestamp_unix" : 1512885600, "AdType" : "Display", "metric": "clicks.display", "Clicks" : 8744},
{"timestamp_unix" : 1512889200, "AdType" : "Display", "metric": "clicks.display", "Clicks" : 7427}
]

For example, we want to replace clicks. in the metric field. We can do it using regex

import re

for record in sdc.records:
    record.value['metric'] = re.sub(r'^clicks.', '', a)
[{
    "source": "test_directory",
    "pipeline_id": "test",
    "values": {"Clicks": "gauge"},
    "units": {"Clicks": "click_unit"}
    "measurement_names": {"Clicks": "clicks"},
    "dimensions": {
      "required": ["AdType", "metric"]
    },
    "timestamp": {
      "type": "unix",
      "name": "timestamp_unix"
    },
    "properties": {"ver": 1},
    "flush_bucket_size": "1d",
    "transform_script": {
      "file": "/path/to/script.py"
    }
}]
Clone this wiki locally