Partitioning
From New Wiki
Contents |
The General Approach
The proposed approach is based on a two-phase processing of the workload and of the data.
The first phase is (ideally) completely agnostic of the schema, and access pattern. And is based on the following steps:
Given a workload:
- Log every statement issued in every transaction
- Log the ids of every tuple touched by any transaction (can be performed at the query level as well)
- Build a graph of co-usages where the nodes are the tuples and the edges carry the weight of how many transaction accessed two tuples together.
This graph completely characterized the workload w.r.t., to the co-usages of tuples.
- Apply a general purpose partitioning algorithm such as METIS, PARMETIS, HMETIS, grPartition
- The algorithm produce a partitioning scheme that minimizes the total weight of edges to be cut, i.e., the total number of transaction that need to run in a distributed fashion across multiple partitions.
The partition is now express on a per-tuple based, and despite being very precise and optimal, it cannot be used directly unless:
- most of the statements in the workload always accesses tuples by id
- we accept to maintain very large lookup tables at the query dispatcher level
The second phase of our approach consists in "justifying" the agnostic partitioning produced in phase one, by grounding it into range, hash, look-up on the data, which are useful given the most common access pattern in the workload. More precisely we:
- parse each statement in the workload and build a list of "potential partitioning attributes", by choosing the one used in the WHERE clauses of selects and updates
- compute Pearson Correlation between the values in the data for each these attributes and the partitioning produced in the "first phase"
- filter out those attributes with low correlation (simple threshold is enough for TPC-C traces since the "good" attributes have 80-97% correlation, while others are below 1%)
- for each of the attribute not filtered, run a decision tree (the C4.5 implementation provided by Weka) and generate an partitioning as a range-based set of predicates.
- alternatively if the data are VERY independent (quantitative analysis of the above graph), attempt hash-partitioning or other cheap partitioning functions
- resort to lookup when no range nor partitioning works, all the tuples not-mentioned in the workload trace, are put in a "catch-all" bucket in one of the partitions
Potential of this approach
This approach, by discovering the optimal partitioning in a agnostic way might be capable of optimizing allocation due to hidden properties of the data, e.g., data skews, hidden correlations between products. It thus might be effective where more analytical approaches fail.
Achieve the Scalability of the Graph
There are a bunch of tricks that one can play to make the graph (and its processing) scale-up enough. Here a possible way of wiring them together.
- LOGGING: we perform logging of the SQL statements only, i.e., we do not store the actual tuple-ids touched by each statement but only its SQL form. This make it much easier to use existing query-logging functionalities and will be easy to achieve in our routing component that is already seeing and analyzing every SQL statement entering the system.
- REPLICATION FILTERING: we analyze the ratio between reads/writes for each table, and the cardinality of the tables. For reasonably small, almost-read-only tables (e.g., item table in TPC-C) the system suggest redundant replication instead of partitioning, and prune them form the log to be used for partitioning analysis.
- BLANKET FILTERING: we filter-out infrequent "blanket" statements that touch a very large set of tuples for big scans or similar. Being infrequent they should not contribute much to the quality of the partitioning, but they contribute a lot to the explosion of the graph. (Trade off and attention is required in this step)
- PREPARE GRAPH GENERATION (1): we transform each SQL statement in the log into an equivalent select (easy), so that we can execute them and obtain the corresponding tuple-ids (the DB in the meantime is changed, but the validity of our finding MUST be independent of small local changes, thus running queries on a snapshot / modified DB should not reduce the quality of our results)
- PREPARE GRAPH GENERATION (2): the above step might still produce too many nodes in the graph, therefore it might be modified by building the graph not at the tuple-level but at some group-level, where a group is defined as a set of tuples (that share some property) of the original table. The grouping can be done in various ways among which:
- (Sam idea) observe common access patterns, and group tuples by generating largest ranges "separating" each predicate range found in tracelog This is cool because it captures ALL of the information in the tracelog in a more summarized form, no approximation is introduced, but: 1) it can be expensive to compute, 2) is expensive to build, and 3) might produce too many tuples
- observe common access patterns, and group tuples as GROUP BY of the all the corresponding attributes (cheaper than the above, but might still produce too many tuples)
- an arbitrary set of ranges, which satisfy some final requirement in term of graph size. We can also try multiple tractable ranges selection and maybe combine them smartly, using some mathematical property. Definitely cheap, and produced a fixed number of groups, although it might hide interesting partitioning chance.
- some combination of the above, chosen based on workload characteristics.
- GRAPH GENERATION: execute the transformed statements to log tuple-ids or group-ids and build the co-usage graph
- REMOVE ALMOST-ZERO TUPLES: filter out groups or tuples that has a much-lower than average number of co-accesses. This should influence very little the quality of the partitioning, but might amount to a large number of nodes. This is an extension of the fact that we already ignore all the tuples not "touched" by our trace, where we just "move" the threshold from "0" to some "k" under which we ignore the contribution.
- REDUCE REDUNDANCY: analyze the columns(rows) and "compact" columns(rows) that have identical or almost identical co-usage patterns (since they will probably contribute similarly).
- DOWNSAMPLING: down-sample / summarize the graph if we are still above the "tractable size"
- GRAPH PARTITIONING:run matlab partitioing tools or METIS partitioing tools.
For all the tuples that has been "excluded" by some kind of filtering we will:
- if range partitioning place them according to the range induced by the more important tuples
- if hash partitioning just apply the hash
- if look-up table based partitioning put them in a "catch-all" partition where we place all the tuple used rarely (or never used in our trace)
We can also consider to run steps 3-8 for very large grouping of table so that if there is some self-evident schema paritioning (e.g., 2 groups of tables never used together) we can divide and conquer the problem by splitting the graph in 2 smaller subgraphs and work on them separately.
Statement Logging
Statement logging, corresponding to the step 1 above, can be performed in multiple ways:
- intercepting invocation of the DBMS at the driver/interface level
- enabling statement-logging with the DBMS and parsing corresponding logs (e.g., general query log for MySQL)
- withing the relationalcloud.com prototype by extending the routing component
Replication Filter
This technique, counts the number of reads and writes of each table in the trace, and the total number of tuples for each table. If the table is "small enough" and "read/(read+write)" ratio is high enough it removes the table all together from the partitioning problem, and suggest replication for that table.
Blanket Filter
This technique implements a blanket filtering above and consists of:
A filter processing each of the statement in a workload based on this thresholds:
- a minTableSize, which specify the minimum size of a table for this filter to apply (this is good to exclude very small table that do not contribute too much to the graph size, but that might be relevant for the partitioning, e.g., warehouse)
- a minCountSize, i.e., the bound under which a statement is never filtered (this allows to reduce the expensive global counts)
- a maxCountSize, i.e., a hard bound on the max number a statement should touch not to be consider a blanket statement
- a percentage threshold, i.e., the min selectivity a statement must have not to be consider a blanket statement
Performance:
- cache all the count already seen (exact match)
- use early return if any of the condition are match (order to check them is in the increasing order of cost)
- for TPC-C we get to roughly 1.4ms per statement filtered, which is "reasonable" for a off-line processing of traces in the order of tens or hundreds of thousands of statements.
The blanket filter also collects statistical information on the number of filtered statement, their selectivity, and estimate of the overall number of non-zero element in the matrix, as produced by the non-filtered statements.