Previous
Create a pipeline
Query the precomputed summaries that a pipeline produces. Pipeline results are stored in a dedicated sink collection, separate from your raw data. You query them by specifying the pipeline_sink data source type and the pipeline’s ID.
You need the pipeline ID, which is returned when you create the pipeline and visible in viam datapipelines list or viam datapipelines describe.
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
PIPELINE_ID = "YOUR-PIPELINE-ID"
# Returns a list of result documents
results = await data_client.tabular_data_by_mql(
organization_id=ORG_ID,
query=[
{"$limit": 10},
],
tabular_data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK,
pipeline_id=PIPELINE_ID,
)
for entry in results:
print(entry)
results, err := dataClient.TabularDataByMQL(ctx, orgID,
[]map[string]interface{}{
{"$limit": 10},
},
&app.TabularDataByMQLOptions{
TabularDataSourceType: app.TabularDataSourceTypePipelineSink,
PipelineID: "YOUR-PIPELINE-ID",
},
)
if err != nil {
logger.Fatal(err)
}
for _, entry := range results {
fmt.Printf("%v\n", entry)
}
The query runs against the pipeline’s output documents, not the raw readings. The fields available depend on what your pipeline’s $project stage produces.
Each result document contains the fields your pipeline’s $project stage outputs, plus metadata added automatically:
{
"_viam_pipeline_run": {
"id": "run-id",
"interval": {
"start": "2025-03-15T14:00:00.000Z",
"end": "2025-03-15T15:00:00.000Z"
},
"organization_id": "org-id"
},
"location": "warehouse-a",
"avg_temp": 23.5,
"count": 3600
}
You can use $match on the _viam_pipeline_run.interval fields to filter results by time window.
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!