OpenLineage
The OpenLineage integration consumes OpenLineage metadata from various systems.
Grai offers detailed instructions for a number of common OpenLineage producers including Airflow.
Web App
Fields
Field | Value | Example |
---|---|---|
source | The name of the source, see sources | my-source |
Name | Name for connection | OpenLineage |
Namespace | Namespace for the connection, see namespaces | default |
namespaces | Optional |
Webhook
In order to authenticate your webhook, you will need to create an API key in the Grai web app.
Grai will open an endpoint at /api/v1/openlineage/<connection_id>/
which can be used to receive OpenLineage events. You should point your OpenLineage clients to this endpoint.
See the OpenLineage docs (opens in a new tab) for how to configure your client to send events to a webhook.
An example config:
transport:
type: "http"
url: "https://api.grai.io"
endpoint: "api/v1/openlineage/<connection_id>/"
auth:
type: "api_key"
api_key: "<your_grai_api_key>"
Python Library
The OpenLineage integration can be run as a standalone python library to convert OpenLineage events into Grai objects.
The library is available via pip
pip install grai_source_openlineage
More information about the API is available here.
Example
The library is split into a few distinct functions but if you only wish to extract nodes/edges you can do so as follows:
from grai_source_openlineage import OpenLineageIntegration
from grai_schemas.v1.source import SourceV1
source = SourceV1(name="my-source", type="my-type")
openlineage_params = {
"namespaces": {}
}
integration = OpenLineageIntegration(source=source, namespace="openlineage", **openlineage_params)
nodes, edges = integration.get_nodes_and_edges()
In this case, we are putting all nodes and edges produced by OpenLineage in a single namespace.
In practice you usually don't want to do this because it will result in overlapping id's.
For example, an OpenLineage connection copying data from a source table my_table
to a destination table my_table
will
result in two nodes with the same id.
To avoid this, you can pass a namespaces
parameter to the OpenLineageIntegration
constructor which will map OpenLineage
namespaces to source and destination Grai namespaces.
namespaces = {<openlineage_namespace>: <grai_namepsace>}
integration = OpenLineageIntegration(source=source, namespaces=namespaces)