Datastream with Redshift
A guide to loading Datastream data from your S3 bucket into an Amazon Redshift cluster for querying.
Prerequisites
To get started on this tutorial, you’ll need:
A working datastream feed delivering to your S3 bucket. You’ll want the name of this bucket and access credentials handy.
A Redshift cluster and credentials for a user with sufficient permissions
On Disk vs Spectrum
To query Datastream data in Redshift, data needs to be made accessible, either by loading the data to disk in Redshift or by creating an external table pointing to your S3 bucket and querying the data via Redshift Spectrum. An overview of the tradeoffs between these two designs is below:
Property
Data loaded to Redshift disk
Data queried via Redshift Spectrum
Price per TB of data
Moderate
Very low
Performance
High, depends on cluster design
Moderate
Performance tuning
Performance of queries can be improved by tuning cluster size, table properties
Less direct control over performance
Querying through multiple systems
Data must be loaded into each system
Data in S3 can by queried by other systems
Mutability
Rows can be deleted or updated through Redshift
Data in S3 cannot be edited through SQL commands
JSON field support
Limited JSON functionality in Redshift
Improve JSON functionality
Storing data on disk in Redshift
Step 1: Table creation
Start by creating a table on your Redshift cluster. We recommend the following CTA statement:
CREATE TABLE example_table (
Distribution VARCHAR(8),
Last_ping_timestamp TIMESTAMP,
Host VARCHAR(32),
Cookie_id VARCHAR(32),
Page_session_id VARCHAR(32),
Domain VARCHAR(64),
Path VARCHAR(1024),
New_user BOOLEAN,
Device VARCHAR(16),
Engaged_time_on_page_seconds INT,
Page_width REAL,
Page_height REAL,
Max_scroll_position_top REAL,
Window_height REAL,
External_referrer VARCHAR(64),
No_client_storage BOOLEAN,
City_name VARCHAR(64),
Region_name VARCHAR(32),
Country_code VARCHAR(2),
Country_name VARCHAR(32),
Continent_name VARCHAR(16),
Dma_code INT,
Utc_offset_minutes INT,
User_agent VARCHAR(256),
Recency INT,
Frequency INT,
Internal_referrer VARCHAR(1024),
Author VARCHAR(128),
Section VARCHAR(128),
Content_type VARCHAR(32),
Sponsor VARCHAR(32),
Utm_campaign VARCHAR(32),
Utm_medium VARCHAR(32),
Utm_source VARCHAR(32),
Utm_content VARCHAR(32),
Utm_term VARCHAR(32),
Account_id VARCHAR(32),
Page_title VARCHAR(256),
Virtual_page BOOLEAN,
Scrolldepth INT,
Total_time_on_page_seconds INT,
Ga_client_id VARCHAR(32),
Login_id VARCHAR(32),
Id_sync VARCHAR(32),
Subscriber_acct VARCHAR(32),
Page_load_time INT,
Row_id BIGINT IDENTITY (0, 1)
)
DISTKEY(Cookie_id)
SORTKEY(Last_ping_timestamp)
;
As shown above, we recommend selecting cookie_id as the table’s distribution key. This field is a unique per-user ID. Distributing on it ensures that data is distributed roughly uniformly across Redshift slices while also generating performance improvements for queries looking at data per-user, which are common for Datastream users.
For example, calculating unique visitors via COUNT(DISTINCT(cookie_id))
will be significantly more performant using this distribution style. Alternatively, we recommend leaving the default distribution style in place.
Since most queries for traffic data contain a time filter, we strongly recommend choosing a sort key of last_ping_timestamp. We strongly recommend that compression encodings not be specified and instead be set automatically by the COPY
command.
Step 2: Loading
Data should be loaded into Redshift via the COPY
command. It can either be loaded in 30 minute batches or, if needed, on a per-file basis.
Step 2a: COPYing data in batches (recommended)
Datastream delivers data to folders partitioned into 30 minute chunks, for example all files written between 7:00am and 7:29:59am on August 12, 2020 will be written to the folders3://yourbucket/2020/08/12/07/00/
Given that, the simplest way to load data into Redshift is to issue COPY
statements copying an entire 30 minutes of data into the cluster, for example:
COPY EXAMPLE_TABLE
FROM ‘s3://yourbucket/YYYY/MM/DD/HH/MM/’
{AUTHORIZATION}
EMPTYASNULL
IGNOREHEADER 1
TIMEFORMAT ‘epochsecs’
TRUNCATECOLUMNS
FORMAT CSV
DELIMITER ‘|’
GZIP
;
Step 2b: Loading data via Lambda
While batch loading is easy operationally, it introduces up to 30 minutes of lag between data delivery and input because one must wait for a folder’s time window to be up before executing a COPY. If faster imports are needed, we recommend following the AWS instructions to create a function in Lambda that immediately copies over files upon their delivery to S3.
Step 3: Deduplication (optional, but strongly recommended)
To ensure that data is delivered in real-time, Datastream guarantees that a record of each pageview is delivered to S3 at least once. While duplicates are rare, they can occur, so for the purposes of accuracy we recommend an additional deduplication step after each COPY statement. In the below deduplication statement we look back for duplicated records for a two hour window and always keep the more recent row:
DELETE FROM EXAMPLE_TABLE
USING (
SELECT
Cookie_id, page_session_id, MAX(row_id) AS ri_to_keep
FROM
EXAMPLE_TABLE
WHERE
Last_ping_timestamp >=GETDATE() - TIMEDELTA ‘2 hours’
GROUP BY
Cookie_id, page_session_id
HAVING
COUNT(1) > 1
) AS duplicates
WHERE
EXAMPLE_TABLE.Last_ping_timestamp>=GETDATE() - TIMEDELTA ‘2 hours’
AND
EXAMPLE_TABLE.cookie_id=duplicates.cookie_id
AND
EXAMPLE_TABLE.page_session_id=duplicates.page_session_id
AND
EXAMPLE_TABLE.row_id != duplicates.ri_to_keep;
;
Last updated
Was this helpful?