PostgreSQLからのcyclo-joinの呼び出し
筑波大学の三橋龍也です。PostgreSQLでは結合処理が存在しますが、まだ発展途上だと考えられます。例えば等結合については並列ハッシュ結合も実装されていません。類似結合(例えばある人のすぐ近くにいる人を探す処理)を行おうとするとNested Loops Joinになりますが、PostgreSQLのNLJの実装は大変遅いので、データサイズが大きくなると非常に時間がかかります。例えばデータサイズが100万件程度の場合、下記のような二次元ユークリッド距離に基づく類似結合を行うと我々の環境では6日くらいかかりました。なお、ここでは簡単のために下記のような問合せを書いていますが、索引を使えない類似性は多数存在し、私の興味はそのような類にあります。
SELECT R.id, S.id FROM R, S WHERE (R.x - S.x)^2 + (R.y - S.y)^2 < threshold;
このような処理を高速化する技法の1つにcyclo-joinという方式があります。これはまずrelation RをR-1からR-N, relation SをS-1からS-Nに分割し、それぞれN台のマシンに配布します。そして各ノードでの照合処理をしましたら、S-iを隣接ノードに移動させます。これをN回繰り返せば照合処理を完了できます。
これを実装してみたら中々高速でした。
Cyclo-joinはPostgreSQLには実装されていません。そこで Nested Loops Join をフックすることを考えました。このフックはざっくり言って ExecInitNestLoop, ExecNestLoop, ExecEndNestLoop から構成されます。ExecInitNestLoop では、TupleArrayの初期化、targetlist/quallist の変換などを行います。ExecNestLoop では、タプルを TupleArray へ読み込み、targetlist/quallist と共にCJ側へ送信し、結果を受け取り、上位オペレーターへpushします。ExecEndNestLoop では、TupleArray とは新しく定義した構造体で、タプルを入れておくためのものです。タプルサイズ(固定長)、タプル数、属性情報(数、オフセットなど)を格納しておきます。これを図で説明すると下記のようになります。つまりデータをPostgreSQLからcyclo join側に送りつけて計算させ、その結果をもらいます。
ExecNestLoopのコードは次のようになります。60台程度をcyclo-joinに用いたところ、処理時間は6日から大幅に高速化されました。詳細はそのうち論文にまとめます。今後はExecNestLoopを直接書き換えるのではなく、ExecutorRun()でフックして多段NestLoopJoinを処理できるようにする予定です。来年度から3年生の授業課題にしたら良いのでは、と思っています。
TupleTableSlot * ExecNestLoop(NestLoopState *node) { NestLoop *nl; PlanState *innerPlan; PlanState *outerPlan; TupleTableSlot *outerTupleSlot; TupleTableSlot *innerTupleSlot; // List *joinqual; List *otherqual; ExprContext *econtext; // needed for result static const int RESERVE_SIZE = sizeof(long); static pthread_t recv_thread; static StructForResult *sfr; static int cur_idx; static char *cur_ptr; static char *end_ptr; static long data_size; static long remain_size; static int i, j; static Datum *values; static bool *isnull; static Datum *data; static TupleTableSlot *slot; /* * get information from the node */ ENL1_printf("getting info from node"); nl = (NestLoop *) node->js.ps.plan; otherqual = node->js.ps.qual; outerPlan = outerPlanState(node); innerPlan = innerPlanState(node); econtext = node->js.ps.ps_ExprContext; /* * Check to see if we're still projecting out tuples from a previous join * tuple (because there is a function-returning-set in the projection * expressions). If so, try to project another one. */ if (node->js.ps.ps_TupFromTlist) { TupleTableSlot *result; ExprDoneCond isDone; result = ExecProject(node->js.ps.ps_ProjInfo, &isDone); if (isDone == ExprMultipleResult) return result; /* Done with that source tuple... */ node->js.ps.ps_TupFromTlist = false; } /* * Ok, everything is setup for the join so now loop until we return a * qualifying join tuple. */ ENL1_printf("entering main loop"); for (;;) { /* * Reset per-tuple memory context to free any expression evaluation * storage allocated in the previous tuple cycle. Note this can't happen * until we're done projecting out tuples from a join tuple. */ ResetExprContext(econtext); switch (CJState) { case 0: /* fetch first outer tuple */ outerTupleSlot = ExecProcNode(outerPlan); readAttsInfoTupleArray(OuterTupleArray, outerTupleSlot); insertDataTupleArray(OuterTupleArray, outerTupleSlot); CJState++; continue; case 1: /* fetch first inner tuple */ innerTupleSlot = ExecProcNode(innerPlan); readAttsInfoTupleArray(InnerTupleArray, innerTupleSlot); insertDataTupleArray(InnerTupleArray, innerTupleSlot); CJState++; continue; case 2: /* fill outer table's buffer */ ENL1_printf("getting new outer tuple"); outerTupleSlot = ExecProcNode(outerPlan); /* * if there are no more outer tuples, finish to fill outer buffer. */ if (TupIsNull(outerTupleSlot)) { ENL1_printf("no outer tuple, ending fetch"); puts("outer scanned"); CJState++; } else { ENL1_printf("Saving new outer tuple information"); insertDataTupleArray(OuterTupleArray, outerTupleSlot); } continue; case 3: /* fill inner table's buffer */ ENL1_printf("getting new inner tuple"); innerTupleSlot = ExecProcNode(innerPlan); /* * if there are no more outer tuples, finish to fill outer buffer. */ if (TupIsNull(innerTupleSlot)) { ENL1_printf("no iner tuple, ending fetch"); puts("inner scanned"); CJState++; } else { ENL1_printf("saving new inner tuple information"); insertDataTupleArray(InnerTupleArray, innerTupleSlot); } continue; case 4: /* * at this point we have a new pair of inner and outer tuples so we * test the inner and outer tuples to see if they satisfy the node's * qualification. * * Only the joinquals determine MatchedOuter status, but all quals * must pass to actually return the tuple. */ puts("--------------outer_ta----------------"); printTupleArray(OuterTupleArray); puts("--------------inner_ta----------------"); printTupleArray(InnerTupleArray); if ((Sock = connectSock(Address, CJPort)) < 0) { fprintf(stderr, "error connectSock\n"); exit(1); } puts("connectSock"); sendOpInfo(Sock); sendResultOpInfo(Sock); sendWholeTupleArray(Sock, OuterTupleArray); sendWholeTupleArray(Sock, InnerTupleArray); puts("sended!!!"); CJState++; /* Result Receiving */ case 5: sfr = constructStructForResult(Sock, sizeof(Datum) * 10000000, sizeof(Datum) * VarNum); pthread_create(&recv_thread, NULL, receiveJoinResult, sfr); data_size = 0; i = j = 0; CJState++; case 6: cur_idx = sfr->idx; while (sfr->size[cur_idx] == 0) ; printf("[%d] received!\n", cur_idx); sfr->idx = (sfr->idx + 1) % 2; cur_ptr = sfr->buf[cur_idx]; end_ptr = cur_ptr + sfr->size[cur_idx]; printf("[%d] size = %ld\n", cur_idx, sfr->size[cur_idx]); data = (Datum *)cur_ptr; CJState++; case 7: if (cur_ptr > end_ptr) { sfr->size[cur_idx] = 0; CJState--; continue; } if (data_size == 0) { data_size = *(long *)cur_ptr; printf("data_size = %ld\n", data_size); /* terminate */ if (data_size == -1) { puts("terminate"); sfr->idx = -1; pthread_join(recv_thread, NULL); destructStructForResult(sfr); puts("fin"); return NULL; } cur_ptr += RESERVE_SIZE; data = (Datum *)cur_ptr; } CJState++; case 8: { if (i == 0) { ProjInfo = node->js.ps.ps_ProjInfo; slot = ProjInfo->pi_slot; ExecClearTuple(slot); values = slot->tts_values; isnull = slot->tts_isnull; } CJState++; } case 9: if (data > end_ptr) { puts("cause switching"); CJState -= 3; continue; } values[i] = *data; data++; isnull[i++] = false; if (i == VarNum) { long result_size = sfr->elm_size; i = 0; cur_ptr += result_size; data_size -= result_size; CJState -= 2; puts("exec store virtual tuple"); return ExecStoreVirtualTuple(slot); } } } }