Datastream with Redshift

A guide to loading Datastream data from your S3 bucket into an Amazon Redshift cluster for querying.

Prerequisites

To get started on this tutorial, you’ll need:

  • A working datastream feed delivering to your S3 bucket. You’ll want the name of this bucket and access credentials handy.

  • A Redshift cluster and credentials for a user with sufficient permissions

On Disk vs Spectrum

To query Datastream data in Redshift, data needs to be made accessible, either by loading the data to disk in Redshift or by creating an external table pointing to your S3 bucket and querying the data via Redshift Spectrum. An overview of the tradeoffs between these two designs is below:

Property

Data loaded to Redshift disk

Data queried via Redshift Spectrum

Price per TB of data

Moderate

Very low

Performance

High, depends on cluster design

Moderate

Performance tuning

Performance of queries can be improved by tuning cluster size, table properties

Less direct control over performance

Querying through multiple systems

Data must be loaded into each system

Data in S3 can by queried by other systems

Mutability

Rows can be deleted or updated through Redshift

Data in S3 cannot be edited through SQL commands

JSON field support

Limited JSON functionality in Redshift

Improve JSON functionality

Note: Most users of Datastream will want to start by storing data in Redshift directly and consider migrating to Spectrum if data reaches a scale where costs are a concern.

Storing data on disk in Redshift

Step 1: Table creation

Start by creating a table on your Redshift cluster. We recommend the following CTA statement:

CREATE TABLE example_table (
    Distribution VARCHAR(8),
    Last_ping_timestamp TIMESTAMP,
    Host VARCHAR(32),
    Cookie_id VARCHAR(32),
    Page_session_id VARCHAR(32),
    Domain VARCHAR(64),
    Path VARCHAR(1024),
    New_user BOOLEAN,
    Device VARCHAR(16),
    Engaged_time_on_page_seconds INT,
    Page_width REAL,
    Page_height REAL,
    Max_scroll_position_top REAL,
    Window_height REAL,
    External_referrer VARCHAR(64),
    No_client_storage BOOLEAN,
    City_name VARCHAR(64),
    Region_name VARCHAR(32),
    Country_code VARCHAR(2),
    Country_name VARCHAR(32),
    Continent_name VARCHAR(16),
    Dma_code INT,
    Utc_offset_minutes INT,
    User_agent VARCHAR(256),
    Recency INT,
    Frequency INT,
    Internal_referrer VARCHAR(1024),
    Author VARCHAR(128),
    Section VARCHAR(128),
    Content_type VARCHAR(32),
    Sponsor VARCHAR(32),
    Utm_campaign VARCHAR(32),
    Utm_medium VARCHAR(32),
    Utm_source VARCHAR(32),
    Utm_content VARCHAR(32),
    Utm_term VARCHAR(32),
    Account_id VARCHAR(32),
    Page_title VARCHAR(256),
    Virtual_page BOOLEAN,
    Scrolldepth INT,
    Total_time_on_page_seconds INT,
    Ga_client_id VARCHAR(32),
    Login_id VARCHAR(32),
    Id_sync VARCHAR(32),
    Subscriber_acct VARCHAR(32),
    Page_load_time INT,
    Row_id BIGINT IDENTITY (0, 1)
)
DISTKEY(Cookie_id)
SORTKEY(Last_ping_timestamp)
;

As shown above, we recommend selecting cookie_id as the table’s distribution key. This field is a unique per-user ID. Distributing on it ensures that data is distributed roughly uniformly across Redshift slices while also generating performance improvements for queries looking at data per-user, which are common for Datastream users.

For example, calculating unique visitors via COUNT(DISTINCT(cookie_id)) will be significantly more performant using this distribution style. Alternatively, we recommend leaving the default distribution style in place.

Since most queries for traffic data contain a time filter, we strongly recommend choosing a sort key of last_ping_timestamp. We strongly recommend that compression encodings not be specified and instead be set automatically by the COPY command.

Step 2: Loading

Data should be loaded into Redshift via the COPY command. It can either be loaded in 30 minute batches or, if needed, on a per-file basis.

Datastream delivers data to folders partitioned into 30 minute chunks, for example all files written between 7:00am and 7:29:59am on August 12, 2020 will be written to the folders3://yourbucket/2020/08/12/07/00/

Given that, the simplest way to load data into Redshift is to issue COPYstatements copying an entire 30 minutes of data into the cluster, for example:

COPY EXAMPLE_TABLE 
FROM ‘s3://yourbucket/YYYY/MM/DD/HH/MM/
{AUTHORIZATION}
EMPTYASNULL
IGNOREHEADER 1
TIMEFORMAT ‘epochsecs’
TRUNCATECOLUMNS
FORMAT CSV
DELIMITER ‘|
GZIP
;

Important notes:

  • {AUTHORIZATION} should be specified using the appropriate authorization parameters, as described in the Redshift documentation here.

  • The above query must be run once a given half-hour has completed. For example, a COPY of the 7:00am folder can only be executed at 7:30am or later. Failure to do so will result in data written after the COPY command is run being skipped.

  • Redshift does not provide functionality to avoid duplicate imports or to retry failed imports (for example, if the cluster is unavailable when a COPY is issued). The calling function should ensure that COPY statements are executed exactly once for each folder.

Step 2b: Loading data via Lambda

While batch loading is easy operationally, it introduces up to 30 minutes of lag between data delivery and input because one must wait for a folder’s time window to be up before executing a COPY. If faster imports are needed, we recommend following the AWS instructions to create a function in Lambda that immediately copies over files upon their delivery to S3.

To ensure that data is delivered in real-time, Datastream guarantees that a record of each pageview is delivered to S3 at least once. While duplicates are rare, they can occur, so for the purposes of accuracy we recommend an additional deduplication step after each COPY statement. In the below deduplication statement we look back for duplicated records for a two hour window and always keep the more recent row:

DELETE FROM EXAMPLE_TABLE
    USING (
        SELECT
            Cookie_id, page_session_id, MAX(row_id) AS ri_to_keep
        FROM
            EXAMPLE_TABLE
        WHERE
            Last_ping_timestamp >=GETDATE() - TIMEDELTA ‘2 hours
        GROUP BY
            Cookie_id, page_session_id
        HAVING
            COUNT(1) > 1
    ) AS duplicates
WHERE
    EXAMPLE_TABLE.Last_ping_timestamp>=GETDATE() - TIMEDELTA ‘2 hours
    AND
    EXAMPLE_TABLE.cookie_id=duplicates.cookie_id
    AND
    EXAMPLE_TABLE.page_session_id=duplicates.page_session_id
    AND                    
    EXAMPLE_TABLE.row_id != duplicates.ri_to_keep;


;

Last updated