verify_edge_ids
def verify_edge_ids(
nodes: List[SourcedNode], edges: List[SourcedEdge]
) -> Tuple[List[SourcedEdge], List[QuarantinedEdge]]
Validates that all edges have a source and destination node in the graph
Arguments:
nodes
- A list of sourced nodesedges
- A list of sourced edges
Returns:
A tuple of lists of sourced edges. The first list contains all edges that have a source and destination node in the graph. The second list contains all edges that do not have a source or destination node in the graph.
ValidatedResult Objects
class ValidatedResult(BaseModel)
Class definition of ValidatedResult
Attributes:
nodes
- A list of sourced nodesedges
- A list of sourced edgesevents
- A list of events
GraiIntegrationImplementation Objects
class GraiIntegrationImplementation(ABC)
Base class for Grai integrations
Attributes:
source
- The Grai data source to associate with output from the integration.version
- The Grai data version to associate with output from the integration.
__init__
def __init__(source: SourceV1, version: Optional[str] = None)
Initializes the Grai integration.
Arguments:
source
- The Grai data source to associate with output from the integration.version
- The Grai data version to associate with output from the integration.
nodes
@abstractmethod
def nodes() -> List[SourcedNode]
Returns a list of sourced nodes
edges
@abstractmethod
def edges() -> List[SourcedEdge]
Returns a list of sourced edges
get_nodes_and_edges
@abstractmethod
def get_nodes_and_edges() -> Tuple[List[SourcedNode], List[SourcedEdge]]
Returns a tuple of lists of sourced nodes and sourced edges
ready
@abstractmethod
def ready() -> bool
Returns True if the integration is ready to run
events
def events(*args, **kwargs) -> List[Event]
Returns a list of events
QuarantineAccessor Objects
class QuarantineAccessor()
Class definition of QuarantineAccessor
Attributes:
nodes
- A list of quarantined nodesedges
- A list of quarantined edgesevents
- A list of quarantined eventshas_quarantined
- True if there are any quarantined items.
__init__
def __init__(integration_instance: GraiIntegrationImplementation)
Initializes the QuarantineAccessor
nodes
@property
def nodes() -> List[QuarantinedNode]
Returns a list of quarantined nodes
nodes
@nodes.setter
def nodes(value: List[QuarantinedNode])
Sets the list of quarantined nodes
edges
@property
def edges() -> List[QuarantinedEdge]
Returns a list of quarantined edges
edges
@edges.setter
def edges(value: List[QuarantinedEdge])
Sets the list of quarantined edges
events
@property
def events() -> List[QuarantinedEvent]
Returns a list of quarantined events
events
@events.setter
def events(value: List[QuarantinedEvent])
Sets the list of quarantined events
has_quarantined
@property
def has_quarantined() -> bool
Returns True if there are any quarantined items
ValidatedIntegration Objects
class ValidatedIntegration(GraiIntegrationImplementation)
A type of Integration which quarantines invalid integration output
Attributes:
integration
- The integration to validate
__init__
def __init__(integration: GraiIntegrationImplementation, *args, **kwargs)
Initializes the ValidatedIntegration
Arguments:
integration
- The integration to validate
quarantine
@property
def quarantine() -> QuarantineAccessor
Returns the QuarantineAccessor for the integration
nodes
@cache
def nodes() -> List[SourcedNode]
Returns a list of validated sourced nodes
edges
@cache
def edges() -> List[SourcedEdge]
Returns a list of validates sourced edges
get_nodes_and_edges
@cache
def get_nodes_and_edges() -> Tuple[List[SourcedNode], List[SourcedEdge]]
Returns a tuple of lists of validated sourced nodes and sourced edges
events
@cache
def events(*args, **kwargs) -> List[Event]
Returns a list of validated events
ready
def ready() -> bool
Returns True if the integration is ready to run