---
title: Cloudflare Pipelines
description: Cloudflare Pipelines ingests events, transforms them with SQL, and delivers them to R2 as Iceberg tables or as Parquet and JSON files.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/index.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Cloudflare Pipelines

Note

Pipelines is in **open beta**, and any developer with a [Workers Paid plan](https://developers.cloudflare.com/workers/platform/pricing/) can start using it. Currently, outside of standard R2 storage and operations, you will not be billed for your use of Pipelines.

Ingest, transform, and load streaming data into Apache Iceberg or Parquet in R2.

 Available on Paid plans 

Cloudflare Pipelines ingests events, transforms them with SQL, and delivers them to R2 as [Iceberg tables](https://developers.cloudflare.com/r2/data-catalog/) or as Parquet and JSON files.

Whether you're processing server logs, mobile application events, IoT telemetry, or clickstream data, Pipelines provides durable ingestion via HTTP endpoints or Worker bindings, SQL-based transformations, and exactly-once delivery to R2\. This makes it easy to build analytics-ready data warehouses and lakehouses without managing streaming infrastructure.

Create your first pipeline by following the [getting started guide](https://developers.cloudflare.com/pipelines/getting-started) or running this [Wrangler](https://developers.cloudflare.com/workers/wrangler/) command:

Terminal window

```

npx wrangler pipelines setup


```

---

## Features

### Create your first pipeline

Build your first pipeline to ingest data via HTTP or Workers, apply SQL transformations, and deliver to R2 as Iceberg tables or Parquet files.

[ Get started ](https://developers.cloudflare.com/pipelines/getting-started/) 

### Streams

Durable, buffered queues that receive events via HTTP endpoints or Worker bindings.

[ Learn about Streams ](https://developers.cloudflare.com/pipelines/streams/) 

### Pipelines

Connect streams to sinks with SQL transformations that validate, filter, transform, and enrich your data at ingestion time.

[ Learn about Pipelines ](https://developers.cloudflare.com/pipelines/pipelines/) 

### Sinks

Configure destinations for your data. Write Apache Iceberg tables to R2 Data Catalog or export as Parquet and JSON files.

[ Learn about Sinks ](https://developers.cloudflare.com/pipelines/sinks/) 

---

## Related products

**[R2](https://developers.cloudflare.com/r2/)** 

Cloudflare R2 Object Storage allows developers to store large amounts of unstructured data without the costly egress bandwidth fees associated with typical cloud storage services.

**[Workers](https://developers.cloudflare.com/workers/)** 

Cloudflare Workers allows developers to build serverless applications and deploy instantly across the globe for exceptional performance, reliability, and scale.

---

## More resources

[Limits](https://developers.cloudflare.com/pipelines/platform/limits/) 

Learn about pipelines limits.

[@CloudflareDev](https://x.com/cloudflaredev) 

Follow @CloudflareDev on Twitter to learn about product announcements, and what is new in Cloudflare Workers.

[Developer Discord](https://discord.cloudflare.com) 

Connect with the Workers community on Discord to ask questions, show what you are building, and discuss the platform with other developers.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}}]}
```

---

---
title: Getting started
description: Create your first pipeline to ingest streaming data and write to R2 Data Catalog as an Apache Iceberg table.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/getting-started.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Getting started

This guide will instruct you through:

* Creating an [API token](https://developers.cloudflare.com/r2/api/tokens/) needed for pipelines to authenticate with your data catalog.
* Creating your first pipeline with a simple ecommerce schema that writes to an [Apache Iceberg ↗](https://iceberg.apache.org/) table managed by R2 Data Catalog.
* Sending sample ecommerce data via HTTP endpoint.
* Validating data in your bucket and querying it with R2 SQL.

## Prerequisites

1. Sign up for a [Cloudflare account ↗](https://dash.cloudflare.com/sign-up/workers-and-pages).
2. Install [Node.js ↗](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm).

Node.js version manager

Use a Node version manager like [Volta ↗](https://volta.sh/) or [nvm ↗](https://github.com/nvm-sh/nvm) to avoid permission issues and change Node.js versions. [Wrangler](https://developers.cloudflare.com/workers/wrangler/install-and-update/), discussed later in this guide, requires a Node version of `16.17.0` or later.

## 1\. Create an API token

Pipelines must authenticate to R2 Data Catalog with an [R2 API token](https://developers.cloudflare.com/r2/api/tokens/) that has catalog and R2 permissions.

1. In the Cloudflare dashboard, go to the **R2 object storage** page.  
[ Go to **Overview** ](https://dash.cloudflare.com/?to=/:account/r2/overview)
2. Select **Manage API tokens**.
3. Select **Create Account API token**.
4. Give your API token a name.
5. Under **Permissions**, select the **Admin Read & Write** permission.
6. Select **Create Account API Token**.
7. Note the **Token value**.

Note

This token also includes the R2 SQL Read permission, which allows you to query your data with R2 SQL.

## 2\. Create your first pipeline

* [ Wrangler CLI ](#tab-panel-5484)
* [ Dashboard ](#tab-panel-5485)

First, create a schema file that defines your ecommerce data structure:

**Create `schema.json`:**

```

{

  "fields": [

    {

      "name": "user_id",

      "type": "string",

      "required": true

    },

    {

      "name": "event_type",

      "type": "string",

      "required": true

    },

    {

      "name": "product_id",

      "type": "string",

      "required": false

    },

    {

      "name": "amount",

      "type": "float64",

      "required": false

    }

  ]

}


```

Use the interactive setup to create a pipeline that writes to R2 Data Catalog:

Terminal window

```

npx wrangler pipelines setup


```

Note

The setup command automatically creates the [R2 bucket](https://developers.cloudflare.com/r2/buckets/) and enables [R2 Data Catalog](https://developers.cloudflare.com/r2/data-catalog/) if they do not already exist, so you do not need to create them beforehand.

Follow the prompts:

1. **Pipeline name**: Enter `ecommerce`
2. **Stream configuration**:  
   * Enable HTTP endpoint: `yes`  
   * Require authentication: `no` (for simplicity)  
   * Configure custom CORS origins: `no`  
   * Schema definition: `Load from file`  
   * Schema file path: `schema.json` (or your file path)
3. **Sink configuration**:  
   * Destination type: `Data Catalog (Iceberg)`  
   * Setup mode: `Simple (recommended defaults)`  
   * R2 bucket name: `pipelines-tutorial` (created automatically if it does not exist)  
   * Table name: `ecommerce`  
   * Catalog API token: Enter your token from step 1
4. **Review**: Confirm the summary and select `Create resources`
5. **SQL transformation**: Choose `Simple ingestion (SELECT * FROM stream)`

Note

If you make a mistake during setup (such as an invalid name or incorrect credentials), you will be prompted to retry rather than needing to restart the entire setup process.

Advanced mode options

If you select **Advanced** instead of **Simple** during sink configuration, you can customize the following additional options:

* **Format**: Output file format (for example, Parquet)
* **Compression**: Compression algorithm (for example, zstd)
* **Rolling policy**: File size threshold (minimum 5 MB) and time interval (minimum 10 seconds) for creating new files
* **Credentials**: Choose between automatic credential generation or manually entering R2 credentials
* **Namespace**: Data Catalog namespace (defaults to `default`)

After setup completes, the command outputs a configuration snippet for your Wrangler file, a Worker binding example with sample data, and a curl command for the HTTP endpoint. Note the HTTP endpoint URL and the `pipelines` configuration for use in the following steps.

You can also pre-set the pipeline name using the `--name` flag:

Terminal window

```

npx wrangler pipelines setup --name ecommerce


```

1. In the Cloudflare dashboard, go to **R2 object storage**.  
[ Go to **Overview** ](https://dash.cloudflare.com/?to=/:account/r2/overview)
2. Select **Create bucket** and enter the bucket name: `pipelines-tutorial`.
3. Select **Create bucket**.
4. Select the bucket, switch to the **Settings** tab, scroll down to **R2 Data Catalog**, and select **Enable**.
5. Once enabled, note the **Catalog URI** and **Warehouse name**.
6. Go to **Pipelines** \> **Pipelines**.  
[ Go to **Pipelines** ](https://dash.cloudflare.com/?to=/:account/pipelines/overview)
7. Select **Create Pipeline**.
8. **Connect to a Stream**:  
   * Pipeline name: `ecommerce`  
   * Enable HTTP endpoint for sending data: Enabled  
   * HTTP authentication: Disabled (default)  
   * Select **Next**
9. **Define Input Schema**:  
   * Select **JSON editor**  
   * Copy in the schema:  
   ```  
   {  
     "fields": [  
       {  
         "name": "user_id",  
         "type": "string",  
         "required": true  
       },  
       {  
         "name": "event_type",  
         "type": "string",  
         "required": true  
       },  
       {  
         "name": "product_id",  
         "type": "string",  
         "required": false  
       },  
       {  
         "name": "amount",  
         "type": "float64",  
         "required": false  
       }  
     ]  
   }  
   ```  
   * Select **Next**
10. **Define Sink**:  
   * Select your R2 bucket: `pipelines-tutorial`  
   * Storage type: **R2 Data Catalog**  
   * Namespace: `default`  
   * Table name: `ecommerce`  
   * **Advanced Settings**: Change **Maximum Time Interval** to `10 seconds`  
   * Select **Next**
11. **Credentials**:  
   * Disable **Automatically create an Account API token for your sink**  
   * Enter **Catalog Token** from step 1  
   * Select **Next**
12. **Pipeline Definition**:  
   * Leave the default SQL query:  
   ```  
   INSERT INTO ecommerce_sink SELECT * FROM ecommerce_stream;  
   ```  
   * Select **Create Pipeline**
13. After pipeline creation, note the **Stream ID** for the next step.

## 3\. Send sample data

Send ecommerce events to your pipeline's HTTP endpoint:

Terminal window

```

curl -X POST https://{stream-id}.ingest.cloudflare.com \

  -H "Content-Type: application/json" \

  -d '[

    {

      "user_id": "user_12345",

      "event_type": "purchase",

      "product_id": "widget-001",

      "amount": 29.99

    },

    {

      "user_id": "user_67890",

      "event_type": "view_product",

      "product_id": "widget-002"

    },

    {

      "user_id": "user_12345",

      "event_type": "add_to_cart",

      "product_id": "widget-003",

      "amount": 15.50

    }

  ]'


```

Replace `{stream-id}` with your actual stream endpoint from the pipeline setup.

## 4\. Validate data in your bucket

1. In the Cloudflare dashboard, go to the **R2 object storage** page.
2. Select your bucket: `pipelines-tutorial`.
3. You should see Iceberg metadata files and data files created by your pipeline. If you are not seeing any files in your bucket, wait a couple of minutes and try again.
4. The data is organized in the Apache Iceberg format with metadata tracking table versions.

## 5\. Query your data using R2 SQL

Set up your environment to use R2 SQL:

Terminal window

```

export WRANGLER_R2_SQL_AUTH_TOKEN=YOUR_API_TOKEN


```

Or create a `.env` file with:

```

WRANGLER_R2_SQL_AUTH_TOKEN=YOUR_API_TOKEN


```

Where `YOUR_API_TOKEN` is the token you created in step 1\. For more information on setting environment variables, refer to [Wrangler system environment variables](https://developers.cloudflare.com/workers/wrangler/system-environment-variables/).

Query your data:

Terminal window

```

npx wrangler r2 sql query "YOUR_WAREHOUSE_NAME" "

SELECT

    user_id,

    event_type,

    product_id,

    amount

FROM default.ecommerce

WHERE event_type = 'purchase'

LIMIT 10"


```

Replace `YOUR_WAREHOUSE_NAME` with the warehouse name noted during pipeline setup. You can find it in the Cloudflare dashboard under **R2 object storage** \> your bucket > **Settings** \> **R2 Data Catalog**.

You can also query this table with any engine that supports Apache Iceberg. To learn more about connecting other engines to R2 Data Catalog, refer to [Connect to Iceberg engines](https://developers.cloudflare.com/r2/data-catalog/config-examples/).

## Learn more

[ Streams ](https://developers.cloudflare.com/pipelines/streams/) Learn about configuring streams for data ingestion. 

[ Pipelines ](https://developers.cloudflare.com/pipelines/pipelines/) Understand SQL transformations and pipeline configuration. 

[ Sinks ](https://developers.cloudflare.com/pipelines/sinks/) Configure data destinations and output formats. 

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/getting-started/","name":"Getting started"}}]}
```

---

---
title: Streams
description: Streams are durable, buffered queues that receive and store events for processing in Cloudflare Pipelines. They provide reliable data ingestion via HTTP endpoints and Worker bindings, ensuring no data loss even during downstream processing delays or failures.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/streams/index.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Streams

Streams are durable, buffered queues that receive and store events for processing in [Cloudflare Pipelines](https://developers.cloudflare.com/pipelines/). They provide reliable data ingestion via HTTP endpoints and Worker bindings, ensuring no data loss even during downstream processing delays or failures.

A single stream can be read by multiple pipelines, allowing you to route the same data to different destinations or apply different transformations. For example, you might send user events to both a real-time analytics pipeline and a data warehouse pipeline.

Streams currently accept events in JSON format and support both structured events with defined schemas and unstructured JSON. When a schema is provided, streams will validate and enforce it for incoming events.

## Learn more

[ Manage streams ](https://developers.cloudflare.com/pipelines/streams/manage-streams/) Create, configure, and delete streams using Wrangler or the API. 

[ Writing to streams ](https://developers.cloudflare.com/pipelines/streams/writing-to-streams/) Send events to streams via HTTP endpoints or Worker bindings. 

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/streams/","name":"Streams"}}]}
```

---

---
title: Manage streams
description: Create, configure, and manage streams for data ingestion
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/streams/manage-streams.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Manage streams

Learn how to:

* Create and configure streams for data ingestion
* View and update stream settings
* Delete streams when no longer needed

## Create a stream

Streams are made available to pipelines as SQL tables using the stream name (for example, `SELECT * FROM my_stream`).

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.  
[ Go to **Pipelines** ](https://dash.cloudflare.com/?to=/:account/pipelines/overview)
2. Select **Create Pipeline** to launch the pipeline creation wizard.
3. Complete the wizard to create your stream along with the associated sink and pipeline.

### Wrangler CLI

To create a stream, run the [pipelines streams create](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-streams-create) command:

Terminal window

```

npx wrangler pipelines streams create <STREAM_NAME>


```

Alternatively, to use the interactive setup wizard that helps you configure a stream, sink, and pipeline, run the [pipelines setup](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-setup) command:

Terminal window

```

npx wrangler pipelines setup


```

### Schema configuration

Streams support two approaches for handling data:

* **Structured streams**: Define a schema with specific fields and data types. Events are validated against the schema.
* **Unstructured streams**: Accept any valid JSON without validation. These streams have a single `value` column containing the JSON data.

To create a structured stream, provide a schema file:

Terminal window

```

npx wrangler pipelines streams create my-stream --schema-file schema.json


```

Example schema file:

```

{

  "fields": [

    {

      "name": "user_id",

      "type": "string",

      "required": true

    },

    {

      "name": "amount",

      "type": "float64",

      "required": false

    },

    {

      "name": "tags",

      "type": "list",

      "required": false,

      "items": {

        "type": "string"

      }

    },

    {

      "name": "metadata",

      "type": "struct",

      "required": false,

      "fields": [

        {

          "name": "source",

          "type": "string",

          "required": false

        },

        {

          "name": "priority",

          "type": "int32",

          "required": false

        }

      ]

    }

  ]

}


```

**Supported data types:**

* `string` \- Text values
* `int32`, `int64` \- Integer numbers
* `float32`, `float64` \- Floating-point numbers
* `bool` \- Boolean true/false
* `timestamp` \- RFC 3339 timestamps, or numeric values parsed as Unix seconds, milliseconds, or microseconds (depending on unit)
* `json` \- JSON objects
* `binary` \- Binary data (base64-encoded)
* `list` \- Arrays of values
* `struct` \- Nested objects with defined fields

Note

Events that do not match the defined schema are accepted during ingestion but will be dropped during processing. To monitor dropped events and understand why they were dropped, query the [user error metrics](https://developers.cloudflare.com/pipelines/observability/metrics/#user-error-metrics) via GraphQL. Schema modifications are not supported after stream creation.

## View stream configuration

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** \> **Streams**.
2. Select a stream to view its associated configuration.

### Wrangler CLI

To view a specific stream, run the [pipelines streams get](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-streams-get) command:

Terminal window

```

npx wrangler pipelines streams get <STREAM_ID>


```

To list all streams in your account, run the [pipelines streams list](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-streams-list) command:

Terminal window

```

npx wrangler pipelines streams list


```

## Update HTTP ingest settings

You can update certain HTTP ingest settings after stream creation. Schema modifications are not supported once a stream is created.

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** \> **Streams**.
2. Select the stream you want to update.
3. In the **Settings** tab, go to **HTTP Ingest**.
4. To turn on or turn off HTTP ingestion, select **Enable** or **Disable**.
5. To update authentication and CORS settings, select **Edit** and modify.
6. Save your changes.

Note

For details on configuring authentication tokens and making authenticated requests, refer to [Writing to streams](https://developers.cloudflare.com/pipelines/streams/writing-to-streams/).

## Delete a stream

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** \> **Streams**.
2. Select the stream you want to delete.
3. In the **Settings** tab, go to **General**, and select **Delete**.

### Wrangler CLI

To delete a stream, run the [pipelines streams delete](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-streams-delete) command:

Terminal window

```

npx wrangler pipelines streams delete <STREAM_ID>


```

Warning

Deleting a stream will permanently remove all buffered events that have not been processed and will delete any dependent pipelines. Ensure all data has been delivered to your sink before deletion.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/streams/","name":"Streams"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/streams/manage-streams/","name":"Manage streams"}}]}
```

---

---
title: Writing to streams
description: Send data to streams via Worker bindings or HTTP endpoints
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/streams/writing-to-streams.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Writing to streams

Send events to streams using [Worker bindings](https://developers.cloudflare.com/workers/runtime-apis/bindings/) or HTTP endpoints for client-side applications and external systems.

## Send via Workers

Worker bindings provide a secure way to send data to streams from [Workers](https://developers.cloudflare.com/workers/) without managing API tokens or credentials.

### Configure pipeline binding

Add a pipeline binding to your Wrangler file that points to your stream:

* [  wrangler.jsonc ](#tab-panel-5528)
* [  wrangler.toml ](#tab-panel-5529)

```

{

  "pipelines": [

    {

      "pipeline": "<STREAM_ID>",

      "binding": "STREAM"

    }

  ]

}


```

```

[[pipelines]]

pipeline = "<STREAM_ID>"

binding = "STREAM"


```

### Workers API

The pipeline binding exposes a method for sending data to your stream:

#### `send(records)`

Sends an array of JSON-serializable records to the stream. Returns a Promise that resolves when records are confirmed as ingested.

* [  JavaScript ](#tab-panel-5530)
* [  TypeScript ](#tab-panel-5531)

JavaScript

```

export default {

  async fetch(request, env, ctx) {

    const events = await request.json();


    await env.STREAM.send(events);


    return new Response("Events sent");

  },

};


```

TypeScript

```

export default {

  async fetch(request, env, ctx): Promise<Response> {

    const events = await request.json<Record<string, unknown>[]>();


    await env.STREAM.send(events);


    return new Response("Events sent");

  },

} satisfies ExportedHandler<Env>;


```

### Typed pipeline bindings

When a stream has a defined schema, running `wrangler types` generates schema-specific TypeScript types for your pipeline bindings. Instead of the generic `Pipeline<PipelineRecord>`, your bindings get a named record type with full autocomplete and compile-time type checking. Refer to the [wrangler types documentation](https://developers.cloudflare.com/workers/wrangler/commands/general/#types) to learn more.

#### Generated types

After running `wrangler types`, the generated `worker-configuration.d.ts` file contains a named record type inside the `Cloudflare` namespace. The type name is derived from the stream name (not the binding name), converted to PascalCase with a `Record` suffix.

Below is an example of what generated types look like in `worker-configuration.d.ts` for a stream named `ecommerce_stream`:

TypeScript

```

declare namespace Cloudflare {

  type EcommerceStreamRecord = {

    user_id: string;

    event_type: string;

    product_id?: string;

    amount?: number;

  };

  interface Env {

    STREAM: import("cloudflare:pipelines").Pipeline<Cloudflare.EcommerceStreamRecord>;

  }

}


```

#### Fallback behavior

`wrangler types` falls back to the generic `Pipeline<PipelineRecord>` type in the following scenarios:

* **Not authenticated**: Run `wrangler login` to enable typed pipeline bindings.
* **Stream not found**: The stream ID in your Wrangler configuration does not match an existing stream.
* **Unstructured stream**: The stream was created without a schema.

## Send via HTTP

Each stream provides an optional HTTP endpoint for ingesting data from external applications, browsers, or any system that can make HTTP requests.

### Endpoint format

HTTP endpoints follow this format:

```

https://{stream-id}.ingest.cloudflare.com


```

Find your stream's endpoint URL in the Cloudflare dashboard under **Pipelines** \> **Streams** or using the Wrangler CLI:

Terminal window

```

npx wrangler pipelines streams get <STREAM_ID>


```

### Making requests

Send events as JSON arrays via POST requests:

Terminal window

```

curl -X POST https://{stream-id}.ingest.cloudflare.com \

  -H "Content-Type: application/json" \

  -d '[

    {

      "user_id": "12345",

      "event_type": "purchase",

      "product_id": "widget-001",

      "amount": 29.99

    }

  ]'


```

### Authentication

When authentication is enabled for your stream, include the API token in the `Authorization` header:

Terminal window

```

curl -X POST https://{stream-id}.ingest.cloudflare.com \

  -H "Content-Type: application/json" \

  -H "Authorization: Bearer YOUR_API_TOKEN" \

  -d '[{"event": "test"}]'


```

The API token must have **Workers Pipeline Send** permission. To learn more, refer to the [Create API token](https://developers.cloudflare.com/fundamentals/api/get-started/create-token/) documentation.

## Schema validation

Streams handle validation differently based on their configuration:

* **Structured streams**: Events must match the defined schema fields and types.
* **Unstructured streams**: Accept any valid JSON structure. Data is stored in a single `value` column.

For structured streams, ensure your events match the schema definition. Invalid events will be accepted but dropped, so validate your data before sending to avoid dropped events. When using Worker bindings, run `wrangler types` to generate [typed pipeline bindings](#typed-pipeline-bindings) that catch schema violations at compile time. You can also query the [user error metrics](https://developers.cloudflare.com/pipelines/observability/metrics/#user-error-metrics) to monitor dropped events and diagnose schema validation issues.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/streams/","name":"Streams"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/streams/writing-to-streams/","name":"Writing to streams"}}]}
```

---

---
title: Sinks
description: Sinks define destinations for your data in Cloudflare Pipelines. They support writing to R2 Data Catalog as Apache Iceberg tables or to R2 as raw JSON or Parquet files.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sinks/index.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Sinks

Sinks define destinations for your data in Cloudflare Pipelines. They support writing to [R2 Data Catalog](https://developers.cloudflare.com/r2/data-catalog/) as Apache Iceberg tables or to [R2](https://developers.cloudflare.com/r2/) as raw JSON or Parquet files.

Sinks provide exactly-once delivery guarantees, ensuring events are never duplicated or dropped. They can be configured to write files frequently for low-latency ingestion or to write larger, less frequent files for better query performance.

## Learn more

[ Manage sinks ](https://developers.cloudflare.com/pipelines/sinks/manage-sinks/) Create, configure, and delete sinks using Wrangler or the API. 

[ Available sinks ](https://developers.cloudflare.com/pipelines/sinks/available-sinks/) Learn about supported sink destinations and their configuration options. 

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sinks/","name":"Sinks"}}]}
```

---

---
title: R2
description: Write data as JSON or Parquet files to R2 object storage
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sinks/available-sinks/r2.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# R2

R2 sinks write processed data from pipelines as raw files to [R2 object storage](https://developers.cloudflare.com/r2/). They currently support writing to JSON and Parquet formats.

To create an R2 sink, run the [pipelines sinks create](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-sinks-create) command and specify the sink type and target [bucket](https://developers.cloudflare.com/r2/buckets/):

Terminal window

```

npx wrangler pipelines sinks create my-sink \

  --type r2 \

  --bucket my-bucket


```

## Format options

R2 sinks support two output formats:

### JSON format

Write data as newline-delimited JSON files:

Terminal window

```

--format json


```

### Parquet format

Write data as Parquet files for better query performance and compression:

Terminal window

```

--format parquet --compression zstd


```

**Compression options for Parquet:**

* `zstd` (default) - Best compression ratio
* `snappy` \- Fastest compression
* `gzip` \- Good compression, widely supported
* `lz4` \- Fast compression with reasonable ratio
* `uncompressed` \- No compression

**Row group size:** [Row groups ↗](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size in MB:

Terminal window

```

--target-row-group-size 256


```

## File organization

Files are written with UUID names within the partitioned directory structure. For example, with path `analytics` and default partitioning:

```

analytics/year=2025/month=09/day=18/002507a5-d449-48e8-a484-b1bea916102f.parquet


```

### Path

Set a base directory in your bucket where files will be written:

Terminal window

```

--path analytics/events


```

### Partitioning

R2 sinks automatically partition files by time using a configurable pattern. The default pattern is `year=%Y/month=%m/day=%d` (Hive-style partitioning).

Terminal window

```

--partitioning "year=%Y/month=%m/day=%d/hour=%H"


```

For available format specifiers, refer to [strftime documentation ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html).

## Batching and rolling policy

Control when files are written to R2\. Configure based on your needs:

* **Lower values**: More frequent writes, smaller files, lower latency
* **Higher values**: Less frequent writes, larger files, better query performance

### Roll interval

Set how often files are written (default: 300 seconds):

Terminal window

```

--roll-interval 60  # Write files every 60 seconds


```

### Roll size

Set maximum file size in MB before creating a new file:

Terminal window

```

--roll-size 100  # Create new file after 100MB


```

## Authentication

R2 sinks require an API credentials (Access Key ID and Secret Access Key) with [Object Read & Write permissions](https://developers.cloudflare.com/r2/api/tokens/#permissions) to write data to your bucket.

Terminal window

```

npx wrangler pipelines sinks create my-sink \

  --type r2 \

  --bucket my-bucket \

  --access-key-id YOUR_ACCESS_KEY_ID \

  --secret-access-key YOUR_SECRET_ACCESS_KEY


```

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sinks/","name":"Sinks"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sinks/available-sinks/","name":"Available sinks"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sinks/available-sinks/r2/","name":"R2"}}]}
```

---

---
title: R2 Data Catalog
description: Write data as Apache Iceberg tables to R2 Data Catalog
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sinks/available-sinks/r2-data-catalog.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# R2 Data Catalog

R2 Data Catalog sinks write processed data from pipelines as [Apache Iceberg ↗](https://iceberg.apache.org/) tables to [R2 Data Catalog](https://developers.cloudflare.com/r2/data-catalog/). Iceberg tables provide ACID transactions, schema evolution, and time travel capabilities for analytics workloads.

To create an R2 Data Catalog sink, run the [pipelines sinks create](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-sinks-create) command and specify the sink type, target bucket, namespace, and table name:

Terminal window

```

npx wrangler pipelines sinks create my-sink \

  --type r2-data-catalog \

  --bucket my-bucket \

  --namespace my_namespace \

  --table my_table \

  --catalog-token YOUR_CATALOG_TOKEN


```

The sink will create the specified namespace and table if they do not exist. Sinks cannot be created for existing Iceberg tables.

## Format

R2 Data Catalog sinks only support Parquet format. JSON format is not supported for Iceberg tables.

### Compression options

Configure Parquet compression for optimal storage and query performance:

Terminal window

```

--compression zstd


```

**Available compression options:**

* `zstd` (default) - Best compression ratio
* `snappy` \- Fastest compression
* `gzip` \- Good compression, widely supported
* `lz4` \- Fast compression with reasonable ratio
* `uncompressed` \- No compression

### Row group size

[Row groups ↗](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size in MB:

Terminal window

```

--target-row-group-size 256


```

## Batching and rolling policy

Control when data is written to Iceberg tables. Configure based on your needs:

* **Lower values**: More frequent writes, smaller files, lower latency
* **Higher values**: Less frequent writes, larger files, better query performance

### Roll interval

Set how often files are written (default: 300 seconds):

Terminal window

```

--roll-interval 60  # Write files every 60 seconds


```

### Roll size

Set maximum file size in MB before creating a new file:

Terminal window

```

--roll-size 100  # Create new file after 100MB


```

## Authentication

R2 Data Catalog sinks require an API token with [R2 Admin Read & Write permissions](https://developers.cloudflare.com/r2/data-catalog/manage-catalogs/#create-api-token-in-the-dashboard). This permission grants the sink access to both R2 Data Catalog and R2 storage.

Terminal window

```

--catalog-token YOUR_CATALOG_TOKEN


```

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sinks/","name":"Sinks"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sinks/available-sinks/","name":"Available sinks"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sinks/available-sinks/r2-data-catalog/","name":"R2 Data Catalog"}}]}
```

---

---
title: Manage sinks
description: Create, configure, and manage sinks for data storage
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sinks/manage-sinks.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Manage sinks

Learn how to:

* Create and configure sinks for data storage
* View sink configuration
* Delete sinks when no longer needed

## Create a sink

Sinks are made available to pipelines as SQL tables using the sink name (e.g., `INSERT INTO my_sink SELECT * FROM my_stream`).

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.  
[ Go to **Pipelines** ](https://dash.cloudflare.com/?to=/:account/pipelines/overview)
2. Select **Create Pipeline** to launch the pipeline creation wizard.
3. Complete the wizard to create your sink along with the associated stream and pipeline.

### Wrangler CLI

To create a sink, run the [pipelines sinks create](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-sinks-create) command:

Terminal window

```

npx wrangler pipelines sinks create <SINK_NAME> \

  --type r2 \

  --bucket my-bucket \


```

For sink-specific configuration options, refer to [Available sinks](https://developers.cloudflare.com/pipelines/sinks/available-sinks/).

Alternatively, to use the interactive setup wizard that helps you configure a stream, sink, and pipeline, run the [pipelines setup](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-setup) command:

Terminal window

```

npx wrangler pipelines setup


```

## View sink configuration

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** \> **Sinks**.
2. Select a sink to view its configuration.

### Wrangler CLI

To view a specific sink, run the [pipelines sinks get](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-sinks-get) command:

Terminal window

```

npx wrangler pipelines sinks get <SINK_ID>


```

To list all sinks in your account, run the [pipelines sinks list](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-sinks-list) command:

Terminal window

```

npx wrangler pipelines sinks list


```

## Delete a sink

### Dashboard

1. In the Cloudflare dashboard, go to **Pipelines** \> **Sinks**.
2. Select the sink you want to delete.
3. In the **Settings** tab, navigate to **General**, and select **Delete**.

### Wrangler CLI

To delete a sink, run the [pipelines sinks delete](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-sinks-delete) command:

Terminal window

```

npx wrangler pipelines sinks delete <SINK_ID>


```

Warning

Deleting a sink stops all data writes to that destination.

## Limitations

* Sinks cannot be modified after creation. To change sink configuration, you must delete and recreate the sink.
* The R2 Data Catalog Sink does not currently support writing to R2 buckets into a different jurisdiction.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sinks/","name":"Sinks"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sinks/manage-sinks/","name":"Manage sinks"}}]}
```

---

---
title: Pipelines
description: Pipelines connect streams and sinks via SQL transformations, which can modify events before writing them to storage. This enables you to shift left, pushing validation, schematization, and processing to your ingestion layer to make your queries easy, fast, and correct.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/pipelines/index.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Pipelines

Pipelines connect [streams](https://developers.cloudflare.com/pipelines/streams/) and [sinks](https://developers.cloudflare.com/pipelines/sinks/) via SQL transformations, which can modify events before writing them to storage. This enables you to shift left, pushing validation, schematization, and processing to your ingestion layer to make your queries easy, fast, and correct.

Pipelines enable you to filter, transform, enrich, and restructure events in real-time as data flows from streams to sinks.

## Learn more

[ Manage pipelines ](https://developers.cloudflare.com/pipelines/pipelines/manage-pipelines/) Create, configure, and manage SQL transformations between streams and sinks. 

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/pipelines/","name":"Pipelines"}}]}
```

---

---
title: Manage pipelines
description: Create, configure, and manage SQL transformations between streams and sinks
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/pipelines/manage-pipelines.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Manage pipelines

Learn how to:

* Create pipelines with SQL transformations
* View pipeline configuration and SQL
* Delete pipelines when no longer needed

## Create a pipeline

Pipelines execute SQL statements that define how data flows from streams to sinks.

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.  
[ Go to **Pipelines** ](https://dash.cloudflare.com/?to=/:account/pipelines/overview)
2. Select **Create Pipeline** to launch the pipeline creation wizard.
3. Follow the wizard to configure your stream, sink, and SQL transformation.

### Wrangler CLI

To create a pipeline, run the [pipelines create](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-create) command:

Terminal window

```

npx wrangler pipelines create my-pipeline \

  --sql "INSERT INTO my_sink SELECT * FROM my_stream"


```

You can also provide SQL from a file:

Terminal window

```

npx wrangler pipelines create my-pipeline \

  --sql-file pipeline.sql


```

Alternatively, to use the interactive setup wizard that helps you configure a stream, sink, and pipeline, run the [pipelines setup](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-setup) command:

Terminal window

```

npx wrangler pipelines setup


```

### SQL transformations

Pipelines support SQL statements for data transformation. For complete syntax, supported functions, and data types, see the [SQL reference](https://developers.cloudflare.com/pipelines/sql-reference/).

Common patterns include:

#### Basic data flow

Transfer all data from stream to sink:

```

INSERT INTO my_sink SELECT * FROM my_stream


```

#### Filtering events

Filter events based on conditions:

```

INSERT INTO my_sink

SELECT * FROM my_stream

WHERE event_type = 'purchase' AND amount > 100


```

#### Selecting specific fields

Choose only the fields you need:

```

INSERT INTO my_sink

SELECT user_id, event_type, timestamp, amount

FROM my_stream


```

#### Transforming data

Apply transformations to fields:

```

INSERT INTO my_sink

SELECT

  user_id,

  UPPER(event_type) as event_type,

  timestamp,

  amount * 1.1 as amount_with_tax

FROM my_stream


```

## View pipeline configuration

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.
2. Select a pipeline to view its SQL transformation, connected streams/sinks, and associated metrics.

### Wrangler CLI

To view a specific pipeline, run the [pipelines get](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-get) command:

Terminal window

```

npx wrangler pipelines get <PIPELINE_ID>


```

To list all pipelines in your account, run the [pipelines list](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-list) command:

Terminal window

```

npx wrangler pipelines list


```

## Delete a pipeline

Deleting a pipeline stops data flow from the connected stream to sink.

### Dashboard

1. In the Cloudflare dashboard, go to the **Pipelines** page.
2. Select the pipeline you want to delete. 3\. In the **Settings** tab, and select **Delete**.

### Wrangler CLI

To delete a pipeline, run the [pipelines delete](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-delete) command:

Terminal window

```

npx wrangler pipelines delete <PIPELINE_ID>


```

Warning

Deleting a pipeline immediately stops data flow between the stream and sink.

## Limitations

Pipeline SQL cannot be modified after creation. To change the SQL transformation, you must delete and recreate the pipeline.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/pipelines/manage-pipelines/","name":"Manage pipelines"}}]}
```

---

---
title: Metrics and analytics
description: Pipelines expose metrics which allow you to measure data ingested, processed, and delivered to sinks.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/observability/metrics.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Metrics and analytics

Pipelines expose metrics which allow you to measure data ingested, processed, and delivered to sinks.

The metrics displayed in the [Cloudflare dashboard ↗](https://dash.cloudflare.com/) are queried from Cloudflare's [GraphQL Analytics API](https://developers.cloudflare.com/analytics/graphql-api/). You can access the metrics [programmatically](#query-via-the-graphql-api) via GraphQL or HTTP client.

## Metrics

### Operator metrics

Pipelines export the below metrics within the `pipelinesOperatorAdaptiveGroups` dataset. These metrics track data read and processed by pipeline operators.

| Metric        | GraphQL Field Name | Description                                                                                              |
| ------------- | ------------------ | -------------------------------------------------------------------------------------------------------- |
| Bytes In      | bytesIn            | Total number of bytes read by the pipeline (filter by streamId\_neq: "" to get data read from streams)   |
| Records In    | recordsIn          | Total number of records read by the pipeline (filter by streamId\_neq: "" to get data read from streams) |
| Decode Errors | decodeErrors       | Number of messages that could not be deserialized in the stream schema                                   |

For a detailed breakdown of why events were dropped (including specific error types like `missing_field`, `type_mismatch`, `parse_failure`, and `null_value`), refer to [User error metrics](#user-error-metrics).

The `pipelinesOperatorAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:

* `pipelineId` \- ID of the pipeline
* `streamId` \- ID of the source stream
* `datetime` \- Timestamp of the operation
* `date` \- Timestamp of the operation, truncated to the start of a day
* `datetimeHour` \- Timestamp of the operation, truncated to the start of an hour

### Sink metrics

Pipelines export the below metrics within the `pipelinesSinkAdaptiveGroups` dataset. These metrics track data delivery to sinks.

| Metric                     | GraphQL Field Name       | Description                                                  |
| -------------------------- | ------------------------ | ------------------------------------------------------------ |
| Bytes Written              | bytesWritten             | Total number of bytes written to the sink, after compression |
| Records Written            | recordsWritten           | Total number of records written to the sink                  |
| Files Written              | filesWritten             | Number of files written to the sink                          |
| Row Groups Written         | rowGroupsWritten         | Number of row groups written (for Parquet files)             |
| Uncompressed Bytes Written | uncompressedBytesWritten | Total number of bytes written before compression             |

The `pipelinesSinkAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:

* `pipelineId` \- ID of the pipeline
* `sinkId` \- ID of the destination sink
* `datetime` \- Timestamp of the operation
* `date` \- Timestamp of the operation, truncated to the start of a day
* `datetimeHour` \- Timestamp of the operation, truncated to the start of an hour

### User error metrics

Pipelines track events that are dropped during processing due to deserialization errors. When a structured stream receives events that do not match its defined schema, those events are accepted during ingestion but dropped during processing. The `pipelinesUserErrorsAdaptiveGroups` dataset provides visibility into these dropped events, telling you which events were dropped and why. You can explore the full schema of this dataset using GraphQL [introspection](https://developers.cloudflare.com/analytics/graphql-api/features/discovery/introspection/).

| Metric | GraphQL Field Name | Description                             |
| ------ | ------------------ | --------------------------------------- |
| Count  | count              | Number of events that failed validation |

The `pipelinesUserErrorsAdaptiveGroups` dataset provides the following dimensions for filtering and grouping queries:

* `pipelineId` \- ID of the pipeline
* `errorFamily` \- Category of the error (for example, `deserialization`)
* `errorType` \- Specific error type within the family
* `date` \- Date of the error, truncated to start of day
* `datetime` \- Timestamp of the error
* `datetimeHour` \- Timestamp of the error, truncated to the start of an hour
* `datetimeMinute` \- Timestamp of the error, truncated to the start of a minute

#### Known error types

| Error family    | Error type     | Description                                                                                                  |
| --------------- | -------------- | ------------------------------------------------------------------------------------------------------------ |
| deserialization | missing\_field | A required field defined in the stream schema was not present in the event                                   |
| deserialization | type\_mismatch | A field value did not match the expected type in the schema (for example, string sent where number expected) |
| deserialization | parse\_failure | The event could not be parsed as valid JSON, or a field value could not be parsed into the expected type     |
| deserialization | null\_value    | A required field was present but had a null value                                                            |

Note

To prevent incorrect data from being ingested in the first place, consider using [typed pipeline bindings](https://developers.cloudflare.com/pipelines/streams/writing-to-streams/#typed-pipeline-bindings) to catch schema violations at compile time.

## View metrics and errors in the dashboard

Per-pipeline analytics are available in the Cloudflare dashboard. To view current and historical metrics for a pipeline:

1. Log in to the [Cloudflare dashboard ↗](https://dash.cloudflare.com) and select your account.
2. Go to **Pipelines** \> **Pipelines**.
3. Select a pipeline.
4. Go to the **Metrics** tab to view its metrics or **Errors** tab to view dropped events.

You can optionally select a time window to query. This defaults to the last 24 hours.

## Query via the GraphQL API

You can programmatically query analytics for your pipelines via the [GraphQL Analytics API](https://developers.cloudflare.com/analytics/graphql-api/). This API queries the same datasets as the Cloudflare dashboard and supports GraphQL [introspection](https://developers.cloudflare.com/analytics/graphql-api/features/discovery/introspection/).

Pipelines GraphQL datasets require an `accountTag` filter with your Cloudflare account ID.

### Measure operator metrics over time period

This query returns the total bytes and records read by a pipeline from streams, along with any decode errors.

```

query PipelineOperatorMetrics(

  $accountTag: String!

  $pipelineId: String!

  $datetimeStart: Time!

  $datetimeEnd: Time!

) {

  viewer {

    accounts(filter: { accountTag: $accountTag }) {

      pipelinesOperatorAdaptiveGroups(

        limit: 10000

        filter: {

          pipelineId: $pipelineId

          streamId_neq: ""

          datetime_geq: $datetimeStart

          datetime_leq: $datetimeEnd

        }

      ) {

        sum {

          bytesIn

          recordsIn

          decodeErrors

        }

      }

    }

  }

}


```

[Run in GraphQL API Explorer](https://graphql.cloudflare.com/explorer?query=I4VwpgTgngBACgSwA5gDYIHZgPIogQwBcB7CAWTEIgQGMBnACgCgYYASfGm4kDQgFXwBzAFwwAylUxCAhC3ZJkaTGACSAEzGTqGWfLbqilBAFswk-BEJj+psHNYGjhOwFEMmmLbNyAlDABveQA3BDAAd0hA+VZObl5CRgAzBFRCSDEAmDiePkFRdhyE-JgAX38g1iqYRRR0LDpcSCJSAEFDJBdgsABxCB4kRhjqmHQTBGsYAEYABjmZ4eqUtIzokZHa5SwNMTZN+rV1RZG6KjB8Ew0AfSxgMQAie+Pqw3SXMyuhMDv2V+MzCxWZ5VP7vMBXVDfXagtweYGlY4VYF0EAmNbrKoAIyg6ToqgwwNYEDA3Ag6jxBIxIJJxHUYFcEH6EDo8OOCOq7LKTFKQA&variables=N4IghgxhD2CuB2AXAKmA5iAXCAggYTwHkBVAOWQH0BJAERABoQAHASyYFMAbF+dqgEywgASgFEACgBl8oigHUqyABLU6jfmETtELALbsAyojAAnREIBMABgsA2ALRWALI4DMyK1cwBWb5icWAFoMIBpaOvqi8ILY1naOLlbuVrY+fgHBAL5AA)

### Measure sink delivery metrics

This query returns detailed metrics about data written to a specific sink, including file and compression statistics.

```

query PipelineSinkMetrics(

  $accountTag: String!

  $pipelineId: String!

  $sinkId: String!

  $datetimeStart: Time!

  $datetimeEnd: Time!

) {

  viewer {

    accounts(filter: { accountTag: $accountTag }) {

      pipelinesSinkAdaptiveGroups(

        limit: 10000

        filter: {

          pipelineId: $pipelineId

          sinkId: $sinkId

          datetime_geq: $datetimeStart

          datetime_leq: $datetimeEnd

        }

      ) {

        sum {

          bytesWritten

          recordsWritten

          filesWritten

          rowGroupsWritten

          uncompressedBytesWritten

        }

      }

    }

  }

}


```

[Run in GraphQL API Explorer](https://graphql.cloudflare.com/explorer?query=I4VwpgTgngBACgSwA5gDYIHZgMqYNYCyYALhAgMYDOAFAFAwwAkAhueQPYgbEAqzA5gC4Y2Upn4BCekyTI0mMAEkAJsNFkMk6Y0r4VasZqkNGy5sRIIAtjmLMIxYT2thjTMxeIuAohlUxnGykAShgAb2kANwQwAHdIcOkGVg4uYhoAMwRUCwhhMJgUzm4+ISYitNKYAF9QiIYGmFkUdCxKXAw8AEEzJC9IsABxCE4kGiTGmHQrBEcYAEYABmXFicasnMh8tcnm+Sx9GTlWpWUdxt1Ow509M8nJj0sbAH1+MGBhU3Mn23tic4ajy8L1Q70+QJ8fgB1R2dQBlBAVkS90aACMoBZKAB1MjECwYAEMCBgDgQZTY3H4wkwDZgCmzKkohojWLDUb0vFgAlMhhcDhWJDEyiUMDKABCGLpOIZXOhOxhjQVNVo1SAA&variables=N4IghgxhD2CuB2AXAKmA5iAXCAggYTwHkBVAOWQH0BJAERABoQAHASyYFMAbF+dqgEywgASgFEACgBl8oigHUqyABLU6jAM48A1gKFipM+YpW0GIfmETtELALbsAyojAAnREIBMABg8A2ALReACyBAMzIXl6YAKzRmEEeAFpmFlY29qLwgtjefoEhXuFevjFxCckAvkA)

### Query dropped event errors

This query returns a summary of events that were dropped due to schema validation failures, grouped by error type and ordered by frequency.

```

query GetPipelineUserErrors(

  $accountTag: String!

  $pipelineId: String!

  $datetimeStart: Time!

  $datetimeEnd: Time!

) {

  viewer {

    accounts(filter: { accountTag: $accountTag }) {

      pipelinesUserErrorsAdaptiveGroups(

        limit: 100

        filter: {

          pipelineId: $pipelineId

          datetime_geq: $datetimeStart

          datetime_leq: $datetimeEnd

        }

        orderBy: [count_DESC]

      ) {

        count

        dimensions {

          date

          errorFamily

          errorType

        }

      }

    }

  }

}


```

[Run in GraphQL API Explorer](https://graphql.cloudflare.com/explorer?query=I4VwpgTgngBA4mALgBQJYAcwBtUDswCqAzpAKIQQD2ERAFAFAwwAkAhgMbuUi6IAqrAOYAuGAGVEEPIICEjFugzY8YAJIATURKm5Z85utaIkqALZgJrCIlF8zYOUwNGT50rk0w75uQEoYAN7yAG6oYADukIHyTBxcPIh0AGaoWMYQogEwcdy8AiIsOQn5MAC+-kFMVTCKmDj4RMRkFNREAIKG6IiowWBwVCDodDHVMDimqDYwAIwADLMj1SlpkJmLo7XK+BqizJv1aurr1YbG3eYA+oJgwLunrhaIVojHVffnYBdYN3cuH+5HUZVUqvajqSAAISgogA2vFeBcACKkMQAYQAuusKq94S8gUx1PZcERUJRidF8W8XK8mJAqBAAGKsCZYKA0mB06h8KCYV4goH84HyEGlIA&variables=N4IghgxhD2CuB2AXAKmA5iAXCAggYTwHkBVAOWQH0BJAERABoQAHASyYFMAbF+dqgEywgASgFEACgBl8oigHUqyABLU6jfmETtELALbsAyojAAnREIBMABgsA2ALRWALI4DMyK1cwBWb5icWAFoMIBpaOvqi8ILY1naOLlbuVrY+fgHBAL5AA)

Example response:

```

{

  "data": {

    "viewer": {

      "accounts": [

        {

          "pipelinesUserErrorsAdaptiveGroups": [

            {

              "count": 679,

              "dimensions": {

                "date": "2026-02-19",

                "errorFamily": "deserialization",

                "errorType": "missing_field"

              }

            },

            {

              "count": 392,

              "dimensions": {

                "date": "2026-02-19",

                "errorFamily": "deserialization",

                "errorType": "type_mismatch"

              }

            },

            {

              "count": 363,

              "dimensions": {

                "date": "2026-02-19",

                "errorFamily": "deserialization",

                "errorType": "parse_failure"

              }

            },

            {

              "count": 44,

              "dimensions": {

                "date": "2026-02-19",

                "errorFamily": "deserialization",

                "errorType": "null_value"

              }

            }

          ]

        }

      ]

    }

  },

  "errors": null

}


```

You can filter by a specific error type by adding `errorType` to the filter:

```

pipelinesUserErrorsAdaptiveGroups(

  limit: 100

  filter: {

    pipelineId: $pipelineId

    datetime_geq: $datetimeStart

    datetime_leq: $datetimeEnd

    errorType: "type_mismatch"

  }

  orderBy: [count_DESC]

)


```

To query errors across all pipelines on an account, omit the `pipelineId` filter and include `pipelineId` in the dimensions:

```

pipelinesUserErrorsAdaptiveGroups(

  limit: 100

  filter: {

    datetime_geq: $datetimeStart

    datetime_leq: $datetimeEnd

  }

  orderBy: [count_DESC]

) {

  count

  dimensions {

    pipelineId

    errorFamily

    errorType

  }

}


```

Note

In addition to `pipelinesUserErrorsAdaptiveGroups`, you can also query the `pipelinesUserErrorsAdaptive` dataset, which provides detailed error descriptions within the last 24 hours. Be aware that querying this dataset may return a large volume of data if your pipeline processes many events.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/observability/","name":"Observability"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/observability/metrics/","name":"Metrics and analytics"}}]}
```

---

---
title: Limits
description: While in open beta, the following limits are currently in effect:
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/platform/limits.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Limits

While in open beta, the following limits are currently in effect:

| Feature                                    | Limit  |
| ------------------------------------------ | ------ |
| Maximum streams per account                | 20     |
| Maximum payload size per ingestion request | 5 MB   |
| Maximum ingest rate per stream             | 5 MB/s |
| Maximum sinks per account                  | 20     |
| Maximum pipelines per account              | 20     |

Need a higher limit?

To request an adjustment to a limit, complete the [Limit Increase Request Form ↗](https://forms.gle/ukpeZVLWLnKeixDu7). If the limit can be increased, Cloudflare will contact you with next steps.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/platform/","name":"Platform"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/platform/limits/","name":"Limits"}}]}
```

---

---
title: Pricing
description: Cloudflare Pipelines is in open beta and available to any developer with a Workers Paid plan.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/platform/pricing.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Pricing

Cloudflare Pipelines is in open beta and available to any developer with a [Workers Paid plan](https://developers.cloudflare.com/workers/platform/pricing/).

We are not currently billing for Pipelines during open beta. However, you will be billed for standard [R2 storage and operations](https://developers.cloudflare.com/r2/pricing/) for data written by sinks to R2 buckets.

We plan to bill based on the volume of data processed by pipelines, transformed by pipelines, and delivered to sinks. We'll provide at least 30 days notice before we make any changes or start charging for Pipelines usage.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/platform/","name":"Platform"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/platform/pricing/","name":"Pricing"}}]}
```

---

---
title: Legacy pipelines
description: Legacy pipelines, those created before September 25, 2025 via the legacy API, are on a deprecation path.
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/reference/legacy-pipelines.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Legacy pipelines

Legacy pipelines, those created before September 25, 2025 via the legacy API, are on a deprecation path.

To check if your pipelines are legacy pipelines, view them in the dashboard under **Pipelines** \> **Pipelines** or run the [pipelines list](https://developers.cloudflare.com/workers/wrangler/commands/pipelines/#pipelines-list) command in [Wrangler](https://developers.cloudflare.com/workers/wrangler/). Legacy pipelines are labeled "legacy" in both locations.

New pipelines offer SQL transformations, multiple output formats, and improved architecture.

## Notable changes

* New pipelines support SQL transformations for data processing.
* New pipelines write to JSON, Parquet, and Apache Iceberg formats instead of JSON only.
* New pipelines separate streams, pipelines, and sinks into distinct resources.
* New pipelines support optional structured schemas with validation.
* New pipelines offer configurable rolling policies and customizable partitioning.

## Moving to new pipelines

Legacy pipelines will continue to work until Pipelines is Generally Available, but new features and improvements are only available in the new pipeline architecture. To migrate:

1. Create a new pipeline using the interactive setup:  
Terminal window  
```  
npx wrangler pipelines setup  
```
2. Configure your new pipeline with the desired streams, SQL transformations, and sinks.
3. Update your applications to send data to the new stream endpoints.
4. Once verified, delete your legacy pipeline.

For detailed guidance, refer to the [getting started guide](https://developers.cloudflare.com/pipelines/getting-started/).

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/reference/","name":"Reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/reference/legacy-pipelines/","name":"Legacy pipelines"}}]}
```

---

---
title: Wrangler commands
description: Interactive setup for a complete pipeline
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/reference/wrangler-commands.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Wrangler commands

## `pipelines setup`

Interactive setup for a complete pipeline

* [  npm ](#tab-panel-5486)
* [  pnpm ](#tab-panel-5487)
* [  yarn ](#tab-panel-5488)

Terminal window

```

npx wrangler pipelines setup


```

Terminal window

```

pnpm wrangler pipelines setup


```

Terminal window

```

yarn wrangler pipelines setup


```

* `--name` ` string `  
Pipeline name

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines create`

Create a new pipeline

* [  npm ](#tab-panel-5489)
* [  pnpm ](#tab-panel-5490)
* [  yarn ](#tab-panel-5491)

Terminal window

```

npx wrangler pipelines create [PIPELINE]


```

Terminal window

```

pnpm wrangler pipelines create [PIPELINE]


```

Terminal window

```

yarn wrangler pipelines create [PIPELINE]


```

* `[PIPELINE]` ` string ` required  
The name of the pipeline to create
* `--sql` ` string `  
Inline SQL query for the pipeline
* `--sql-file` ` string `  
Path to file containing SQL query for the pipeline

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines list`

List all pipelines

* [  npm ](#tab-panel-5492)
* [  pnpm ](#tab-panel-5493)
* [  yarn ](#tab-panel-5494)

Terminal window

```

npx wrangler pipelines list


```

Terminal window

```

pnpm wrangler pipelines list


```

Terminal window

```

yarn wrangler pipelines list


```

* `--page` ` number ` default: 1  
Page number for pagination
* `--per-page` ` number ` default: 20  
Number of pipelines per page
* `--json` ` boolean ` default: false  
Output in JSON format

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines get`

Get details about a specific pipeline

* [  npm ](#tab-panel-5495)
* [  pnpm ](#tab-panel-5496)
* [  yarn ](#tab-panel-5497)

Terminal window

```

npx wrangler pipelines get [PIPELINE]


```

Terminal window

```

pnpm wrangler pipelines get [PIPELINE]


```

Terminal window

```

yarn wrangler pipelines get [PIPELINE]


```

* `[PIPELINE]` ` string ` required  
The ID of the pipeline to retrieve
* `--json` ` boolean ` default: false  
Output in JSON format

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines update`

Update a pipeline configuration (legacy pipelines only)

* [  npm ](#tab-panel-5498)
* [  pnpm ](#tab-panel-5499)
* [  yarn ](#tab-panel-5500)

Terminal window

```

npx wrangler pipelines update [PIPELINE]


```

Terminal window

```

pnpm wrangler pipelines update [PIPELINE]


```

Terminal window

```

yarn wrangler pipelines update [PIPELINE]


```

* `[PIPELINE]` ` string ` required  
The name of the legacy pipeline to update
* `--source` ` array `  
Space separated list of allowed sources. Options are 'http' or 'worker'
* `--require-http-auth` ` boolean `  
Require Cloudflare API Token for HTTPS endpoint authentication
* `--cors-origins` ` array `  
CORS origin allowlist for HTTP endpoint (use \* for any origin). Defaults to an empty array
* `--batch-max-mb` ` number `  
Maximum batch size in megabytes before flushing. Defaults to 100 MB if unset. Minimum: 1, Maximum: 100
* `--batch-max-rows` ` number `  
Maximum number of rows per batch before flushing. Defaults to 10,000,000 if unset. Minimum: 100, Maximum: 10,000,000
* `--batch-max-seconds` ` number `  
Maximum age of batch in seconds before flushing. Defaults to 300 if unset. Minimum: 1, Maximum: 300
* `--r2-bucket` ` string `  
Destination R2 bucket name
* `--r2-access-key-id` ` string `  
R2 service Access Key ID for authentication. Leave empty for OAuth confirmation.
* `--r2-secret-access-key` ` string `  
R2 service Secret Access Key for authentication. Leave empty for OAuth confirmation.
* `--r2-prefix` ` string `  
Prefix for storing files in the destination bucket. Default is no prefix
* `--compression` ` string `  
Compression format for output files
* `--shard-count` ` number `  
Number of shards for the pipeline. More shards handle higher request volume; fewer shards produce larger output files. Defaults to 2 if unset. Minimum: 1, Maximum: 15

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines delete`

Delete a pipeline

* [  npm ](#tab-panel-5501)
* [  pnpm ](#tab-panel-5502)
* [  yarn ](#tab-panel-5503)

Terminal window

```

npx wrangler pipelines delete [PIPELINE]


```

Terminal window

```

pnpm wrangler pipelines delete [PIPELINE]


```

Terminal window

```

yarn wrangler pipelines delete [PIPELINE]


```

* `[PIPELINE]` ` string ` required  
The ID or name of the pipeline to delete
* `--force` ` boolean ` alias: --y default: false  
Skip confirmation

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines streams create`

Create a new stream

* [  npm ](#tab-panel-5504)
* [  pnpm ](#tab-panel-5505)
* [  yarn ](#tab-panel-5506)

Terminal window

```

npx wrangler pipelines streams create [STREAM]


```

Terminal window

```

pnpm wrangler pipelines streams create [STREAM]


```

Terminal window

```

yarn wrangler pipelines streams create [STREAM]


```

* `[STREAM]` ` string ` required  
The name of the stream to create
* `--schema-file` ` string `  
Path to JSON file containing stream schema
* `--http-enabled` ` boolean ` default: true  
Enable HTTP endpoint
* `--http-auth` ` boolean ` default: true  
Require authentication for HTTP endpoint
* `--cors-origin` ` string `  
CORS origin

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines streams list`

List all streams

* [  npm ](#tab-panel-5507)
* [  pnpm ](#tab-panel-5508)
* [  yarn ](#tab-panel-5509)

Terminal window

```

npx wrangler pipelines streams list


```

Terminal window

```

pnpm wrangler pipelines streams list


```

Terminal window

```

yarn wrangler pipelines streams list


```

* `--page` ` number ` default: 1  
Page number for pagination
* `--per-page` ` number ` default: 20  
Number of streams per page
* `--pipeline-id` ` string `  
Filter streams by pipeline ID
* `--json` ` boolean ` default: false  
Output in JSON format

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines streams get`

Get details about a specific stream

* [  npm ](#tab-panel-5510)
* [  pnpm ](#tab-panel-5511)
* [  yarn ](#tab-panel-5512)

Terminal window

```

npx wrangler pipelines streams get [STREAM]


```

Terminal window

```

pnpm wrangler pipelines streams get [STREAM]


```

Terminal window

```

yarn wrangler pipelines streams get [STREAM]


```

* `[STREAM]` ` string ` required  
The ID of the stream to retrieve
* `--json` ` boolean ` default: false  
Output in JSON format

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines streams delete`

Delete a stream

* [  npm ](#tab-panel-5513)
* [  pnpm ](#tab-panel-5514)
* [  yarn ](#tab-panel-5515)

Terminal window

```

npx wrangler pipelines streams delete [STREAM]


```

Terminal window

```

pnpm wrangler pipelines streams delete [STREAM]


```

Terminal window

```

yarn wrangler pipelines streams delete [STREAM]


```

* `[STREAM]` ` string ` required  
The ID of the stream to delete
* `--force` ` boolean ` alias: --y default: false  
Skip confirmation

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines sinks create`

Create a new sink

* [  npm ](#tab-panel-5516)
* [  pnpm ](#tab-panel-5517)
* [  yarn ](#tab-panel-5518)

Terminal window

```

npx wrangler pipelines sinks create [SINK]


```

Terminal window

```

pnpm wrangler pipelines sinks create [SINK]


```

Terminal window

```

yarn wrangler pipelines sinks create [SINK]


```

* `[SINK]` ` string ` required  
The name of the sink to create
* `--type` ` string ` required  
The type of sink to create
* `--bucket` ` string ` required  
R2 bucket name
* `--format` ` string ` default: parquet  
Output format
* `--compression` ` string ` default: zstd  
Compression method (parquet only)
* `--target-row-group-size` ` string `  
Target row group size for parquet format
* `--path` ` string `  
The base prefix in your bucket where data will be written
* `--partitioning` ` string `  
Time partition pattern (r2 sinks only)
* `--roll-size` ` number `  
Roll file size in MB
* `--roll-interval` ` number ` default: 300  
Roll file interval in seconds
* `--access-key-id` ` string `  
R2 access key ID (leave empty for R2 credentials to be automatically created)
* `--secret-access-key` ` string `  
R2 secret access key (leave empty for R2 credentials to be automatically created)
* `--namespace` ` string `  
Data catalog namespace (required for r2-data-catalog)
* `--table` ` string `  
Table name within namespace (required for r2-data-catalog)
* `--catalog-token` ` string `  
Authentication token for data catalog (required for r2-data-catalog)

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines sinks list`

List all sinks

* [  npm ](#tab-panel-5519)
* [  pnpm ](#tab-panel-5520)
* [  yarn ](#tab-panel-5521)

Terminal window

```

npx wrangler pipelines sinks list


```

Terminal window

```

pnpm wrangler pipelines sinks list


```

Terminal window

```

yarn wrangler pipelines sinks list


```

* `--page` ` number ` default: 1  
Page number for pagination
* `--per-page` ` number ` default: 20  
Number of sinks per page
* `--pipeline-id` ` string `  
Filter sinks by pipeline ID
* `--json` ` boolean ` default: false  
Output in JSON format

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines sinks get`

Get details about a specific sink

* [  npm ](#tab-panel-5522)
* [  pnpm ](#tab-panel-5523)
* [  yarn ](#tab-panel-5524)

Terminal window

```

npx wrangler pipelines sinks get [SINK]


```

Terminal window

```

pnpm wrangler pipelines sinks get [SINK]


```

Terminal window

```

yarn wrangler pipelines sinks get [SINK]


```

* `[SINK]` ` string ` required  
The ID of the sink to retrieve
* `--json` ` boolean ` default: false  
Output in JSON format

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

## `pipelines sinks delete`

Delete a sink

* [  npm ](#tab-panel-5525)
* [  pnpm ](#tab-panel-5526)
* [  yarn ](#tab-panel-5527)

Terminal window

```

npx wrangler pipelines sinks delete [SINK]


```

Terminal window

```

pnpm wrangler pipelines sinks delete [SINK]


```

Terminal window

```

yarn wrangler pipelines sinks delete [SINK]


```

* `[SINK]` ` string ` required  
The ID of the sink to delete
* `--force` ` boolean ` alias: --y default: false  
Skip confirmation

Global flags

* `--v` ` boolean ` alias: --version  
Show version number
* `--cwd` ` string `  
Run as if Wrangler was started in the specified directory instead of the current working directory
* `--config` ` string ` alias: --c  
Path to Wrangler configuration file
* `--env` ` string ` alias: --e  
Environment to use for operations, and for selecting .env and .dev.vars files
* `--env-file` ` string `  
Path to an .env file to load - can be specified multiple times - values from earlier files are overridden by values in later files
* `--experimental-provision` ` boolean ` aliases: --x-provision default: true  
Experimental: Enable automatic resource provisioning
* `--experimental-auto-create` ` boolean ` alias: --x-auto-create default: true  
Automatically provision draft bindings with new resources

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/reference/","name":"Reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/reference/wrangler-commands/","name":"Wrangler commands"}}]}
```

---

---
title: Array functions
description: Scalar functions for manipulating arrays
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/array.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Array functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `array_append`

Appends an element to the end of an array.

```

array_append(array, element)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to append to the array.

**Example**

```

> select array_append([1, 2, 3], 4);

+--------------------------------------+

| array_append(List([1,2,3]),Int64(4)) |

+--------------------------------------+

| [1, 2, 3, 4]                         |

+--------------------------------------+


```

**Aliases**

* array\_push\_back
* list\_append
* list\_push\_back

## `array_sort`

Sort array.

```

array_sort(array, desc, nulls_first)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **desc**: Whether to sort in descending order(`ASC` or `DESC`).
* **nulls\_first**: Whether to sort nulls first(`NULLS FIRST` or `NULLS LAST`).

**Example**

```

> select array_sort([3, 1, 2]);

+-----------------------------+

| array_sort(List([3,1,2]))   |

+-----------------------------+

| [1, 2, 3]                   |

+-----------------------------+


```

**Aliases**

* list\_sort

## `array_resize`

Resizes the list to contain size elements. Initializes new elements with value or empty if value is not set.

```

array_resize(array, size, value)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **size**: New size of given array.
* **value**: Defines new elements' value or empty if value is not set.

**Example**

```

> select array_resize([1, 2, 3], 5, 0);

+-------------------------------------+

| array_resize(List([1,2,3],5,0))     |

+-------------------------------------+

| [1, 2, 3, 0, 0]                     |

+-------------------------------------+


```

**Aliases**

* list\_resize

## `array_cat`

_Alias of [array\_concat](#array%5Fconcat)._

## `array_concat`

Concatenates arrays.

```

array_concat(array[, ..., array_n])


```

**Arguments**

* **array**: Array expression to concatenate. Can be a constant, column, or function, and any combination of array operators.
* **array\_n**: Subsequent array column or literal array to concatenate.

**Example**

```

> select array_concat([1, 2], [3, 4], [5, 6]);

+---------------------------------------------------+

| array_concat(List([1,2]),List([3,4]),List([5,6])) |

+---------------------------------------------------+

| [1, 2, 3, 4, 5, 6]                                |

+---------------------------------------------------+


```

**Aliases**

* array\_cat
* list\_cat
* list\_concat

## `array_contains`

_Alias of [array\_has](#array%5Fhas)._

## `array_has`

Returns true if the array contains the element

```

array_has(array, element)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Scalar or Array expression. Can be a constant, column, or function, and any combination of array operators.

**Aliases**

* list\_has

## `array_has_all`

Returns true if all elements of sub-array exist in array

```

array_has_all(array, sub-array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **sub-array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Aliases**

* list\_has\_all

## `array_has_any`

Returns true if any elements exist in both arrays

```

array_has_any(array, sub-array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **sub-array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Aliases**

* list\_has\_any

## `array_dims`

Returns an array of the array's dimensions.

```

array_dims(array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_dims([[1, 2, 3], [4, 5, 6]]);

+---------------------------------+

| array_dims(List([1,2,3,4,5,6])) |

+---------------------------------+

| [2, 3]                          |

+---------------------------------+


```

**Aliases**

* list\_dims

## `array_distinct`

Returns distinct values from the array after removing duplicates.

```

array_distinct(array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_distinct([1, 3, 2, 3, 1, 2, 4]);

+---------------------------------+

| array_distinct(List([1,2,3,4])) |

+---------------------------------+

| [1, 2, 3, 4]                    |

+---------------------------------+


```

**Aliases**

* list\_distinct

## `array_element`

Extracts the element with the index n from the array.

```

array_element(array, index)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **index**: Index to extract the element from the array.

**Example**

```

> select array_element([1, 2, 3, 4], 3);

+-----------------------------------------+

| array_element(List([1,2,3,4]),Int64(3)) |

+-----------------------------------------+

| 3                                       |

+-----------------------------------------+


```

**Aliases**

* array\_extract
* list\_element
* list\_extract

## `array_extract`

_Alias of [array\_element](#array%5Felement)._

## `array_fill`

Returns an array filled with copies of the given value.

DEPRECATED: use `array_repeat` instead!

```

array_fill(element, array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to copy to the array.

## `flatten`

Converts an array of arrays to a flat array

* Applies to any depth of nested arrays
* Does not change arrays that are already flat

The flattened array contains all the elements from all source arrays.

**Arguments**

* **array**: Array expression Can be a constant, column, or function, and any combination of array operators.

```

flatten(array)


```

## `array_indexof`

_Alias of [array\_position](#array%5Fposition)._

## `array_intersect`

Returns an array of elements in the intersection of array1 and array2.

```

array_intersect(array1, array2)


```

**Arguments**

* **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_intersect([1, 2, 3, 4], [5, 6, 3, 4]);

+----------------------------------------------------+

| array_intersect([1, 2, 3, 4], [5, 6, 3, 4]);       |

+----------------------------------------------------+

| [3, 4]                                             |

+----------------------------------------------------+

> select array_intersect([1, 2, 3, 4], [5, 6, 7, 8]);

+----------------------------------------------------+

| array_intersect([1, 2, 3, 4], [5, 6, 7, 8]);       |

+----------------------------------------------------+

| []                                                 |

+----------------------------------------------------+


```

---

**Aliases**

* list\_intersect

## `array_join`

_Alias of [array\_to\_string](#array%5Fto%5Fstring)._

## `array_length`

Returns the length of the array dimension.

```

array_length(array, dimension)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **dimension**: Array dimension.

**Example**

```

> select array_length([1, 2, 3, 4, 5]);

+---------------------------------+

| array_length(List([1,2,3,4,5])) |

+---------------------------------+

| 5                               |

+---------------------------------+


```

**Aliases**

* list\_length

## `array_ndims`

Returns the number of dimensions of the array.

```

array_ndims(array, element)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_ndims([[1, 2, 3], [4, 5, 6]]);

+----------------------------------+

| array_ndims(List([1,2,3,4,5,6])) |

+----------------------------------+

| 2                                |

+----------------------------------+


```

**Aliases**

* list\_ndims

## `array_prepend`

Prepends an element to the beginning of an array.

```

array_prepend(element, array)


```

**Arguments**

* **element**: Element to prepend to the array.
* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_prepend(1, [2, 3, 4]);

+---------------------------------------+

| array_prepend(Int64(1),List([2,3,4])) |

+---------------------------------------+

| [1, 2, 3, 4]                          |

+---------------------------------------+


```

**Aliases**

* array\_push\_front
* list\_prepend
* list\_push\_front

## `array_pop_front`

Returns the array without the first element.

```

array_pop_front(array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_pop_front([1, 2, 3]);

+-------------------------------+

| array_pop_front(List([1,2,3])) |

+-------------------------------+

| [2, 3]                        |

+-------------------------------+


```

**Aliases**

* list\_pop\_front

## `array_pop_back`

Returns the array without the last element.

```

array_pop_back(array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_pop_back([1, 2, 3]);

+-------------------------------+

| array_pop_back(List([1,2,3])) |

+-------------------------------+

| [1, 2]                        |

+-------------------------------+


```

**Aliases**

* list\_pop\_back

## `array_position`

Returns the position of the first occurrence of the specified element in the array.

```

array_position(array, element)

array_position(array, element, index)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to search for position in the array.
* **index**: Index at which to start searching.

**Example**

```

> select array_position([1, 2, 2, 3, 1, 4], 2);

+----------------------------------------------+

| array_position(List([1,2,2,3,1,4]),Int64(2)) |

+----------------------------------------------+

| 2                                            |

+----------------------------------------------+


```

**Aliases**

* array\_indexof
* list\_indexof
* list\_position

## `array_positions`

Searches for an element in the array, returns all occurrences.

```

array_positions(array, element)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to search for positions in the array.

**Example**

```

> select array_positions([1, 2, 2, 3, 1, 4], 2);

+-----------------------------------------------+

| array_positions(List([1,2,2,3,1,4]),Int64(2)) |

+-----------------------------------------------+

| [2, 3]                                        |

+-----------------------------------------------+


```

**Aliases**

* list\_positions

## `array_push_back`

_Alias of [array\_append](#array%5Fappend)._

## `array_push_front`

_Alias of [array\_prepend](#array%5Fprepend)._

## `array_repeat`

Returns an array containing element `count` times.

```

array_repeat(element, count)


```

**Arguments**

* **element**: Element expression. Can be a constant, column, or function, and any combination of array operators.
* **count**: Value of how many times to repeat the element.

**Example**

```

> select array_repeat(1, 3);

+---------------------------------+

| array_repeat(Int64(1),Int64(3)) |

+---------------------------------+

| [1, 1, 1]                       |

+---------------------------------+


```

```

> select array_repeat([1, 2], 2);

+------------------------------------+

| array_repeat(List([1,2]),Int64(2)) |

+------------------------------------+

| [[1, 2], [1, 2]]                   |

+------------------------------------+


```

**Aliases**

* list\_repeat

## `array_remove`

Removes the first element from the array equal to the given value.

```

array_remove(array, element)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to be removed from the array.

**Example**

```

> select array_remove([1, 2, 2, 3, 2, 1, 4], 2);

+----------------------------------------------+

| array_remove(List([1,2,2,3,2,1,4]),Int64(2)) |

+----------------------------------------------+

| [1, 2, 3, 2, 1, 4]                           |

+----------------------------------------------+


```

**Aliases**

* list\_remove

## `array_remove_n`

Removes the first `max` elements from the array equal to the given value.

```

array_remove_n(array, element, max)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to be removed from the array.
* **max**: Number of first occurrences to remove.

**Example**

```

> select array_remove_n([1, 2, 2, 3, 2, 1, 4], 2, 2);

+---------------------------------------------------------+

| array_remove_n(List([1,2,2,3,2,1,4]),Int64(2),Int64(2)) |

+---------------------------------------------------------+

| [1, 3, 2, 1, 4]                                         |

+---------------------------------------------------------+


```

**Aliases**

* list\_remove\_n

## `array_remove_all`

Removes all elements from the array equal to the given value.

```

array_remove_all(array, element)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **element**: Element to be removed from the array.

**Example**

```

> select array_remove_all([1, 2, 2, 3, 2, 1, 4], 2);

+--------------------------------------------------+

| array_remove_all(List([1,2,2,3,2,1,4]),Int64(2)) |

+--------------------------------------------------+

| [1, 3, 1, 4]                                     |

+--------------------------------------------------+


```

**Aliases**

* list\_remove\_all

## `array_replace`

Replaces the first occurrence of the specified element with another specified element.

```

array_replace(array, from, to)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **from**: Initial element.
* **to**: Final element.

**Example**

```

> select array_replace([1, 2, 2, 3, 2, 1, 4], 2, 5);

+--------------------------------------------------------+

| array_replace(List([1,2,2,3,2,1,4]),Int64(2),Int64(5)) |

+--------------------------------------------------------+

| [1, 5, 2, 3, 2, 1, 4]                                  |

+--------------------------------------------------------+


```

**Aliases**

* list\_replace

## `array_replace_n`

Replaces the first `max` occurrences of the specified element with another specified element.

```

array_replace_n(array, from, to, max)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **from**: Initial element.
* **to**: Final element.
* **max**: Number of first occurrences to replace.

**Example**

```

> select array_replace_n([1, 2, 2, 3, 2, 1, 4], 2, 5, 2);

+-------------------------------------------------------------------+

| array_replace_n(List([1,2,2,3,2,1,4]),Int64(2),Int64(5),Int64(2)) |

+-------------------------------------------------------------------+

| [1, 5, 5, 3, 2, 1, 4]                                             |

+-------------------------------------------------------------------+


```

**Aliases**

* list\_replace\_n

## `array_replace_all`

Replaces all occurrences of the specified element with another specified element.

```

array_replace_all(array, from, to)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **from**: Initial element.
* **to**: Final element.

**Example**

```

> select array_replace_all([1, 2, 2, 3, 2, 1, 4], 2, 5);

+------------------------------------------------------------+

| array_replace_all(List([1,2,2,3,2,1,4]),Int64(2),Int64(5)) |

+------------------------------------------------------------+

| [1, 5, 5, 3, 5, 1, 4]                                      |

+------------------------------------------------------------+


```

**Aliases**

* list\_replace\_all

## `array_reverse`

Returns the array with the order of the elements reversed.

```

array_reverse(array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_reverse([1, 2, 3, 4]);

+------------------------------------------------------------+

| array_reverse(List([1, 2, 3, 4]))                          |

+------------------------------------------------------------+

| [4, 3, 2, 1]                                               |

+------------------------------------------------------------+


```

**Aliases**

* list\_reverse

## `array_slice`

Returns a slice of the array based on 1-indexed start and end positions.

```

array_slice(array, begin, end)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **begin**: Index of the first element. If negative, it counts backward from the end of the array.
* **end**: Index of the last element. If negative, it counts backward from the end of the array.
* **stride**: Stride of the array slice. The default is 1.

**Example**

```

> select array_slice([1, 2, 3, 4, 5, 6, 7, 8], 3, 6);

+--------------------------------------------------------+

| array_slice(List([1,2,3,4,5,6,7,8]),Int64(3),Int64(6)) |

+--------------------------------------------------------+

| [3, 4, 5, 6]                                           |

+--------------------------------------------------------+


```

**Aliases**

* list\_slice

## `array_to_string`

Converts each element to its text representation.

```

array_to_string(array, delimiter)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **delimiter**: Array element separator.

**Example**

```

> select array_to_string([[1, 2, 3, 4], [5, 6, 7, 8]], ',');

+----------------------------------------------------+

| array_to_string(List([1,2,3,4,5,6,7,8]),Utf8(",")) |

+----------------------------------------------------+

| 1,2,3,4,5,6,7,8                                    |

+----------------------------------------------------+


```

**Aliases**

* array\_join
* list\_join
* list\_to\_string

## `array_union`

Returns an array of elements that are present in both arrays (all elements from both arrays) with out duplicates.

```

array_union(array1, array2)


```

**Arguments**

* **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_union([1, 2, 3, 4], [5, 6, 3, 4]);

+----------------------------------------------------+

| array_union([1, 2, 3, 4], [5, 6, 3, 4]);           |

+----------------------------------------------------+

| [1, 2, 3, 4, 5, 6]                                 |

+----------------------------------------------------+

> select array_union([1, 2, 3, 4], [5, 6, 7, 8]);

+----------------------------------------------------+

| array_union([1, 2, 3, 4], [5, 6, 7, 8]);           |

+----------------------------------------------------+

| [1, 2, 3, 4, 5, 6, 7, 8]                           |

+----------------------------------------------------+


```

---

**Aliases**

* list\_union

## `array_except`

Returns an array of the elements that appear in the first array but not in the second.

```

array_except(array1, array2)


```

**Arguments**

* **array1**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **array2**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select array_except([1, 2, 3, 4], [5, 6, 3, 4]);

+----------------------------------------------------+

| array_except([1, 2, 3, 4], [5, 6, 3, 4]);           |

+----------------------------------------------------+

| [1, 2]                                 |

+----------------------------------------------------+

> select array_except([1, 2, 3, 4], [3, 4, 5, 6]);

+----------------------------------------------------+

| array_except([1, 2, 3, 4], [3, 4, 5, 6]);           |

+----------------------------------------------------+

| [1, 2]                                 |

+----------------------------------------------------+


```

---

**Aliases**

* list\_except

## `cardinality`

Returns the total number of elements in the array.

```

cardinality(array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select cardinality([[1, 2, 3, 4], [5, 6, 7, 8]]);

+--------------------------------------+

| cardinality(List([1,2,3,4,5,6,7,8])) |

+--------------------------------------+

| 8                                    |

+--------------------------------------+


```

## `empty`

Returns 1 for an empty array or 0 for a non-empty array.

```

empty(array)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.

**Example**

```

> select empty([1]);

+------------------+

| empty(List([1])) |

+------------------+

| 0                |

+------------------+


```

**Aliases**

* array\_empty,
* list\_empty

## `generate_series`

Similar to the range function, but it includes the upper bound.

```

generate_series(start, stop, step)


```

**Arguments**

* **start**: start of the range
* **end**: end of the range (included)
* **step**: increase by step (can not be 0)

**Example**

```

> select generate_series(1,3);

+------------------------------------+

| generate_series(Int64(1),Int64(3)) |

+------------------------------------+

| [1, 2, 3]                          |

+------------------------------------+


```

## `list_append`

_Alias of [array\_append](#array%5Fappend)._

## `list_cat`

_Alias of [array\_concat](#array%5Fconcat)._

## `list_concat`

_Alias of [array\_concat](#array%5Fconcat)._

## `list_dims`

_Alias of [array\_dims](#array%5Fdims)._

## `list_distinct`

_Alias of [array\_dims](#array%5Fdistinct)._

## `list_element`

_Alias of [array\_element](#array%5Felement)._

## `list_empty`

_Alias of [empty](#empty)._

## `list_except`

_Alias of [array\_element](#array%5Fexcept)._

## `list_extract`

_Alias of [array\_element](#array%5Felement)._

## `list_has`

_Alias of [array\_has](#array%5Fhas)._

## `list_has_all`

_Alias of [array\_has\_all](#array%5Fhas%5Fall)._

## `list_has_any`

_Alias of [array\_has\_any](#array%5Fhas%5Fany)._

## `list_indexof`

_Alias of [array\_position](#array%5Fposition)._

## `list_intersect`

_Alias of [array\_position](#array%5Fintersect)._

## `list_join`

_Alias of [array\_to\_string](#array%5Fto%5Fstring)._

## `list_length`

_Alias of [array\_length](#array%5Flength)._

## `list_ndims`

_Alias of [array\_ndims](#array%5Fndims)._

## `list_prepend`

_Alias of [array\_prepend](#array%5Fprepend)._

## `list_pop_back`

_Alias of [array\_pop\_back](#array%5Fpop%5Fback)._

## `list_pop_front`

_Alias of [array\_pop\_front](#array%5Fpop%5Ffront)._

## `list_position`

_Alias of [array\_position](#array%5Fposition)._

## `list_positions`

_Alias of [array\_positions](#array%5Fpositions)._

## `list_push_back`

_Alias of [array\_append](#array%5Fappend)._

## `list_push_front`

_Alias of [array\_prepend](#array%5Fprepend)._

## `list_repeat`

_Alias of [array\_repeat](#array%5Frepeat)._

## `list_resize`

_Alias of [array\_resize](#array%5Fresize)._

## `list_remove`

_Alias of [array\_remove](#array%5Fremove)._

## `list_remove_n`

_Alias of [array\_remove\_n](#array%5Fremove%5Fn)._

## `list_remove_all`

_Alias of [array\_remove\_all](#array%5Fremove%5Fall)._

## `list_replace`

_Alias of [array\_replace](#array%5Freplace)._

## `list_replace_n`

_Alias of [array\_replace\_n](#array%5Freplace%5Fn)._

## `list_replace_all`

_Alias of [array\_replace\_all](#array%5Freplace%5Fall)._

## `list_reverse`

_Alias of [array\_reverse](#array%5Freverse)._

## `list_slice`

_Alias of [array\_slice](#array%5Fslice)._

## `list_sort`

_Alias of [array\_sort](#array%5Fsort)._

## `list_to_string`

_Alias of [array\_to\_string](#array%5Fto%5Fstring)._

## `list_union`

_Alias of [array\_union](#array%5Funion)._

## `make_array`

Returns an Arrow array using the specified input expressions.

```

make_array(expression1[, ..., expression_n])


```

## `array_empty`

_Alias of [empty](#empty)._

**Arguments**

* **expression\_n**: Expression to include in the output array. Can be a constant, column, or function, and any combination of arithmetic or string operators.

**Example**

```

> select make_array(1, 2, 3, 4, 5);

+----------------------------------------------------------+

| make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)) |

+----------------------------------------------------------+

| [1, 2, 3, 4, 5]                                          |

+----------------------------------------------------------+


```

**Aliases**

* make\_list

## `make_list`

_Alias of [make\_array](#make%5Farray)._

## `string_to_array`

Splits a string in to an array of substrings based on a delimiter. Any substrings matching the optional `null_str` argument are replaced with NULL.`SELECT string_to_array('abc##def', '##')` or `SELECT string_to_array('abc def', ' ', 'def')`

```

starts_with(str, delimiter[, null_str])


```

**Arguments**

* **str**: String expression to split.
* **delimiter**: Delimiter string to split on.
* **null\_str**: Substring values to be replaced with `NULL`

**Aliases**

* string\_to\_list

## `string_to_list`

_Alias of [string\_to\_array](#string%5Fto%5Farray)._

## `trim_array`

Removes the last n elements from the array.

DEPRECATED: use `array_slice` instead!

```

trim_array(array, n)


```

**Arguments**

* **array**: Array expression. Can be a constant, column, or function, and any combination of array operators.
* **n**: Element to trim the array.

## `range`

Returns an Arrow array between start and stop with step. `SELECT range(2, 10, 3) -> [2, 5, 8]` or `SELECT range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);`

The range start..end contains all values with start <= x < end. It is empty if start >= end.

Step can not be 0 (then the range will be nonsense.).

Note that when the required range is a number, it accepts (stop), (start, stop), and (start, stop, step) as parameters, but when the required range is a date, it must be 3 non-NULL parameters. For example,

```

SELECT range(3);

SELECT range(1,5);

SELECT range(1,5,1);


```

are allowed in number ranges

but in date ranges, only

```

SELECT range(DATE '1992-09-01', DATE '1993-03-01', INTERVAL '1' MONTH);


```

is allowed, and

```

SELECT range(DATE '1992-09-01', DATE '1993-03-01', NULL);

SELECT range(NULL, DATE '1993-03-01', INTERVAL '1' MONTH);

SELECT range(DATE '1992-09-01', NULL, INTERVAL '1' MONTH);


```

are not allowed

**Arguments**

* **start**: start of the range
* **end**: end of the range (not included)
* **step**: increase by step (can not be 0)

**Aliases**

* generate\_series

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/array/","name":"Array functions"}}]}
```

---

---
title: Binary string functions
description: Scalar functions for manipulating binary strings
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/binary-string.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Binary string functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `encode`

Encode binary data into a textual representation.

```

encode(expression, format)


```

**Arguments**

* **expression**: Expression containing string or binary data
* **format**: Supported formats are: `base64`, `hex`

**Related functions**:[decode](#decode)

## `decode`

Decode binary data from textual representation in string.

```

decode(expression, format)


```

**Arguments**

* **expression**: Expression containing encoded string data
* **format**: Same arguments as [encode](#encode)

**Related functions**:[encode](#encode)

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/binary-string/","name":"Binary string functions"}}]}
```

---

---
title: Conditional functions
description: Scalar functions to implement conditional logic
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/conditional.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Conditional functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `coalesce`

Returns the first of its arguments that is not _null_. Returns _null_ if all arguments are _null_. This function is often used to substitute a default value for _null_ values.

```

coalesce(expression1[, ..., expression_n])


```

**Arguments**

* **expression1, expression\_n**: Expression to use if previous expressions are _null_. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary.

## `nullif`

Returns _null_ if _expression1_ equals _expression2_; otherwise it returns _expression1_. This can be used to perform the inverse operation of [coalesce](#coalesce).

```

nullif(expression1, expression2)


```

**Arguments**

* **expression1**: Expression to compare and return if equal to expression2\. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression2**: Expression to compare to expression1\. Can be a constant, column, or function, and any combination of arithmetic operators.

## `nvl`

Returns _expression2_ if _expression1_ is NULL; otherwise it returns _expression1_.

```

nvl(expression1, expression2)


```

**Arguments**

* **expression1**: return if expression1 not is NULL. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression2**: return if expression1 is NULL. Can be a constant, column, or function, and any combination of arithmetic operators.

## `nvl2`

Returns _expression2_ if _expression1_ is not NULL; otherwise it returns _expression3_.

```

nvl2(expression1, expression2, expression3)


```

**Arguments**

* **expression1**: conditional expression. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression2**: return if expression1 is not NULL. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression3**: return if expression1 is NULL. Can be a constant, column, or function, and any combination of arithmetic operators.

## `ifnull`

_Alias of [nvl](#nvl)._

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/conditional/","name":"Conditional functions"}}]}
```

---

---
title: Hashing functions
description: Scalar functions for hashing values
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/hashing.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Hashing functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `digest`

Computes the binary hash of an expression using the specified algorithm.

```

digest(expression, algorithm)


```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **algorithm**: String expression specifying algorithm to use. Must be one of:  
   * md5  
   * sha224  
   * sha256  
   * sha384  
   * sha512  
   * blake2s  
   * blake2b  
   * blake3

## `md5`

Computes an MD5 128-bit checksum for a string expression.

```

md5(expression)


```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha224`

Computes the SHA-224 hash of a binary string.

```

sha224(expression)


```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha256`

Computes the SHA-256 hash of a binary string.

```

sha256(expression)


```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha384`

Computes the SHA-384 hash of a binary string.

```

sha384(expression)


```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

## `sha512`

Computes the SHA-512 hash of a binary string.

```

sha512(expression)


```

**Arguments**

* **expression**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/hashing/","name":"Hashing functions"}}]}
```

---

---
title: JSON functions
description: Scalar functions for manipulating JSON
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/json.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# JSON functions

Cloudflare Pipelines provides two set of JSON functions, the first based on PostgreSQL's SQL functions and syntax, and the second based on the[JSONPath ↗](https://jsonpath.com/) standard.

## SQL functions

The SQL functions provide basic JSON parsing functions similar to those found in PostgreSQL.

### json\_contains

Returns `true` if the JSON string contains the specified key(s).

```

SELECT json_contains('{"a": 1, "b": 2, "c": 3}', 'a') FROM source;

true


```

Also available via the `?` operator:

```

SELECT '{"a": 1, "b": 2, "c": 3}' ? 'a' FROM source;

true


```

### json\_get

Retrieves the value from a JSON string by the specified path (keys). Returns the value as its native type (string, int, etc.).

```

SELECT json_get('{"a": {"b": 2}}', 'a', 'b') FROM source;

2


```

Also available via the `->` operator:

```

SELECT '{"a": {"b": 2}}'->'a'->'b' FROM source;

2


```

Various permutations of `json_get` functions are available for retrieving values as a specific type, or you can use SQL type annotations:

```

SELECT json_get('{"a": {"b": 2}}', 'a', 'b')::int FROM source;

2


```

### json\_get\_str

Retrieves a string value from a JSON string by the specified path. Returns an empty string if the value does not exist or is not a string.

```

SELECT json_get_str('{"a": {"b": "hello"}}', 'a', 'b') FROM source;

"hello"


```

### json\_get\_int

Retrieves an integer value from a JSON string by the specified path. Returns `0`if the value does not exist or is not an integer.

```

SELECT json_get_int('{"a": {"b": 42}}', 'a', 'b') FROM source;

42


```

### json\_get\_float

Retrieves a float value from a JSON string by the specified path. Returns `0.0`if the value does not exist or is not a float.

```

SELECT json_get_float('{"a": {"b": 3.14}}', 'a', 'b') FROM source;

3.14


```

### json\_get\_bool

Retrieves a boolean value from a JSON string by the specified path. Returns`false` if the value does not exist or is not a boolean.

```

SELECT json_get_bool('{"a": {"b": true}}', 'a', 'b') FROM source;

true


```

### json\_get\_json

Retrieves a nested JSON string from a JSON string by the specified path. The value is returned as raw JSON.

```

SELECT json_get_json('{"a": {"b": {"c": 1}}}', 'a', 'b') FROM source;

'{"c": 1}'


```

### json\_as\_text

Retrieves any value from a JSON string by the specified path and returns it as a string, regardless of the original type.

```

SELECT json_as_text('{"a": {"b": 42}}', 'a', 'b') FROM source;

"42"


```

Also available via the `->>` operator:

```

SELECT '{"a": {"b": 42}}'->>'a'->>'b' FROM source;

"42"


```

### json\_length

Returns the length of a JSON object or array at the specified path. Returns `0`if the path does not exist or is not an object/array.

```

SELECT json_length('{"a": [1, 2, 3]}', 'a') FROM source;

3


```

## Json path functions

JSON functions provide basic json parsing functions using[JsonPath ↗](https://goessner.net/articles/JsonPath/), an evolving standard for querying JSON objects.

### extract\_json

Returns the JSON elements in the first argument that match the JsonPath in the second argument. The returned value is an array of json strings.

```

SELECT extract_json('{"a": 1, "b": 2, "c": 3}', '$.a') FROM source;

['1']


```

### extract\_json\_string

Returns an unescaped String for the first item matching the JsonPath, if it is a string.

```

SELECT extract_json_string('{"a": "a", "b": 2, "c": 3}', '$.a') FROM source;

'a'


```

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/json/","name":"JSON functions"}}]}
```

---

---
title: Math functions
description: Scalar functions for mathematical operations
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/math.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Math functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `abs`

Returns the absolute value of a number.

```

abs(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `acos`

Returns the arc cosine or inverse cosine of a number.

```

acos(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `acosh`

Returns the area hyperbolic cosine or inverse hyperbolic cosine of a number.

```

acosh(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `asin`

Returns the arc sine or inverse sine of a number.

```

asin(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `asinh`

Returns the area hyperbolic sine or inverse hyperbolic sine of a number.

```

asinh(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `atan`

Returns the arc tangent or inverse tangent of a number.

```

atan(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `atanh`

Returns the area hyperbolic tangent or inverse hyperbolic tangent of a number.

```

atanh(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `atan2`

Returns the arc tangent or inverse tangent of `expression_y / expression_x`.

```

atan2(expression_y, expression_x)


```

**Arguments**

* **expression\_y**: First numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_x**: Second numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `cbrt`

Returns the cube root of a number.

```

cbrt(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `ceil`

Returns the nearest integer greater than or equal to a number.

```

ceil(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `cos`

Returns the cosine of a number.

```

cos(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `cosh`

Returns the hyperbolic cosine of a number.

```

cosh(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `degrees`

Converts radians to degrees.

```

degrees(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `exp`

Returns the base-e exponential of a number.

```

exp(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to use as the exponent. Can be a constant, column, or function, and any combination of arithmetic operators.

## `factorial`

Factorial. Returns 1 if value is less than 2.

```

factorial(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `floor`

Returns the nearest integer less than or equal to a number.

```

floor(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `gcd`

Returns the greatest common divisor of `expression_x` and `expression_y`. Returns 0 if both inputs are zero.

```

gcd(expression_x, expression_y)


```

**Arguments**

* **expression\_x**: First numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_y**: Second numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `isnan`

Returns true if a given number is +NaN or -NaN otherwise returns false.

```

isnan(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `iszero`

Returns true if a given number is +0.0 or -0.0 otherwise returns false.

```

iszero(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `lcm`

Returns the least common multiple of `expression_x` and `expression_y`. Returns 0 if either input is zero.

```

lcm(expression_x, expression_y)


```

**Arguments**

* **expression\_x**: First numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_y**: Second numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `ln`

Returns the natural logarithm of a number.

```

ln(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `log`

Returns the base-x logarithm of a number. Can either provide a specified base, or if omitted then takes the base-10 of a number.

```

log(base, numeric_expression)

log(numeric_expression)


```

**Arguments**

* **base**: Base numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `log10`

Returns the base-10 logarithm of a number.

```

log10(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `log2`

Returns the base-2 logarithm of a number.

```

log2(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `nanvl`

Returns the first argument if it's not _NaN_. Returns the second argument otherwise.

```

nanvl(expression_x, expression_y)


```

**Arguments**

* **expression\_x**: Numeric expression to return if it's not _NaN_. Can be a constant, column, or function, and any combination of arithmetic operators.
* **expression\_y**: Numeric expression to return if the first expression is _NaN_. Can be a constant, column, or function, and any combination of arithmetic operators.

## `pi`

Returns an approximate value of π.

```

pi()


```

## `power`

Returns a base expression raised to the power of an exponent.

```

power(base, exponent)


```

**Arguments**

* **base**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **exponent**: Exponent numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

**Aliases**

* pow

## `pow`

_Alias of [power](#power)._

## `radians`

Converts degrees to radians.

```

radians(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `random`

Returns a random float value in the range \[0, 1). The random seed is unique to each row.

```

random()


```

## `round`

Rounds a number to the nearest integer.

```

round(numeric_expression[, decimal_places])


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **decimal\_places**: Optional. The number of decimal places to round to. Defaults to 0.

## `signum`

Returns the sign of a number. Negative numbers return `-1`. Zero and positive numbers return `1`.

```

signum(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `sin`

Returns the sine of a number.

```

sin(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `sinh`

Returns the hyperbolic sine of a number.

```

sinh(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `sqrt`

Returns the square root of a number.

```

sqrt(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `tan`

Returns the tangent of a number.

```

tan(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `tanh`

Returns the hyperbolic tangent of a number.

```

tanh(numeric_expression)


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `trunc`

Truncates a number to a whole number or truncated to the specified decimal places.

```

trunc(numeric_expression[, decimal_places])


```

**Arguments**

* **numeric\_expression**: Numeric expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **decimal\_places**: Optional. The number of decimal places to truncate to. Defaults to 0 (truncate to a whole number). If`decimal_places` is a positive integer, truncates digits to the right of the decimal point. If `decimal_places` is a negative integer, replaces digits to the left of the decimal point with `0`.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/math/","name":"Math functions"}}]}
```

---

---
title: Other functions
description: Miscellaneous scalar functions
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/other.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Other functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `arrow_cast`

Casts a value to a specific Arrow data type:

```

arrow_cast(expression, datatype)


```

**Arguments**

* **expression**: Expression to cast. Can be a constant, column, or function, and any combination of arithmetic or string operators.
* **datatype**: [Arrow data type ↗](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html) name to cast to, as a string. The format is the same as that returned by \[`arrow_typeof`\]

**Example**

```

> select arrow_cast(-5, 'Int8') as a,

  arrow_cast('foo', 'Dictionary(Int32, Utf8)') as b,

  arrow_cast('bar', 'LargeUtf8') as c,

  arrow_cast('2023-01-02T12:53:02', 'Timestamp(Microsecond, Some("+08:00"))') as d

  ;

+----+-----+-----+---------------------------+

| a  | b   | c   | d                         |

+----+-----+-----+---------------------------+

| -5 | foo | bar | 2023-01-02T12:53:02+08:00 |

+----+-----+-----+---------------------------+

1 row in set. Query took 0.001 seconds.


```

## `arrow_typeof`

Returns the name of the underlying [Arrow data type ↗](https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html) of the expression:

```

arrow_typeof(expression)


```

**Arguments**

* **expression**: Expression to evaluate. Can be a constant, column, or function, and any combination of arithmetic or string operators.

**Example**

```

> select arrow_typeof('foo'), arrow_typeof(1);

+---------------------------+------------------------+

| arrow_typeof(Utf8("foo")) | arrow_typeof(Int64(1)) |

+---------------------------+------------------------+

| Utf8                      | Int64                  |

+---------------------------+------------------------+

1 row in set. Query took 0.001 seconds.


```

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/other/","name":"Other functions"}}]}
```

---

---
title: Regex functions
description: Scalar functions for regular expressions
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/regex.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Regex functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

Cloudflare Pipelines uses a[PCRE-like ↗](https://en.wikibooks.org/wiki/Regular%5FExpressions/Perl-Compatible%5FRegular%5FExpressions)regular expression [syntax ↗](https://docs.rs/regex/latest/regex/#syntax) (minus support for several features including look-around and backreferences).

## `regexp_like`

Returns true if a [regular expression ↗](https://docs.rs/regex/latest/regex/#syntax) has at least one match in a string, false otherwise.

```

regexp_like(str, regexp[, flags])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **regexp**: Regular expression to test against the string expression. Can be a constant, column, or function.
* **flags**: Optional regular expression flags that control the behavior of the regular expression. The following flags are supported:  
   * **i**: case-insensitive: letters match both upper and lower case  
   * **m**: multi-line mode: ^ and $ match begin/end of line  
   * **s**: allow . to match \\n  
   * **R**: enables CRLF mode: when multi-line mode is enabled, \\r\\n is used  
   * **U**: swap the meaning of x\* and x\*?

**Example**

```

select regexp_like('Köln', '[a-zA-Z]ö[a-zA-Z]{2}');

+--------------------------------------------------------+

| regexp_like(Utf8("Köln"),Utf8("[a-zA-Z]ö[a-zA-Z]{2}")) |

+--------------------------------------------------------+

| true                                                   |

+--------------------------------------------------------+

SELECT regexp_like('aBc', '(b|d)', 'i');

+--------------------------------------------------+

| regexp_like(Utf8("aBc"),Utf8("(b|d)"),Utf8("i")) |

+--------------------------------------------------+

| true                                             |

+--------------------------------------------------+


```

Additional examples can be found [here ↗](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/regexp.rs)

## `regexp_match`

Returns a list of [regular expression ↗](https://docs.rs/regex/latest/regex/#syntax) matches in a string.

```

regexp_match(str, regexp[, flags])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **regexp**: Regular expression to match against. Can be a constant, column, or function.
* **flags**: Optional regular expression flags that control the behavior of the regular expression. The following flags are supported:  
   * **i**: case-insensitive: letters match both upper and lower case  
   * **m**: multi-line mode: ^ and $ match begin/end of line  
   * **s**: allow . to match \\n  
   * **R**: enables CRLF mode: when multi-line mode is enabled, \\r\\n is used  
   * **U**: swap the meaning of x\* and x\*?

**Example**

```

select regexp_match('Köln', '[a-zA-Z]ö[a-zA-Z]{2}');

+---------------------------------------------------------+

| regexp_match(Utf8("Köln"),Utf8("[a-zA-Z]ö[a-zA-Z]{2}")) |

+---------------------------------------------------------+

| [Köln]                                                  |

+---------------------------------------------------------+

SELECT regexp_match('aBc', '(b|d)', 'i');

+---------------------------------------------------+

| regexp_match(Utf8("aBc"),Utf8("(b|d)"),Utf8("i")) |

+---------------------------------------------------+

| [B]                                               |

+---------------------------------------------------+


```

Additional examples can be found [here ↗](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/regexp.rs)

## `regexp_replace`

Replaces substrings in a string that match a [regular expression ↗](https://docs.rs/regex/latest/regex/#syntax).

```

regexp_replace(str, regexp, replacement[, flags])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **regexp**: Regular expression to match against. Can be a constant, column, or function.
* **replacement**: Replacement string expression. Can be a constant, column, or function, and any combination of string operators.
* **flags**: Optional regular expression flags that control the behavior of the regular expression. The following flags are supported:  
   * **g**: (global) Search globally and don't return after the first match  
   * **i**: case-insensitive: letters match both upper and lower case  
   * **m**: multi-line mode: ^ and $ match begin/end of line  
   * **s**: allow . to match \\n  
   * **R**: enables CRLF mode: when multi-line mode is enabled, \\r\\n is used  
   * **U**: swap the meaning of x\* and x\*?

**Example**

```

SELECT regexp_replace('foobarbaz', 'b(..)', 'X\\1Y', 'g');

+------------------------------------------------------------------------+

| regexp_replace(Utf8("foobarbaz"),Utf8("b(..)"),Utf8("X\1Y"),Utf8("g")) |

+------------------------------------------------------------------------+

| fooXarYXazY                                                            |

+------------------------------------------------------------------------+

SELECT regexp_replace('aBc', '(b|d)', 'Ab\\1a', 'i');

+-------------------------------------------------------------------+

| regexp_replace(Utf8("aBc"),Utf8("(b|d)"),Utf8("Ab\1a"),Utf8("i")) |

+-------------------------------------------------------------------+

| aAbBac                                                            |

+-------------------------------------------------------------------+


```

Additional examples can be found [here ↗](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/regexp.rs)

## `position`

Returns the position of `substr` in `origstr` (counting from 1). If `substr` does not appear in `origstr`, return 0.

```

position(substr in origstr)


```

**Arguments**

* **substr**: The pattern string.
* **origstr**: The model string.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/regex/","name":"Regex functions"}}]}
```

---

---
title: String functions
description: Scalar functions for manipulating strings
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/string.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# String functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `ascii`

Returns the ASCII value of the first character in a string.

```

ascii(str)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**:[chr](#chr)

## `bit_length`

Returns the bit length of a string.

```

bit_length(str)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**:[length](#length),[octet\_length](#octet%5Flength)

## `btrim`

Trims the specified trim string from the start and end of a string. If no trim string is provided, all whitespace is removed from the start and end of the input string.

```

btrim(str[, trim_str])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **trim\_str**: String expression to trim from the beginning and end of the input string. Can be a constant, column, or function, and any combination of arithmetic operators._Default is whitespace characters._

**Related functions**:[ltrim](#ltrim),[rtrim](#rtrim)

**Aliases**

* trim

## `char_length`

_Alias of [length](#length)._

## `character_length`

_Alias of [length](#length)._

## `concat`

Concatenates multiple strings together.

```

concat(str[, ..., str_n])


```

**Arguments**

* **str**: String expression to concatenate. Can be a constant, column, or function, and any combination of string operators.
* **str\_n**: Subsequent string column or literal string to concatenate.

**Related functions**:[concat\_ws](#concat%5Fws)

## `concat_ws`

Concatenates multiple strings together with a specified separator.

```

concat(separator, str[, ..., str_n])


```

**Arguments**

* **separator**: Separator to insert between concatenated strings.
* **str**: String expression to concatenate. Can be a constant, column, or function, and any combination of string operators.
* **str\_n**: Subsequent string column or literal string to concatenate.

**Related functions**:[concat](#concat)

## `chr`

Returns the character with the specified ASCII or Unicode code value.

```

chr(expression)


```

**Arguments**

* **expression**: Expression containing the ASCII or Unicode code value to operate on. Can be a constant, column, or function, and any combination of arithmetic or string operators.

**Related functions**:[ascii](#ascii)

## `ends_with`

Tests if a string ends with a substring.

```

ends_with(str, substr)


```

**Arguments**

* **str**: String expression to test. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring to test for.

## `initcap`

Capitalizes the first character in each word in the input string. Words are delimited by non-alphanumeric characters.

```

initcap(str)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**:[lower](#lower),[upper](#upper)

## `instr`

_Alias of [strpos](#strpos)._

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring expression to search for. Can be a constant, column, or function, and any combination of string operators.

## `left`

Returns a specified number of characters from the left side of a string.

```

left(str, n)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: Number of characters to return.

**Related functions**:[right](#right)

## `length`

Returns the number of characters in a string.

```

length(str)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Aliases**

* char\_length
* character\_length

**Related functions**:[bit\_length](#bit%5Flength),[octet\_length](#octet%5Flength)

## `lower`

Converts a string to lower-case.

```

lower(str)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**:[initcap](#initcap),[upper](#upper)

## `lpad`

Pads the left side of a string with another string to a specified string length.

```

lpad(str, n[, padding_str])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: String length to pad to.
* **padding\_str**: String expression to pad with. Can be a constant, column, or function, and any combination of string operators._Default is a space._

**Related functions**:[rpad](#rpad)

## `ltrim`

Trims the specified trim string from the beginning of a string. If no trim string is provided, all whitespace is removed from the start of the input string.

```

ltrim(str[, trim_str])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **trim\_str**: String expression to trim from the beginning of the input string. Can be a constant, column, or function, and any combination of arithmetic operators._Default is whitespace characters._

**Related functions**:[btrim](#btrim),[rtrim](#rtrim)

## `octet_length`

Returns the length of a string in bytes.

```

octet_length(str)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**:[bit\_length](#bit%5Flength),[length](#length)

## `repeat`

Returns a string with an input string repeated a specified number.

```

repeat(str, n)


```

**Arguments**

* **str**: String expression to repeat. Can be a constant, column, or function, and any combination of string operators.
* **n**: Number of times to repeat the input string.

## `replace`

Replaces all occurrences of a specified substring in a string with a new substring.

```

replace(str, substr, replacement)


```

**Arguments**

* **str**: String expression to repeat. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring expression to replace in the input string. Can be a constant, column, or function, and any combination of string operators.
* **replacement**: Replacement substring expression. Can be a constant, column, or function, and any combination of string operators.

## `reverse`

Reverses the character order of a string.

```

reverse(str)


```

**Arguments**

* **str**: String expression to repeat. Can be a constant, column, or function, and any combination of string operators.

## `right`

Returns a specified number of characters from the right side of a string.

```

right(str, n)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: Number of characters to return.

**Related functions**:[left](#left)

## `rpad`

Pads the right side of a string with another string to a specified string length.

```

rpad(str, n[, padding_str])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **n**: String length to pad to.
* **padding\_str**: String expression to pad with. Can be a constant, column, or function, and any combination of string operators._Default is a space._

**Related functions**:[lpad](#lpad)

## `rtrim`

Trims the specified trim string from the end of a string. If no trim string is provided, all whitespace is removed from the end of the input string.

```

rtrim(str[, trim_str])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **trim\_str**: String expression to trim from the end of the input string. Can be a constant, column, or function, and any combination of arithmetic operators._Default is whitespace characters._

**Related functions**:[btrim](#btrim),[ltrim](#ltrim)

## `split_part`

Splits a string based on a specified delimiter and returns the substring in the specified position.

```

split_part(str, delimiter, pos)


```

**Arguments**

* **str**: String expression to spit. Can be a constant, column, or function, and any combination of string operators.
* **delimiter**: String or character to split on.
* **pos**: Position of the part to return.

## `starts_with`

Tests if a string starts with a substring.

```

starts_with(str, substr)


```

**Arguments**

* **str**: String expression to test. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring to test for.

## `strpos`

Returns the starting position of a specified substring in a string. Positions begin at 1\. If the substring does not exist in the string, the function returns 0.

```

strpos(str, substr)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **substr**: Substring expression to search for. Can be a constant, column, or function, and any combination of string operators.

**Aliases**

* instr

## `substr`

Extracts a substring of a specified number of characters from a specific starting position in a string.

```

substr(str, start_pos[, length])


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **start\_pos**: Character position to start the substring at. The first character in the string has a position of 1.
* **length**: Number of characters to extract. If not specified, returns the rest of the string after the start position.

## `translate`

Translates characters in a string to specified translation characters.

```

translate(str, chars, translation)


```

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.
* **chars**: Characters to translate.
* **translation**: Translation characters. Translation characters replace only characters at the same position in the **chars** string.

## `to_hex`

Converts an integer to a hexadecimal string.

```

to_hex(int)


```

**Arguments**

* **int**: Integer expression to convert. Can be a constant, column, or function, and any combination of arithmetic operators.

## `trim`

_Alias of [btrim](#btrim)._

## `upper`

Converts a string to upper-case.

```

upper(str)


```

**Arguments**

* **str**: String expression to operate on. Can be a constant, column, or function, and any combination of string operators.

**Related functions**:[initcap](#initcap),[lower](#lower)

## `uuid`

Returns UUID v4 string value which is unique per row.

```

uuid()


```

## `overlay`

Returns the string which is replaced by another string from the specified position and specified count length. For example, `overlay('Txxxxas' placing 'hom' from 2 for 4) → Thomas`

```

overlay(str PLACING substr FROM pos [FOR count])


```

**Arguments**

* **str**: String expression to operate on.
* **substr**: the string to replace part of str.
* **pos**: the start position to replace of str.
* **count**: the count of characters to be replaced from start position of str. If not specified, will use substr length instead.

## `levenshtein`

Returns the Levenshtein distance between the two given strings. For example, `levenshtein('kitten', 'sitting') = 3`

```

levenshtein(str1, str2)


```

**Arguments**

* **str1**: String expression to compute Levenshtein distance with str2.
* **str2**: String expression to compute Levenshtein distance with str1.

## `substr_index`

Returns the substring from str before count occurrences of the delimiter delim. If count is positive, everything to the left of the final delimiter (counting from the left) is returned. If count is negative, everything to the right of the final delimiter (counting from the right) is returned. For example, `substr_index('www.apache.org', '.', 1) = www`, `substr_index('www.apache.org', '.', -1) = org`

```

substr_index(str, delim, count)


```

**Arguments**

* **str**: String expression to operate on.
* **delim**: the string to find in str to split str.
* **count**: The number of times to search for the delimiter. Can be both a positive or negative number.

## `find_in_set`

Returns a value in the range of 1 to N if the string str is in the string list strlist consisting of N substrings. For example, `find_in_set('b', 'a,b,c,d') = 2`

```

find_in_set(str, strlist)


```

**Arguments**

* **str**: String expression to find in strlist.
* **strlist**: A string list is a string composed of substrings separated by , characters.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/string/","name":"String functions"}}]}
```

---

---
title: Struct functions
description: Scalar functions for manipulating structs
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/struct.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Struct functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `struct`

Returns an Arrow struct using the specified input expressions. Fields in the returned struct use the `cN` naming convention. For example: `c0`, `c1`, `c2`, etc.

```

struct(expression1[, ..., expression_n])


```

For example, this query converts two columns `a` and `b` to a single column with a struct type of fields `c0` and `c1`:

```

select * from t;

+---+---+

| a | b |

+---+---+

| 1 | 2 |

| 3 | 4 |

+---+---+


select struct(a, b) from t;

+-----------------+

| struct(t.a,t.b) |

+-----------------+

| {c0: 1, c1: 2}  |

| {c0: 3, c1: 4}  |

+-----------------+


```

#### Arguments

* **expression\_n**: Expression to include in the output struct. Can be a constant, column, or function, and any combination of arithmetic or string operators.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/struct/","name":"Struct functions"}}]}
```

---

---
title: Time and date functions
description: Scalar functions for handling times and dates
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/scalar-functions/time-and-date.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# Time and date functions

_Cloudflare Pipelines scalar function implementations are based on[Apache DataFusion ↗](https://arrow.apache.org/datafusion/) (via [Arroyo ↗](https://www.arroyo.dev/)) and these docs are derived from the DataFusion function reference._

## `date_bin`

Calculates time intervals and returns the start of the interval nearest to the specified timestamp. Use `date_bin` to downsample time series data by grouping rows into time-based "bins" or "windows" and applying an aggregate or selector function to each window.

For example, if you "bin" or "window" data into 15 minute intervals, an input timestamp of `2023-01-01T18:18:18Z` will be updated to the start time of the 15 minute bin it is in: `2023-01-01T18:15:00Z`.

```

date_bin(interval, expression, origin-timestamp)


```

**Arguments**

* **interval**: Bin interval.
* **expression**: Time expression to operate on. Can be a constant, column, or function.
* **origin-timestamp**: Optional. Starting point used to determine bin boundaries. If not specified defaults `1970-01-01T00:00:00Z` (the UNIX epoch in UTC).

The following intervals are supported:

* nanoseconds
* microseconds
* milliseconds
* seconds
* minutes
* hours
* days
* weeks
* months
* years
* century

## `date_trunc`

Truncates a timestamp value to a specified precision.

```

date_trunc(precision, expression)


```

**Arguments**

* **precision**: Time precision to truncate to. The following precisions are supported:  
   * year / YEAR  
   * quarter / QUARTER  
   * month / MONTH  
   * week / WEEK  
   * day / DAY  
   * hour / HOUR  
   * minute / MINUTE  
   * second / SECOND
* **expression**: Time expression to operate on. Can be a constant, column, or function.

**Aliases**

* datetrunc

## `datetrunc`

_Alias of [date\_trunc](#date%5Ftrunc)._

## `date_part`

Returns the specified part of the date as an integer.

```

date_part(part, expression)


```

**Arguments**

* **part**: Part of the date to return. The following date parts are supported:  
   * year  
   * quarter _(emits value in inclusive range \[1, 4\] based on which quartile of the year the date is in)_  
   * month  
   * week _(week of the year)_  
   * day _(day of the month)_  
   * hour  
   * minute  
   * second  
   * millisecond  
   * microsecond  
   * nanosecond  
   * dow _(day of the week)_  
   * doy _(day of the year)_  
   * epoch _(seconds since Unix epoch)_
* **expression**: Time expression to operate on. Can be a constant, column, or function.

**Aliases**

* datepart

## `datepart`

_Alias of [date\_part](#date%5Fpart)._

## `extract`

Returns a sub-field from a time value as an integer.

```

extract(field FROM source)


```

Equivalent to calling `date_part('field', source)`. For example, these are equivalent:

```

extract(day FROM '2024-04-13'::date)

date_part('day', '2024-04-13'::date)


```

See [date\_part](#date%5Fpart).

## `make_date`

Make a date from year/month/day component parts.

```

make_date(year, month, day)


```

**Arguments**

* **year**: Year to use when making the date. Can be a constant, column or function, and any combination of arithmetic operators.
* **month**: Month to use when making the date. Can be a constant, column or function, and any combination of arithmetic operators.
* **day**: Day to use when making the date. Can be a constant, column or function, and any combination of arithmetic operators.

**Example**

```

> select make_date(2023, 1, 31);

+-------------------------------------------+

| make_date(Int64(2023),Int64(1),Int64(31)) |

+-------------------------------------------+

| 2023-01-31                                |

+-------------------------------------------+

> select make_date('2023', '01', '31');

+-----------------------------------------------+

| make_date(Utf8("2023"),Utf8("01"),Utf8("31")) |

+-----------------------------------------------+

| 2023-01-31                                    |

+-----------------------------------------------+


```

## `to_char`

Returns a string representation of a date, time, timestamp or duration based on a [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html). Unlike the PostgreSQL equivalent of this function numerical formatting is not supported.

```

to_char(expression, format)


```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function that results in a date, time, timestamp or duration.
* **format**: A [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) string to use to convert the expression.

**Example**

```

> > select to_char('2023-03-01'::date, '%d-%m-%Y');

+----------------------------------------------+

| to_char(Utf8("2023-03-01"),Utf8("%d-%m-%Y")) |

+----------------------------------------------+

| 01-03-2023                                   |

+----------------------------------------------+


```

**Aliases**

* date\_format

## `to_timestamp`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00Z`). Supports strings, integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no \[Chrono formats\] are provided. Integers, unsigned integers, and doubles are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

Note: `to_timestamp` returns `Timestamp(Nanosecond)`. The supported range for integer input is between `-9223372037` and `9223372036`. Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds`for the input outside of supported bounds.

```

to_timestamp(expression[, ..., format_n])


```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```

> select to_timestamp('2023-01-31T09:26:56.123456789-05:00');

+-----------------------------------------------------------+

| to_timestamp(Utf8("2023-01-31T09:26:56.123456789-05:00")) |

+-----------------------------------------------------------+

| 2023-01-31T14:26:56.123456789                             |

+-----------------------------------------------------------+

> select to_timestamp('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');

+--------------------------------------------------------------------------------------------------------+

| to_timestamp(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |

+--------------------------------------------------------------------------------------------------------+

| 2023-05-17T03:59:00.123456789                                                                          |

+--------------------------------------------------------------------------------------------------------+


```

## `to_timestamp_millis`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as milliseconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

```

to_timestamp_millis(expression[, ..., format_n])


```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```

> select to_timestamp_millis('2023-01-31T09:26:56.123456789-05:00');

+------------------------------------------------------------------+

| to_timestamp_millis(Utf8("2023-01-31T09:26:56.123456789-05:00")) |

+------------------------------------------------------------------+

| 2023-01-31T14:26:56.123                                          |

+------------------------------------------------------------------+

> select to_timestamp_millis('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');

+---------------------------------------------------------------------------------------------------------------+

| to_timestamp_millis(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |

+---------------------------------------------------------------------------------------------------------------+

| 2023-05-17T03:59:00.123                                                                                       |

+---------------------------------------------------------------------------------------------------------------+


```

## `to_timestamp_micros`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as microseconds since the unix epoch (`1970-01-01T00:00:00Z`) Returns the corresponding timestamp.

```

to_timestamp_micros(expression[, ..., format_n])


```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```

> select to_timestamp_micros('2023-01-31T09:26:56.123456789-05:00');

+------------------------------------------------------------------+

| to_timestamp_micros(Utf8("2023-01-31T09:26:56.123456789-05:00")) |

+------------------------------------------------------------------+

| 2023-01-31T14:26:56.123456                                       |

+------------------------------------------------------------------+

> select to_timestamp_micros('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');

+---------------------------------------------------------------------------------------------------------------+

| to_timestamp_micros(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |

+---------------------------------------------------------------------------------------------------------------+

| 2023-05-17T03:59:00.123456                                                                                    |

+---------------------------------------------------------------------------------------------------------------+


```

## `to_timestamp_nanos`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no \[Chrono formats\] are provided. Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

```

to_timestamp_nanos(expression[, ..., format_n])


```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```

> select to_timestamp_nanos('2023-01-31T09:26:56.123456789-05:00');

+-----------------------------------------------------------------+

| to_timestamp_nanos(Utf8("2023-01-31T09:26:56.123456789-05:00")) |

+-----------------------------------------------------------------+

| 2023-01-31T14:26:56.123456789                                   |

+-----------------------------------------------------------------+

> select to_timestamp_nanos('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');

+--------------------------------------------------------------------------------------------------------------+

| to_timestamp_nanos(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |

+--------------------------------------------------------------------------------------------------------------+

| 2023-05-17T03:59:00.123456789                                                                                |

+---------------------------------------------------------------------------------------------------------------+


```

## `to_timestamp_seconds`

Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html)s are provided. Integers and unsigned integers are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`). Returns the corresponding timestamp.

```

to_timestamp_seconds(expression[, ..., format_n])


```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.
* **format\_n**: Optional [Chrono format ↗](https://docs.rs/chrono/latest/chrono/format/strftime/index.html) strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned.

**Example**

```

> select to_timestamp_seconds('2023-01-31T09:26:56.123456789-05:00');

+-------------------------------------------------------------------+

| to_timestamp_seconds(Utf8("2023-01-31T09:26:56.123456789-05:00")) |

+-------------------------------------------------------------------+

| 2023-01-31T14:26:56                                               |

+-------------------------------------------------------------------+

> select to_timestamp_seconds('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');

+----------------------------------------------------------------------------------------------------------------+

| to_timestamp_seconds(Utf8("03:59:00.123456789 05-17-2023"),Utf8("%c"),Utf8("%+"),Utf8("%H:%M:%S%.f %m-%d-%Y")) |

+----------------------------------------------------------------------------------------------------------------+

| 2023-05-17T03:59:00                                                                                            |

+----------------------------------------------------------------------------------------------------------------+


```

## `from_unixtime`

Converts an integer to RFC3339 timestamp format (`YYYY-MM-DDT00:00:00.000000000Z`). Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp.

```

from_unixtime(expression)


```

**Arguments**

* **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.

## `now`

Returns the UTC timestamp at pipeline start.

The now() return value is determined at query compilation time, and will be constant across the execution of the pipeline.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/scalar-functions/","name":"Scalar functions"}},{"@type":"ListItem","position":5,"item":{"@id":"/pipelines/sql-reference/scalar-functions/time-and-date/","name":"Time and date functions"}}]}
```

---

---
title: SELECT statements
description: Query syntax for data transformation in Cloudflare Pipelines SQL
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/select-statements.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# SELECT statements

SELECT statements are used to transform data in Cloudflare Pipelines. The general form is:

```

[WITH with_query [, ...]]

SELECT select_expr [, ...]

FROM from_item

[WHERE condition]


```

## WITH clause

The WITH clause allows you to define named subqueries that can be referenced in the main query. This can improve query readability by breaking down complex transformations.

Syntax:

```

WITH query_name AS (subquery) [, ...]


```

Simple example:

```

WITH filtered_events AS

    (SELECT user_id, event_type, amount

        FROM user_events WHERE amount > 50)

SELECT user_id, amount * 1.1 as amount_with_tax

FROM filtered_events

WHERE event_type = 'purchase';


```

## SELECT clause

The SELECT clause is a comma-separated list of expressions, with optional aliases. Column names must be unique.

```

SELECT select_expr [, ...]


```

Examples:

```

-- Select specific columns

SELECT user_id, event_type, amount FROM events


-- Use expressions and aliases

SELECT

    user_id,

    amount * 1.1 as amount_with_tax,

    UPPER(event_type) as event_type_upper

FROM events


-- Select all columns

SELECT * FROM events


```

## FROM clause

The FROM clause specifies the data source for the query. It will be either a table name or subquery. The table name can be either a stream name or a table created in the WITH clause.

```

FROM from_item


```

Tables can be given aliases:

```

SELECT e.user_id, e.amount

FROM user_events e

WHERE e.event_type = 'purchase'


```

## WHERE clause

The WHERE clause filters data using boolean conditions. Predicates are applied to input rows.

```

WHERE condition


```

Examples:

```

-- Filter by field value

SELECT * FROM events WHERE event_type = 'purchase'


-- Multiple conditions

SELECT * FROM events

WHERE event_type = 'purchase' AND amount > 50


-- String operations

SELECT * FROM events

WHERE user_id LIKE 'user_%'


-- Null checks

SELECT * FROM events

WHERE description IS NOT NULL


```

## UNNEST operator

The UNNEST operator converts arrays into multiple rows. This is useful for processing list data types.

UNNEST restrictions:

* May only appear in the SELECT clause
* Only one array may be unnested per SELECT statement

Example:

```

SELECT

    UNNEST([1, 2, 3]) as numbers

FROM events;


```

This will produce:

```

+---------+

| numbers |

+---------+

|       1 |

|       2 |

|       3 |

+---------+


```

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/select-statements/","name":"SELECT statements"}}]}
```

---

---
title: SQL data types
description: Supported data types in Cloudflare Pipelines SQL
image: https://developers.cloudflare.com/dev-products-preview.png
---

[Skip to content](#%5Ftop) 

Was this helpful?

YesNo

[ Edit page ](https://github.com/cloudflare/cloudflare-docs/edit/production/src/content/docs/pipelines/sql-reference/sql-data-types.mdx) [ Report issue ](https://github.com/cloudflare/cloudflare-docs/issues/new/choose) 

Copy page

# SQL data types

Cloudflare Pipelines supports a set of primitive and composite data types for SQL transformations. These types can be used in stream schemas and SQL literals with automatic type inference.

## Primitive types

| Pipelines | SQL Types                   | Example Literals                                 |
| --------- | --------------------------- | ------------------------------------------------ |
| bool      | BOOLEAN                     | TRUE, FALSE                                      |
| int32     | INT, INTEGER                | 0, 1, \-2                                        |
| int64     | BIGINT                      | 0, 1, \-2                                        |
| float32   | FLOAT, REAL                 | 0.0, \-2.4, 1E-3                                 |
| float64   | DOUBLE                      | 0.0, \-2.4, 1E-35                                |
| string    | VARCHAR, CHAR, TEXT, STRING | "hello", "world"                                 |
| timestamp | TIMESTAMP                   | '2020-01-01', '2023-05-17T22:16:00.648662+00:00' |
| binary    | BYTEA                       | X'A123' (hex)                                    |
| json      | JSON                        | '{"event": "purchase", "amount": 29.99}'         |

## Composite types

In addition to primitive types, Pipelines SQL supports composite types for more complex data structures.

### List types

Lists group together zero or more elements of the same type. In stream schemas, lists are declared using the `list` type with an `items` field specifying the element type. In SQL, lists correspond to arrays and are declared by suffixing another type with `[]`, for example `INT[]`.

List values can be indexed using 1-indexed subscript notation (`v[1]` is the first element of `v`).

Lists can be constructed via `[]` literals:

```

SELECT [1, 2, 3] as numbers


```

Pipelines provides array functions for manipulating list values, and lists may be unnested using the `UNNEST` operator.

### Struct types

Structs combine related fields into a single value. In stream schemas, structs are declared using the `struct` type with a `fields` array. In SQL, structs can be created using the `struct` function.

Example creating a struct in SQL:

```

SELECT struct('user123', 'purchase', 29.99) as event_data FROM events


```

This creates a struct with fields `c0`, `c1`, `c2` containing the user ID, event type, and amount.

Struct fields can be accessed via `.` notation, for example `event_data.c0` for the user ID.

```json
{"@context":"https://schema.org","@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"item":{"@id":"/directory/","name":"Directory"}},{"@type":"ListItem","position":2,"item":{"@id":"/pipelines/","name":"Pipelines"}},{"@type":"ListItem","position":3,"item":{"@id":"/pipelines/sql-reference/","name":"SQL reference"}},{"@type":"ListItem","position":4,"item":{"@id":"/pipelines/sql-reference/sql-data-types/","name":"SQL data types"}}]}
```
