OpenLineage

The OpenLineage integration consumes OpenLineage metadata from various systems.

ℹ️

Grai offers detailed instructions for a number of common OpenLineage producers including Airflow.

Web App

OpenLineage Integration

Fields

FieldValueExample
sourceThe name of the source, see sourcesmy-source
NameName for connectionOpenLineage
NamespaceNamespace for the connection, see namespacesdefault
namespacesOptional

Webhook

ℹ️

In order to authenticate your webhook, you will need to create an API key in the Grai web app.

Grai will open an endpoint at /api/v1/openlineage/<connection_id>/ which can be used to receive OpenLineage events. You should point your OpenLineage clients to this endpoint.

See the OpenLineage docs (opens in a new tab) for how to configure your client to send events to a webhook.

An example config:

transport:
  type: "http"
  url: "https://api.grai.io"
  endpoint: "api/v1/openlineage/<connection_id>/"
  auth:
    type: "api_key"
    api_key: "<your_grai_api_key>"

Python Library

The OpenLineage integration can be run as a standalone python library to convert OpenLineage events into Grai objects.

The library is available via pip

pip install grai_source_openlineage

More information about the API is available here.

Example

The library is split into a few distinct functions but if you only wish to extract nodes/edges you can do so as follows:

  from grai_source_openlineage import OpenLineageIntegration
  from grai_schemas.v1.source import SourceV1
 
  source = SourceV1(name="my-source", type="my-type")
  openlineage_params = {
    "namespaces": {}
  }
 
  integration = OpenLineageIntegration(source=source, namespace="openlineage", **openlineage_params)
 
  nodes, edges = integration.get_nodes_and_edges()

In this case, we are putting all nodes and edges produced by OpenLineage in a single namespace. In practice you usually don't want to do this because it will result in overlapping id's. For example, an OpenLineage connection copying data from a source table my_table to a destination table my_table will result in two nodes with the same id.

To avoid this, you can pass a namespaces parameter to the OpenLineageIntegration constructor which will map OpenLineage namespaces to source and destination Grai namespaces.

    namespaces = {<openlineage_namespace>: <grai_namepsace>}
    integration = OpenLineageIntegration(source=source, namespaces=namespaces)