Skip to content

Drunken Data Quality 5.0.0

Frank Rosner edited this page Feb 28, 2020 · 1 revision

Drunken Data Quality (DDQ)

The core concepts in DDQ are checks, constraints, reporters and the runner.

A check contains a set of constraints for one data frame. Each constraint checks a specific property related to data quality on this data frame. By passing a set of checks to a runner, it executes them and hands them over to a set of reporters.

Table of Contents

Checks

To create a check, it is sufficient to specify the data frame you would like to work with.

Check(dataFrame: DataFrame)

There are more options available for advanced users:

  • You can specify a display name, which will be used instead of the default string representation in reports.
  • Also you can select a cache method (default is MEMORY_ONLY). Caching the data frame makes sense if you execute a lot of different checks on it and it is not cached already. If you don't want to cache, pass None.
  • If required, you can specify a set of constraints upfront, if you don't want to add them using the fluent interface.
  • Also it is possible to pick an ID instead of a randomly generated one. The ID is used in sophisticated reporters to identify a check. Don't change it if you don't need to.
Check(
  dataFrame: DataFrame,
  displayName: Option[String] = Option.empty,
  cacheMethod: Option[StorageLevel] = Check.defaultCacheMethod,
  constraints: Seq[Constraint] = Seq.empty,
  id: String = UUID.randomUUID.toString
)

Constraints

A check offers multiple constraints to verify. You can add them using a fluent interface.

Column Constraints

Generic

Check whether the given constraint is satisfied. You may provide a constraint as a SQL string or a Column instance.

Signature
def satisfies(constraint: String): Check
def satisfies(constraint: Column): Check
Example
Check(customers).satisfies("age > 0")
Check(customers).satisfies(customers("age") > 0)

Generic Conditional

Check whether the given conditional constraint is satisfied. Be aware that it might cause problems with null values, as A -> B gets translated to !A || B and comparing null to anything will always yield false.

Signature
def satisfies(conditional: (Column, Column)): Check
Example
Check(customers).satisfies(customers("age") > 50 -> customers("seniority") === "high")

Null

Check whether the column with the given name contains only null values.

Signature
def isAlwaysNull(columnName: String): Check
Example
Check(customers).isAlwaysNull("complaint")

Non-Null

Check whether the column with the given name contains no null values.

Signature
def isNeverNull(columnName: String): Check
Example
Check(customers).isNeverNull("age")

Regex

Check whether the column with the given name is always matching the specified regular expression.

Signature
def isMatchingRegex(columnName: String, regex: String): Check
Example
Check("customers").isMatchingRegex("email", "^[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,6}$")

Value Set

Check whether the column with the given name is always any of the specified values.

Signature
def isAnyOf(columnName: String, allowed: Set[Any]): Check
Example
Check(customers).isAnyOf("gender", Set("m", "f"))

Date Format

Check whether the column with the given name can be converted to a date using the specified date format string. It will not be lenient to enforce the format you specify.

Signature
def isFormattedAsDate(columnName: String, dateFormatString: String): Check
Example
Check(contracts).isFormattedAsDate("signatureDate", "yyyy-MM-dd")

Type Conversion

Check whether the column with the given name can be converted to the given type.

Signature
def isConvertibleTo(columnName: String, targetType: DataType): Check
Example
Check(transactions).isConveritbleTo("amount", DoubleType)

Table Constraints

Number of Rows

Check whether the table has exactly the given number of rows. Due to technical reasons the Python API is less flexible and only supports three predefined checks.

Signature

Scala

def hasNumRows(expected: (Column) -> Column): Check

Python

def hasNumRowsEqualTo(expected)
def hasNumRowsGreaterThan(expected)
def hasNumRowsLessThan(expected)
Example
Check(clicks).hasNumRows(_ > 10)

Unique Key

Check whether the given columns are a unique key for this table.

Signature
def hasUniqueKey(columnName: String, columnNames: String*): Check
Example
Check(connections).hasUniqueKey("time", "thread")

Equality

Check whether the given data set is exactly equal to the checked one. The equality check is performed by first computing a distinct count on both data frames and then computing a pairwise set difference. If both differences yield an empty set, the data frames are equal.

Signature
def isEqualTo(other: DataFrame): Check
Example
Check(oldTable).isEqualTo(newTable)

Relationship Constraints

Foreign Key

Check whether the columns with the given names define a foreign key to the specified reference table. Note that a foreign key needs to be a unique key in the reference table, which will also be checked.

Signature
def hasForeignKey(referenceTable: DataFrame, keyMap: (String, String), keyMaps: (String, String)*): Check
Example
Check(contracts).hasForeignKey(customers, "customerId" -> "id")

Joinability

Check whether a join between this table and the given reference table returns any results. It will also output a percentage of matching keys between the base and the reference table.

Signature
def isJoinableWith(referenceTable: DataFrame, keyMap: (String, String), keyMaps: (String, String)*): Check
Example
Check(contracts).isJoinableWith(customers, "customerId" -> "id")

Functional Dependency

Check whether the columns in the dependent set have a functional dependency on the determinant set. This can be used to check a "foreign key" relationship in a denormalized table.

Signature
def hasFunctionalDependency(determinantSet: Seq[String], dependentSet: Seq[String]): Check
Example
Check(records).hasFunctionalDependency(Seq("artist.id"), Seq("artist.name", "artist.country"))

Custom Constraints

Custom DataFrame Constraints

Allows you to perform custom / arbitrary checks on your dataframe. You need to provide a name for your constraint and a function which transforms your dataframe into either a success message or a failure message. If your code throws an exception the constraint will be marked as errored.

Note that you should not cache the dataframe here but rather enable caching on the check level so other constraints on the same table also benefit from it.

Custom constraints are only available in Scala.

Signature
// type de.frosner.ddq.constraints.CustomConstraint.SuccessMsg = String
// type de.frosner.ddq.constraints.CustomConstraint.FailureMsg = String

def custom(name: String, fun: DataFrame => Either[FailureMsg, SuccessMsg]): Check
Example
Check(df).custom(
  "number of columns = 100",
  { df => 
    val numColumns = df.columns.length
    val message = s"There are $numColumns columns."
    if (numColumns == 100) Right(message) else Left(message) 
  }
)

Running and Reporting

In order to run a set of constraints, just execute the run method on the check. You can pass a list of reporters as well. If no reporter is passed, it will report to the console output stream using a console reporter.

def run(reporters: Reporter*): CheckResult

In order to report one or multiple check results to one or multiple reporters, use the Runner object. The runner will then execute all checks, report the results to all reporters, and return all results in a programmatic way so you can use it for other purposes (e.g. unit testing).

val check1: Check = ???
val check2: Check = ???
val reporter1: Reporter = ???
val reporter2: Reporter = ???
val result = Runner.run(Seq(check1, check2), Seq(reporter1, reporter2))

Console Reporter

The console reporter is a simple reporter meant for interactive usage (e.g. on the Spark shell). It prints checks and constraint results to the specified print stream, coloured by ANSI terminal markup.

ConsoleReporter(
  stream: PrintStream = Console.out
)

Markdown Reporter

The markdown reporter is another simple reporter suitable for both, interactive and non-interactive usage. It prints the constraint results to the specified print stream in markdown layout. If you want to store the markdown file, you can wrap a FileOutputStream into a PrintStream.

MarkdownReporter(stream: PrintStream)

Zeppelin Reporter

The Zeppelin reporter can be used to show the results in a Zeppelin notebook note. Make sure to use exactly one ZeppelinReporter instance per note.

image

If you are using %pyspark, you need to pass the ZeppelinContext (z) to the ZeppelinReporter. Example:

%pyspark
from pyddq.core import Check
from pyddq.reporters import ZeppelinReporter

df = spark.createDataFrame([(1, "a"), (1, None), (3, "c")])
check = Check(df)
reporter = ZeppelinReporter(z)
check.hasUniqueKey("_1", "_2").isNeverNull("_1").run([reporter])

Log4j Reporter

The log4j reporter is more sophisticated. It serializes all available information about the check, the constraints and the results into a JSON string and logs it to the specified logger using the specified level.

Log4jReporter(
  logger: Logger = org.apache.log4j.Logger.getLogger("DDQ"), 
  logLevel: Level = org.apache.log4j.Level.INFO
)

If you want to collect, parse, evaluate and visualize the results, the ELK stack might be a good fit. Please refer to the reporting showcase for an example setup.

Email Reporter

In case you want email notifications about your check results but do not want to use a complex pipeline involving Kibana + Watcher or Grafana, the email reporter is a convenient way to do that. It requires a bit more configuration than the other reporters.

  • First, you have to configure the SMTP server to use and the recipient list. You are free to add multiple recipients (either directly or as carbon copies).
  • Additionally you can customize the subject line with a prefix and the sender.
  • If your SMTP server requires authentication please specify username and password. Keep in mind that if your server does not support STARTTLS the credentials will be transmitted unencrypted.
  • You can also configure whether you want to receive reports only if something went wrong.
  • Also it allows you to accumulate reports before sending an email, which is useful when you have multiple tables to check and don't want to get spammed. For this, construct the reporter with the flag set to true and use it as any other reporter. When you are done running all your checks, execute the sendAccumulatedReport(accumulatedCheckName: Option[String]) method. You can specify a name to call your "check suite" which will be used in the email subject.
EmailReporter(
  smtpServer: String,
  to: Set[String],
  cc: Set[String] = Set.empty,
  subjectPrefix: String = "Data Quality Report: ",
  smtpPort: Int = 25,
  from: String = "mail@ddq.io",
  usernameAndPassword: Option[(String, String)] = None,
  reportOnlyOnFailure: Boolean = false,
  accumulatedReport: Boolean = false
)

Example

val reporter = EmailReporter(
  smtpServer = "localhost",
  to = Set("CDO@yourcompany.com"),
  smtpPort = 23456,
  accumulatedReport = true
)
Runner.run(Seq(check1, check2), Seq(reporter))
reporter.sendAccumulatedReport(Some("Data Warehouse"))