PostgreSQLからのcyclo-joinの呼び出し
筑波大学の三橋龍也です。PostgreSQLでは結合処理が存在しますが、まだ発展途上だと考えられます。例えば等結合については並列ハッシュ結合も実装されていません。類似結合(例えばある人のすぐ近くにいる人を探す処理)を行おうとするとNested Loops Joinになりますが、PostgreSQLのNLJの実装は大変遅いので、データサイズが大きくなると非常に時間がかかります。例えばデータサイズが100万件程度の場合、下記のような二次元ユークリッド距離に基づく類似結合を行うと我々の環境では6日くらいかかりました。なお、ここでは簡単のために下記のような問合せを書いていますが、索引を使えない類似性は多数存在し、私の興味はそのような類にあります。
SELECT R.id, S.id FROM R, S WHERE (R.x - S.x)^2 + (R.y - S.y)^2 < threshold;
このような処理を高速化する技法の1つにcyclo-joinという方式があります。これはまずrelation RをR-1からR-N, relation SをS-1からS-Nに分割し、それぞれN台のマシンに配布します。そして各ノードでの照合処理をしましたら、S-iを隣接ノードに移動させます。これをN回繰り返せば照合処理を完了できます。

これを実装してみたら中々高速でした。
Cyclo-joinはPostgreSQLには実装されていません。そこで Nested Loops Join をフックすることを考えました。このフックはざっくり言って ExecInitNestLoop, ExecNestLoop, ExecEndNestLoop から構成されます。ExecInitNestLoop では、TupleArrayの初期化、targetlist/quallist の変換などを行います。ExecNestLoop では、タプルを TupleArray へ読み込み、targetlist/quallist と共にCJ側へ送信し、結果を受け取り、上位オペレーターへpushします。ExecEndNestLoop では、TupleArray とは新しく定義した構造体で、タプルを入れておくためのものです。タプルサイズ(固定長)、タプル数、属性情報(数、オフセットなど)を格納しておきます。これを図で説明すると下記のようになります。つまりデータをPostgreSQLからcyclo join側に送りつけて計算させ、その結果をもらいます。


ExecNestLoopのコードは次のようになります。60台程度をcyclo-joinに用いたところ、処理時間は6日から大幅に高速化されました。詳細はそのうち論文にまとめます。今後はExecNestLoopを直接書き換えるのではなく、ExecutorRun()でフックして多段NestLoopJoinを処理できるようにする予定です。来年度から3年生の授業課題にしたら良いのでは、と思っています。
TupleTableSlot *
ExecNestLoop(NestLoopState *node)
{
NestLoop *nl;
PlanState *innerPlan;
PlanState *outerPlan;
TupleTableSlot *outerTupleSlot;
TupleTableSlot *innerTupleSlot;
// List *joinqual;
List *otherqual;
ExprContext *econtext;
// needed for result
static const int RESERVE_SIZE = sizeof(long);
static pthread_t recv_thread;
static StructForResult *sfr;
static int cur_idx;
static char *cur_ptr;
static char *end_ptr;
static long data_size;
static long remain_size;
static int i, j;
static Datum *values;
static bool *isnull;
static Datum *data;
static TupleTableSlot *slot;
/*
* get information from the node
*/
ENL1_printf("getting info from node");
nl = (NestLoop *) node->js.ps.plan;
otherqual = node->js.ps.qual;
outerPlan = outerPlanState(node);
innerPlan = innerPlanState(node);
econtext = node->js.ps.ps_ExprContext;
/*
* Check to see if we're still projecting out tuples from a previous join
* tuple (because there is a function-returning-set in the projection
* expressions). If so, try to project another one.
*/
if (node->js.ps.ps_TupFromTlist) {
TupleTableSlot *result;
ExprDoneCond isDone;
result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
if (isDone == ExprMultipleResult)
return result;
/* Done with that source tuple... */
node->js.ps.ps_TupFromTlist = false;
}
/*
* Ok, everything is setup for the join so now loop until we return a
* qualifying join tuple.
*/
ENL1_printf("entering main loop");
for (;;) {
/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle. Note this can't happen
* until we're done projecting out tuples from a join tuple.
*/
ResetExprContext(econtext);
switch (CJState) {
case 0:
/* fetch first outer tuple */
outerTupleSlot = ExecProcNode(outerPlan);
readAttsInfoTupleArray(OuterTupleArray, outerTupleSlot);
insertDataTupleArray(OuterTupleArray, outerTupleSlot);
CJState++;
continue;
case 1:
/* fetch first inner tuple */
innerTupleSlot = ExecProcNode(innerPlan);
readAttsInfoTupleArray(InnerTupleArray, innerTupleSlot);
insertDataTupleArray(InnerTupleArray, innerTupleSlot);
CJState++;
continue;
case 2:
/* fill outer table's buffer */
ENL1_printf("getting new outer tuple");
outerTupleSlot = ExecProcNode(outerPlan);
/*
* if there are no more outer tuples, finish to fill outer buffer.
*/
if (TupIsNull(outerTupleSlot)) {
ENL1_printf("no outer tuple, ending fetch");
puts("outer scanned");
CJState++;
}
else {
ENL1_printf("Saving new outer tuple information");
insertDataTupleArray(OuterTupleArray, outerTupleSlot);
}
continue;
case 3:
/* fill inner table's buffer */
ENL1_printf("getting new inner tuple");
innerTupleSlot = ExecProcNode(innerPlan);
/*
* if there are no more outer tuples, finish to fill outer buffer.
*/
if (TupIsNull(innerTupleSlot)) {
ENL1_printf("no iner tuple, ending fetch");
puts("inner scanned");
CJState++;
}
else {
ENL1_printf("saving new inner tuple information");
insertDataTupleArray(InnerTupleArray, innerTupleSlot);
}
continue;
case 4:
/*
* at this point we have a new pair of inner and outer tuples so we
* test the inner and outer tuples to see if they satisfy the node's
* qualification.
*
* Only the joinquals determine MatchedOuter status, but all quals
* must pass to actually return the tuple.
*/
puts("--------------outer_ta----------------");
printTupleArray(OuterTupleArray);
puts("--------------inner_ta----------------");
printTupleArray(InnerTupleArray);
if ((Sock = connectSock(Address, CJPort)) < 0) {
fprintf(stderr, "error connectSock\n");
exit(1);
}
puts("connectSock");
sendOpInfo(Sock);
sendResultOpInfo(Sock);
sendWholeTupleArray(Sock, OuterTupleArray);
sendWholeTupleArray(Sock, InnerTupleArray);
puts("sended!!!");
CJState++;
/* Result Receiving */
case 5:
sfr = constructStructForResult(Sock, sizeof(Datum) * 10000000, sizeof(Datum) * VarNum);
pthread_create(&recv_thread, NULL, receiveJoinResult, sfr);
data_size = 0;
i = j = 0;
CJState++;
case 6:
cur_idx = sfr->idx;
while (sfr->size[cur_idx] == 0)
;
printf("[%d] received!\n", cur_idx);
sfr->idx = (sfr->idx + 1) % 2;
cur_ptr = sfr->buf[cur_idx];
end_ptr = cur_ptr + sfr->size[cur_idx];
printf("[%d] size = %ld\n", cur_idx, sfr->size[cur_idx]);
data = (Datum *)cur_ptr;
CJState++;
case 7:
if (cur_ptr > end_ptr) {
sfr->size[cur_idx] = 0;
CJState--;
continue;
}
if (data_size == 0) {
data_size = *(long *)cur_ptr;
printf("data_size = %ld\n", data_size);
/* terminate */
if (data_size == -1) {
puts("terminate");
sfr->idx = -1;
pthread_join(recv_thread, NULL);
destructStructForResult(sfr);
puts("fin");
return NULL;
}
cur_ptr += RESERVE_SIZE;
data = (Datum *)cur_ptr;
}
CJState++;
case 8: {
if (i == 0) {
ProjInfo = node->js.ps.ps_ProjInfo;
slot = ProjInfo->pi_slot;
ExecClearTuple(slot);
values = slot->tts_values;
isnull = slot->tts_isnull;
}
CJState++;
}
case 9:
if (data > end_ptr) {
puts("cause switching");
CJState -= 3;
continue;
}
values[i] = *data;
data++;
isnull[i++] = false;
if (i == VarNum) {
long result_size = sfr->elm_size;
i = 0;
cur_ptr += result_size;
data_size -= result_size;
CJState -= 2;
puts("exec store virtual tuple");
return ExecStoreVirtualTuple(slot);
}
}
}
}