import pandas as pd
from io import StringIO
import os
import re
from collections import namedtuple
from typing import Optional
def _isFile(input: str):
file_path_pattern = r'^(?:[a-zA-Z]:\\|\.{1,2}[\\\/]|\/)?(?:[\w\-\s]+[\\\/]?)+[\w\-\s]+\.[\w]+$'
if re.match(file_path_pattern, input):
if os.path.isfile(input):
return True
else:
raise FileNotFoundError (f"The file {input} does not exist.")
return False
def _is_url(input: str):
url_pattern = r'^https?://.*\.[\w]+$'
return re.match(url_pattern, input) is not None
def _is_rdf_xml(content):
rdf_xml_pattern = r'^\s*<\?xml.*\?>.*<rdf:RDF'
return re.search(rdf_xml_pattern, content, re.DOTALL) is not None
def _is_turtle(content):
turtle_pattern = r'(@prefix|@base|<[^>]+>\s*<[^>]+>\s*<[^>]+>|<[^>]+>\s*<[^>]+>\s*"[^"]*")'
return re.search(turtle_pattern, content) is not None
[docs]
class CoreseAPI:
"""
Simplified API to leverage functionality of Corese Java library ``corese-core``.
Parameters
----------
java_bridge : str, optional
Package name to use for Java integration. Options: ``py4j``, ``jpyp``. Default is ``py4j``.
corese_path : str, optional
Path to the corese-python library. If not specified (default), the jar
file that was installed with the package is used.
"""
def __init__(self,
java_bridge: str = 'py4j',
corese_path: Optional[str] = None):
if java_bridge.lower() not in ['py4j', 'jpype']:
raise ValueError('Invalid java bridge. Only "py4j" and "jpype" are supported.')
self.corese_path = corese_path
self.java_bridge = java_bridge.lower()
self.java_gateway = None
self._bridge = None
# This is a minimum set of Corese classes required for the API to work
self.Graph = None # Corese ``fr.inria.corese.core.Graph`` object
self.QueryProcess = None # Corese ``fr.inria.corese.core.query.QueryProcess`` object
self.ResultFormat = None # Corese ``fr.inria.corese.core.print.ResultFormat`` object
self.Load = None # Corese ``fr.inria.corese.core.load.Load`` object
self.RuleEngine = None # Corese ``fr.inria.corese.core.rule.RuleEngine`` object
self.Transformer = None # Corese ``fr.inria.corese.core.transform.Transformer`` object
self.Shacl = None # Corese ``fr.inria.corese.core.shacl.Shacl`` object
self._DataManager = None # Corese ``fr.inria.corese.core.storage.api.dataManager.DataManager`` object
self._CoreseGraphDataManager = None # Corese ``fr.inria.corese.core.storage.CoreseGraphDataManager`` object
self._CoreseGraphDataManagerBuilder = None # Corese ``fr.inria.corese.core.storage.CoreseGraphDataManagerBuilder`` object
[docs]
def coreseVersion(self)-> str|None:
"""
Get the version of the corese-core library.
Notes
-----
Corese library must be loaded first.
Returns
-------
str
The version of the ``corese-core`` library used. If the library is not loaded, returns None.
"""
#TODO: implement this to call the coreseVersion() from
# the corese engine (at the moment this method is static and
# may return bad result)
if self._bridge is None:
print("Corese engine is not loaded yet")
return None
return self._bridge.coreseVersion()
[docs]
def loadCorese(self) -> Optional[object]:
"""
Load Corese library into JVM and expose the Corese classes.
Returns
-------
object
Java Gateway object if the library is loaded successfully. Otherwise, returns None.
"""
#TODO: refactor
if self.java_bridge == 'py4j':
from .py4J_bridge import Py4JBridge
self._bridge = Py4JBridge(corese_path = self.corese_path)
self.java_gateway = self._bridge.loadCorese()
else:
from .jpype_bridge import JPypeBridge
self._bridge = JPypeBridge(corese_path = self.corese_path)
self.java_gateway =self._bridge.loadCorese()
# This is a minimum set of classes required for the API to work
# if we need more classes we should think about how to expose
# them without listing every single one of them here
self.Graph = self._bridge.Graph
self.Load = self._bridge.Load
self.Loader = self._bridge.Loader
self.QueryProcess = self._bridge.QueryProcess
self.ResultFormat = self._bridge.ResultFormat
self.RuleEngine = self._bridge.RuleEngine
self.Transformer = self._bridge.Transformer
# Classes to manage Graph(s) with different storage options
self._DataManager = self._bridge.DataManager
self._CoreseGraphDataManager = self._bridge.CoreseGraphDataManager
self._CoreseGraphDataManagerBuilder = self._bridge.CoreseGraphDataManagerBuilder
# Classes to manage SHACL validation
self.Shacl = self._bridge.Shacl
# Define the known namespaces
Namespace = namedtuple('Namespace', ['RDF', 'RDFS', 'SHACL'])
self.Namespaces = Namespace(
self._bridge.RDF.RDF,
self._bridge.RDFS.RDFS,
'http://www.w3.org/ns/shacl#'
)
self.SHACL_REPORT_QUERY=f'''@prefix sh: <{self.Namespaces.SHACL}> .
SELECT ?o ?p ?s
WHERE {{ ?o a sh:ValidationResult.
?o ?p ?s.}}'''
return self.java_gateway
[docs]
def unloadCorese(self):
"""
Explicitly unload Corese library.
It's not necessary to call this method, as the library is automatically
unloaded when the Python interpreter exits.
Warning
-------
After unloading Corese bridged by ``JPype`` it is not possible to restart it.
"""
self._bridge.unloadCorese()
self.java_gateway = None
self.Graph = None
self.QueryProcess = None
self.ResultFormat = None
self.Load = None
#TODO: Add support for the other RDF formats
[docs]
def loadRDF(self, rdf: str, graph: Optional[object] = None)-> object:
"""
Load RDF file/string into Corese graph. Supported formats are RDF/XML and Turtle.
Parameters
----------
rdf : str
Path or URL of an RDF file or a string with RDF content.
graph : object, optional
Corese ``fr.inria.corese.core.Graph`` object.
If an object is not provided (default), new Graph and GraphManager will be created.
Returns
-------
object
Corese ``fr.inria.core.Graph`` object.
"""
if not self.java_gateway:
self.loadCorese()
assert self.Graph, 'Corese classes are not loaded properly.'
assert self.Load, 'Corese classes are not loaded properly.'
#TODO: add support for DataManager(s) for different storage options
# the option has to come as a parameter
# if not graph:
# graph = self.Graph()
# graph_mgr = self.CoreseGraphDataManagerBuilder().build()
# else:
# graph_mgr = self.CoreseGraphDataManagerBuilder().graph(graph).build()
#
# ld = self.Load.create(graph, graph_mgr)
# ...
# return graph_mgr.getGraph()
graph = graph or self.Graph()
ld = self.Load.create(graph)
#TODO: add support for a URL
if _isFile(rdf):
ld.parse(rdf)
elif _is_url(rdf):
ld.parse(rdf)
else:
if _is_rdf_xml(rdf):
ld.loadString(rdf, self.Loader.format.RDFXML_FORMAT)
elif _is_turtle(rdf):
ld.loadString(rdf, self.Loader.format.TURTLE_FORMAT)
else:
raise ValueError('Unsupported RDF format. Only RDF/XML and Turtle are supported by this version')
return graph.getGraph()
[docs]
def loadRuleEngine(self, graph: object,
profile: str,
replace:bool = False)-> object:
"""
Load the rule engine for a given graph.
Parameters
----------
graph : object
Corese ``fr.inria.corese.core.Graph`` object
profile : str
Profile the rule engine. Accepted values: Accepted values: *rdfs*, *owlrl*, *owlrl_lite*, *owlrl_ext*
replace : bool, optional
Replace the existing rule engine. Default is False.
Returns
-------
object
Corese ``fr.inria.core.rule.RuleEngine`` object.
"""
assert self.RuleEngine, 'Corese classes are not loaded properly.'
assert graph, 'Graph object is required.'
assert profile, 'Profile object is required.'
if profile == 'rdfs':
profile = self.RuleEngine.Profile.RDFS
elif profile == 'owlrl_lite':
profile = self.RuleEngine.Profile.OWLRL_LITE
elif profile == 'owlrl_ext':
profile = self.RuleEngine.Profile.OWLRL_EXT
elif profile == 'owlrl':
profile = self.RuleEngine.Profile.OWLRL
else:
raise ValueError('Invalid profile. Accepted values are: "rdfs", "owlrl_lite", "owlrl_ext", "owlrl"')
if replace:
self.resetRuleEngine(graph)
rule_engine = self.RuleEngine.create(graph)
rule_engine.setProfile(profile)
rule_engine.process()
return rule_engine
[docs]
def resetRuleEngine(self, graph: object)-> None:
"""
Reset the rule engine for a given graph.
Parameters
----------
graph : object
Corese ``fr.inria.corese.core.Graph`` object
"""
assert self.RuleEngine, 'Corese classes are not loaded properly.'
assert graph, 'Graph object is required.'
rule_engine = self.RuleEngine.create(graph.getGraph())
rule_engine.remove()
[docs]
def parsePrefixes(self, query: str)-> dict:
"""
Parse a query string to extract a dictionary of (prefix, namespace) pairs
Parameters
----------
query : str
Query string that may contain PREFIX declarations
Returns
-------
dict
Dictionary of (prefix, namespace) pairs or an empty dictionary if no prefixes are found.
"""
pattern = re.compile(r'PREFIX\s+(\w+):\s+<\s*(.*?)\s*>')
matches = pattern.findall(query)
prefixes = {prefix + ':' : url for prefix, url in matches}
return prefixes
def _applyPrefixes(self, query_result: object|pd.DataFrame, ns: dict|None)-> object|pd.DataFrame:
"""
Substitute long namespace names in the URIs with prefixes. This method can be applied either
to a Corese query result map or a DataFrame.
applying prefixes to a DataFrame is faster since it is done in python. Applying prefixes to a map
is slower since it is done in the Java process for each value.
Parameters
----------
query_result : object or pd.DataFrame
Query result in Corese format or a DataFrame.
ns : dict, optional
Dictionary of (prefix, namespace) pairs. Default is None.
Returns
-------
object or pd.DataFrame
Query result with prefixes applied.
"""
if not isinstance(ns, dict):
return query_result
if isinstance(query_result, pd.DataFrame):
# prefix in the DataFrame -fast
return query_result.fillna('')\
.replace(ns.values(), ns.keys(), regex=True)\
.replace('',pd.NA,regex = True)
else:
# assume it's a map output of the query -slow
for i, row in enumerate(query_result.getMappingList()):
for j, var in enumerate(row.getList()):
if not var.isEmpty() and var.size() > 1 and var[1].isURI():
for prefix, namespace in ns.items():
if var[1].contains(namespace):
new_uri = var[1].stringValue().replace(namespace, prefix)
var[1].setValue(new_uri)
return query_result
[docs]
def sparqlSelect(self, graph: object,
query: str ='SELECT * WHERE {?s ?p ?o} LIMIT 5',
return_dataframe: bool =True,
post_apply_prefixes: bool = True)-> object|pd.DataFrame:
"""
Execute SPARQL SELECT or ASK query on Corese graph. Optionally return the result as DataFrame.
Parameters
----------
graph : object
Corese ``fr.inria.corese.core.Graph`` object
query : str, optional
SPARQL query. By default five first triples of the graph are returned.
return_dataframe : bool, optional
Return the result as a DataFrame. Default is True.
post_apply_prefixes : bool, optional
Substitute long namespaces with prefixes defined in the query . Default is True.
Returns
-------
object or pd.DataFrame
Result of the SPARQL query in CSV-formatted ``fr.inria.core.print.ResultFormat``
object or a ``pandas.DataFrame``.
"""
assert self.QueryProcess, 'Corese classes are not loaded properly.'
assert self.ResultFormat, 'Corese classes are not loaded properly.'
if not graph:
raise ValueError('Graph object is required.')
# create a dictionary of (prefix, namespace) pairs
ns = self.parsePrefixes(query)
exec = self.QueryProcess.create(graph)
map = exec.query(query)
# to keep it simple for now return the result in CSV format
result = self.ResultFormat.create(map, self.ResultFormat.SPARQL_RESULTS_CSV)
# or return a DataFrame
if return_dataframe:
if post_apply_prefixes:
return self._applyPrefixes(self.toDataFrame(result), ns)
else:
return self.toDataFrame(result)
return result
[docs]
def toDataFrame(self, queryResult: object,
dtypes: list|dict|None = None)-> pd.DataFrame:
"""
Convert Corese ResultFormat object to ``pandas.DataFrame``.
Parameters
----------
queryResult : object
CSV-formatted ``fr.inria.core.print.ResultFormat`` object.
dtypes : list or dict, optional
Optional column data types for the columns in the format as in ``panads.read_csv`` method.
https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
Returns
-------
pd.DataFrame
Corese object converted to a DataFrame.
"""
assert self.ResultFormat, 'Corese classes are not loaded properly.'
df = pd.read_csv(StringIO(str(queryResult)),
skipinitialspace=True,
dtype=dtypes)
# Assign n/a to empty strings
string_dtypes = df.convert_dtypes().select_dtypes("string")
df[string_dtypes.columns] = string_dtypes.replace(r'^\s*$', None, regex=True)
return df
#TODO: add timeout parameter
[docs]
def sparqlConstruct(self, graph: Optional[object] = None,
query: str ='',
merge: bool=False)-> object:
"""
Execute SPARQL CONSTRUCT query on Corese graph.
Optionally the new triples can be merged with the existing graph.
Parameters
----------
graph : object, optional
Corese ``fr.inria.corese.core.Graph`` object. If not provided (default), a new graph is created.
query : str, optional
SPARQL query. Defaults to an empty string, resulting in an empty graph.
merge : bool, optional
Option to merge the result with the existing graph passed in the parameters. Default is False.
Returns
-------
object
Result of the SPARQL CONSTRUCT query in RDF/XML format.
"""
assert self.QueryProcess, 'Corese classes are not loaded properly.'
assert self.ResultFormat, 'Corese classes are not loaded properly.'
if not graph:
graph = self.Graph()
exec = self.QueryProcess.create(graph)
map = exec.query(query)
if merge:
graph.getGraph().merge(map.getGraph())
result = self.ResultFormat.create(map, self.ResultFormat.DEFAULT_CONSTRUCT_FORMAT)
return result
[docs]
def toTurtle(self, rdf:object)-> str:
"""
Convert RDF/XML to Turtle format.
Parameters
----------
rdf : object
Corese RDF object
Returns
-------
str
RDF in Turtle format.
"""
assert self.Transformer, 'Corese classes are not loaded properly.'
# TODO: ASk Remi about getGraph, the Graph and the right way to do the transformation
ttl = self.Transformer.create(rdf.getMappings().getGraph(), self.Transformer.TURTLE)
return ttl.toString()
#TODO: ASk Remi what are the acceptable shacl formats
[docs]
def shaclValidate(self, graph: object,
shacl_shape_ttl: str ='',
return_dataframe = False)-> str:
"""
Validate RDF graph against SHACL shape.
This version supports only Turtle format to define a SHACL shape.
Parameters
----------
graph : object
Corese ``fr.inria.corese.core.Graph`` object
shacl_shape_ttl : str, optional
SHACL shape in Turtle format. If not provided, the validation will be skipped.
return_dataframe : bool, optional
Return the validation report as a DataFrame. Default is False.
Returns
-------
str
SHACL validation report in Turtle format.
"""
assert self.Shacl, 'Corese classes are not loaded properly.'
shapeGraph = self.Graph()
ld = self.Load.create(shapeGraph)
if _isFile(shacl_shape_ttl):
# Load shape graph from file
ld.parse(shacl_shape_ttl)
else:
# Load shape graph from string
ld.loadString(shacl_shape_ttl,
self.Loader.format.TURTLE_FORMAT)
# Evaluation
shacl = self.Shacl(graph.getGraph(), shapeGraph)
result = shacl.eval()
trans = self.Transformer.create(result, self.Transformer.TURTLE)
if return_dataframe:
return self.shaclReportToDataFrame(str(trans.toString()))
return str(trans.toString())
# Parse validation report
[docs]
def shaclReportToDataFrame(self, validation_report: str)-> pd.DataFrame:
"""
Convert SHACL validation report to ``pandas.DataFrame``.
Parameters
----------
validation_report : str
SHACL validation report in Turtle format.
Returns
-------
pd.DataFrame
Validation report as a DataFrame.
"""
validation_report_graph = self.loadRDF(validation_report)
report = self.sparqlSelect(validation_report_graph, self.SHACL_REPORT_QUERY)
report = report.pivot(index='o', columns='p', values='s')
report.columns = [uri.split('#')[-1] for uri in report.columns]
report.reset_index(drop=True, inplace=True)
return report
#TODO: add a named graph parameter
[docs]
def addTriple(self, graph: object, subject: str, predicate: str, obj: str)-> object:
"""
Add a triple to the default Corese graph.
Parameters
----------
graph : object
Corese ``fr.inria.corese.core.Graph`` object
subject : str
Subject of the triple. Must be a URI.
predicate : str
Predicate of the triple. Must be a URI.
obj : str
Object of the triple. Must be a URI or a literal.
Returns
-------
object
Corese ``fr.inria.corese.core.Graph`` object with the new triple.
"""
subject = graph.addResource(subject)
predicate = graph.addProperty(predicate)
if obj.startswith('http'):
obj = graph.addResource(obj)
else:
obj = graph.addLiteral(obj)
graph.addEdge(subject, predicate, obj)
return graph
[docs]
def removeTriple(self, graph: object, subject: str, predicate: str, obj: str)-> object:
"""
Remove a triple from the default Corese graph.
Parameters
----------
graph : object
Corese ``fr.inria.corese.core.Graph`` object
subject : str
Subject of the triple. Must be a URI.
predicate : str
Predicate of the triple. Must be a URI.
obj : str
Object of the triple. Must be a URI or a literal.
Returns
-------
object
Corese ``fr.inria.corese.core.Graph`` object without the triple.
"""
subject = graph.getResource(subject)
predicate = graph.getResource(predicate)
if obj.startswith('http'):
obj = graph.getResource(obj)
else:
obj = graph.getLiteral(obj)
graph.delete(subject, predicate, obj)
return graph
[docs]
def getTripleObject(self, graph, subject:str, predicate:str)->Optional[str]:
"""
Get the object of a triple. It can be a URI or a literal.
Parameters
----------
Return
------
str
String representation of the object of the triple or None if the triple does not exist.
"""
subject = graph.getResource(subject)
predicate = graph.getResource(predicate)
obj = graph.getEdgesRDF4J(subject, predicate, None, None)
if obj.iterator().hasNext():
return obj.iterator().next().getObjectNode().getLabel()
return None
[docs]
def exportRDF(self, graph:object, path:str, format:str ='turtle', overwrite:bool=False)-> None:
"""
Export Corese graph to an RDF file. Only RDF/XML and Turtle are supported by this version.
Parameters
----------
graph : object
Corese ``fr.inria.corese.core.Graph`` object.
path : str
Path to the output RDF file.
format : str, optional
RDF format. Default is Turtle. Accepted values are
*turtle*, *ttl*, *xml*, and *rdfxml*.
overwrite : bool, optional
Overwrite the file if it exists. Default is False.
"""
format = format.lower()
if 'xml' in format or 'rdf' in format:
format = self.Transformer.RDFXML
elif 'ttl' in format or 'turtle' in format:
format = self.Transformer.TURTLE
else:
raise ValueError('Unsupported RDF format. Only RDF/XML and Turtle are supported by this version')
if os.path.exists(path) and not overwrite:
raise FileExistsError(f'{path} already exists. Set overwrite=True to overwrite the file.')
transformer = self.Transformer.create(graph, format)
transformer.write(path)
#TODO: apply prefixes to the output file
if __name__ == "__main__":
# Initialize the CoreseAPI
cr = CoreseAPI(java_bridge='py4j')
cr.loadCorese()
# Load RDF file
gr = cr.loadRDF(os.path.abspath(os.path.join('.', 'examples', 'data','beatles.rdf')))
print("Graph size: ", gr.graphSize())
# Load Rule Engine OwlRL
ren = cr.loadRuleEngine(gr, profile=cr.RuleEngine.Profile.OWLRL)
print("Graph size: ", gr.graphSize())
# Load another Rule Engine e.g. RDFS to replace the existing one
ren = cr.loadRuleEngine(gr, profile=cr.RuleEngine.Profile.RDFS, replace=True)
print("Graph size: ", gr.graphSize())
# Reset Rule Engine
cr.resetRuleEngine(gr)
print("Graph size: ", gr.graphSize())
# Execute SPARQL SELECT query
res = cr.sparqlSelect(gr, query='select * where {?s ?p ?o} limit 5')
# Convert the result to DataFrame
print(cr.toDataFrame(res))
# Execute SPARQL CONSTRUCT query
prefixes = ['@prefix ex: <http://example.com/>']
contruct = '''CONSTRUCT {?Beatle a ex:BandMember }
WHERE { ex:The_Beatles ex:member ?Beatle}'''
results = cr.sparqlConstruct(gr, prefixes=prefixes, query=contruct)
print(results)
# Convert the result to Turtle
print(cr.toTurtle(results))
# Execute SHACL validation
shacl_shape_file = '.\\examples\\data\\beatles-validator.ttl'
report = cr.shaclValidate(gr, shacl_shape_ttl=shacl_shape_file, prefixes=prefixes)
print(report)
# Convert SHACL validation report to DataFrame
shr = cr.shaclReportToDataFrame(report)
print(shr)
# Shutdown the JVM
cr.unloadCorese()
print("Done!")