FalkorDB
FalkorDB
Start FalkorDB locally
- Start FalkroDB locally in a Docker container (with autoremove)
docker run -p 6379:6379 -p 3000:3000 -it --rm falkordb/falkordb:latest
Persistant storage (in the container)
- Start FalkorDB locally and call the container
falkorto make it easy to restart.
docker run -p 6379:6379 -p 3000:3000 -it --name falkor falkordb/falkordb:latest
Ctrl-C will stop it
Restart it as a daemon:
docker restart falkor
To stop:
docker stop falkor
Persistand storage (outside the container)
TBD
docker run -p 6379:6379 -p 3000:3000 --rm --name falkor -v falkordb_data:/var/lib/falkordb/data falkordb/falkordb:latest
OpenCypher
- OpenCypher query language
GraphDB systems using OpenCypher
Competing languages
- GQL (Graph Query Language), and emerging ISO standard.
- Gremlin (Apache TinkerPop)
- SPARQL (RDF graphs)
OpenCypher Basics
Node
(p:Person {name: "Alice"})
Where
p= variablePerson= label{}= properties
Relationship
(direction matters)
(p)-[:KNOWS]->(q)
- Both
pandqrepresent nodes.
READ: Match nodes
MATCH (p:Person)
RETURN p
READ: Filter nodes
MATCH (p:Person)
WHERE p.name = "Alice"
RETURN p
READ: Match relationship
MATCH (p:Person)-[:KNOWS]->(friend)
RETURN p, friend
READ: Pattern matching
MATCH (p:Person)-[:KNOWS]->(f:Person)-[:KNOWS]->(fof)
RETURN fof
WRITE: Create nodes
CREATE (p:Person {name: "Alice", age: 30})
WRITE: Create relationships
MATCH (a:Person {name: "Alice"}), (b:Person {name: "Bob"})
CREATE (a)-[:KNOWS]->(b)
MERGE
MERGE (p:Person {name: "Alice"})
- Creates if not exists
- Matches if exists
(a bit like UPSERT)
Updating data:
MATCH (p:Person {name: "Alice"})
SET p.age = 31
Remove property.
REMOVE p.age
DELETE node
MATCH (p:Person {name: "Alice"})
DELETE p
DELETE relationship?
MATCH (p:Person {name: "Alice"})
DETACH DELETE p
Aggregation
MATCH (p:Person)
RETURN count(p)
MATCH (p:Person)
RETURN p.country, count(*)
Variable-length paths
MATCH (p:Person)-[:KNOWS*1..3]->(other)
RETURN other
SQL to Cyper mental switch
- JOIN -> Pattern matching
- Tables -> Graph
- Rows -> Paths
- Foreign Key -> Relationships
Python
$ mkdir demo
$ cd demo
$ uv init
$ uv add falkordb
Remove the main.py file.
MotoGP in Python
This example is based on the one on the front page of the FalkorDB documentation
Run the following to show the “usage”:
$ cd examples/python
$ uv run motogp.py
The main function sets up the connection to the database and selects the graph called MotoGP.
$ uv run motogp.py delete- Delete the existing graph (in case we had it already in the database):$ uv run motogp.py load-CREATEa few nodes and relationships.$ uv run motogp.py all_riders- List the names of the riders.$ uv run motogp.py all_pairs- Get back all the Rider-rides-Team relations and print out the names.$ uv run motogp.py all_names- The same, but now we only return the names.$ uv run motogp.py yamaha- Which riders represent Yamaha?$ uv run motogp.py ducati- How many riders represent team Ducati? - An aggregator.
import sys
from falkordb import FalkorDB
def load(graph):
graph.query("""CREATE
(:Rider {name:'Valentino Rossi'})-[:rides]->(:Team {name:'Yamaha'}),
(:Rider {name:'Dani Pedrosa'})-[:rides]->(:Team {name:'Honda'}),
(:Rider {name:'Andrea Dovizioso'})-[:rides]->(:Team {name:'Ducati'})""")
def all_riders(graph):
res = graph.query("""MATCH (r:Rider)
RETURN r""")
# print(type(res.result_set))
# `result_set` is a `list` of tuples.
# In this case they are 1-element tuples. (Hence the comma)
for (rider,) in res.result_set:
print(rider.properties["name"])
def all_pairs(graph):
res = graph.query("""MATCH (r:Rider)-[:rides]->(t:Team)
RETURN r, t""")
for rider, team in res.result_set:
print(f"{rider.properties['name']:18} - {team.properties['name']}")
# alias', 'id', 'labels', 'properties', 'to_string'
def all_names(graph):
res = graph.query("""MATCH (r:Rider)-[:rides]->(t:Team)
RETURN r.name, t.name""")
for rider, team in res.result_set:
print(f"{rider:18} - {team}")
def which_rider_represents_yamaha(graph):
# Query which riders represent Yamaha?
company_name = "Yamaha"
# company_name = 'Honda'
res = graph.query(
"""MATCH (r:Rider)-[:rides]->(t:Team)
WHERE t.name = $value
RETURN r.name""",
{"value": company_name},
)
for row in res.result_set:
print(row[0]) # Prints: "Valentino Rossi"
def how_many_riders_represent_ducati(graph):
# Query how many riders represent team Ducati ?
company_name = "Ducati"
res = graph.query(
# """MATCH (r:Rider)-[:rides]->(t:Team {name:'Ducati'}) RETURN count(r)"""
# """MATCH (r:Rider)-[:rides]->(t:Team) WHERE t.name = 'Ducati' RETURN count(r)"""
"""MATCH (r:Rider)-[:rides]->(t:Team) WHERE t.name = $name RETURN count(r)""",
{"name": company_name},
)
print(res.result_set[0][0]) # Prints: 1
def delete(graph):
try:
graph.delete()
except Exception:
# Graph doesn't exist yet, which is fine
pass
def main() -> None:
if len(sys.argv) != 2:
usage()
cmd = sys.argv[1]
if cmd not in DISPATCH:
usage()
db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph("MotoGP")
DISPATCH[cmd](graph)
def usage():
cmds = ", ".join(sorted(DISPATCH.keys()))
print(f"Usage: {sys.argv[0]} {cmds}")
exit(1)
DISPATCH = {
"load": load,
"delete": delete,
"yamaha": which_rider_represents_yamaha,
"ducati": how_many_riders_represent_ducati,
"all_names": all_names,
"all_riders": all_riders,
"all_pairs": all_pairs,
}
if __name__ == "__main__":
main()
Demo
import sys
from falkordb import FalkorDB
def run(graph):
commands = [
# Create a `Node` with the `Person` label (type) and the `name` attribute (property)
"""CREATE (:Person {name: "Alice"})""",
# Create another `Node` of type `Person`, set `p` as an alias to the created node and return it.
# Later we'll be able to reuse this alias either inside the command or in the client language.
"""CREATE (p:Person {name: "Bob", email: "bob@example.org"}) RETURN p""",
# Create a `Node` of type `Article`.
"""CREATE (a:Article {title: "Introduction for FalkorDB"}) RETURN a""",
# If we (mistakenly) leave out the colon `:` we create a `Node` without a label
# In this case `Person` is the alias that was not used to return the Node.
"""CREATE (Person {name: "Clark"})""",
# Create a link (edge, relationship) from Alice to Bob called `KNOWS`. (Alice knows Bob)
"""MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) MERGE (a)-[:KNOWS]->(b)""",
# CREate another type of link between `Alice` and that `Article`.
"""MATCH (p:Person {name: 'Alice'}), (a: Article) WHERE ID(a) = 0 MERGE (p)-[:published]->(a)"""
# Return all the Nodes.
"""MATCH (p) RETURN p""",
# Return all the Nodes with `Person` label
"""MATCH (p:Person) RETURN p""",
# Instead of changing the existing Node,
# This will create another `Node` of type `Person` with name `Alice`.
# It is probably not a good idea.
"""CREATE (p:Person {name: "Alice", email: "alice@example.org"}) RETURN p""",
# List all the Alice-es
"""MATCH (p:Person, {name: "Alice"}) RETURN p""",
# Update all the Person Nodes where the name is Alice adding a new attribute
# As we have 2 nodes mathching, this will update both.
"""MATCH (p:Person {name: 'Alice'}) SET p.age = 42 RETURN p"""
# Delete all the nodes (and thus all the relationships)
#"""MATCH (n) DETACH DELETE n"""
# TODO
"""CREATE (:Person {name: 'Jane'})-[:knows]->(:Person {name: 'Joe'})""",
"""CREATE (:Person {name: 'Alice'})-[:knows]->(:Person {name: 'Cecile'})""",
"""MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) MERGE (a)-[:KNOWS]->(b)""",
#"""MATCH (p:Person)-[:knows]->(q:Person) RETURN p.name, q.name""",
]
# MATCH (n) OPTIONAL MATCH (n)-[e]-(m) RETURN * LIMIT 100
for cmd in commands:
input(cmd)
res = graph.query(cmd)
print(res.result_set)
for row in res.result_set:
for item in row:
print(type(item).__name__)
print(f"id: {item.id}")
print(f"alias: {item.alias}")
print(f"labels: {item.labels}")
print(f"properties: {item.properties}")
input()
def delete(graph):
try:
graph.delete()
except Exception:
# Graph doesn't exist yet, which is fine
pass
def main() -> None:
if len(sys.argv) != 2:
usage()
cmd = sys.argv[1]
if cmd not in DISPATCH:
usage()
db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph("Demo")
DISPATCH[cmd](graph)
def usage():
cmds = ", ".join(sorted(DISPATCH.keys()))
print(f"Usage: {sys.argv[0]} {cmds}")
exit(1)
DISPATCH = {
"run": run,
"delete": delete,
}
if __name__ == "__main__":
main()
Interactive shell
import argparse
import atexit
from pathlib import Path
import readline
from falkordb import FalkorDB
HISTORY_FILE = Path.home() / ".falkordb_shell_history"
PROMPT = "falkordb> "
EXIT_COMMANDS = {".exit", ".quit"}
HELP_COMMANDS = {".help"}
HELP = """
.help, - Show this help page.
.exit, .quit - Quit the REPL.
.intro - Introduction to the OpenCyper commands.
"""
INTRO = """
# Create a `Node` with the `Person` label (type) and the `name` attribute (property)
CREATE (:Person {name: "Alice"})
"""
def setup_history() -> None:
readline.parse_and_bind("tab: complete")
readline.set_history_length(1000)
if HISTORY_FILE.exists():
readline.read_history_file(HISTORY_FILE)
atexit.register(readline.write_history_file, HISTORY_FILE)
def print_result(result) -> None:
if result.result_set:
for row in result.result_set:
for item in row:
print(type(item).__name__)
print(f"id: {item.id}")
print(f"alias: {item.alias}")
print(f"labels: {item.labels}")
print(f"properties: {item.properties}")
else:
print("OK")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", type=int, default=6379)
parser.add_argument("--graph", default="Shell")
return parser.parse_args()
def run_shell(graph) -> None:
while True:
try:
command = input(PROMPT).strip()
except EOFError:
print()
return
except KeyboardInterrupt:
print()
continue
if not command:
continue
if command == ".intro":
print(INTRO)
continue
if command in HELP_COMMANDS:
print(HELP)
continue
if command in EXIT_COMMANDS:
return
try:
result = graph.query(command)
except Exception as error:
print(f"ERROR: {error}")
continue
print_result(result)
def main() -> None:
args = parse_args()
setup_history()
db = FalkorDB(host=args.host, port=args.port)
graph = db.select_graph(args.graph)
print("Welcome to the interactive FalkorDB shell.")
print("Type .help to see the help.")
run_shell(graph)
if __name__ == "__main__":
main()
People and cities
import sys
from falkordb import FalkorDB
def main():
if len(sys.argv) != 2:
usage()
cmd = sys.argv[1]
if cmd not in DISPATCH:
usage()
db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph("People")
DISPATCH[cmd](graph)
def delete(graph):
try:
graph.delete()
except Exception:
# Graph doesn't exist yet, which is fine
pass
def load(graph):
delete(graph)
# (bob)-[:KNOWS]->(alice),
res = graph.query("""CREATE
(alice:Person {name: "Alice", age: 30}),
(bob:Person {name: "Bob", age: 25}),
(carol:Person {name: "Carol", age: 27}),
(dave:Person {name: "Dave", age: 35}),
(eve:Person {name: "Eve", age: 29}),
(london:City {name: "London"}),
(paris:City {name: "Paris"}),
(alice)-[:KNOWS]->(bob),
(bob)-[:KNOWS]->(carol),
(carol)-[:KNOWS]->(dave),
(alice)-[:KNOWS]->(eve),
(alice)-[:KNOWS]->(carol),
(alice)-[:LIVES_IN]->(london),
(bob)-[:LIVES_IN]->(london),
(carol)-[:LIVES_IN]->(paris),
(dave)-[:LIVES_IN]->(paris),
(eve)-[:LIVES_IN]->(london);
""")
print(f"Nodes created: {res.nodes_created}")
print(f"Relationships created: {res.relationships_created}")
print()
def list_all_the_people(graph):
res = graph.query("""MATCH (p:Person)
RETURN p""")
for row in res.result_set:
print(row[0])
# labels and properties
print(row[0].properties["name"])
def get_the_names_of_the_people(graph):
res = graph.query("""MATCH (p:Person)
RETURN p.name""")
for row in res.result_set:
print(row[0])
# labels and properties
# print(row[0].properties['name'])
def find_younger_people(graph):
res = graph.query("""MATCH (p:Person)
WHERE p.age < 30
RETURN p""")
for row in res.result_set:
print(row[0])
def find_who_knows_whom(graph):
res = graph.query("""MATCH (who:Person)-[:KNOWS]->(whom)
RETURN who, whom""")
for who, whom in res.result_set:
print(f"{who.properties['name']:5} knows {whom.properties['name']}")
def find_who_alice_knows(graph):
res = graph.query("""MATCH (p:Person)-[:KNOWS]->(other)
WHERE p.name = 'Alice'
RETURN other""")
for row in res.result_set:
print(row[0])
def find_person_by_name(graph):
res = graph.query("""MATCH (p:Person)
WHERE p.name = 'Alice'
RETURN p""")
for row in res.result_set:
print(row[0])
print(row[0].properties["name"])
def who_lives_where(graph):
res = graph.query("""
MATCH (p:Person)-[:LIVES_IN]->(city:City)
RETURN p, city
""")
for person, city in res.result_set:
print(f"{person.properties['name']:5} -> {city.properties['name']}")
def find_people_who_live_in_london(graph):
res = graph.query("""MATCH (p:Person)-[:LIVES_IN]->(city:City)
WHERE city.name = 'London'
RETURN p""")
for row in res.result_set:
print(row[0])
print(row[0].properties["name"])
# What if A -> B -> C and also A -> C ?
# What if A -> B -> A ?
# Can we filter those out
def friends_of_friends_of_alice(graph):
res = graph.query("""MATCH (p:Person)-[:KNOWS]->(f:Person)-[:KNOWS]->(ff:Person)
WHERE p.name = 'Alice'
AND
ff.name <> 'Alice'
RETURN ff""")
for row in res.result_set:
print(row[0].properties["name"])
def how_many_people_live_in_each_city(graph):
res = graph.query("""MATCH (p:Person)-[:LIVES_IN]->(c:City)
RETURN c.name, count(c)""")
for name, count in res.result_set:
print(f"{name} - {count}")
def usage():
cmds = ", ".join(sorted(DISPATCH.keys()))
print(f"Usage: {sys.argv[0]} {cmds}")
exit(1)
DISPATCH = {
"load": load,
"people": list_all_the_people,
"names": get_the_names_of_the_people,
"younger": find_younger_people,
"who": find_who_knows_whom,
"alice": find_who_alice_knows,
"by_name": find_person_by_name,
"london": find_people_who_live_in_london,
"where": who_lives_where,
"ff": friends_of_friends_of_alice,
"population": how_many_people_live_in_each_city,
"delete": delete,
}
if __name__ == "__main__":
main()
$ uv run people.py
Family tree in Python
- Take the CSV file from here.
- Write a python program to load it into the database.
import sys
import csv
from io import StringIO
from urllib.request import urlopen
from falkordb import FalkorDB
CSV_URL = "https://raw.githubusercontent.com/szabgab/exercises.code-maven.com/refs/heads/main/examples/data/family.csv"
def download_csv_to_memory(url: str) -> list[dict[str, str]]:
with urlopen(url, timeout=30) as response:
csv_text = response.read().decode("utf-8")
reader = csv.DictReader(
StringIO(csv_text),
fieldnames=[
name.strip() for name in csv.DictReader(StringIO(csv_text)).fieldnames
],
)
rows = list(reader)
rows.pop(0)
return rows
def delete(graph):
try:
graph.delete()
except Exception:
# Graph doesn't exist yet, which is fine
pass
def load(graph) -> None:
rows = download_csv_to_memory(CSV_URL)
# print(f"Loaded {len(rows)} rows into memory")
# if rows:
# print("First row:", rows[0])
delete(graph)
for row in rows:
row = {key: value.strip() for key, value in row.items()}
print(row)
# print(row["Name"])
res = graph.query("CREATE (person:Person {name: $name})", {"name": row["Name"]})
if row["Father"]:
res = graph.query(
"""
MATCH (c:Person {name: $child}), (f:Person {name: $father})
MERGE (c)-[:FATHER]->(f)
""",
{"child": row["Name"], "father": row["Father"]},
)
def main() -> None:
if len(sys.argv) != 2:
usage()
cmd = sys.argv[1]
if cmd not in DISPATCH:
usage()
db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph("Family")
DISPATCH[cmd](graph)
def usage():
cmds = ", ".join(sorted(DISPATCH.keys()))
print(f"Usage: {sys.argv[0]} {cmds}")
exit(1)
DISPATCH = {
"load": load,
"delete": delete,
}
if __name__ == "__main__":
main()
$ uv run load_family_csv.py
Examples
-
What if we add the same node twice? (there will be two nodes with the same attributes)
-
Setup multiple relationships between people Joe knows Jane Joe likes Jane Joe knows Mary
import sys
from falkordb import Edge, FalkorDB, Node, Operation, Path
def add_node(graph):
graph.query("""CREATE (:Person {name:'Foo Bar'})""")
def list_nodes(graph):
res = graph.query("""MATCH (p:Person) RETURN count(p)""")
print(f"count: {res.result_set[0][0]}")
res = graph.query("""MATCH (p:Person) RETURN p""")
for row in res.result_set:
print(f"row: {row} - {row[0]}")
def add_people(graph):
people = ["Joe", "Jane", "Mary", "q'ote"]
for name in people:
graph.query("""CREATE (:Person {name: $name})""", {"name": name})
# graph.query("""CREATE (Joe)-[:KNOWS]->(Jane)""")
# (alice)-[:KNOWS]->(bob),
# Joe knows Jane
# Joe likes Jane
# Joe knows Mary
def objects(graph):
john = Node(
alias="p",
labels="person",
properties={
"name": "John Doe",
"age": 33,
"gender": "male",
"status": "single",
},
)
japan = Node(alias="c", labels="country", properties={"name": "Japan"})
query = f"CREATE {john}, {japan} RETURN c, p"
result = graph.query(query)
country = result.result_set[0][0]
person = result.result_set[0][1]
print(person)
print(country)
def delete(graph):
try:
graph.delete()
except Exception:
# Graph doesn't exist yet, which is fine
pass
def main() -> None:
if len(sys.argv) != 2:
usage()
cmd = sys.argv[1]
if cmd not in DISPATCH:
usage()
#db = FalkorDB(host="localhost", port=6379)
db = FalkorDB(host="falkordb", port=6379)
graph = db.select_graph("Examples")
DISPATCH[cmd](graph)
def usage():
cmds = ", ".join(sorted(DISPATCH.keys()))
print(f"Usage: {sys.argv[0]} {cmds}")
exit(1)
DISPATCH = {
"delete": delete,
"add": add_node,
"list": list_nodes,
"people": add_people,
"objects": objects,
}
if __name__ == "__main__":
main()
Python dependencies
import argparse
import json
import re
import sys
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from falkordb import FalkorDB
GRAPH_NAME = "PyPI"
PYPI_URL = "https://pypi.org/pypi/{package}/json"
REQUIREMENT_NAME = re.compile(r"^\s*([A-Za-z0-9][A-Za-z0-9._-]*)")
def fetch_package_metadata(package_name: str) -> dict:
url = PYPI_URL.format(package=package_name)
with urlopen(url, timeout=30) as response:
return json.load(response)
def extract_dependency_names(requires_dist: list[str] | None) -> list[str]:
if not requires_dist:
return []
dependencies: list[str] = []
seen: set[str] = set()
for requirement in requires_dist:
marker = requirement.partition(";")[2]
if "extra ==" in marker:
continue
match = REQUIREMENT_NAME.match(requirement)
if not match:
continue
name = match.group(1)
normalized = name.lower().replace("_", "-")
if normalized in seen:
continue
seen.add(normalized)
dependencies.append(name)
return dependencies
def normalize_package_name(name: str) -> str:
return name.lower().replace("_", "-")
def save_package(graph, package_name: str, leaf: bool) -> None:
graph.query(
"""
MERGE (package:Package {normalized_name: $normalized_name})
ON CREATE SET package.name = $package_name
SET package.name = $package_name,
package.leaf = $leaf
""",
{
"normalized_name": normalize_package_name(package_name),
"package_name": package_name,
"leaf": leaf,
},
)
def clear_dependencies(graph, package_name: str) -> None:
graph.query(
"""
MATCH (package:Package {normalized_name: $normalized_name})
OPTIONAL MATCH (package)-[rel:DEPENDS_ON]->()
DELETE rel
""",
{"normalized_name": normalize_package_name(package_name)},
)
def save_dependencies(graph, package_name: str, dependencies: list[str]) -> None:
if not dependencies:
return
graph.query(
"""
MATCH (package:Package {normalized_name: $package_normalized_name})
UNWIND $dependencies AS dependency
MERGE (dep:Package {normalized_name: dependency.normalized_name})
ON CREATE SET dep.name = dependency.name
SET dep.name = dependency.name
MERGE (package)-[:DEPENDS_ON]->(dep)
""",
{
"package_normalized_name": normalize_package_name(package_name),
"dependencies": [
{
"name": dependency,
"normalized_name": normalize_package_name(dependency),
}
for dependency in dependencies
],
},
)
def delete_graph(graph) -> None:
graph.delete()
def top_packages_by_cumulative_dependencies(graph) -> list[tuple[str, int]]:
result = graph.query("""
MATCH (package:Package)
OPTIONAL MATCH (package)-[:DEPENDS_ON*1..]->(dependency:Package)
RETURN package.name, count(DISTINCT dependency) AS dependency_count
ORDER BY dependency_count DESC, package.name ASC
LIMIT 10
""")
return [(name, count) for name, count in result.result_set]
def top_packages_by_usage(graph) -> list[tuple[str, int]]:
result = graph.query("""
MATCH (dependency:Package)
OPTIONAL MATCH (package:Package)-[:DEPENDS_ON*1..]->(dependency)
RETURN dependency.name, count(DISTINCT package) AS usage_count
ORDER BY usage_count DESC, dependency.name ASC
LIMIT 10
""")
return [(name, count) for name, count in result.result_set]
def print_report_section(title: str, rows: list[tuple[str, int]]) -> None:
print(title)
for index, (name, count) in enumerate(rows, start=1):
print(f"{index:2}. {name}: {count}")
if not rows:
print("No packages found.")
print()
def report_graph(graph) -> None:
print_report_section(
"Top 10 packages by cumulative dependency count",
top_packages_by_cumulative_dependencies(graph),
)
print_report_section(
"Top 10 most used packages",
top_packages_by_usage(graph),
)
def fetch_package_metadata_or_exit(package_name: str) -> dict:
try:
return fetch_package_metadata(package_name)
except HTTPError as error:
if error.code == 404:
print(f"Package not found on PyPI: {package_name}", file=sys.stderr)
raise SystemExit(2) from error
raise
except URLError as error:
print(f"Failed to reach PyPI: {error.reason}", file=sys.stderr)
raise SystemExit(3) from error
def import_packages(graph, package_names: list[str]) -> None:
pending = list(reversed(package_names))
processed: set[str] = set()
while pending:
requested_name = pending.pop()
requested_normalized_name = normalize_package_name(requested_name)
if requested_normalized_name in processed:
continue
metadata = fetch_package_metadata_or_exit(requested_name)
package_name = metadata["info"]["name"]
normalized_package_name = normalize_package_name(package_name)
if normalized_package_name in processed:
continue
dependencies = extract_dependency_names(metadata["info"].get("requires_dist"))
save_package(graph, package_name, leaf=not dependencies)
clear_dependencies(graph, package_name)
save_dependencies(graph, package_name, dependencies)
processed.add(requested_normalized_name)
processed.add(normalized_package_name)
print(package_name)
for dependency in dependencies:
print(f"{package_name} -> {dependency}")
if normalize_package_name(dependency) not in processed:
pending.append(dependency)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("package_names", nargs="*")
parser.add_argument("--delete", action="store_true", dest="delete_graph")
parser.add_argument("--report", action="store_true")
args = parser.parse_args()
selected_modes = sum(
[
bool(args.package_names),
args.delete_graph,
args.report,
]
)
if selected_modes != 1:
parser.error("provide PACKAGE_NAME [PACKAGE_NAME ...], --delete, or --report")
return args
def main() -> None:
args = parse_args()
db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph(GRAPH_NAME)
if args.delete_graph:
delete_graph(graph)
print(f"Deleted graph {GRAPH_NAME}")
return
if args.report:
report_graph(graph)
return
import_packages(graph, args.package_names)
if __name__ == "__main__":
main()
Python Docker compose
services:
falkordb:
image: falkordb/falkordb:latest
container_name: falkor
ports:
- "6379:6379"
- "3000:3000"
volumes:
- ./data:/var/lib/falkordb/data
environment:
REDIS_ARGS: "--appendonly yes"
restart: unless-stopped
client:
#image: redis:latest
image: szabgab/python:latest
container_name: falkor-client
depends_on:
- falkordb
stdin_open: true
user: ubuntu
tty: true
working_dir: /opt
volumes:
- .:/opt
- $HOME/.copilot/:/home/ubuntu/.copilot
- $HOME/.gemini/:/home/ubuntu/.gemini
Rust
The falkordb crate is the client library.
services:
falkordb:
image: falkordb/falkordb:latest
container_name: falkordb-for-rust
ports:
- "6379:6379"
- "3000:3000"
volumes:
- ./data:/var/lib/falkordb/data
restart: unless-stopped
client:
image: szabgab/rust:latest
container_name: falkor-client-for-rust
depends_on:
- falkordb
stdin_open: true
user: ubuntu
tty: true
working_dir: /opt
volumes:
- .:/opt
- $HOME/.copilot/:/home/ubuntu/.copilot
- $HOME/.gemini/:/home/ubuntu/.gemini
cd examples/rust
docker compose up
We can now visit the FalkorDB web UI via http://localhost:3000/
Start a client session from which we can access the server called falkrodb.
docker compose exec client bash
MotoGP in Rust
[package]
name = "motogp"
version = "0.1.0"
edition = "2024"
[dependencies]
falkordb = { version = "0.2.1", features = ["tokio"] }
tokio = { version = "1.52.3", features = ["full"] }
use falkordb::{AsyncGraph, FalkorAsyncClient, FalkorClientBuilder, FalkorConnectionInfo};
async fn get_client() -> Result<FalkorAsyncClient, Box<dyn std::error::Error>> {
let mut hostname = std::env::var("FALKORDB").unwrap_or_else(|_| "127.0.0.1".to_string());
if !hostname.contains(':') {
hostname.push_str(":6379");
}
let connection_info: FalkorConnectionInfo = format!("falkor://{}", hostname)
.try_into()
.expect("Invalid connection info");
let client = FalkorClientBuilder::new_async()
.with_connection_info(connection_info)
.build()
.await?;
Ok(client)
}
async fn delete_graph(graph: &mut AsyncGraph) -> Result<(), Box<dyn std::error::Error>> {
graph.delete().await?;
Ok(())
}
async fn create(graph: &mut AsyncGraph) -> Result<(), Box<dyn std::error::Error>> {
// let _ = graph
// .query(
// r#"CREATE
// (:Rider {name:'Valentino Rossi'})-[:rides]->(:Team {name:'Yamaha'}),
// (:Rider {name:'Dani Pedrosa'})-[:rides]->(:Team {name:'Honda'}),
// (:Rider {name:'Andrea Dovizioso'})-[:rides]->(:Team {name:'Ducati'})"#,
// )
// .execute()
// .await?;
let pairs = vec![
("Valentino Rossi", "Yamaha"),
("Dani Pedrosa", "Honda"),
("Andrea Dovizioso", "Ducati"),
// ("d'Aartagnan", "Horse"),
// Error: RedisError("Invalid input 'A': expected ';', ':', a statement option, a query hint, call clause, a clause or a schema command line: 1, column: 1, offset: 0 errCtx: Aartagnan' MERGE (r:Rider {name: $rider}) errCtxOffset: 0")
];
for (rider_name, team_name) in pairs {
let mut params = std::collections::HashMap::new();
params.insert(String::from("rider"), format!("'{rider_name}'"));
params.insert(String::from("team"), format!("'{team_name}'"));
let _ = graph
.query(
r#"MERGE (r:Rider {name: $rider})
MERGE (t:Team {name: $team})
MERGE (r)-[:rides]->(t)"#,
)
.with_params(¶ms)
.execute()
.await?;
}
Ok(())
}
async fn get_yamaha(graph: &mut AsyncGraph) -> Result<(), Box<dyn std::error::Error>> {
// Query which riders represent Yamaha?
let team_name = String::from("Yamaha");
let mut params = std::collections::HashMap::new();
params.insert(String::from("team"), format!("'{team_name}'"));
let mut nodes = graph
.query(
r#"MATCH (r:Rider)-[:rides]->(t:Team)
WHERE t.name = $team
RETURN r.name"#,
)
.with_params(¶ms)
.execute()
.await?;
for node in nodes.data.by_ref() {
println!("{:?}", node);
}
Ok(())
}
async fn riders(graph: &mut AsyncGraph) -> Result<(), Box<dyn std::error::Error>> {
// Query how many riders represent team Ducati?
let mut nodes = graph
.query(r#"MATCH (r:Rider)-[:rides]->(t:Team {name:'Ducati'}) RETURN count(r)"#)
.execute()
.await?;
for node in nodes.data.by_ref() {
println!("{:?}", node);
}
Ok(())
}
async fn list_all(graph: &mut AsyncGraph) -> Result<(), Box<dyn std::error::Error>> {
let mut res = graph
.query(r#"MATCH (r:Rider)-[:rides]->(t:Team) RETURN r, t"#)
.execute()
.await?;
for row in res.data.by_ref() {
let rider = &row[0];
let team = &row[1];
let rider_name = rider
.as_node()
.unwrap()
.properties
.get("name")
.unwrap()
.as_string()
.unwrap();
let team_name = team
.as_node()
.unwrap()
.properties
.get("name")
.unwrap()
.as_string()
.unwrap();
println!("{rider_name:20} rides {team_name}");
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = get_client().await?;
// Select the 'MotoGP' graph
let mut graph = client.select_graph("MotoGP");
let args = std::env::args().collect::<Vec<String>>();
if args.len() != 2 {
eprintln!("Usage: {} <delete|create|yamaha|riders|list>", args[0]);
std::process::exit(1);
}
let cmd = &args[1];
match cmd as &str {
"delete" => {
delete_graph(&mut graph).await?;
}
"create" => {
create(&mut graph).await?;
}
"yamaha" => {
get_yamaha(&mut graph).await?;
}
"riders" => {
riders(&mut graph).await?;
}
"list" => {
list_all(&mut graph).await?;
}
_ => {
eprintln!("Unknown command: {}", cmd);
std::process::exit(1);
}
}
Ok(())
}
When using docker compose we need to set the name of the FalkorDB server:
FALKORDB=falkordb cargo run
- Those extra single-quotes in the paramaters are needed or we get an error.
RedisError("to parse query parameter 'team' value").
Manual in Rust
[package]
name = "manual"
version = "0.1.0"
edition = "2024"
[dependencies]
clap = { version = "4.5.38", features = ["derive"] }
falkordb = { version = "0.2.1", features = ["tokio"] }
tokio = { version = "1.52.3", features = ["full"] }
use std::collections::HashMap;
use clap::Parser;
use falkordb::{FalkorClientBuilder, FalkorConnectionInfo, FalkorValue};
#[derive(Parser, Debug)]
#[command(version, about = "Manage the Manual FalkorDB graph")]
struct Cli {
/// Delete the graph before applying other actions.
#[arg(long)]
delete: bool,
/// Add a Person node with the given name. Can be passed multiple times.
#[arg(long = "node", value_name = "NAME", value_parser = validate_name)]
node_names: Vec<String>,
/// List all nodes in the graph.
#[arg(long)]
nodes: bool,
}
fn validate_name(name: &str) -> Result<String, String> {
if name.is_empty() {
return Err("name cannot be empty".to_string());
}
if name.chars().all(|ch| ch.is_ascii_alphanumeric()) {
return Ok(name.to_string());
}
Err("name must contain only a-z, A-Z, or 0-9 characters".to_string())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
// Connect to FalkorDB
let connection_info: FalkorConnectionInfo = "falkor://127.0.0.1:6379"
.try_into()
.expect("Invalid connection info");
let client = FalkorClientBuilder::new_async()
.with_connection_info(connection_info)
.build()
.await?;
let mut graph = client.select_graph("Manual");
if cli.delete {
graph.delete().await?;
println!("Deleted graph {}", graph.graph_name());
}
for name in cli.node_names {
let mut params = HashMap::new();
params.insert("name".to_string(), format!("'{name}'"));
graph
.query("CREATE (:Person {name: $name})")
.with_params(¶ms)
.execute()
.await?;
println!("Added Person node with name {name}");
}
if cli.nodes {
let mut result = graph.query("MATCH (n) RETURN n").execute().await?;
if result.data.is_empty() {
println!("No nodes found");
} else {
while let Some(row) = result.data.next() {
for value in row {
match value {
FalkorValue::Node(node) => println!("{node:?}"),
other => println!("{other:?}"),
}
}
}
}
}
Ok(())
}
Movies
From GroupLens Research the MovieLens project.
Download the “MovieLens for education and development” ml-latest-small.zip file and unzip it in the root of the rust project.
It will create a folder called ml-latest-small/ and the csv files in it.
Load the data:
FALKORDB=falkordb cargo run -- --load
[package]
name = "movielens"
version = "0.1.0"
edition = "2024"
[dependencies]
clap = { version = "4.6.1", features = ["derive"] }
csv = "1.4.0"
falkordb = { version = "0.2.1", features = ["tokio"] }
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.52.3", features = ["full"] }
use clap::Parser;
use csv::ReaderBuilder;
use falkordb::{
AsyncGraph, FalkorClientBuilder, FalkorConnectionInfo, FalkorValue, LazyResultSet, QueryResult,
};
use serde::Deserialize;
use std::{error::Error, io, path::Path};
const GRAPH_NAME: &str = "Movielens";
const DATA_DIR: &str = "ml-latest-small";
const MOVIE_BATCH_SIZE: usize = 250;
const LINK_BATCH_SIZE: usize = 500;
const RATING_BATCH_SIZE: usize = 500;
const TAG_BATCH_SIZE: usize = 500;
const REPORT_MENU_ID: usize = 0;
const REPORT_COUNT: usize = 12;
type AppResult<T> = Result<T, Box<dyn Error + Send + Sync>>;
#[derive(Debug, Parser)]
#[command(author, version, about, long_about = None, arg_required_else_help = true)]
struct Cli {
/// Delete the MovieLens graph from FalkorDB.
#[arg(long)]
delete: bool,
/// Load the MovieLens CSV files into FalkorDB.
#[arg(long)]
load: bool,
/// List available reports, or run a report by numeric id.
#[arg(long, value_name = "ID", num_args = 0..=1, default_missing_value = "0")]
report: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct MovieRecord {
#[serde(rename = "movieId")]
movie_id: u64,
title: String,
genres: String,
}
#[derive(Debug, Deserialize)]
struct LinkRecord {
#[serde(rename = "movieId")]
movie_id: u64,
#[serde(rename = "imdbId")]
imdb_id: String,
#[serde(rename = "tmdbId")]
tmdb_id: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct RatingRecord {
#[serde(rename = "userId")]
user_id: u64,
#[serde(rename = "movieId")]
movie_id: u64,
rating: f64,
timestamp: i64,
}
#[derive(Debug, Deserialize)]
struct TagRecord {
#[serde(rename = "userId")]
user_id: u64,
#[serde(rename = "movieId")]
movie_id: u64,
tag: String,
timestamp: i64,
}
#[tokio::main]
async fn main() -> AppResult<()> {
let cli = Cli::parse();
if cli.report == Some(REPORT_MENU_ID) {
print_report_menu();
return Ok(());
}
let report_to_run = cli.report.filter(|report_id| *report_id != REPORT_MENU_ID);
let needs_graph = cli.delete || cli.load || report_to_run.is_some();
if needs_graph {
let mut hostname = std::env::var("FALKORDB").unwrap_or_else(|_| "127.0.0.1".to_string());
if !hostname.contains(':') {
hostname.push_str(":6379");
}
let falkordb_url = format!("falkor://{hostname}");
let connection_info: FalkorConnectionInfo = falkordb_url.try_into()?;
let client = FalkorClientBuilder::new_async()
.with_connection_info(connection_info)
.build()
.await?;
let mut graph = client.select_graph(GRAPH_NAME);
if cli.delete {
graph.delete().await?;
println!("Deleted the {GRAPH_NAME} graph.");
}
if cli.load {
let data_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join(DATA_DIR);
load_dataset(&mut graph, &data_dir).await?;
println!("Loaded MovieLens data into the {GRAPH_NAME} graph.");
}
if let Some(report_id) = report_to_run {
run_report(&mut graph, report_id).await?;
}
}
Ok(())
}
fn print_report_menu() {
println!("Available reports:");
for report_id in 1..=REPORT_COUNT {
if let Some(title) = report_title(report_id) {
println!("{report_id}) {title}");
}
}
}
fn report_title(report_id: usize) -> Option<&'static str> {
match report_id {
1 => Some("Best-rated movies with enough votes"),
2 => Some("Most active users"),
3 => Some("Most tagged movies"),
4 => Some("Most common tags"),
5 => Some("Most popular genres"),
6 => Some("Highest-rated genres"),
7 => Some("Movies that fans of Toy Story also rated highly"),
8 => Some("Similar users by overlapping movie ratings"),
9 => Some("Simple personalized recommendations for user 1"),
10 => Some("Hidden gems"),
11 => Some("Tag-to-genre associations"),
12 => Some("Users whose taste differs most from the crowd"),
_ => None,
}
}
async fn run_report(graph: &mut AsyncGraph, report_id: usize) -> AppResult<()> {
match report_id {
1 => report_best_rated_movies(graph).await,
2 => report_most_active_users(graph).await,
3 => report_most_tagged_movies(graph).await,
4 => report_most_common_tags(graph).await,
5 => report_most_popular_genres(graph).await,
6 => report_highest_rated_genres(graph).await,
7 => report_movies_liked_by_toy_story_fans(graph).await,
8 => report_similar_users(graph).await,
9 => report_recommendations_for_user_1(graph).await,
10 => report_hidden_gems(graph).await,
11 => report_tag_to_genre_associations(graph).await,
12 => report_users_with_unusual_taste(graph).await,
_ => Err(app_error(format!(
"Unknown report id: {report_id}. Run --report to list the available reports."
))),
}
}
async fn report_best_rated_movies(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Best-rated movies with enough votes",
"MATCH (:User)-[r:RATED]->(m:Movie)
WITH m, count(r) AS ratings, avg(r.rating) AS avg_rating
WHERE ratings >= 20
RETURN m.title AS title, ratings, round(avg_rating * 100) / 100.0 AS avg_rating
ORDER BY avg_rating DESC, ratings DESC
LIMIT 20",
)
.await
}
async fn report_most_active_users(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Most active users",
"MATCH (u:User)-[r:RATED]->(:Movie)
RETURN u.user_id AS user_id, count(r) AS rating_count
ORDER BY rating_count DESC
LIMIT 20",
)
.await
}
async fn report_most_tagged_movies(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Most tagged movies",
"MATCH (:User)-[t:TAGGED]->(m:Movie)
RETURN m.title AS title, count(t) AS tag_count
ORDER BY tag_count DESC
LIMIT 20",
)
.await
}
async fn report_most_common_tags(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Most common tags",
"MATCH (:User)-[t:TAGGED]->(:Movie)
RETURN t.tag AS tag, count(*) AS uses
ORDER BY uses DESC
LIMIT 30",
)
.await
}
async fn report_most_popular_genres(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Most popular genres",
"MATCH (:User)-[:RATED]->(m:Movie)
UNWIND m.genres AS genre
RETURN genre, count(*) AS ratings
ORDER BY ratings DESC
LIMIT 20",
)
.await
}
async fn report_highest_rated_genres(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Highest-rated genres",
"MATCH (:User)-[r:RATED]->(m:Movie)
UNWIND m.genres AS genre
WITH genre, count(*) AS ratings, avg(r.rating) AS avg_rating
WHERE ratings >= 50
RETURN genre, ratings, round(avg_rating * 100) / 100.0 AS avg_rating
ORDER BY avg_rating DESC, ratings DESC
LIMIT 20",
)
.await
}
async fn report_movies_liked_by_toy_story_fans(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Movies that fans of Toy Story also rated highly",
"MATCH (u:User)-[r1:RATED]->(seed:Movie {title: 'Toy Story (1995)'})
MATCH (u)-[r2:RATED]->(other:Movie)
WHERE r1.rating >= 4.0 AND r2.rating >= 4.0 AND other <> seed
RETURN other.title AS title, count(*) AS shared_fans, round(avg(r2.rating) * 100) / 100.0 AS avg_rating
ORDER BY shared_fans DESC, avg_rating DESC
LIMIT 20",
)
.await
}
async fn report_similar_users(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Similar users by overlapping movie ratings",
"MATCH (u1:User)-[r1:RATED]->(m:Movie)<-[r2:RATED]-(u2:User)
WHERE u1.user_id < u2.user_id
WITH u1, u2, count(m) AS overlap, avg(abs(r1.rating - r2.rating)) AS avg_diff
WHERE overlap >= 20
RETURN u1.user_id AS user_1, u2.user_id AS user_2, overlap, round(avg_diff * 100) / 100.0 AS avg_diff
ORDER BY overlap DESC, avg_diff ASC
LIMIT 20",
)
.await
}
async fn report_recommendations_for_user_1(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Simple personalized recommendations for user 1",
"MATCH (me:User {user_id: 1})-[my:RATED]->(m:Movie)<-[their:RATED]-(other:User)
WHERE my.rating >= 4.0 AND their.rating >= 4.0
MATCH (other)-[rec:RATED]->(candidate:Movie)
WHERE rec.rating >= 4.0
OPTIONAL MATCH (me)-[seen:RATED]->(candidate)
WITH candidate, other, rec, seen
WHERE seen IS NULL
WITH candidate, count(DISTINCT other) AS supporters, avg(rec.rating) AS avg_rating
WHERE supporters >= 3
RETURN candidate.title AS title, supporters, round(avg_rating * 100) / 100.0 AS avg_rating
ORDER BY supporters DESC, avg_rating DESC
LIMIT 20",
)
.await
}
async fn report_hidden_gems(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Hidden gems",
"MATCH (:User)-[r:RATED]->(m:Movie)
WITH m, count(r) AS ratings, avg(r.rating) AS avg_rating
WHERE ratings >= 10 AND ratings <= 50
RETURN m.title AS title, ratings, round(avg_rating * 100) / 100.0 AS avg_rating
ORDER BY avg_rating DESC
LIMIT 20",
)
.await
}
async fn report_tag_to_genre_associations(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Tag-to-genre associations",
"MATCH (:User)-[t:TAGGED]->(m:Movie)
UNWIND m.genres AS genre
RETURN t.tag AS tag, genre, count(*) AS freq
ORDER BY freq DESC
LIMIT 30",
)
.await
}
async fn report_users_with_unusual_taste(graph: &mut AsyncGraph) -> AppResult<()> {
execute_report(
graph,
"Users whose taste differs most from the crowd",
"MATCH (u:User)-[r:RATED]->(m:Movie)
MATCH (:User)-[allr:RATED]->(m)
WITH u, m, r.rating AS user_rating, avg(allr.rating) AS movie_avg
WITH u, avg(abs(user_rating - movie_avg)) AS deviation, count(*) AS rated_movies
WHERE rated_movies >= 20
RETURN u.user_id AS user_id, rated_movies, round(deviation * 100) / 100.0 AS deviation
ORDER BY deviation DESC
LIMIT 20",
)
.await
}
async fn execute_report(graph: &mut AsyncGraph, title: &str, query: &str) -> AppResult<()> {
let mut result = graph.query(query).execute().await?;
println!("{title}");
render_query_result(&mut result);
Ok(())
}
fn render_query_result(result: &mut QueryResult<LazyResultSet<'_>>) {
let headers = result.header.clone();
let rows = result
.data
.by_ref()
.map(|row| row.into_iter().map(render_value).collect::<Vec<_>>())
.collect::<Vec<_>>();
if headers.is_empty() {
println!("No columns returned.");
return;
}
let mut widths = headers
.iter()
.map(|header| header.len())
.collect::<Vec<_>>();
for row in &rows {
for (index, cell) in row.iter().enumerate() {
if index >= widths.len() {
widths.push(cell.len());
} else {
widths[index] = widths[index].max(cell.len());
}
}
}
print_table_row(&headers, &widths);
print_table_separator(&widths);
if rows.is_empty() {
println!("(no rows)");
return;
}
for row in &rows {
print_table_row(row, &widths);
}
}
fn print_table_row(row: &[String], widths: &[usize]) {
let formatted = row
.iter()
.zip(widths.iter())
.map(|(value, width)| format!("{value:<width$}", width = width))
.collect::<Vec<_>>()
.join(" | ");
println!("{formatted}");
}
fn print_table_separator(widths: &[usize]) {
let separator = widths
.iter()
.map(|width| "-".repeat(*width))
.collect::<Vec<_>>()
.join("-+-");
println!("{separator}");
}
fn render_value(value: FalkorValue) -> String {
match value {
FalkorValue::String(value) => value,
FalkorValue::Bool(value) => value.to_string(),
FalkorValue::I64(value) => value.to_string(),
FalkorValue::F64(value) => format!("{value:.2}"),
FalkorValue::None => "NULL".to_string(),
FalkorValue::Array(values) => {
let rendered = values.into_iter().map(render_value).collect::<Vec<_>>();
format!("[{}]", rendered.join(", "))
}
other => format!("{other:?}"),
}
}
fn app_error(message: impl Into<String>) -> Box<dyn Error + Send + Sync> {
io::Error::other(message.into()).into()
}
async fn load_dataset(graph: &mut AsyncGraph, data_dir: &Path) -> AppResult<()> {
load_movies(graph, &data_dir.join("movies.csv")).await?;
load_links(graph, &data_dir.join("links.csv")).await?;
load_ratings(graph, &data_dir.join("ratings.csv")).await?;
load_tags(graph, &data_dir.join("tags.csv")).await?;
Ok(())
}
async fn load_movies(graph: &mut AsyncGraph, path: &Path) -> AppResult<()> {
let mut reader = ReaderBuilder::new().from_path(path)?;
let mut batch = Vec::with_capacity(MOVIE_BATCH_SIZE);
let mut imported = 0usize;
for record in reader.deserialize() {
batch.push(record?);
if batch.len() == MOVIE_BATCH_SIZE {
imported += flush_movies(graph, &batch).await?;
batch.clear();
}
}
if !batch.is_empty() {
imported += flush_movies(graph, &batch).await?;
}
println!("Imported {imported} movies.");
Ok(())
}
async fn load_links(graph: &mut AsyncGraph, path: &Path) -> AppResult<()> {
let mut reader = ReaderBuilder::new().from_path(path)?;
let mut batch = Vec::with_capacity(LINK_BATCH_SIZE);
let mut imported = 0usize;
for record in reader.deserialize() {
batch.push(record?);
if batch.len() == LINK_BATCH_SIZE {
imported += flush_links(graph, &batch).await?;
batch.clear();
}
}
if !batch.is_empty() {
imported += flush_links(graph, &batch).await?;
}
println!("Imported {imported} movie links.");
Ok(())
}
async fn load_ratings(graph: &mut AsyncGraph, path: &Path) -> AppResult<()> {
let mut reader = ReaderBuilder::new().from_path(path)?;
let mut batch = Vec::with_capacity(RATING_BATCH_SIZE);
let mut imported = 0usize;
for record in reader.deserialize() {
batch.push(record?);
if batch.len() == RATING_BATCH_SIZE {
imported += flush_ratings(graph, &batch).await?;
batch.clear();
}
}
if !batch.is_empty() {
imported += flush_ratings(graph, &batch).await?;
}
println!("Imported {imported} ratings.");
Ok(())
}
async fn load_tags(graph: &mut AsyncGraph, path: &Path) -> AppResult<()> {
let mut reader = ReaderBuilder::new().from_path(path)?;
let mut batch = Vec::with_capacity(TAG_BATCH_SIZE);
let mut imported = 0usize;
for record in reader.deserialize() {
batch.push(record?);
if batch.len() == TAG_BATCH_SIZE {
imported += flush_tags(graph, &batch).await?;
batch.clear();
}
}
if !batch.is_empty() {
imported += flush_tags(graph, &batch).await?;
}
println!("Imported {imported} tags.");
Ok(())
}
async fn flush_movies(graph: &mut AsyncGraph, batch: &[MovieRecord]) -> AppResult<usize> {
let rows = batch
.iter()
.map(|record| {
let genres = genres_literal(&record.genres);
format!(
"{{movie_id: {}, title: {}, genres: {genres}}}",
record.movie_id,
cypher_string(&record.title),
)
})
.collect::<Vec<_>>()
.join(", ");
let query = format!(
"UNWIND [{rows}] AS row \
MERGE (m:Movie {{movie_id: row.movie_id}}) \
SET m.title = row.title, m.genres = row.genres"
);
graph.query(query).execute().await?;
Ok(batch.len())
}
async fn flush_links(graph: &mut AsyncGraph, batch: &[LinkRecord]) -> AppResult<usize> {
let rows = batch
.iter()
.map(|record| {
format!(
"{{movie_id: {}, imdb_id: {}, tmdb_id: {}}}",
record.movie_id,
cypher_string(&record.imdb_id),
cypher_optional_u64(record.tmdb_id),
)
})
.collect::<Vec<_>>()
.join(", ");
let query = format!(
"UNWIND [{rows}] AS row \
MATCH (m:Movie {{movie_id: row.movie_id}}) \
SET m.imdb_id = row.imdb_id, m.tmdb_id = row.tmdb_id"
);
graph.query(query).execute().await?;
Ok(batch.len())
}
async fn flush_ratings(graph: &mut AsyncGraph, batch: &[RatingRecord]) -> AppResult<usize> {
let rows = batch
.iter()
.map(|record| {
format!(
"{{user_id: {}, movie_id: {}, rating: {}, timestamp: {}}}",
record.user_id, record.movie_id, record.rating, record.timestamp,
)
})
.collect::<Vec<_>>()
.join(", ");
let query = format!(
"UNWIND [{rows}] AS row \
MERGE (u:User {{user_id: row.user_id}}) \
WITH u, row \
MATCH (m:Movie {{movie_id: row.movie_id}}) \
MERGE (u)-[r:RATED]->(m) \
SET r.rating = row.rating, r.timestamp = row.timestamp"
);
graph.query(query).execute().await?;
Ok(batch.len())
}
async fn flush_tags(graph: &mut AsyncGraph, batch: &[TagRecord]) -> AppResult<usize> {
let rows = batch
.iter()
.map(|record| {
format!(
"{{user_id: {}, movie_id: {}, tag: {}, timestamp: {}}}",
record.user_id,
record.movie_id,
cypher_string(&record.tag),
record.timestamp,
)
})
.collect::<Vec<_>>()
.join(", ");
let query = format!(
"UNWIND [{rows}] AS row \
MERGE (u:User {{user_id: row.user_id}}) \
WITH u, row \
MATCH (m:Movie {{movie_id: row.movie_id}}) \
MERGE (u)-[:TAGGED {{tag: row.tag, timestamp: row.timestamp}}]->(m)"
);
graph.query(query).execute().await?;
Ok(batch.len())
}
fn cypher_string(value: &str) -> String {
let escaped = value
.replace('\\', "\\\\")
.replace('\'', "\\'")
.replace('\n', "\\n")
.replace('\r', "\\r");
format!("'{escaped}'")
}
fn cypher_optional_u64(value: Option<u64>) -> String {
value
.map(|value| value.to_string())
.unwrap_or_else(|| "NULL".to_string())
}
fn genres_literal(genres: &str) -> String {
let values = if genres == "(no genres listed)" {
Vec::new()
} else {
genres.split('|').map(cypher_string).collect::<Vec<_>>()
};
format!("[{}]", values.join(", "))
}
#[cfg(test)]
mod tests {
use super::*;
use clap::CommandFactory;
#[test]
fn parses_load_flag() {
let cli = Cli::parse_from(["movielens", "--load"]);
assert!(cli.load);
assert!(!cli.delete);
assert_eq!(cli.report, None);
}
#[test]
fn parses_delete_flag() {
let cli = Cli::parse_from(["movielens", "--delete"]);
assert!(cli.delete);
assert!(!cli.load);
assert_eq!(cli.report, None);
}
#[test]
fn parses_report_menu_flag_without_value() {
let cli = Cli::parse_from(["movielens", "--report"]);
assert_eq!(cli.report, Some(REPORT_MENU_ID));
}
#[test]
fn parses_report_flag_with_value() {
let cli = Cli::parse_from(["movielens", "--report", "3"]);
assert_eq!(cli.report, Some(3));
}
#[test]
fn command_defines_load_delete_and_report_flags() {
let command = Cli::command();
assert!(
command
.get_arguments()
.any(|arg| arg.get_long() == Some("load"))
);
assert!(
command
.get_arguments()
.any(|arg| arg.get_long() == Some("delete"))
);
assert!(
command
.get_arguments()
.any(|arg| arg.get_long() == Some("report"))
);
}
#[test]
fn knows_all_report_titles() {
assert_eq!(report_title(1), Some("Best-rated movies with enough votes"));
assert_eq!(
report_title(12),
Some("Users whose taste differs most from the crowd")
);
assert_eq!(report_title(13), None);
}
#[test]
fn escapes_cypher_strings() {
assert_eq!(cypher_string("Schindler's List"), "'Schindler\\'s List'");
assert_eq!(cypher_string(r"c:\tmp"), r"'c:\\tmp'");
}
#[test]
fn converts_no_genres_to_empty_list() {
assert_eq!(genres_literal("(no genres listed)"), "[]");
}
#[test]
fn renders_values_for_tables() {
assert_eq!(render_value(FalkorValue::I64(7)), "7");
assert_eq!(render_value(FalkorValue::F64(4.125)), "4.12");
assert_eq!(
render_value(FalkorValue::Array(vec![FalkorValue::String(
"Drama".to_string()
)])),
"[Drama]"
);
}
}
DEFAULT_FALKORDB_URL=falkor://falkordb:6379
BioGrid
Proteins -[INTERACTS_WITH]→ Proteins
[package]
name = "biogrid"
version = "0.1.0"
edition = "2024"
[dependencies]
fn main() {
println!("Hello, world!");
}
Perl
FalkorDB perl module.
services:
falkordb:
image: falkordb/falkordb:latest
container_name: falkordb-for-perl
ports:
- "6379:6379"
- "3000:3000"
volumes:
- ./data:/var/lib/falkordb/data
#environment:
# REDIS_ARGS: "--appendonly yes"
restart: unless-stopped
client:
image: szabgab/perl:latest
container_name: falkor-client-for-perl
depends_on:
- falkordb
stdin_open: true
user: ubuntu
tty: true
working_dir: /opt
volumes:
- .:/opt
- $HOME/.copilot/:/home/ubuntu/.copilot
- $HOME/.gemini/:/home/ubuntu/.gemini
cd examples/perl
docker compose up
We can now visit the FalkorDB web UI via http://localhost:3000/
Start a client session from which we can access the server called falkrodb.
docker compose exec client bash
Example
use strict;
use warnings;
use feature 'say';
use Data::Dumper qw(Dumper);
use FalkorDB;
my $db = FalkorDB->new(
host => 'falkordb',
port => 6379,
);
my %DISP = (
add => \&add_person,
del => \&del_everything,
list => \&list,
people => \&add_people,
all => \&list_all,
);
my $cmd = shift or usage();
main();
exit(0);
sub main {
my $graph = $db->select_graph('Example');
if ($DISP{$cmd}) {
$DISP{$cmd}->($graph);
} else {
usage();
}
}
sub add_people {
my ($graph) = @_;
my @people = ("Joe", "Jane", "Mary", "q'ote", 'Other"quote');
for my $person_name (@people) {
$graph->query("CREATE (:Person {name: \$name})", { name => $person_name });
}
}
sub add_person {
my ($graph) = @_;
my $res = $graph->query("CREATE (p:Person {name: 'Alice', age: 30}) RETURN p");
while (my $row = $res->next_row()) {
for my $node (@$row) {
my ($name, $age) = ($node->{properties}{name}, $node->{properties}{age});
print "Added Person: $name ($age)\n";
}
}
}
sub del_everything {
my ($graph) = @_;
$graph->query("MATCH (n) DETACH DELETE n");
}
sub list {
my ($graph) = @_;
# Execute a parameterized read query
my $res = $graph->query(
"MATCH (p:Person) WHERE p.age = \$age RETURN p.name, p.age",
{ age => 30 }
);
# Iterate over results
while (my $row = $res->next_row()) {
my ($name, $age) = @$row;
print "Found Person: $name ($age)\n";
}
}
sub list_all {
my ($graph) = @_;
my $res = $graph->query(
"MATCH (p:Person) RETURN p",
);
while (my $row = $res->next_row()) {
for my $node (@$row) {
print "Found Person: $node->{properties}{name}\n";
}
}
}
sub usage {
my $params = join "|", sort keys %DISP;
die "Usage: $0 [$params]\n";
}