Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
CREATE TABLE CardAssignment (
customerId BIGINT NOT NULL,
cardNo STRING NOT NULL,
cardType STRING NOT NULL,
`timestamp` TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
'properties.group.id' = 'mygroupid',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'value.format' = 'flexible-json',
'topic' = 'cardassignment'
);

CREATE TABLE Merchant (
merchantId BIGINT NOT NULL,
name STRING NOT NULL,
category STRING NOT NULL,
updatedTime TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
'properties.group.id' = 'mygroupid',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'value.format' = 'flexible-json',
'topic' = 'merchant'
);


CREATE TABLE MerchantReward (
merchantId BIGINT NOT NULL,
rewardsByCard ARRAY<ROW<
cardType STRING,
rewardPercentage BIGINT,
startTimestamp BIGINT,
expirationTimestamp BIGINT
>> NOT NULL,
updatedTime TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
'properties.group.id' = 'mygroupid',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'value.format' = 'flexible-json',
'topic' = 'merchantreward'
);

CREATE TABLE Transaction (
transactionId BIGINT NOT NULL,
cardNo STRING NOT NULL,
`time` TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
amount DOUBLE NOT NULL,
merchantId BIGINT NOT NULL,
WATERMARK FOR `time` AS `time` - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}',
'properties.group.id' = 'mygroupid',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'value.format' = 'flexible-json',
'topic' = 'transaction'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
CREATE TABLE CardAssignment (
customerId BIGINT NOT NULL,
cardNo STRING NOT NULL,
cardType STRING NOT NULL,
`timestamp` TIMESTAMP_LTZ(3) NOT NULL,
PRIMARY KEY (`customerId`, `cardNo`, `timestamp`) NOT ENFORCED,
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/card_assignment.jsonl',
'source.monitor-interval' = '10 min',
'connector' = 'filesystem'
);

CREATE TABLE Merchant (
merchantId BIGINT NOT NULL,
name STRING NOT NULL,
category STRING NOT NULL,
updatedTime TIMESTAMP_LTZ(3) NOT NULL,
PRIMARY KEY (`merchantId`, `updatedTime`) NOT ENFORCED,
WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/merchant.jsonl',
'source.monitor-interval' = '10 min',
'connector' = 'filesystem'
);

CREATE TABLE MerchantReward (
merchantId BIGINT NOT NULL,
rewardsByCard ARRAY<ROW<cardType STRING, rewardPercentage BIGINT, startTimestamp BIGINT, expirationTimestamp BIGINT>> NOT NULL,
updatedTime TIMESTAMP_LTZ(3) NOT NULL,
PRIMARY KEY (`merchantId`, `updatedTime`) NOT ENFORCED,
WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/merchant_reward.jsonl',
'source.monitor-interval' = '10 min',
'connector' = 'filesystem'
);

CREATE TABLE Transaction (
transactionId BIGINT NOT NULL,
cardNo STRING NOT NULL,
`time` TIMESTAMP_LTZ(3) NOT NULL,
amount DOUBLE NOT NULL,
merchantId BIGINT NOT NULL,
PRIMARY KEY (`transactionId`, `time`) NOT ENFORCED,
WATERMARK FOR `time` AS `time` - INTERVAL '1' SECOND
) WITH (
'format' = 'flexible-json',
'path' = '${DATA_PATH}/transaction.jsonl',
'source.monitor-interval' = '10 min',
'connector' = 'filesystem'
);
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
/* Import Data */
IMPORT creditcard-{{variant}}.merchant AS _MerchantStream;
IMPORT creditcard-{{variant}}.card_assignment AS _CardAssignmentStream;
IMPORT creditcard-{{variant}}.transaction AS _Transaction;
IMPORT connectors.sources-{{variant}} AS sources;

/* Deduplicate CDC Streams */
_Merchant := DISTINCT _MerchantStream ON merchantId ORDER BY updatedTime DESC;
_CardAssignment := DISTINCT _CardAssignmentStream ON cardNo ORDER BY `timestamp` DESC;
_Merchant := DISTINCT sources.Merchant ON merchantId ORDER BY updatedTime DESC;
_CardAssignment := DISTINCT sources.CardAssignment ON cardNo ORDER BY `timestamp` DESC;

/** Enrich credit card transactions with customer and merchant information */
/*+no_query */
CustomerTransaction := SELECT t.transactionId, t.cardNo, t.`time`, t.amount, m.name AS merchantName,
m.category, c.customerId
FROM _Transaction t
FROM sources.Transaction t
JOIN _CardAssignment FOR SYSTEM_TIME AS OF t.`time` c ON t.cardNo = c.cardNo
JOIN _Merchant FOR SYSTEM_TIME AS OF t.`time` m ON t.merchantId = m.merchantId;

Expand All @@ -29,12 +28,14 @@ _SpendingByDay := SELECT customerId, window_time as timeDay, SUM(amount) as spen

/* ==== QUERY ENDPOINTS ==== */

/** Returns all credit card transactions within a specified time period ordered by time (most recent first) */
/** Returns all credit card transactions since fromTime (inclusive) and until toTime (exclusive) for the given customer showing most recent transactions first.
fromTime and toTime must be RFC-3339 compliant date time scalar. Both must be the start of a day, e.g. 2024-01-19T00:00:00-00:00. */
Transactions(customerId BIGINT NOT NULL, fromTime TIMESTAMP NOT NULL, toTime TIMESTAMP NOT NULL) :=
SELECT * FROM CustomerTransaction WHERE customerId = :customerId AND :fromTime <= `time` AND :toTime > `time`
ORDER BY `time` DESC LIMIT 10000;

/* Returns the total customer spending by day for the specified time period ordered by time (most recent first) */
/* Returns the total customer spending by day since fromTime (inclusive) and until toTime (exclusive) ordered by time (most recent first)
fromTime and toTime must be RFC-3339 compliant date time scalar. Both must be the start of a day, e.g. 2024-01-19T00:00:00-00:00. */
SpendingByDay(customerId BIGINT NOT NULL, fromTime TIMESTAMP NOT NULL, toTime TIMESTAMP NOT NULL) :=
SELECT timeDay, spending
FROM _SpendingByDay WHERE customerId = :customerId AND :fromTime <= timeDay AND :toTime > timeDay
Expand Down
Loading