Previous
Query results
Monitor and manage your data pipelines after creation. For creating pipelines, see Create a pipeline.
viam datapipelines list --org-id=<org-id>
pipelines = await data_client.list_data_pipelines(organization_id=ORG_ID)
for p in pipelines:
print(f"{p.id}: {p.name} (enabled={p.enabled}, schedule={p.schedule})")
pipelines, err := dataClient.ListDataPipelines(ctx, orgID)
if err != nil {
logger.Fatal(err)
}
for _, p := range pipelines {
fmt.Printf("%s: %s (enabled=%v, schedule=%s)\n", p.ID, p.Name, p.Enabled, p.Schedule)
}
viam datapipelines describe --id=<pipeline-id>
pipeline = await data_client.get_data_pipeline(id="YOUR-PIPELINE-ID")
print(f"Name: {pipeline.name}")
print(f"Schedule: {pipeline.schedule}")
print(f"Enabled: {pipeline.enabled}")
print(f"Data source: {pipeline.data_source_type}")
print(f"Created: {pipeline.created_on}")
pipeline, err := dataClient.GetDataPipeline(ctx, "YOUR-PIPELINE-ID")
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Name: %s\nSchedule: %s\nEnabled: %v\n", pipeline.Name, pipeline.Schedule, pipeline.Enabled)
Each pipeline run has a status and an associated time window showing which data it processed.
# Returns a page of runs (default page size: 10)
page = await data_client.list_data_pipeline_runs(id="YOUR-PIPELINE-ID")
for run in page.runs:
print(f"Run {run.id}: {run.status}")
print(f" Data window: {run.data_start_time} to {run.data_end_time}")
if run.error_message:
print(f" Error: {run.error_message}")
# Get the next page if there are more runs
if page.next_page_token:
next_page = await page.next_page()
// Returns a page of runs (default page size: 10)
page, err := dataClient.ListDataPipelineRuns(ctx, "YOUR-PIPELINE-ID", 10)
if err != nil {
logger.Fatal(err)
}
for _, run := range page.Runs {
fmt.Printf("Run %s: %d\n", run.ID, run.Status)
fmt.Printf(" Data window: %s to %s\n", run.DataStartTime, run.DataEndTime)
if run.ErrorMessage != "" {
fmt.Printf(" Error: %s\n", run.ErrorMessage)
}
}
// Get the next page
nextPage, err := page.NextPage(ctx)
Run statuses:
| Status | Meaning |
|---|---|
SCHEDULED | The run is queued and waiting to execute (2-minute delay before execution starts). |
STARTED | The run is executing the MQL aggregation against the data source. |
COMPLETED | The run finished and results are in the pipeline sink. |
FAILED | The run encountered an error. Check the error_message field. |
If a run stays in STARTED for more than 10 minutes, it is automatically marked as failed and a new run is created for that time window.
viam datapipelines enable --id=<pipeline-id>
err = dataClient.EnableDataPipeline(ctx, "YOUR-PIPELINE-ID")
The Python SDK does not currently have enable_data_pipeline or disable_data_pipeline methods. Use the CLI or Go SDK.
viam datapipelines disable --id=<pipeline-id>
err = dataClient.DisableDataPipeline(ctx, "YOUR-PIPELINE-ID")
Disabling a pipeline stops future scheduled runs but does not delete existing results. When you re-enable a pipeline, it resumes from the next scheduled time window. It does not backfill windows it missed while disabled.
viam datapipelines rename --id=<pipeline-id> --name=new-name
await data_client.rename_data_pipeline(id="YOUR-PIPELINE-ID", name="new-name")
err = dataClient.RenameDataPipeline(ctx, "YOUR-PIPELINE-ID", "new-name")
viam datapipelines delete --id=<pipeline-id>
await data_client.delete_data_pipeline(id="YOUR-PIPELINE-ID")
err = dataClient.DeleteDataPipeline(ctx, "YOUR-PIPELINE-ID")
Deleting a pipeline removes the pipeline configuration, its execution history, and all output data in the pipeline sink. This is not reversible. If you need to preserve pipeline results, export them before deleting the pipeline.
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!