R-Tree GiST для прямоугольников
Для описания сложных объектов часто используют более простые фигуры, аппроксимирующие сложную форму, которые характерны тем, что полностью содержат в себе эти объекты. Такие фигуры называют bounding volume и их гораздо проще использовать для проверки на различные условия, например, на пересечение двух объектов. В качестве bounding volume часто используют сферу, цилиндр или куб. Мы будем рассматривать куб, который называют bounding box,или BB.
R-Tree - это структура данных, которая используется для индексирования многомерных данных. Она была предложена Гуттманом [Gut84] как расширение B-tree на многомерное пространство, которое разбивается на иерархически вложенные и возможно перекрывающиеся BB (прямоугольники для двумерного случая, в случае 3-х измерений - кубики), Каждый узел R-Tree может содержать переменное количество записей, но не больше заранее определенного максимума. Каждая запись во внутренних узлах содержит ссылку на дочерний узел и BB, который содержит все записи этого дочернего узла. Каждая запись концевого узла (leaf node) содержит ссылку на данные и BB этих данных. При вставке новых данных отслеживается, чтобы близкие данные лежали "близко", в одном концевом узле, в частности, это достигается с помощью правила наименьшего увеличения BB этого узла после вставки. При поиске сравниваются BB-ы запроса и текущего узла и если нет пересечений, то последующие проверки с узлами этого поддерева уже не нужны, чем сильно уменьшается количество просмотренных узлов и достигается выигрыш в производительности.
R-tree для для городов и деревень Греции. Данные взяты с rtreeportal.org
На рисунке изображен фрагмент дерева (полное дерево), построенного с помощью модуля rtree_gist и - специального модуля, предназначенного для разработчиков расширений с использованием GiST. Исходными данными являются MBR (minimum bounding rectangles) городов и деревень Греции. Изображены данные на концевых узлах (маленькие прямоугольники) и на 1-м уровне (большие прямоугольники). Подробнее можно прочитать здесь.
compress, decompress
Функции тривиальны, просто возвращают то, что получили. Datum gist_box_compress(PG_FUNCTION_ARGS) { PG_RETURN_POINTER(PG_GETARG_POINTER(0)); }
Datum gist_box_decompress(PG_FUNCTION_ARGS) { PG_RETURN_POINTER(PG_GETARG_POINTER(0)); }
equal
Сравнивает два прямоугольника и возвращает true если они равны или оба равны NULL Datum gist_box_same(PG_FUNCTION_ARGS) { BOX *b1 = PG_GETARG_BOX_P(0); BOX *b2 = PG_GETARG_BOX_P(1); bool *result = (bool *) PG_GETARG_POINTER(2);
if (b1 && b2) *result = DatumGetBool(DirectFunctionCall2(box_same, PointerGetDatum(b1), PointerGetDatum(b2))); else *result = (b1 == NULL && b2 == NULL) ? TRUE : FALSE; PG_RETURN_POINTER(result); }
Кстати, тут присутствует вызов встроенной в PostgreSQL функции box_same. Следует также обратить внимание на работу с переменной result: эта интерфейсная функция является исключением из правил и функция должна изменить значения своего параметра. Но это не касается двух первых аргументов.
penalty
Функция возвращает изменение (увеличение) площади прямоугольника после объединения обоих (получение bounding box) как меру изменения. static double size_box(Datum dbox) { /* Вычисление площади прямоугольника */ BOX *box = DatumGetBoxP(dbox);
if (box == NULL box->high.x <= box->low.x box->high.y <= box->low.y) return 0.0; return (box->high.x - box->low.x) * (box->high.y - box->low.y); }
Datum gist_box_penalty(PG_FUNCTION_ARGS) { /* исходный прямоугольник */ GISTENTRY *origentry = (GISTENTRY *) PG_GETARG_POINTER(0); /* добавляемый прямоугольник */ GISTENTRY *newentry = (GISTENTRY *) PG_GETARG_POINTER(1); float *result = (float *) PG_GETARG_POINTER(2); Datum ud;
/* получаем объединяющий прямоугольник */ ud = DirectFunctionCall2(rt_box_union, origentry->key, newentry->key); /* вычитаем площадь исходниго из полученного прямоугольника */ *result = (float) (size_box(ud) - size_box(origentry->key));
PG_RETURN_POINTER(result); }
union
Функция возвращает объединяющий прямоугольник для всех входящих прямоугольников Datum gist_box_union(PG_FUNCTION_ARGS) { GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); int *sizep = (int *) PG_GETARG_POINTER(1); int numranges, i; BOX *cur, *pageunion;
numranges = entryvec->n; /* возвращаемое значение должно быть palloc'ировано! */ pageunion = (BOX *) palloc(sizeof(BOX)); /* инициация объединяющего прямоугольника первым прямоугольником */ cur = DatumGetBoxP(entryvec->vector[0].key); memcpy((void *) pageunion, (void *) cur, sizeof(BOX));
for (i = 1; i < numranges; i++) { cur = DatumGetBoxP(entryvec->vector[i].key); if (pageunion->high.x < cur->high.x) pageunion->high.x = cur->high.x; if (pageunion->low.x > cur->low.x) pageunion->low.x = cur->low.x; if (pageunion->high.y < cur->high.y) pageunion->high.y = cur->high.y; if (pageunion->low.y > cur->low.y) pageunion->low.y = cur->low.y; }
/* размер возвращаемого значения в байтах */ *sizep = sizeof(BOX);
PG_RETURN_POINTER(pageunion); }
consistent
Datum gist_box_consistent(PG_FUNCTION_ARGS) { GISTENTRY *entry = (GISTENTRY *) PG_GETARG_POINTER(0); BOX *query = PG_GETARG_BOX_P(1); StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
if (DatumGetBoxP(entry->key) == NULL query == NULL) PG_RETURN_BOOL(FALSE);
/* если значение находится на концевой странице, то выполняется точное сравнение, на внутренней мы должны вернуть true если значения на последующих страницах (детях) МОГУТ удовлетворять условию */
if (GIST_LEAF(entry)) PG_RETURN_BOOL(gist_box_leaf_consistent(DatumGetBoxP(entry->key), query, strategy)); else PG_RETURN_BOOL(rtree_internal_consistent(DatumGetBoxP(entry->key), query, strategy)); }
Реальные функции gist_box_leaf_consistent и rtree_internal_consistent довольно объемны, ограничимся их вариантами только для поиска на совпадение. static bool gist_box_leaf_consistent(BOX *key, BOX *query, StrategyNumber strategy) { bool retval = FALSE; switch (strategy) { case RTSameStrategyNumber: retval = DatumGetBool(DirectFunctionCall2(box_same, PointerGetDatum(key), PointerGetDatum(query))); break; default: elog(NOTICE,"Unsupported StrategyNumber %d", strategy); } return retval; }
static bool rtree_internal_consistent(BOX *key, BOX *query, StrategyNumber strategy) { bool retval=FALSE; switch (strategy) { case RTSameStrategyNumber: retval = DatumGetBool(DirectFunctionCall2(box_contain, PointerGetDatum(key), PointerGetDatum(query))); break; default: elog(NOTICE,"Unsupported StrategyNumber %d", strategy); } return retval; } Обратите внимание, что в функции gist_box_leaf_consistent поисковый прямоугольник тестируется на полное совпадение с испытуемым значением, а в rtree_internal_consistent поисковый прямоугольник должен полностью содержаться в испытуемом значении. Очевидно, что если поисковый прямоугольник не содержится, то совпадающих с ним прямоугольников в страница-наследниках просто не может быть.
picksplit
Обычно самая сложная функция в интерфейсе GiST. Полную версию можно найти в исходниках PostgreSQL, файл ./src/backend/access/gist/gistproc.c Можно использовать обычный Гуттмановский (квадратичный) алгоритм ([Gut84]). Для полноты мы будем использовать простой (неэффективный) алгоритм, который помещает четные элементы в "левый" массив, в нечетный - в "правый".
Datum gist_box_picksplit(PG_FUNCTION_ARGS) { GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0); GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1); OffsetNumber i, maxoff = entryvec->n - 1; int nbytes;
nbytes = (maxoff + 2) * sizeof(OffsetNumber); v->spl_left = (OffsetNumber *) palloc(nbytes); v->spl_right = (OffsetNumber *) palloc(nbytes); v->spl_ldatum = PointerGetDatum( palloc( sizeof(BOX) ) ); v->spl_rdatum = PointerGetDatum( palloc( sizeof(BOX) ) ); v->spl_nleft = 0; v->spl_nright = 0;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { BOX *pageunion; /* указатель на объединяющий прямоугольник для страницы */
if ( i%2 == 0 ) { v->spl_left[ v->spl_nleft ] = i; v->spl_nleft++; pageunion = DatumGetBoxP( v->spl_ldatum ); } else { v->spl_right[ v->spl_nright ] = i; v->spl_nright++; pageunion = DatumGetBoxP( v->spl_rdatum ); }
if ( i<=OffsetNumberNext( FirstOffsetNumber ) ) { /* первоначальная инициализация объединяющего прямоугольника */ memcpy( pageunion, DatumGetBoxP(entryvec->vector[i].key), sizeof(BOX) ); } else { BOX *curbox = DatumGetBoxP(entryvec->vector[i].key); if (pageunion->high.x < curbox->high.x) pageunion->high.x = curbox->high.x; if (pageunion->low.x > curbox->low.x) pageunion->low.x = curbox->low.x; if (pageunion->high.y < curbox->high.y) pageunion->high.y = curbox->high.y; if (pageunion->low.y > curbox->low.y) pageunion->low.y = curbox->low.y; } }
PG_RETURN_POINTER(v); }