Building a Materialized View for Orders with TapFlow
Main steps:
-
Install TapFlow’s Python SDK and CLI
-
Configure TapData Cluster connection details
-
Use TapFlow’s commands and APIs to create a Flow to build real-time materialized view
-
Run the Flow
Detailed Steps:
Step 1: Install Tap Shell, a Python SDK and Interactive Command-Line Interface for TapFlow
# prerequisites: install python3 & pip3 before install tapshell
# Install TapShell using Pip
maximus@Reid:~/home pip3 install tapflow
Step 2: Start and Configure Tap Shell
# Enter tapcli directory and Type tap and press enter button
maximus@Reid:~/ tap
Mon Nov 4 12:34:48 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip ! Tap Flow requires TapData Live Data Platform(LDP) cluster to run. If you would like to use with TapData Enterprise or TapData Community, type L to continue. If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue. Please type L or C (L/[C]): C You may obtain the keys by log onto TapData Cloud, and click: ‘User Center’ on the top right, then copy & paste the accesskey and secret key pair.
Enter AK: xxxxxxxxxxxxxxxxxxx
Enter SK: xxxxxxxxxxxxxxxxxxx
Mon Oct 21 15:53:50 CST 2024 connecting remote server: https://cloud.tapdata.net … Mon Oct 21 15:53:50 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip ! ======================================================================================================================== TapData Cloud Service Running Agent: 1 Agent name: agent-jk6453h (Machine), ip: 192.168.1.11, cpu usage: 40%
tap >
# If you’re using TapData Enterprise then type L, please provide the server URL with port and access code, for example: 192.18.108.1:13030 && 123e4567-e89b-12d3-a456-426614174000. You can find the access code by logging into the TapData Enterprise platform, then navigating to Account Settings Mon Nov 4 12:34:48 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip ! Tap Flow requires TapData Live Data Platform(LDP) cluster to run. If you would like to use with TapData Enterprise or TapData Community, type L to continue. If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue. Please type L or C (L/[C]): L Please enter server:port of TapData LDP
server: 127.0.0.1:3030
Please enter access code: xxxxxxxxxxxxxxxxxxxxxxxxxx
Mon Oct 21 11:26:55 CST 2024 connecting remote server: you 127.0.0.1:3030 … Mon Oct 21 11:26:55 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip ! tap >
Step 3: Start Building Materialized View
Step 3.1: Set Up Connection with Source databases.
# Connect with Source Database Mysql
tap > mysqlJsonConfig = { ‘database’: ‘Demo’, ‘port’: 3306, ‘host’: ‘demo.tapdata.io’, ‘username’: ‘demouser’, ‘password’: ‘demopass’ };
tap > mysqlConn = DataSource(‘mysql’, ‘qa-mySqlEcommerceData’, mysqlJsonConfig) .type(‘source’) .save();
datasource qa-mySqlEcommerceData creating, please wait… save datasource qa-mySqlEcommerceData success, will load schema, please wait… load schema status: finished
Upon successful signup on https://view.tapdata.io/, TapFlow automatically provisions a managed MongoDB Atlas instance for the user. This instance, referenced by the DEFAULT_SINK variable, serves as the destination for materialized views or tables created from source databases
Step 3.2: Create data pipeline to build wide order data model
# Create the flow and set the base or master table “ecom_orders”
tap> orderFlow = Flow(“Order_SingleView_Sync”).read_from(“qa-mySqlEcommerceData.ecom_orders”);
Flow updated: source added
# Lookup and add the ‘ecom_customers’ table as an embedded document in ‘orders’ using customer_id as the association key.cIn MongoDB, path=”customer_info”, embeds it with the field name customer_info, and type=”object”, indicating it will be stored as an embedded document.
tap> orderFlow.lookup(“qa-mySqlEcommerceData.ecom_customers”, path=”customer_info”, type=”object”, relation=[[“customer_id”,”customer_id”]]);
Flow updated: source added Flow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3eb64d66e0> added as child table
# Lookup and add the ‘ecom_order_payments’ table as an embedded array in ‘orders’ using order_id as the association key. In MongoDB, path=”orderPayments” embeds it with the field name order_payments, and type=”array”, indicating it will be stored as an embedded array.
tap> orderFlow.lookup(“qa-mySqlEcommerceData.ecom_order_payments”, path=”order_payments”, type=”array”, relation=[[“order_id”,”order_id”]]);
Flow updated: source added Flow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3eb6723e50> added as child table
# Lookup and add the ‘ecom_order_items’ table as an embedded array in ‘orders’ using order_id as the association key. In MongoDB, path=”order_items,” embeds it with the field name order_items, and type=”array”, indicating it will be stored as an embedded array.
tap> orderFlow.lookup(“qa-mySqlEcommerceData.ecom_order_items”, path=”order_items”, type=”array”, relation=[[“order_id”,”order_id”]]);
Flow updated: source added Flow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3eb6864160> added as child table
# Lookup and add the ‘ecom_products’ table as embedded document in ‘order_itmes’ using product_id as association key.
tap> orderFlow.lookup(“qa-mySqlEcommerceData.ecom_products”, path=”order_items.product”, type=”object”, relation=[[“order_items.product_id”,”product_id”]]);
Flow updated: source added Flow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3e4573e50> added as child table
# Lookup and add the ‘ecom_sellers’ table as embedded document in ‘order_itmes’ using seller_id as association key.
tap>orderFlow.lookup(“qa-mySqlEcommerceData.ecom_sellers”, path=”order_items.seller”, type=”object”, relation=[[“order_items.seller_id”,”seller_id”]]);
Flow updated: source added Flow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3y94853e50> added as child table
# Write data to target Monogdb
tap> orderFlow.write_to(DEFAULT_SINK.orderSingleView).save();
# Start the pipline tap> orderFlow.start();
# view Flow stats
tap> stats ‘Order_SingleView_Sync’