Skip to content

Gremlin Backend

ddigraph supports Gremlin-compatible graph databases through the Apache TinkerPop framework.

Supported Databases

Database Connection Use Case
Apache TinkerGraph In-memory Local testing, development
JanusGraph WebSocket/HTTP Production, distributed
Amazon Neptune WebSocket AWS cloud, managed service
Azure Cosmos DB WebSocket Azure cloud, Gremlin API

Dependencies

GremlinPython is included with ddigraph:

pip install ddigraph  # includes gremlinpython

Basic Usage

Connect to Gremlin Server

from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection

# Local TinkerPop server
connection = DriverRemoteConnection('ws://localhost:8182/gremlin', 'g')
g = traversal().withRemote(connection)

# AWS Neptune
# connection = DriverRemoteConnection(
#     'wss://your-neptune-endpoint:8182/gremlin',
#     'g'
# )

Load DDI Data

from ddigraph import DDIFragmentParser

parser = DDIFragmentParser()

for fragment in parser.parse("survey.xml"):
    # Create vertex
    vertex = g.addV(fragment.element_type) \
        .property('id', fragment.fragment_id) \
        .property('label', fragment.label or '') \
        .property('urn', fragment.urn or '') \
        .next()

    # Store for edge creation
    vertices[fragment.fragment_id] = vertex

# Create edges (second pass)
parser = DDIFragmentParser()
for fragment in parser.parse("survey.xml"):
    for rel_type, ref in fragment.references:
        if ref.id in vertices:
            g.V(vertices[fragment.fragment_id]) \
                .addE(rel_type) \
                .to(vertices[ref.id]) \
                .iterate()

connection.close()

Full Example

See demo/load_gremlin.py for a complete example:

"""Load DDI into Gremlin-compatible graph database."""

from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.graph_traversal import __

from ddigraph.ingest.fragment_loader import DDIFragmentParser


def load_ddi_to_gremlin(ddi_path: str, gremlin_endpoint: str = 'ws://localhost:8182/gremlin'):
    """Parse DDI-L file and load into Gremlin database."""

    connection = DriverRemoteConnection(gremlin_endpoint, 'g')
    g = traversal().withRemote(connection)

    try:
        # Clear existing data (optional)
        g.V().drop().iterate()

        parser = DDIFragmentParser()
        fragment_ids = set()

        # First pass: create all vertices
        for fragment in parser.parse(ddi_path):
            props = fragment.to_dict()

            vertex = g.addV(fragment.element_type)
            vertex = vertex.property('fragment_id', fragment.fragment_id)

            if fragment.label:
                vertex = vertex.property('label', fragment.label)
            if fragment.urn:
                vertex = vertex.property('urn', fragment.urn)
            if fragment.agency:
                vertex = vertex.property('agency', fragment.agency)
            if fragment.version:
                vertex = vertex.property('version', fragment.version)

            # Add type-specific properties
            if fragment.element_type == "QuestionItem":
                if props.get("question_text"):
                    vertex = vertex.property('question_text', props["question_text"])
            elif fragment.element_type == "Category":
                if props.get("category_label"):
                    vertex = vertex.property('category_label', props["category_label"])

            vertex.next()
            fragment_ids.add(fragment.fragment_id)

        print(f"Created {len(fragment_ids)} vertices")

        # Second pass: create edges
        edge_count = 0
        parser = DDIFragmentParser()

        for fragment in parser.parse(ddi_path):
            for rel_type, ref in fragment.references:
                if ref.id in fragment_ids:
                    g.V().has('fragment_id', fragment.fragment_id) \
                        .addE(rel_type) \
                        .to(__.V().has('fragment_id', ref.id)) \
                        .iterate()
                    edge_count += 1

        print(f"Created {edge_count} edges")

    finally:
        connection.close()


def query_examples(gremlin_endpoint: str = 'ws://localhost:8182/gremlin'):
    """Example Gremlin queries."""

    connection = DriverRemoteConnection(gremlin_endpoint, 'g')
    g = traversal().withRemote(connection)

    try:
        # Count vertices by type
        counts = g.V().groupCount().by(__.label()).next()
        print("Vertex counts by type:")
        for label, count in counts.items():
            print(f"  {label}: {count}")

        # Find all questions
        questions = g.V().hasLabel('QuestionItem').valueMap(True).toList()
        print(f"\nFound {len(questions)} questions")

        # Traverse from instrument to questions
        paths = g.V().hasLabel('Instrument') \
            .repeat(__.out('HAS_CONSTRUCT')) \
            .until(__.hasLabel('QuestionConstruct')) \
            .out('ASKS_QUESTION') \
            .path() \
            .toList()

        print(f"\nFound {len(paths)} paths from Instrument to Question")

    finally:
        connection.close()


if __name__ == "__main__":
    load_ddi_to_gremlin("data/Ireland_LabourSurvey.xml")
    query_examples()

Gremlin Queries

Basic Traversals

// Count vertices by label
g.V().groupCount().by(label)

// Find all QuestionItems
g.V().hasLabel('QuestionItem').valueMap(true)

// Get a specific fragment by ID
g.V().has('fragment_id', 'abc-123').valueMap(true)

Relationship Traversals

// From Instrument to all constructs
g.V().hasLabel('Instrument').out('HAS_CONSTRUCT').valueMap(true)

// Questions with their code lists
g.V().hasLabel('QuestionItem')
    .as('q')
    .out('USES_CODELIST')
    .as('cl')
    .select('q', 'cl')
    .by(valueMap('label', 'question_text'))
    .by(valueMap('label'))

// Categories in a code list
g.V().has('fragment_id', 'codelist-123')
    .out('HAS_CATEGORY')
    .valueMap('category_label')

Path Queries

// Full path from Instrument to Questions
g.V().hasLabel('Instrument')
    .repeat(out('HAS_CONSTRUCT'))
    .until(hasLabel('QuestionConstruct'))
    .out('ASKS_QUESTION')
    .path()
    .by('label')

// Conditional branches
g.V().hasLabel('IfThenElse')
    .project('condition', 'then', 'else')
    .by('condition')
    .by(out('THEN').values('label'))
    .by(out('ELSE').values('label'))

Database-Specific Configuration

JanusGraph

from gremlin_python.driver.serializer import GraphSONSerializersV3d0

connection = DriverRemoteConnection(
    'ws://localhost:8182/gremlin',
    'g',
    message_serializer=GraphSONSerializersV3d0()
)

Amazon Neptune

from gremlin_python.driver import client

# IAM authentication
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest

# WebSocket connection with SigV4
connection = DriverRemoteConnection(
    'wss://your-cluster.region.neptune.amazonaws.com:8182/gremlin',
    'g'
)

Azure Cosmos DB

from gremlin_python.driver import client, serializer

# Cosmos DB requires specific serializer
connection = DriverRemoteConnection(
    'wss://your-account.gremlin.cosmos.azure.com:443/',
    'g',
    username="/dbs/your-database/colls/your-graph",
    password="your-primary-key"
)

Batch Loading

For large DDI files, use batched writes:

BATCH_SIZE = 100

vertices_batch = []
parser = DDIFragmentParser()

for i, fragment in enumerate(parser.parse("large_survey.xml")):
    vertices_batch.append(fragment)

    if len(vertices_batch) >= BATCH_SIZE:
        # Submit batch
        for f in vertices_batch:
            g.addV(f.element_type) \
                .property('fragment_id', f.fragment_id) \
                .property('label', f.label or '') \
                .iterate()
        vertices_batch = []
        print(f"Processed {i + 1} vertices")

# Final batch
for f in vertices_batch:
    g.addV(f.element_type) \
        .property('fragment_id', f.fragment_id) \
        .property('label', f.label or '') \
        .iterate()

See Also