PostgreSQLからのcyclo-joinの呼び出し

筑波大学の三橋龍也です。PostgreSQLでは結合処理が存在しますが、まだ発展途上だと考えられます。例えば等結合については並列ハッシュ結合も実装されていません。類似結合(例えばある人のすぐ近くにいる人を探す処理)を行おうとするとNested Loops Joinになりますが、PostgreSQLのNLJの実装は大変遅いので、データサイズが大きくなると非常に時間がかかります。例えばデータサイズが100万件程度の場合、下記のような二次元ユークリッド距離に基づく類似結合を行うと我々の環境では6日くらいかかりました。なお、ここでは簡単のために下記のような問合せを書いていますが、索引を使えない類似性は多数存在し、私の興味はそのような類にあります。

SELECT R.id, S.id FROM R, S
WHERE (R.x - S.x)^2 + (R.y - S.y)^2 < threshold;

このような処理を高速化する技法の1つにcyclo-joinという方式があります。これはまずrelation RをR-1からR-N, relation SをS-1からS-Nに分割し、それぞれN台のマシンに配布します。そして各ノードでの照合処理をしましたら、S-iを隣接ノードに移動させます。これをN回繰り返せば照合処理を完了できます。

f:id:ryuya-mitsuhashi:20151226075243p:plain
これを実装してみたら中々高速でした。

Cyclo-joinはPostgreSQLには実装されていません。そこで Nested Loops Join をフックすることを考えました。このフックはざっくり言って ExecInitNestLoop, ExecNestLoop, ExecEndNestLoop から構成されます。ExecInitNestLoop では、TupleArrayの初期化、targetlist/quallist の変換などを行います。ExecNestLoop では、タプルを TupleArray へ読み込み、targetlist/quallist と共にCJ側へ送信し、結果を受け取り、上位オペレーターへpushします。ExecEndNestLoop では、TupleArray とは新しく定義した構造体で、タプルを入れておくためのものです。タプルサイズ(固定長)、タプル数、属性情報(数、オフセットなど)を格納しておきます。これを図で説明すると下記のようになります。つまりデータをPostgreSQLからcyclo join側に送りつけて計算させ、その結果をもらいます。

f:id:ryuya-mitsuhashi:20151225171330p:plain

f:id:ryuya-mitsuhashi:20151225171430p:plain

ExecNestLoopのコードは次のようになります。60台程度をcyclo-joinに用いたところ、処理時間は6日から大幅に高速化されました。詳細はそのうち論文にまとめます。今後はExecNestLoopを直接書き換えるのではなく、ExecutorRun()でフックして多段NestLoopJoinを処理できるようにする予定です。来年度から3年生の授業課題にしたら良いのでは、と思っています。

TupleTableSlot *
ExecNestLoop(NestLoopState *node)
{
	NestLoop   *nl;
	PlanState  *innerPlan;
	PlanState  *outerPlan;
	TupleTableSlot *outerTupleSlot;
	TupleTableSlot *innerTupleSlot;
	// List	   *joinqual;
	List	   *otherqual;
	ExprContext *econtext;
	
	// needed for result
	static const int RESERVE_SIZE = sizeof(long);
	
	static pthread_t recv_thread;
	static StructForResult *sfr;
	
	static int cur_idx;
	static char *cur_ptr;
	static char *end_ptr;
	
	static long data_size;
	static long remain_size;
	
	static int i, j;
	
	static Datum *values;
	static bool  *isnull;
	static Datum *data;
	static TupleTableSlot *slot;
	
	
	/*
	 * get information from the node
	 */
	ENL1_printf("getting info from node");
	
	nl = (NestLoop *) node->js.ps.plan;
	otherqual = node->js.ps.qual;
	outerPlan = outerPlanState(node);
	innerPlan = innerPlanState(node);
	econtext = node->js.ps.ps_ExprContext;
	
	
	/*
	 * Check to see if we're still projecting out tuples from a previous join
	 * tuple (because there is a function-returning-set in the projection
	 * expressions).  If so, try to project another one.
	 */
	if (node->js.ps.ps_TupFromTlist) {
		TupleTableSlot *result;
		ExprDoneCond isDone;
		
		result = ExecProject(node->js.ps.ps_ProjInfo, &isDone);
		if (isDone == ExprMultipleResult)
			return result;
		/* Done with that source tuple... */
		node->js.ps.ps_TupFromTlist = false;
	}
	
	
	/*
	 * Ok, everything is setup for the join so now loop until we return a
	 * qualifying join tuple.
	 */
	ENL1_printf("entering main loop");
	
	for (;;) {
		/*
		 * Reset per-tuple memory context to free any expression evaluation
		 * storage allocated in the previous tuple cycle.  Note this can't happen
		 * until we're done projecting out tuples from a join tuple.
		 */
		ResetExprContext(econtext);
		
		switch (CJState) {
		case 0: 
			/* fetch first outer tuple */
			outerTupleSlot = ExecProcNode(outerPlan);
			
			readAttsInfoTupleArray(OuterTupleArray, outerTupleSlot);
			insertDataTupleArray(OuterTupleArray, outerTupleSlot);
			
			CJState++;
			continue;
		case 1:
			/* fetch first inner tuple */			
			innerTupleSlot = ExecProcNode(innerPlan);
			
			readAttsInfoTupleArray(InnerTupleArray, innerTupleSlot);
			insertDataTupleArray(InnerTupleArray, innerTupleSlot);
			
			CJState++;
			continue;
		case 2:
			/* fill outer table's buffer */
			ENL1_printf("getting new outer tuple");
			outerTupleSlot = ExecProcNode(outerPlan);
			
			/*
			 * if there are no more outer tuples, finish to fill outer buffer.
			 */
			if (TupIsNull(outerTupleSlot)) {
				ENL1_printf("no outer tuple, ending fetch");
				puts("outer scanned");
				CJState++;
			}				
			else {
				ENL1_printf("Saving new outer tuple information");
				insertDataTupleArray(OuterTupleArray, outerTupleSlot);
			}
			
			continue;
		case 3:
			/* fill inner table's buffer */
			ENL1_printf("getting new inner tuple");
			innerTupleSlot = ExecProcNode(innerPlan);
	
			/*
			 * if there are no more outer tuples, finish to fill outer buffer.
			 */
			if (TupIsNull(innerTupleSlot)) {
				ENL1_printf("no iner tuple, ending fetch");
				puts("inner scanned");
				CJState++;
			}
			else {
				ENL1_printf("saving new inner tuple information");
				insertDataTupleArray(InnerTupleArray, innerTupleSlot);
			}

			continue;
		case 4:
			/*
			 * at this point we have a new pair of inner and outer tuples so we
			 * test the inner and outer tuples to see if they satisfy the node's
			 * qualification.
			 *
			 * Only the joinquals determine MatchedOuter status, but all quals
			 * must pass to actually return the tuple.
			 */
			
			puts("--------------outer_ta----------------");
			printTupleArray(OuterTupleArray);
			puts("--------------inner_ta----------------");
			printTupleArray(InnerTupleArray);
			
			
			if ((Sock = connectSock(Address, CJPort)) < 0) {
				fprintf(stderr, "error connectSock\n");
				exit(1);
			}
			puts("connectSock");
			

                        sendOpInfo(Sock);
			sendResultOpInfo(Sock);
			
			sendWholeTupleArray(Sock, OuterTupleArray);
			sendWholeTupleArray(Sock, InnerTupleArray);
			
			puts("sended!!!");
			CJState++;
			
			/* Result Receiving */
		case 5: 
			sfr = constructStructForResult(Sock, sizeof(Datum) * 10000000, sizeof(Datum) * VarNum);
			
			pthread_create(&recv_thread, NULL, receiveJoinResult, sfr);
			data_size = 0;
			
			i = j = 0;
			CJState++;
		case 6: 
			cur_idx = sfr->idx;
			while (sfr->size[cur_idx] == 0)
				;
			
			printf("[%d] received!\n", cur_idx);
			sfr->idx = (sfr->idx + 1) % 2;
			
			cur_ptr = sfr->buf[cur_idx];
			end_ptr = cur_ptr + sfr->size[cur_idx];
			printf("[%d] size = %ld\n", cur_idx, sfr->size[cur_idx]);
			data = (Datum *)cur_ptr;
			CJState++;
		case 7: 
			if (cur_ptr > end_ptr) {
				sfr->size[cur_idx] = 0;
				CJState--;
				continue;
			}
			
			if (data_size == 0) {
				data_size = *(long *)cur_ptr;
				printf("data_size = %ld\n", data_size);
				
				/* terminate */
				if (data_size == -1) {
					puts("terminate");
					sfr->idx = -1;					
					pthread_join(recv_thread, NULL);
					destructStructForResult(sfr);
					
					puts("fin");
					return NULL;
				}
				
				cur_ptr += RESERVE_SIZE;
				data = (Datum *)cur_ptr;
			}
			CJState++;
		case 8: {
			if (i == 0) {
				ProjInfo = node->js.ps.ps_ProjInfo;
				slot = ProjInfo->pi_slot;
				ExecClearTuple(slot);
				
				values = slot->tts_values;
				isnull = slot->tts_isnull;
			}
			CJState++;
		}
		case 9: 
			if (data > end_ptr) {
				puts("cause switching");
				CJState -= 3;
				continue;
			}
			
			values[i] = *data;
			data++;
			isnull[i++] = false;			
			
			if (i == VarNum) {
				long result_size = sfr->elm_size;
				
				i = 0;
				cur_ptr += result_size;
				data_size -= result_size;
				CJState -= 2;
				
				puts("exec store virtual tuple");
				return ExecStoreVirtualTuple(slot);
			}
		}
	}
}