项目作者: trdtnguyen

项目描述 :
Springboard guided capstone project
高级语言: Python
项目地址: git://github.com/trdtnguyen/sb-guided-capstone.git
创建时间: 2020-11-09T04:27:21Z
项目社区:https://github.com/trdtnguyen/sb-guided-capstone

开源协议:GNU General Public License v3.0

下载


Equity Market Data Analysis

This project implement a basic ETL pipeline used to acquire daily trade prices and quotes from stock market and analytic them based on the difference of the close prices of the previous day and the current day.

The project exploits Spark for extracting data source, cleaning, and transforming raw data to meaningful data that ready for analytic.

This is a guided project from Springboard’s Data Engineering career track.

Project Setup

Requirements

Standalone project without container:

  • postgresql installed
  • python packages in requirements.txt
  • Spark installed and properly configued

Running on docker container

  • docker and docker-compose installed

Database setup

  • Create database
    1. $ createdb -h localhost -p 5432 -U postgres sbstock
  • Create table

    1. $ psql -h localhost -U postgres -d sbstock -f sql/create_table.sql
  • JDBC connector setup
    This project has already provided the jar file for JDBC connector for Postgresql. In case you want to download a newer version than the provided one, follow step 1. Otherwise, skip step 1.

  • Step 1: Download JDBC connector for postgresql
    1. $ cd jars
    2. $ wget https://jdbc.postgresql.org/download/postgresql-42.2.18.jar
  • Step 2: Copy JDBC jar file to your spark jars
    1. $ cp postgresql-42.2.18.jar $SPARK_HOME/jars/

Data Model

Trade data (source)

Column Type
Trade Date Date
Record Type Varchar(1)
Symbol String
Execution ID String
Event Time Timestamp
Event Sequence Number Int
Exchange String
Trade Price Decimal
Trade Size Int

Quote data (source)

Column Type
Trade Date Date
Record Type Varchar(1)
Symbol String
Event Time Timestamp
Event Sequence Number Int
Exchange String
Bid Price Decimal
Bid Size Int
Ask Price Decimal
Ask Size Int

Note:

  • The bid price refers to the highest price a buyer will pay for a security.
  • The ask price refers to the lowest price a seller will accept for a security.

For example, company A wants to purchase 1,000 shares of XYZ stock at $10, and company B wants to sell 1,500 shares of XYZ at $10.25. The spread is the difference between the asking price of $10.25 and the bid price of $10, or 25 cents. An individual investor looking at this spread would then know that, if they want to sell 1,000 shares, they could do so at $10 by selling to A. Conversely, the same investor would know that they could purchase 1,500 shares from B at $10.25.

Common Event (dimentional)

Data models in Trade and Quote share some common columns. We combine them into one data model called Common Event as below:
Column | Type | Note
———-|——-|———-
trade_dt| Date | Trade Date (common)
rec_type | String | Record Type, Trade is ‘T’, Quote is ‘Q’ (common)
symbol | String | Symbol (common)
exchange | String | Exchange name (common)
event_tm | Timestamp | Event time (common)
event_seq_nb | Integer | Event sequence number (common)
arrival_tm | Timestamp | Arrival time, get in digestion stage
trade_price | Decimal | Trade Price (get from Trade)
trade_size | Integer | Trase size (get from Trade)
bid_price | Decimal | Bid Price (get from Quote)
bid_size | Integer | Bid Size (get from Quote)
ask_price | Decimal | Ask Price (get from Quote)
ask_size | Integer | Ask Price (get from Quote)
partition | String | ‘T’ for Trade records, ‘Q’ for quote records, ‘B’ for bad format record

Data Sources

This project extract data from stock trades and stock quotes data source on MS Azure with two format CSV and JSON. A sample CSV file would look like:

  1. #TradeDate,RecordType,Symbol,ExecutionID,EventTime,EventSeqNum,Exchange,TradePrice,TradeSize
  2. 2020-01-01,T,Symbol1,1,2020-11-23,1,Exchange1, 59.970001,12
  3. 2020-01-01,T,Symbol2,2,2020-11-23,2,Exchange1, 134.213,120
  4. 2020-01-01,T,Symbol3,3,2020-11-23,3,Exchange2, 10.4712,1301
  5. 2020-01-01,Q,Symbol1,2020-11-23,4,Exchange1, 61,1301, 20, 58.231, 17
  6. 2020-01-01,Q,Symbol2,2020-11-23,4,Exchange1, 61,1301, 20, 58.231, 17
  7. 2020-01-01,Q,Symbol2,2020-11-23,4,Exchange1, 61,1301, 20,

Notes

  • Different record types (T for trade and Q for quote) have different line format.
  • Trade row and quote row must share comon values on exchange column. In other words, intersect between exchange values in trade rows and exchange values in quote rows must be a non-empty set.

Digestion Process

This is the firs step in the project to extract data from datasources, cleaning and transform the data to parquet files using Apache Spark. The details are:

  • Although trade data and quote data have different format, they share common columns such as TradeDate, RecordType, Symbol, etc. So we create a dimentional table CommonEvent to combine both data type into one table.
  • Implement CommonEvent class in Python that represent the CommonEvent table. We use this module to capture the extracted data during digestion process.
  • Implement Extract module to extract data from CSV, JSON format to parquet files and locate those files in partition in HDFS.

The simple code for digesting CSV file is:

  1. def extract_csv(self, filepath:str, config_key:str, config_value:str):
  2. self.spark.conf.set(config_key, config_value)
  3. raw = self.spark.sparkContext.textFile(filepath)
  4. #raw = self.spark.read.csv(filepath, comment='#')
  5. parsed = raw.map(lambda line: parse_csv(line))
  6. data = self.spark.createDataFrame(parsed)
  7. data.write.partitionBy('partition').mode('overwrite').parquet('output_dir')

and extract json file:

  1. def extract_json(self, filepath:str, config_key:str, config_value:str):
  2. self.spark.conf.set(config_key, config_value)
  3. raw = self.spark.read.json(filepath)
  4. parsed = raw.map(lambda line: parse_json(line))
  5. data = self.spark.createDataFrame(parsed)
  6. data.write.partitionBy('partition').mode('overwrite').parquet('output_dir')

To test the digestion process run:

  1. $ python controllers/Extract.py

As the results, parquet files are created in output directory as:

  1. output_dir/
  2. ├── partition=B
  3. └── part-00000-356a74e9-e5e0-4a44-9e37-ea25f28af432.c000.snappy.parquet
  4. ├── partition=Q
  5. └── part-00000-356a74e9-e5e0-4a44-9e37-ea25f28af432.c000.snappy.parquet
  6. ├── partition=T
  7. └── part-00000-356a74e9-e5e0-4a44-9e37-ea25f28af432.c000.snappy.parquet
  8. └── _SUCCESS

Developer Note:

  • In extract_csv() funciton, we use .map() funciton of RDD with lambda function syntax that apply parse_csv(line) for each item in the raw RDD. The return type of parse_csv() is CommonEvent object. The parsed variable has type as PipelineRDD.
  • We then convert PipelineRDD to DataFrame using spark.createDataFrame(parsed). The output of data.printSchema() would be:
  1. root
  2. |-- arrival_time: timestamp (nullable = true)
  3. |-- ask_price: double (nullable = true)
  4. |-- ask_size: long (nullable = true)
  5. |-- bid_price: double (nullable = true)
  6. |-- bid_size: long (nullable = true)
  7. |-- event_seq_num: long (nullable = true)
  8. |-- event_time: timestamp (nullable = true)
  9. |-- exchange: string (nullable = true)
  10. |-- original_line: string (nullable = true)
  11. |-- partition: string (nullable = true)
  12. |-- rec_type: string (nullable = true)
  13. |-- symbol: string (nullable = true)
  14. |-- trade_dt: timestamp (nullable = true)
  15. |-- trade_price: double (nullable = true)
  16. |-- trade_size: long (nullable = true)
  • The output of data.show() would be:
    ```
    +——————————+————-+————+————-+————+——————-+—————————-+————-+——————————+————-+————+———-+—————————-+—————-+—————+
    | arrival_time|ask_price|ask_size|bid_price|bid_size|event_seq_num| event_time| exchange| original_line|partition|rec_type| symbol| trade_dt|trade_price|trade_size|
    +——————————+————-+————+————-+————+——————-+—————————-+————-+——————————+————-+————+———-+—————————-+—————-+—————+
    | null| null| null| null| null| null| null| null|#TradeDate,Record…| B| null| null| null| null| null|
    |2020-12-07 13:10:…| 0.0| 0| 0.0| 0| 1|2020-11-23 00:00:00|Exchange1| | T| T|Symbol1|2020-01-01 00:00:00| 59.970001| 12|
    |2020-12-07 13:10:…| 0.0| 0| 0.0| 0| 2|2020-11-23 00:00:00|Exchange1| | T| T|Symbol2|2020-01-01 00:00:00| 134.213| 120|
    |2020-12-07 13:10:…| 0.0| 0| 0.0| 0| 3|2020-11-23 00:00:00|Exchange2| | T| T|Symbol3|2020-01-01 00:00:00| 10.4712| 1301|
    |2020-12-07 13:10:…| 58.231| 17| 61.1301| 20| 4|2020-11-23 00:00:00|Exchange1| | Q| Q|Symbol1|2020-01-01 00:00:00| 0.0| 0|
    |2020-12-07 13:10:…| 58.231| 17| 61.1301| 20| 4|2020-11-23 00:00:00|Exchange1| | Q| Q|Symbol2|2020-01-01 00:00:00| 0.0| 0|
    | null| null| null| null| null| null| null| null|2020-01-01,Q,Symb…| B| null| null| null| null| null|
    +——————————+————-+————+————-+————+——————-+—————————-+————-+——————————+————-+————+———-+—————————-+—————-+—————+
  1. One interesting observation is that when using `map()` with the function paramenter return the object, Spark RDD automatically creates the header based on attribute names in our `CommonEvent` class in ascending alphabet order.
  2. ## End-of-day Data Load (Cleaning)
  3. In the previous stage, digestion process keep update in-day data into parquet files located in temporary locations i.e., `output_dir`. This stage load and clean data as following steps:
  4. * Read parquet files from temporary location
  5. * Select the necessary columns for `trade` and `quote` records
  6. * Apply data correction i.e., keep only the latest price and remove the older one.
  7. * Write the dataset back to parquet files on Azure Blob Storage
  8. * ***Input***: parquet files in `output_dir` partitioned by type (`T`, `Q`, 'B`)
  9. * ***Output***: parquet files in `trade` or `quote` directory for each day (e.g., `trade/trade-dt=2020-12-20`)
  10. ### Why applying data correction is needed?
  11. Trade and quote data are updated periodically in a day by Exchange. For each row, the composite key is `trade_dt`, `symbol`, `exchange`, `event_tm`, `event_sq_num`. During the day, the Exchange could update data with the same composite key to correct the previous one. So in our digested data, there are rows with the same composite key but different prices.
  12. ### How to handle duplicate composite key records?
  13. * Choose `arrival_time` as the ***unique key*** in `CommonEvent` table.
  14. ```python
  15. #Read Trade Partition Dataset from temporary location
  16. trade_common = self.spark.read.parquet(filepath)
  17. # select necessary of trade records.
  18. trade_df = trade_common.select("arrival_time", "trade_dt", "symbol", "exchange", "event_time", "event_seq_num", "trade_price", "trade_size")
  • Sort the records by arrivel_time and aggregate duplicated composite key rows into a group using pyspark.sql.functions.collect_set()

    1. # composite key: trade_dt, symbol, exchange, event_time, event_seq_num
    2. trade_grouped_df = trade_df.orderBy("arrival_time") \
    3. .groupBy("trade_dt", "symbol", "exchange", "event_time", "event_seq_num") \
    4. .agg(F.collect_set("arrival_time").alias("arrival_times"), F.collect_set("trade_price").alias("trade_prices"), F.collect_set("trade_size").alias("trade_sizes")
  • Make a new column by selecting the first item in each aggregated group in the previous step. This column is the latest price we need to keep, the remains are discard.

  1. trade_removed_dup_df = trade_grouped_df \
  2. .withColumn("arrival_time", F.slice(trade_grouped_df["arrival_times"], 1, 1)[0]) \
  3. .withColumn("trade_price", F.slice(trade_grouped_df["trade_prices"], 1, 1)[0]) \
  4. .withColumn("trade_size", F.slice(trade_grouped_df["trade_sizes"], 1, 1)[0])
  • Discard unnessary columns

    1. trade_final_df = trade_removed_dup_df \
    2. .drop(F.col("arrival_times")) \
    3. .drop(F.col("trade_prices")) \
    4. .drop(F.col("trade_sizes"))

    Analytical ETL

    In the previous stage, cleaning process removed unnecessary data, corrupted data and older data in a daily basic and save data in trade and quote directory in Azure Blob Storage. This stage loads data from the previous step into Spark and transform the data as following steps:

  • Step 1: Load trade data for the current day into a temp view v1.
    An sample output of v1:

  1. arrival_time,trade_dt,symbol,exchange,event_time,event_seq_num,trade_price,trade_size
  2. "2020-12-20 12:00:00","2020-12-20 00:00:00",ABC,Exchange1,"2020-12-20 10:00:00",1,20.12,120
  3. "2020-12-20 13:00:00","2020-12-20 00:00:00",ABC,Exchange1,"2020-12-20 10:10:00",2,22.32,130
  4. "2020-12-20 14:00:00","2020-12-20 00:00:00",ABC,Exchange1,"2020-12-20 10:20:00",3,21.2,125
  5. "2020-12-20 15:00:00","2020-12-20 00:00:00",ABC,Exchange1,"2020-12-20 10:30:00",4,25.12,100
  6. "2020-12-20 12:00:00","2020-12-20 00:00:00",DEF,Exchange1,"2020-12-20 10:00:00",1,20.12,120
  7. "2020-12-20 13:00:00","2020-12-20 00:00:00",DEF,Exchange1,"2020-12-20 10:10:00",2,20.12,120
  8. "2020-12-20 14:00:00","2020-12-20 00:00:00",DEF,Exchange1,"2020-12-20 10:20:00",3,21.12,125
  9. "2020-12-20 15:00:00","2020-12-20 00:00:00",DEF,Exchange1,"2020-12-20 10:30:00",4,21.12,125
  10. "2020-12-21 12:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:00:00",1,24.12,127
  11. "2020-12-21 13:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:10:00",2,21.21,125
  12. "2020-12-21 14:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:20:00",3,23.11,125
  13. "2020-12-21 15:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:30:00",4,21.12,125
  14. "2020-12-21 12:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:00:00",1,26.12,125
  15. "2020-12-21 13:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:10:00",2,25.12,121
  16. "2020-12-21 14:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:20:00",3,22.12,120
  17. "2020-12-21 15:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:30:00",4,21.12,125
  • Step 2: Aggreates trade prices within 30-minutes sliding windows from v1 and save the result in a temp table t1. The t1 columns are: symbol, exchange, event_time, event_seq_num, trade_price, running_avg. running_avg is the average trade prices within a 30-minutes window.

SQL statement:

  1. DROP TABLE IF EXISTS moving_avg_tb;
  2. CREATE TEMPORARY TABLE moving_avg_tb AS
  3. SELECT trade_dt,
  4. symbol,
  5. exchange,
  6. event_time,
  7. event_seq_num,
  8. trade_price,
  9. AVG(trade_price) OVER (PARTITION BY symbol ORDER BY event_time
  10. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS running_avg
  11. FROM trade
  12. WHERE trade_dt = '2020-12-21'
  13. ;

An sample output of t1:

  1. trade_dt,symbol,exchange,event_time,event_seq_num,trade_price,running_avg
  2. "2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:00:00",1,24.12,24.1200008392334
  3. "2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:10:00",2,21.21,22.664999961853027
  4. "2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:20:00",3,23.11,22.81333351135254
  5. "2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:30:00",4,21.12,21.81333351135254
  6. "2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:00:00",1,26.12,26.1200008392334
  7. "2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:10:00",2,25.12,25.6200008392334
  8. "2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:20:00",3,22.12,24.45333417256673
  9. "2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:30:00",4,21.12,22.786667505900066
  • Step 3: Repeat step 1 and step 2 for the day before of the current day and save the result in a temp table t2.

SQL statements:

  1. DROP TABLE IF EXISTS last_moving_avg_tb;
  2. CREATE TEMPORARY TABLE last_moving_avg_tb AS
  3. SELECT symbol,
  4. exchange,
  5. event_time,
  6. event_seq_num,
  7. trade_price,
  8. AVG(trade_price) OVER (PARTITION BY symbol ORDER BY event_time
  9. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS avg_pr
  10. FROM trade
  11. WHERE trade_dt = '2020-12-20'
  12. ;

The output:

  1. symbol,exchange,event_time,event_seq_num,trade_price,last_pr
  2. ABC,Exchange1,"2020-12-20 10:30:00",4,21.12,20.786667505900066
  3. ABC,Exchange1,"2020-12-20 10:20:00",3,21.12,20.45333417256673
  4. ABC,Exchange1,"2020-12-20 10:10:00",2,20.12,20.1200008392334
  5. ABC,Exchange1,"2020-12-20 10:00:00",1,20.12,20.1200008392334
  6. DEF,Exchange1,"2020-12-20 10:30:00",4,21.12,20.786667505900066
  7. DEF,Exchange1,"2020-12-20 10:20:00",3,21.12,20.45333417256673
  8. DEF,Exchange1,"2020-12-20 10:10:00",2,20.12,20.1200008392334
  9. DEF,Exchange1,"2020-12-20 10:00:00",1,20.12,20.1200008392334

Next, get the last avg moving price for each partition of symbol

  1. DROP TABLE IF EXISTS last_moving_avg_tb;
  2. CREATE TEMPORARY TABLE last_moving_avg_tb AS
  3. SELECT symbol,
  4. exchange,
  5. event_time,
  6. event_seq_num,
  7. trade_price,
  8. LAST_VALUE(avg_pr)
  9. OVER (PARTITION BY symbol
  10. ORDER BY event_time
  11. RANGE BETWEEN
  12. UNBOUNDED PRECEDING AND
  13. UNBOUNDED FOLLOWING
  14. ) AS last_pr
  15. FROM prev_moving_avg_tb
  16. ;

The output:

  1. symbol,exchange,event_time,event_seq_num,trade_price,last_pr
  2. ABC,Exchange1,"2020-12-20 10:00:00",1,20.12,22.880000432332356
  3. ABC,Exchange1,"2020-12-20 10:10:00",2,22.32,22.880000432332356
  4. ABC,Exchange1,"2020-12-20 10:20:00",3,21.2,22.880000432332356
  5. ABC,Exchange1,"2020-12-20 10:30:00",4,25.12,22.880000432332356
  6. DEF,Exchange1,"2020-12-20 10:00:00",1,20.12,20.786667505900066
  7. DEF,Exchange1,"2020-12-20 10:10:00",2,20.12,20.786667505900066
  8. DEF,Exchange1,"2020-12-20 10:20:00",3,21.12,20.786667505900066
  9. DEF,Exchange1,"2020-12-20 10:30:00",4,21.12,20.786667505900066
  • Step 4: Load quote data for the current day into a temp view v2. Remind that the column in v2 are: arrival_time, trade_dt, symbol, exchange, event_time, event_seq_num, bid_price, bid_size, ask_price, ask_size.
  1. SELECT * FROM quote WHERE trade_dt = '2020-12-21';

The output:

  1. arrival_time,trade_dt,symbol,exchange,event_time,event_seq_num,bid_price,bid_size,ask_price,ask_size
  2. "2020-12-21 12:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:00:00",1,20.12,120,21.57,110
  3. "2020-12-21 13:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:10:00",2,20.12,120,20.57,111
  4. "2020-12-21 14:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:20:00",3,21.12,125,19.57,113
  5. "2020-12-21 15:00:00","2020-12-21 00:00:00",ABC,Exchange1,"2020-12-21 10:30:00",4,21.12,125,19.57,113
  6. "2020-12-21 12:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:00:00",1,20.12,120,21.57,110
  7. "2020-12-21 13:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:10:00",2,20.12,120,20.57,111
  8. "2020-12-21 14:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:20:00",3,21.12,125,19.57,113
  9. "2020-12-21 15:00:00","2020-12-21 00:00:00",DEF,Exchange1,"2020-12-21 10:30:00",4,21.12,125,19.57,113
  • Union t1 and v2 with the common schema:
Column Value
trade_dt Value from corresponding records
rec_type “Q” for quotes, “T” for trades
symbol Value from corresponding records
event_tm Value from corresponding records
event_seq_nb From quotes, null for trades
exchange Value from corresponding records
bid_pr From quotes, null for trades
bid_size From quotes, null for trades
ask_pr From quotes, null for trades
ask_size From quotes, null for trades
trade_pr From trades, null for quotes
mov_avg_pr From trades, null for quotes
  1. DROP TABLE IF EXISTS quote_union;
  2. CREATE TABLE IF NOT EXISTS quote_union(
  3. trade_dt datetime,
  4. rec_type varchar(8),
  5. symbol varchar(8),
  6. event_time datetime,
  7. event_seq_num int NULL,
  8. exchange varchar(32),
  9. bid_price float NULL,
  10. bid_size int NULL,
  11. ask_price float NULL,
  12. ask_size int NULL,
  13. trade_price float NULL,
  14. mov_avg_pr float NULL
  15. );
  16. INSERT INTO quote_union (
  17. SELECT trade_dt, 'T', symbol, event_time, NULL, exchange, NULL, NULL, NULL, NULL, trade_price, running_avg
  18. FROM moving_avg_tb
  19. UNION
  20. SELECT trade_dt, 'Q', symbol, event_time, event_seq_num, exchange, bid_price, bid_size, ask_price, ask_size, NULL, NULL
  21. FROM quote
  22. WHERE trade_dt = '2020-12-21'
  23. )
  24. ;

The output:

  1. trade_dt,rec_type,symbol,event_time,event_seq_num,exchange,bid_price,bid_size,ask_price,ask_size,trade_price,mov_avg_pr
  2. "2020-12-21 00:00:00",T,ABC,"2020-12-21 10:30:00",NULL,Exchange1,NULL,NULL,NULL,NULL,21.12,21.8133
  3. "2020-12-21 00:00:00",T,ABC,"2020-12-21 10:20:00",NULL,Exchange1,NULL,NULL,NULL,NULL,23.11,22.8133
  4. "2020-12-21 00:00:00",T,ABC,"2020-12-21 10:10:00",NULL,Exchange1,NULL,NULL,NULL,NULL,21.21,22.665
  5. "2020-12-21 00:00:00",T,ABC,"2020-12-21 10:00:00",NULL,Exchange1,NULL,NULL,NULL,NULL,24.12,24.12
  6. "2020-12-21 00:00:00",T,DEF,"2020-12-21 10:30:00",NULL,Exchange1,NULL,NULL,NULL,NULL,21.12,22.7867
  7. "2020-12-21 00:00:00",T,DEF,"2020-12-21 10:20:00",NULL,Exchange1,NULL,NULL,NULL,NULL,22.12,24.4533
  8. "2020-12-21 00:00:00",T,DEF,"2020-12-21 10:10:00",NULL,Exchange1,NULL,NULL,NULL,NULL,25.12,25.62
  9. "2020-12-21 00:00:00",T,DEF,"2020-12-21 10:00:00",NULL,Exchange1,NULL,NULL,NULL,NULL,26.12,26.12
  10. "2020-12-21 00:00:00",Q,ABC,"2020-12-21 10:00:00",1,Exchange1,20.12,120,21.57,110,NULL,NULL
  11. "2020-12-21 00:00:00",Q,ABC,"2020-12-21 10:10:00",2,Exchange1,20.12,120,20.57,111,NULL,NULL
  12. "2020-12-21 00:00:00",Q,ABC,"2020-12-21 10:20:00",3,Exchange1,21.12,125,19.57,113,NULL,NULL
  13. "2020-12-21 00:00:00",Q,ABC,"2020-12-21 10:30:00",4,Exchange1,21.12,125,19.57,113,NULL,NULL
  14. "2020-12-21 00:00:00",Q,DEF,"2020-12-21 10:00:00",1,Exchange1,20.12,120,21.57,110,NULL,NULL
  15. "2020-12-21 00:00:00",Q,DEF,"2020-12-21 10:10:00",2,Exchange1,20.12,120,20.57,111,NULL,NULL
  16. "2020-12-21 00:00:00",Q,DEF,"2020-12-21 10:20:00",3,Exchange1,21.12,125,19.57,113,NULL,NULL
  17. "2020-12-21 00:00:00",Q,DEF,"2020-12-21 10:30:00",4,Exchange1,21.12,125,19.57,113,NULL,NULL