diff --git a/finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/README.md b/finance-credit-card-chatbot/credit-card-analytics/connectors/README.md similarity index 100% rename from finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/README.md rename to finance-credit-card-chatbot/credit-card-analytics/connectors/README.md diff --git a/finance-credit-card-chatbot/credit-card-analytics/connectors/sources-kafka.sqrl b/finance-credit-card-chatbot/credit-card-analytics/connectors/sources-kafka.sqrl new file mode 100644 index 00000000..64bbd63d --- /dev/null +++ b/finance-credit-card-chatbot/credit-card-analytics/connectors/sources-kafka.sqrl @@ -0,0 +1,69 @@ +CREATE TABLE CardAssignment ( + customerId BIGINT NOT NULL, + cardNo STRING NOT NULL, + cardType STRING NOT NULL, + `timestamp` TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp', + WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}', + 'properties.group.id' = 'mygroupid', + 'scan.startup.mode' = 'group-offsets', + 'properties.auto.offset.reset' = 'earliest', + 'value.format' = 'flexible-json', + 'topic' = 'cardassignment' + ); + +CREATE TABLE Merchant ( + merchantId BIGINT NOT NULL, + name STRING NOT NULL, + category STRING NOT NULL, + updatedTime TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp', + WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}', + 'properties.group.id' = 'mygroupid', + 'scan.startup.mode' = 'group-offsets', + 'properties.auto.offset.reset' = 'earliest', + 'value.format' = 'flexible-json', + 'topic' = 'merchant' + ); + + +CREATE TABLE MerchantReward ( + merchantId BIGINT NOT NULL, + rewardsByCard ARRAY> NOT NULL, + updatedTime TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp', + WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}', + 'properties.group.id' = 'mygroupid', + 'scan.startup.mode' = 'group-offsets', + 'properties.auto.offset.reset' = 'earliest', + 'value.format' = 'flexible-json', + 'topic' = 'merchantreward' + ); + +CREATE TABLE Transaction ( + transactionId BIGINT NOT NULL, + cardNo STRING NOT NULL, + `time` TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp', + amount DOUBLE NOT NULL, + merchantId BIGINT NOT NULL, + WATERMARK FOR `time` AS `time` - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'properties.bootstrap.servers' = '${KAFKA_BOOTSTRAP_SERVERS}', + 'properties.group.id' = 'mygroupid', + 'scan.startup.mode' = 'group-offsets', + 'properties.auto.offset.reset' = 'earliest', + 'value.format' = 'flexible-json', + 'topic' = 'transaction' + ); diff --git a/finance-credit-card-chatbot/credit-card-analytics/connectors/sources-testdata.sqrl b/finance-credit-card-chatbot/credit-card-analytics/connectors/sources-testdata.sqrl new file mode 100644 index 00000000..2bf30e63 --- /dev/null +++ b/finance-credit-card-chatbot/credit-card-analytics/connectors/sources-testdata.sqrl @@ -0,0 +1,55 @@ +CREATE TABLE CardAssignment ( + customerId BIGINT NOT NULL, + cardNo STRING NOT NULL, + cardType STRING NOT NULL, + `timestamp` TIMESTAMP_LTZ(3) NOT NULL, + PRIMARY KEY (`customerId`, `cardNo`, `timestamp`) NOT ENFORCED, + WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/card_assignment.jsonl', + 'source.monitor-interval' = '10 min', + 'connector' = 'filesystem' + ); + +CREATE TABLE Merchant ( + merchantId BIGINT NOT NULL, + name STRING NOT NULL, + category STRING NOT NULL, + updatedTime TIMESTAMP_LTZ(3) NOT NULL, + PRIMARY KEY (`merchantId`, `updatedTime`) NOT ENFORCED, + WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/merchant.jsonl', + 'source.monitor-interval' = '10 min', + 'connector' = 'filesystem' + ); + +CREATE TABLE MerchantReward ( + merchantId BIGINT NOT NULL, + rewardsByCard ARRAY> NOT NULL, + updatedTime TIMESTAMP_LTZ(3) NOT NULL, + PRIMARY KEY (`merchantId`, `updatedTime`) NOT ENFORCED, + WATERMARK FOR `updatedTime` AS `updatedTime` - INTERVAL '1' SECOND +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/merchant_reward.jsonl', + 'source.monitor-interval' = '10 min', + 'connector' = 'filesystem' + ); + +CREATE TABLE Transaction ( + transactionId BIGINT NOT NULL, + cardNo STRING NOT NULL, + `time` TIMESTAMP_LTZ(3) NOT NULL, + amount DOUBLE NOT NULL, + merchantId BIGINT NOT NULL, + PRIMARY KEY (`transactionId`, `time`) NOT ENFORCED, + WATERMARK FOR `time` AS `time` - INTERVAL '1' SECOND +) WITH ( + 'format' = 'flexible-json', + 'path' = '${DATA_PATH}/transaction.jsonl', + 'source.monitor-interval' = '10 min', + 'connector' = 'filesystem' + ); \ No newline at end of file diff --git a/finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/card_assignment.jsonl b/finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/card_assignment.jsonl similarity index 100% rename from finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/card_assignment.jsonl rename to finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/card_assignment.jsonl diff --git a/finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/merchant.jsonl b/finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/merchant.jsonl similarity index 100% rename from finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/merchant.jsonl rename to finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/merchant.jsonl diff --git a/finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/merchant_reward.jsonl b/finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/merchant_reward.jsonl similarity index 100% rename from finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/merchant_reward.jsonl rename to finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/merchant_reward.jsonl diff --git a/finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/transaction.jsonl b/finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/transaction.jsonl similarity index 100% rename from finance-credit-card-chatbot/credit-card-analytics/creditcard-testdata/transaction.jsonl rename to finance-credit-card-chatbot/credit-card-analytics/connectors/test-data/transaction.jsonl diff --git a/finance-credit-card-chatbot/credit-card-analytics/creditcard_analytics.sqrl b/finance-credit-card-chatbot/credit-card-analytics/creditcard_analytics.sqrl index 03731cf5..e7955061 100644 --- a/finance-credit-card-chatbot/credit-card-analytics/creditcard_analytics.sqrl +++ b/finance-credit-card-chatbot/credit-card-analytics/creditcard_analytics.sqrl @@ -1,16 +1,15 @@ /* Import Data */ -IMPORT creditcard-{{variant}}.merchant AS _MerchantStream; -IMPORT creditcard-{{variant}}.card_assignment AS _CardAssignmentStream; -IMPORT creditcard-{{variant}}.transaction AS _Transaction; +IMPORT connectors.sources-{{variant}} AS sources; /* Deduplicate CDC Streams */ -_Merchant := DISTINCT _MerchantStream ON merchantId ORDER BY updatedTime DESC; -_CardAssignment := DISTINCT _CardAssignmentStream ON cardNo ORDER BY `timestamp` DESC; +_Merchant := DISTINCT sources.Merchant ON merchantId ORDER BY updatedTime DESC; +_CardAssignment := DISTINCT sources.CardAssignment ON cardNo ORDER BY `timestamp` DESC; /** Enrich credit card transactions with customer and merchant information */ +/*+no_query */ CustomerTransaction := SELECT t.transactionId, t.cardNo, t.`time`, t.amount, m.name AS merchantName, m.category, c.customerId - FROM _Transaction t + FROM sources.Transaction t JOIN _CardAssignment FOR SYSTEM_TIME AS OF t.`time` c ON t.cardNo = c.cardNo JOIN _Merchant FOR SYSTEM_TIME AS OF t.`time` m ON t.merchantId = m.merchantId; @@ -29,12 +28,14 @@ _SpendingByDay := SELECT customerId, window_time as timeDay, SUM(amount) as spen /* ==== QUERY ENDPOINTS ==== */ -/** Returns all credit card transactions within a specified time period ordered by time (most recent first) */ +/** Returns all credit card transactions since fromTime (inclusive) and until toTime (exclusive) for the given customer showing most recent transactions first. + fromTime and toTime must be RFC-3339 compliant date time scalar. Both must be the start of a day, e.g. 2024-01-19T00:00:00-00:00. */ Transactions(customerId BIGINT NOT NULL, fromTime TIMESTAMP NOT NULL, toTime TIMESTAMP NOT NULL) := SELECT * FROM CustomerTransaction WHERE customerId = :customerId AND :fromTime <= `time` AND :toTime > `time` ORDER BY `time` DESC LIMIT 10000; -/* Returns the total customer spending by day for the specified time period ordered by time (most recent first) */ +/* Returns the total customer spending by day since fromTime (inclusive) and until toTime (exclusive) ordered by time (most recent first) + fromTime and toTime must be RFC-3339 compliant date time scalar. Both must be the start of a day, e.g. 2024-01-19T00:00:00-00:00. */ SpendingByDay(customerId BIGINT NOT NULL, fromTime TIMESTAMP NOT NULL, toTime TIMESTAMP NOT NULL) := SELECT timeDay, spending FROM _SpendingByDay WHERE customerId = :customerId AND :fromTime <= timeDay AND :toTime > timeDay