Distribute Runtime Filter Aggregation
The process of aggregating runtime filters at runtime can exert a substantial memory load on the coordinator. In response to this challenge, we initially introduced local aggregation of runtime filters within a single executor node, aiming to reduce the strain on the coordinator by transmitting filter updates only after local aggregation.
As Impala clusters scale up, the limitations of local filter aggregation become apparent, particularly in scenarios involving numerous nodes. This situation places significant memory stress on the coordinator node.
To mitigate this challenge, we have implemented a solution that distributes the runtime filter aggregation across specific Impala backends. The final aggregation takes place after this distributed aggregation, and the results are then published to the coordinator. This strategic distribution effectively alleviates the memory pressure on the coordinator, resulting in a more streamlined and accelerated runtime filter aggregation process.
The control mechanism for this enhancement lies in the query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST. This option determines the number of executor nodes denoted as M, which are randomly selected as intermediate aggregators for the runtime filter. This approach optimizes the runtime filter aggregation process and contributes to overall system performance.
M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST)
Where N is the number of all backend executors running the query, excluding the coordinator.
For instance, in a cluster of 400 nodes, each coordinator and intermediate aggregator would be configured to handle 20 filter updates. By aligning the MAX_NUM_FILTERS_AGGREGATED_PER_HOST with the square root of the cluster size, this approach aims to strike a balance and improve the runtime filter aggregation process, particularly in larger-scale deployments.