Source code for pycorese.api

import pandas as pd
from io import StringIO
import os
import re
from collections import namedtuple

def _isFile(input: str):
    file_path_pattern = r'^(?:[a-zA-Z]:\\|\.{1,2}[\\\/]|\/)?(?:[\w\-\s]+[\\\/]?)+[\w\-\s]+\.[\w]+$'

    if re.match(file_path_pattern, input):
        if os.path.isfile(input):
            return True
        else:
            raise FileNotFoundError (f"The file {input} does not exist.")

    return False

def _is_rdf_xml(content):
    rdf_xml_pattern = r'^\s*<\?xml.*\?>.*<rdf:RDF'
    return re.search(rdf_xml_pattern, content, re.DOTALL) is not None

def _is_turtle(content):
    turtle_pattern = r'(@prefix|@base|<[^>]+>\s*<[^>]+>\s*<[^>]+>|<[^>]+>\s*<[^>]+>\s*"[^"]*")'
    return re.search(turtle_pattern, content) is not None


[docs] class CoreseAPI: """ Simplified API to leverage functionality of Corese Java library (``corese-core``). Parameters ---------- java_bridge : str, optional Package name to use for Java integration. Options: 'py4j', 'jpype'. Default is 'py4j'. corese_path : str, optional Path to the corese-python library. If not specified (default), the jar file that was installed with the package is used. """ def __init__(self, java_bridge: str = 'py4j', corese_path: str|None = None): if java_bridge.lower() not in ['py4j', 'jpype']: raise ValueError('Invalid java bridge. Only "py4j" and "jpype" are supported.') self.corese_path = corese_path self.java_bridge = java_bridge.lower() self.java_gateway = None self._bridge = None # This is a minimum set of Corese classes required for the API to work self.Graph = None # Corese ``fr.inria.corese.core.Graph`` object self.QueryProcess = None # Corese ``fr.inria.corese.core.query.QueryProcess`` object self.ResultFormat = None # Corese ``fr.inria.corese.core.print.ResultFormat`` object self.Load = None # Corese ``fr.inria.corese.core.load.Load`` object self.RuleEngine = None # Corese ``fr.inria.corese.core.rule.RuleEngine`` object self.Transformer = None # Corese ``fr.inria.corese.core.transform.Transformer`` object self.Shacl = None # Corese ``fr.inria.corese.core.shacl.Shacl`` object self.DataManager = None # Corese ``fr.inria.corese.core.storage.api.dataManager.DataManager`` object self.CoreseGraphDataManager = None # Corese ``fr.inria.corese.core.storage.CoreseGraphDataManager`` object self.CoreseGraphDataManagerBuilder = None # Corese ``fr.inria.corese.core.storage.CoreseGraphDataManagerBuilder`` object
[docs] def coreseVersion(self)-> str|None: """ Get the version of the corese-core library. Notes ----- Corese library must be loaded first. Returns ------- str The version of the ``corese-core`` library used. If the library is not loaded, returns None. """ #TODO: implement this to call the coreseVersion() from # the corese engine (at the moment this method is static and # may return bad result) if self._bridge is None: print("Corese engine is not loaded yet") return None return self._bridge.coreseVersion()
[docs] def unloadCorese(self): """ Explicitly unload Corese library. It's not necessary to call this method, as the library is automatically unloaded when the Python interpreter exits. Warning ------- After unloading Corese bridged by ``JPype`` it is not possible to restart it. """ self._bridge.unloadCorese() self.java_gateway = None self.Graph = None self.QueryProcess = None self.ResultFormat = None self.Load = None
[docs] def loadCorese(self) -> None: """Load Corese library into JVM and expose the Corese classes. """ #TODO: refactor if self.java_bridge == 'py4j': from .py4J_bridge import Py4JBridge self._bridge = Py4JBridge(corese_path = self.corese_path) self.java_gateway = self._bridge.loadCorese() else: from .jpype_bridge import JPypeBridge self._bridge = JPypeBridge(corese_path = self.corese_path) self.java_gateway =self._bridge.loadCorese() # This is a minimum set of classes required for the API to work # if we need more classes we should think about how to expose # them without listing every single one of them here self.Graph = self._bridge.Graph self.Load = self._bridge.Load self.QueryProcess = self._bridge.QueryProcess self.ResultFormat = self._bridge.ResultFormat self.RuleEngine = self._bridge.RuleEngine self.Transformer = self._bridge.Transformer # Classes to manage Graph(s) with different storage options self.DataManager = self._bridge.DataManager self.CoreseGraphDataManager = self._bridge.CoreseGraphDataManager self.CoreseGraphDataManagerBuilder = self._bridge.CoreseGraphDataManagerBuilder # Classes to manage SHACL validation self.Shacl = self._bridge.Shacl # Define the known namespaces Namespace = namedtuple('Namespace', ['RDF', 'RDFS', 'SHACL']) self.Namespaces = Namespace( self._bridge.RDF.RDF, self._bridge.RDFS.RDFS, 'http://www.w3.org/ns/shacl#' ) self.SHACL_REPORT_QUERY='''SELECT ?o ?p ?s WHERE { ?o a sh:ValidationResult. ?o ?p ?s. }'''
#TODO: Add support for the other RDF formats
[docs] def loadRDF(self, rdf: str, graph=None)-> object: """ Load RDF file/string into Corese graph. Supported formats are RDF/XML and Turtle. Parameters ---------- rdf : str Path to the RDF file or a string with RDF content. graph : object, optional Corese object of either ``fr.inria.corese.core.Graph`` or ``fr.inria.core.storage.CoreseGraphDataManager`` type. If an object is not provided (default), new Graph and GraphManager will be created. Returns ------- object Corese ``fr.inria.core.storage.CoreseGraphDataManager`` object. """ if not self.java_gateway: self.loadCorese() assert self.Graph, 'Corese classes are not loaded properly.' assert self.Load, 'Corese classes are not loaded properly.' assert self.CoreseGraphDataManagerBuilder, 'Corese classes are not loaded properly.' if not graph: graph = self.Graph() graph_mgr = self.CoreseGraphDataManagerBuilder().build() ld = self.Load().create(graph, graph_mgr) if _isFile(rdf): ld.parse(rdf) else: if _is_rdf_xml(rdf): ld.loadString(rdf, self.Load.RDFXML_FORMAT) elif _is_turtle(rdf): ld.loadString(rdf, self.Load.TURTLE_FORMAT) else: raise ValueError('Unsupported RDF format. Only RDF/XML and Turtle are supported by this version') return graph_mgr
[docs] def loadRuleEngine(self, graph: object, profile: object, replace:bool = False)-> object: """ Load the rule engine for a given graph. Parameters ---------- graph : object Corese Graph or DataManager object profile : object Profile object for the rule engine. Accepted values: ``RuleEngine.Profile.RDFS``, ``RuleEngine.Profile.OWLRL``, ``RuleEngine.Profile.OWLRL_LITE``, ``RuleEngine.Profile.OWLRL_EXT`` replace : bool, optional Replace the existing rule engine. Default is False. Returns ------- object Corese ``fr.inria.core.rule.RuleEngine`` object. """ assert self.RuleEngine, 'Corese classes are not loaded properly.' assert graph, 'Graph object is required.' assert profile, 'Profile object is required.' #TODO: assert profile is valid if replace: self.resetRuleEngine(graph) rule_engine = self.RuleEngine.create(graph) rule_engine.setProfile(profile) rule_engine.process() return rule_engine
[docs] def resetRuleEngine(self, graph: object)-> None: """ Reset the rule engine for a given graph. Parameters ---------- graph : object Corese Graph or DataManager object """ assert self.RuleEngine, 'Corese classes are not loaded properly.' assert graph, 'Graph object is required.' rule_engine = self.RuleEngine.create(graph.getGraph()) rule_engine.remove()
[docs] def sparqlSelect(self, graph: object, prefixes: str|list|None = None, query: str ='SELECT * WHERE {?s ?p ?o} LIMIT 5', return_dataframe: bool =True)-> object|pd.DataFrame: """ Execute SPARQL SELECT or ASK query on Corese graph. Optionally return the result as DataFrame. Parameters ---------- graph : object Corese Graph or DataManager object prefixes : str or list, optional namespace prefixes. Default is None. query : str, optional SPARQL query. By default five first triples of the graph are returned. return_dataframe : bool, optional Return the result as a DataFrame. Default is True. Returns ------- object or pd.DataFrame Result of the SPARQL query in CSV-formatted ``fr.inria.core.print.ResultFormat`` object or a DataFrame. """ assert self.QueryProcess, 'Corese classes are not loaded properly.' assert self.ResultFormat, 'Corese classes are not loaded properly.' if not graph: raise ValueError('Graph object is required.') #TODO: extract method to create a prefix string if not prefixes: prefixes = '' if isinstance(prefixes, list): prefixes = '\n'.join(prefixes) exec = self.QueryProcess.create(graph) map = exec.query('\n'.join([prefixes, query]) ) # to keep it simple for now return the result in CSV format result = self.ResultFormat.create(map, self.ResultFormat.SPARQL_RESULTS_CSV) if return_dataframe: return self.toDataFrame(result) return result
[docs] def toDataFrame(self, queryResult: object, dtypes: list|dict|None = None)-> pd.DataFrame: """ Convert Corese ResultFormat object to ``pandas.DataFrame``. Parameters ---------- queryResult : object CSV-formatted ``fr.inria.core.print.ResultFormat`` object. dtypes : list or dict, optional Optional column data types for the columns in the format as in ``panads.read_csv`` method. https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html Returns ------- pd.DataFrame Corese object converted to a DataFrame. """ assert self.ResultFormat, 'Corese classes are not loaded properly.' df = pd.read_csv(StringIO(str(queryResult)), skipinitialspace=True, dtype=dtypes) # Assign n/a to empty strings string_dtypes = df.convert_dtypes().select_dtypes("string") df[string_dtypes.columns] = string_dtypes.replace(r'^\s*$', None, regex=True) return df
#TODO: add timeout
[docs] def sparqlConstruct(self, graph: object, prefixes: str|list|None = None, query: str ='', merge: bool=False)-> object: """ Execute SPARQL CONSTRUCT query on Corese graph. Optionally the new triples can be merged with the existing graph. Parameters ---------- graph : object Corese Graph or DataManager object prefixes : str or list, optional namespace prefixes. Default is None. query : str, optional SPARQL query. Default is empty string resulting in empty graph. merge : bool, optional Option to merge the result with the existing graph. Default is False. Returns ------- object Result of the SPARQL CONSTRUCT query in RDF/XML format. """ assert self.QueryProcess, 'Corese classes are not loaded properly.' assert self.ResultFormat, 'Corese classes are not loaded properly.' if not graph: raise ValueError('Graph object is required.') #todo: extract method to create a prefix string if not prefixes: prefixes = '' if isinstance(prefixes, list): prefixes = '\n'.join(prefixes) exec = self.QueryProcess.create(graph) map = exec.query('\n'.join([prefixes, query]) ) if merge: graph.getGraph().merge(map.getGraph()) result = self.ResultFormat.create(map, self.ResultFormat.DEFAULT_CONSTRUCT_FORMAT) return result
[docs] def toTurtle(self, rdf:object)-> str: """ Convert RDF/XML to Turtle format. Parameters ---------- rdf : object Corese RDF object Returns ------- str RDF in Turtle format. """ assert self.Transformer, 'Corese classes are not loaded properly.' # TODO: ASk Remi about getGraph, the Graph and the right way to do the transformation ttl = self.Transformer.create(rdf.getMappings().getGraph(), self.Transformer.TURTLE) return ttl.toString()
#TODO: ASk Remi what are the acceptable shacl formats
[docs] def shaclValidate(self, graph: object, prefixes: str|list|None = None, shacl_shape_ttl: str ='', return_dataframe = False)-> object: """ Validate RDF graph against SHACL shape. This version supports only Turtle format to define a SHACL shape. Parameters ---------- graph : object Corese Graph or DataManager object prefixes : str or list, optional namespace prefixes. Default is None. shacl_shape_ttl : str, optional SHACL shape in Turtle format. If not provided, the validation will be skipped. return_dataframe : bool, optional Return the validation report as a DataFrame. Default is False. Returns ------- object SHACL validation report in Turtle format. """ assert self.Shacl, 'Corese classes are not loaded properly.' prefix_shacl = f'@prefix sh: <{self.Namespaces.SHACL}> .' if not prefixes: prefixes = '' if isinstance(prefixes, list): prefixes = '\n'.join(prefixes) prefixes = '\n'.join([prefixes, prefix_shacl]) shapeGraph = self.Graph() ld = self.Load.create(shapeGraph) if _isFile(shacl_shape_ttl): # Load shape graph from file ld.parse(shacl_shape_ttl) else: # Load shape graph from string ld.loadString('\n'.join([prefixes, shacl_shape_ttl]), self.Load.TURTLE_FORMAT) # Evaluation shacl = self.Shacl(graph.getGraph(), shapeGraph) result = shacl.eval() trans = self.Transformer.create(result, self.Transformer.TURTLE) if return_dataframe: return self.shaclReportToDataFrame(str(trans.toString())) return str(trans.toString())
# Parse validation report
[docs] def shaclReportToDataFrame(self, validation_report: str)-> pd.DataFrame: """ Convert SHACL validation report to ``pandas.DataFrame``. Parameters ---------- validation_report : str SHACL validation report in Turtle format. Returns ------- pd.DataFrame Validation report as a DataFrame. """ prefix_shacl = f'@prefix sh: <{self.Namespaces.SHACL}> .' validation_report_graph = self.loadRDF(validation_report) report = self.sparqlSelect(validation_report_graph, prefix_shacl, self.SHACL_REPORT_QUERY) report = report.pivot(index='o', columns='p', values='s') report.columns = [uri.split('#')[-1] for uri in report.columns] #TODO cleanup the report return report
if __name__ == "__main__": # Initialize the CoreseAPI cr = CoreseAPI(java_bridge='py4j') cr.loadCorese() # Load RDF file gr = cr.loadRDF(os.path.abspath(os.path.join('.', 'examples', 'data','beatles.rdf'))) print("Graph size: ", gr.graphSize()) # Load Rule Engine OwlRL ren = cr.loadRuleEngine(gr, profile=cr.RuleEngine.Profile.OWLRL) print("Graph size: ", gr.graphSize()) # Load another Rule Engine e.g. RDFS to replace the existing one ren = cr.loadRuleEngine(gr, profile=cr.RuleEngine.Profile.RDFS, replace=True) print("Graph size: ", gr.graphSize()) # Reset Rule Engine cr.resetRuleEngine(gr) print("Graph size: ", gr.graphSize()) # Execute SPARQL SELECT query res = cr.sparqlSelect(gr, query='select * where {?s ?p ?o} limit 5') # Convert the result to DataFrame print(cr.toDataFrame(res)) # Execute SPARQL CONSTRUCT query prefixes = ['@prefix ex: <http://example.com/>'] contruct = '''CONSTRUCT {?Beatle a ex:BandMember } WHERE { ex:The_Beatles ex:member ?Beatle}''' results = cr.sparqlConstruct(gr, prefixes=prefixes, query=contruct) print(results) # Convert the result to Turtle print(cr.toTurtle(results)) # Execute SHACL validation shacl_shape_file = '.\\examples\\data\\beatles-validator.ttl' report = cr.shaclValidate(gr, shacl_shape_ttl=shacl_shape_file, prefixes=prefixes) print(report) # Convert SHACL validation report to DataFrame shr = cr.shaclReportToDataFrame(report) print(shr) # Shutdown the JVM cr.unloadCorese() print("Done!")