grai_source_postgres
loader

get_from_env

def get_from_env(label: str,
                 default: Optional[Any] = None,
                 validator: Callable = None)

Arguments:

label (str):

  • default Optional[Any], optional - (Default value = None)
  • validator Callable, optional - (Default value = None)

Returns:

PostgresConnector Objects

class PostgresConnector()

connection_string

@property
def connection_string() -> str

Arguments:

Returns:

connect

def connect()

connection

@property
def connection()

close

def close() -> None

Arguments:

Returns:

query_runner

def query_runner(query: str, param_dict: Dict = {}) -> List[Dict]

Arguments:

query (str):

  • param_dict Dict, optional - (Default value = {})

Returns:

tables

@cached_property
def tables() -> List[Table]

Create and return a list of dictionaries with the schemas and names of tables in the database connected to by the connection argument.

Arguments:

Returns:

columns

@cached_property
def columns() -> List[Column]

Creates and returns a list of dictionaries for the specified schema.table in the database connected to.

Arguments:

Returns:

get_table_columns

def get_table_columns(table: Table) -> List[Column]

Arguments:

table (Table):

Returns:

column_map

@cached_property
def column_map() -> Dict[Tuple[str, str], List[Column]]

Arguments:

Returns:

foreign_keys

@cached_property
def foreign_keys() -> List[Edge]

This needs to be tested / evaluated

Arguments:

Returns:

get_nodes

def get_nodes() -> List[PostgresNode]

Arguments:

Returns:

get_edges

def get_edges() -> List[Edge]

Arguments:

Returns:

get_nodes_and_edges

def get_nodes_and_edges() -> Tuple[List[PostgresNode], List[Edge]]

Arguments:

Returns: