int bucketNumber);
 static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
+static void *dense_alloc(HashJoinTable hashtable, Size size);
 
 /* ----------------------------------------------------------------
  *     ExecHash
    hashtable->spaceUsedSkew = 0;
    hashtable->spaceAllowedSkew =
        hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
+   hashtable->chunks = NULL;
 
    /*
     * Get info about the hash functions to be used for each hash key. Also
    int         oldnbatch = hashtable->nbatch;
    int         curbatch = hashtable->curbatch;
    int         nbatch;
-   int         i;
    MemoryContext oldcxt;
    long        ninmemory;
    long        nfreed;
+   HashMemoryChunk oldchunks;
 
    /* do nothing if we've decided to shut off growth */
    if (!hashtable->growEnabled)
     */
    ninmemory = nfreed = 0;
 
-   for (i = 0; i < hashtable->nbuckets; i++)
-   {
-       HashJoinTuple prevtuple;
-       HashJoinTuple tuple;
+   /*
+    * We will scan through the chunks directly, so that we can reset the
+    * buckets now and not have to keep track which tuples in the buckets have
+    * already been processed. We will free the old chunks as we go.
+    */
+   memset(hashtable->buckets, 0, sizeof(HashJoinTuple *) * hashtable->nbuckets);
+   oldchunks = hashtable->chunks;
+   hashtable->chunks = NULL;
 
-       prevtuple = NULL;
-       tuple = hashtable->buckets[i];
+   /* so, let's scan through the old chunks, and all tuples in each chunk */
+   while (oldchunks != NULL)
+   {
+       HashMemoryChunk nextchunk = oldchunks->next;
+       /* position within the buffer (up to oldchunks->used) */
+       size_t      idx = 0;
 
-       while (tuple != NULL)
+       /* process all tuples stored in this chunk (and then free it) */
+       while (idx < oldchunks->used)
        {
-           /* save link in case we delete */
-           HashJoinTuple nexttuple = tuple->next;
+           HashJoinTuple hashTuple = (HashJoinTuple) (oldchunks->data + idx);
+           MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+           int         hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
            int         bucketno;
            int         batchno;
 
            ninmemory++;
-           ExecHashGetBucketAndBatch(hashtable, tuple->hashvalue,
+           ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
                                      &bucketno, &batchno);
-           Assert(bucketno == i);
+
            if (batchno == curbatch)
            {
-               /* keep tuple */
-               prevtuple = tuple;
+               /* keep tuple in memory - copy it into the new chunk */
+               HashJoinTuple copyTuple =
+                   (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
+               memcpy(copyTuple, hashTuple, hashTupleSize);
+
+               /* and add it back to the appropriate bucket */
+               copyTuple->next = hashtable->buckets[bucketno];
+               hashtable->buckets[bucketno] = copyTuple;
            }
            else
            {
                /* dump it out */
                Assert(batchno > curbatch);
-               ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(tuple),
-                                     tuple->hashvalue,
+               ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
+                                     hashTuple->hashvalue,
                                      &hashtable->innerBatchFile[batchno]);
-               /* and remove from hash table */
-               if (prevtuple)
-                   prevtuple->next = nexttuple;
-               else
-                   hashtable->buckets[i] = nexttuple;
-               /* prevtuple doesn't change */
-               hashtable->spaceUsed -=
-                   HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(tuple)->t_len;
-               pfree(tuple);
+
+               hashtable->spaceUsed -= hashTupleSize;
                nfreed++;
            }
 
-           tuple = nexttuple;
+           /* next tuple in this chunk */
+           idx += MAXALIGN(hashTupleSize);
        }
+
+       /* we're done with this chunk - free it and proceed to the next one */
+       pfree(oldchunks);
+       oldchunks = nextchunk;
    }
 
 #ifdef HJDEBUG
 
        /* Create the HashJoinTuple */
        hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
-       hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
-                                                      hashTupleSize);
+       hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
+
        hashTuple->hashvalue = hashvalue;
        memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
 
    hashtable->spaceUsed = 0;
 
    MemoryContextSwitchTo(oldcxt);
+
+   /* Forget the chunks (the memory was freed by the context reset above). */
+   hashtable->chunks = NULL;
 }
 
 /*
        hashtable->spaceUsedSkew = 0;
    }
 }
+
+/*
+ * Allocate 'size' bytes from the currently active HashMemoryChunk
+ */
+static void *
+dense_alloc(HashJoinTable hashtable, Size size)
+{
+   HashMemoryChunk newChunk;
+   char       *ptr;
+
+   /* just in case the size is not already aligned properly */
+   size = MAXALIGN(size);
+
+   /*
+    * If tuple size is larger than of 1/4 of chunk size, allocate a separate
+    * chunk.
+    */
+   if (size > HASH_CHUNK_THRESHOLD)
+   {
+       /* allocate new chunk and put it at the beginning of the list */
+       newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
+                                 offsetof(HashMemoryChunkData, data) + size);
+       newChunk->maxlen = size;
+       newChunk->used = 0;
+       newChunk->ntuples = 0;
+
+       /*
+        * Add this chunk to the list after the first existing chunk, so that
+        * we don't lose the remaining space in the "current" chunk.
+        */
+       if (hashtable->chunks != NULL)
+       {
+           newChunk->next = hashtable->chunks->next;
+           hashtable->chunks->next = newChunk;
+       }
+       else
+       {
+           newChunk->next = hashtable->chunks;
+           hashtable->chunks = newChunk;
+       }
+
+       newChunk->used += size;
+       newChunk->ntuples += 1;
+
+       return newChunk->data;
+   }
+
+   /*
+    * See if we have enough space for it in the current chunk (if any).
+    * If not, allocate a fresh chunk.
+    */
+   if ((hashtable->chunks == NULL) ||
+       (hashtable->chunks->maxlen - hashtable->chunks->used) < size)
+   {
+       /* allocate new chunk and put it at the beginning of the list */
+       newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
+                      offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE);
+
+       newChunk->maxlen = HASH_CHUNK_SIZE;
+       newChunk->used = size;
+       newChunk->ntuples = 1;
+
+       newChunk->next = hashtable->chunks;
+       hashtable->chunks = newChunk;
+
+       return newChunk->data;
+   }
+
+   /* There is enough space in the current chunk, let's add the tuple */
+   ptr = hashtable->chunks->data + hashtable->chunks->used;
+   hashtable->chunks->used += size;
+   hashtable->chunks->ntuples += 1;
+
+   /* return pointer to the start of the tuple memory */
+   return ptr;
+}