Writing a Connector

A complete guide to implementing the Connector trait.

1. Create a Crate

Connectors are separate crates depending on parallax-connect:

# Cargo.toml
[package]
name = "connector-myservice"
version = "0.1.0"
edition = "2021"

[dependencies]
parallax-connect = { path = "../parallax-connect" }
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
# ... your API client crate

2. Define the Struct

#![allow(unused)]
fn main() {
use parallax_connect::prelude::*;

pub struct MyServiceConnector {
    api_base_url: String,
    api_token: String,
}

impl MyServiceConnector {
    pub fn new(api_base_url: impl Into<String>, api_token: impl Into<String>) -> Self {
        MyServiceConnector {
            api_base_url: api_base_url.into(),
            api_token: api_token.into(),
        }
    }
}
}

3. Define Steps

Steps are the units of collection. Each step is independent or depends on prior steps. Define them in the steps() method:

#![allow(unused)]
fn main() {
#[async_trait]
impl Connector for MyServiceConnector {
    fn name(&self) -> &str {
        "my-service"
    }

    fn steps(&self) -> Vec<StepDefinition> {
        vec![
            step("users", "Collect users").build(),
            step("hosts", "Collect hosts").build(),
            step("services", "Collect services")
                .depends_on(&["hosts"])     // runs after "hosts"
                .build(),
            step("relationships", "Collect relationships")
                .depends_on(&["users", "hosts", "services"])  // runs last
                .build(),
        ]
    }
    // ...
}
}

INV-C05: Step dependencies must form a DAG. Cycles are rejected at connector load time.

INV-C06: A failed step does not prevent independent steps from running. Steps that don't depend on the failed step still execute.

4. Implement Steps

#![allow(unused)]
fn main() {
#[async_trait]
impl Connector for MyServiceConnector {
    // ... name() and steps() ...

    async fn execute_step(
        &self,
        step_id: &str,
        ctx: &mut StepContext,
    ) -> Result<(), ConnectorError> {
        match step_id {
            "users" => self.collect_users(ctx).await,
            "hosts" => self.collect_hosts(ctx).await,
            "services" => self.collect_services(ctx).await,
            "relationships" => self.collect_relationships(ctx).await,
            _ => Err(ConnectorError::UnknownStep(step_id.to_string())),
        }
    }
}
}

5. Emit Entities

#![allow(unused)]
fn main() {
impl MyServiceConnector {
    async fn collect_users(&self, ctx: &mut StepContext) -> Result<(), ConnectorError> {
        let users = self.fetch_users_from_api().await?;

        for user in users {
            ctx.emit_entity(
                entity("user", &user.id)          // (type, key)
                    .class("User")                // entity class
                    .display_name(&user.name)
                    .property("email", user.email.as_str())
                    .property("active", user.active)
                    .property("mfa_enabled", user.mfa_enabled)
            )?;
        }
        Ok(())
    }
}
}

ctx.emit_entity() returns Result<(), ConnectorError>. Emit errors are non-fatal by default — they log a warning but don't stop the step. To make them fatal, propagate with ?.

6. Emit Relationships

#![allow(unused)]
fn main() {
async fn collect_relationships(
    &self,
    ctx: &mut StepContext,
) -> Result<(), ConnectorError> {
    let assignments = self.fetch_role_assignments().await?;

    for assignment in assignments {
        ctx.emit_relationship(
            relationship(
                "user",         // from_type
                &assignment.user_id,  // from_key
                "ASSIGNED",     // verb
                "role",         // to_type
                &assignment.role_id,  // to_key
            )
            .property("assigned_at", assignment.timestamp.to_string().as_str())
        )?;
    }
    Ok(())
}
}

INV-C04: Referential integrity is enforced at commit time. A relationship whose from_key or to_key doesn't exist in the batch or the current graph will be rejected. Emit entities before the relationships that reference them.

7. Access Prior Step Data

Steps can read entities emitted by their dependencies:

#![allow(unused)]
fn main() {
async fn collect_services(
    &self,
    ctx: &mut StepContext,
) -> Result<(), ConnectorError> {
    // Read hosts emitted by the "hosts" step
    let host_ids: Vec<String> = ctx.prior
        .entities_by_type("host")
        .iter()
        .map(|e| e.entity_key.to_string())
        .collect();

    // Use host IDs to fetch services from the API
    for host_id in host_ids {
        let services = self.fetch_services_for_host(&host_id).await?;
        for service in services {
            ctx.emit_entity(
                entity("service", &service.id)
                    .class("Service")
                    .display_name(&service.name)
            )?;
        }
    }
    Ok(())
}
}

8. Error Handling

#![allow(unused)]
fn main() {
use parallax_connect::ConnectorError;

// For unrecognized step IDs (always include this)
Err(ConnectorError::UnknownStep(step_id.to_string()))

// For API errors
Err(ConnectorError::Custom(format!("API error: {}", response.status())))

// For configuration errors
Err(ConnectorError::Configuration("API token is empty".to_string()))
}

Connector errors are logged and reported in SyncEvent::StepFailed. They do not abort the entire sync — independent steps still run.

9. Run the Connector

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let connector = MyServiceConnector::new(
        "https://api.myservice.com",
        std::env::var("MYSERVICE_TOKEN")?,
    );

    let mut engine = StorageEngine::open(StoreConfig::new("./data"))?;

    let output = parallax_connect::run_connector(
        &connector,
        "my-account-id",  // account_id
        "sync-001",       // sync_id (unique per run)
        None,             // event_tx (optional observability channel)
    ).await?;

    let result = parallax_ingest::commit_sync_exclusive(
        &mut engine,
        &output.connector_id,
        &output.sync_id,
        output.entities,
        output.relationships,
    )?;

    println!("Sync complete:");
    println!("  Created: {}", result.stats.entities_created);
    println!("  Updated: {}", result.stats.entities_updated);
    println!("  Deleted: {}", result.stats.entities_deleted);
    println!("  Relationships created: {}", result.stats.relationships_created);

    Ok(())
}