Major overhaul of large-object implementation, by Denis Perchine with
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 24 Oct 2000 01:38:44 +0000 (01:38 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 24 Oct 2000 01:38:44 +0000 (01:38 +0000)
kibitzing from Tom Lane.  Large objects are now all stored in a single
system relation "pg_largeobject" --- no more xinv or xinx files, no more
relkind 'l'.  This should offer substantial performance improvement for
large numbers of LOs, since there won't be directory bloat anymore.
It'll also fix problems like running out of locktable space when you
access thousands of LOs in one transaction.
Also clean up cruft in read/write routines.  LOs with "holes" in them
(never-written byte ranges) now work just like Unix files with holes do:
a hole reads as zeroes but doesn't occupy storage space.
INITDB forced!

21 files changed:
contrib/pg_dumplo/lo_export.c
contrib/vacuumlo/vacuumlo.c
doc/src/sgml/ref/psql-ref.sgml
src/backend/catalog/Makefile
src/backend/catalog/indexing.c
src/backend/catalog/pg_largeobject.c [new file with mode: 0644]
src/backend/libpq/be-fsstubs.c
src/backend/storage/large_object/inv_api.c
src/bin/pg_dump/pg_dump.c
src/bin/pgtclsh/updateStats.tcl
src/bin/psql/describe.c
src/bin/psql/large_obj.c
src/include/catalog/catname.h
src/include/catalog/catversion.h
src/include/catalog/indexing.h
src/include/catalog/pg_class.h
src/include/catalog/pg_largeobject.h [new file with mode: 0644]
src/include/storage/large_object.h
src/interfaces/odbc/info.c
src/test/regress/expected/opr_sanity.out
src/test/regress/expected/sanity_check.out

index e18c3ef651ea807e0d1f2ac561ab817ab5d45e71..248cf831f5c2dc1bba93bf6e624af9f52b41f040 100644 (file)
@@ -94,7 +94,7 @@ pglo_export(LODumpMaster *pgLO)
                 * Query
                 * ----------
                 */
-               sprintf(Qbuff, "SELECT x.%s FROM %s x, pg_class c WHERE x.%s = c.oid and c.relkind = 'l'", 
+               sprintf(Qbuff, "SELECT DISTINCT x.\"%s\" FROM \"%s\" x, pg_largeobject l WHERE x.\"%s\" = l.loid",
                        ll->lo_attr, ll->lo_table, ll->lo_attr);
                
                /* puts(Qbuff); */
@@ -104,7 +104,8 @@ pglo_export(LODumpMaster *pgLO)
                if ((tuples = PQntuples(pgLO->res)) == 0) {
                
                        if (!pgLO->quiet && pgLO->action == ACTION_EXPORT_ATTR)
-                               printf("%s: no large objets in '%s'\n", progname, ll->lo_table);        
+                               printf("%s: no large objects in '%s'\n",
+                                          progname, ll->lo_table);     
                        continue;
                
                } else if (check_res(pgLO)) {
index 0ad617c97adcf9b17bd9c8a42b0d7b9f8fad88a6..2a0276544ea45c31f2a5b9265770ede270c60f48 100644 (file)
@@ -59,10 +59,9 @@ vacuumlo(char *database, int verbose)
         * First we create and populate the lo temp table
         */
        buf[0] = '\0';
-       strcat(buf, "SELECT oid AS lo ");
+       strcat(buf, "SELECT DISTINCT loid AS lo ");
        strcat(buf, "INTO TEMP TABLE vacuum_l ");
-       strcat(buf, "FROM pg_class ");
-       strcat(buf, "WHERE relkind='l'");
+       strcat(buf, "FROM pg_largeobject ");
        if (!(res = PQexec(conn, buf)))
        {
                fprintf(stderr, "Failed to create temp table.\n");
index 77a7e65fc1366f0c5a1baa664a62828000290323..6563854785e4eeab8f50ddc900780c2e17d1817a 100644 (file)
@@ -706,7 +706,8 @@ lo_import 152801
        <listitem>
        <para>
        Shows a list of all <productname>Postgres</productname> <quote>large
-       objects</quote> currently stored in the database along with their owners.
+       objects</quote> currently stored in the database, along with any
+       comments provided for them.
        </para>
        </listitem>
       </varlistentry>
index 62389eca1f79dd181dec0c9746ec0a195399ff23..56192e390283c5f0e6e6ba4c9fe732713247d542 100644 (file)
@@ -11,7 +11,8 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = catalog.o heap.o index.o indexing.o aclchk.o \
-       pg_aggregate.o pg_operator.o pg_proc.o pg_type.o
+       pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \
+       pg_type.o
 
 BKIFILES = global.bki template1.bki global.description template1.description
 
@@ -29,7 +30,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\
        pg_proc.h pg_type.h pg_attribute.h pg_class.h \
        pg_inherits.h pg_index.h pg_statistic.h \
        pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
-       pg_language.h \
+       pg_language.h pg_largeobject.h \
        pg_aggregate.h pg_ipl.h pg_inheritproc.h \
        pg_rewrite.h pg_listener.h pg_description.h indexing.h \
     )
index aa8440a71ca00aba23c62e1ddcbf99bb9ba84d7d..b84408263558b694be230c68b215d7c7451f125f 100644 (file)
@@ -51,6 +51,8 @@ char     *Name_pg_inherits_indices[Num_pg_inherits_indices] =
 {InheritsRelidSeqnoIndex};
 char      *Name_pg_language_indices[Num_pg_language_indices] =
 {LanguageOidIndex, LanguageNameIndex};
+char      *Name_pg_largeobject_indices[Num_pg_largeobject_indices] =
+{LargeObjectLOidPNIndex};
 char      *Name_pg_listener_indices[Num_pg_listener_indices] =
 {ListenerPidRelnameIndex};
 char      *Name_pg_opclass_indices[Num_pg_opclass_indices] =
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c
new file mode 100644 (file)
index 0000000..8bb3859
--- /dev/null
@@ -0,0 +1,184 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.c
+ *       routines to support manipulation of the pg_largeobject relation
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       $Header$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_largeobject.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+
+
+/*
+ * Create a large object having the given LO identifier.
+ *
+ * We do this by inserting an empty first page, so that the object will
+ * appear to exist with size 0.  Note that the unique index will reject
+ * an attempt to create a duplicate page.
+ *
+ * Return value is OID assigned to the page tuple (any use in it?)
+ */
+Oid
+LargeObjectCreate(Oid loid)
+{
+       Oid                     retval;
+       Relation        pg_largeobject;
+       HeapTuple       ntup;
+       Relation        idescs[Num_pg_largeobject_indices];
+       Datum           values[Natts_pg_largeobject];
+       char            nulls[Natts_pg_largeobject];
+       int                     i;
+
+       pg_largeobject = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+
+       /*
+        * Form new tuple
+        */
+       for (i = 0; i < Natts_pg_largeobject; i++)
+       {
+               values[i] = (Datum)NULL;
+               nulls[i] = ' ';
+       }
+
+       i = 0;
+       values[i++] = ObjectIdGetDatum(loid);
+       values[i++] = Int32GetDatum(0);
+       values[i++] = DirectFunctionCall1(byteain,
+                                                                         CStringGetDatum(""));
+       
+       ntup = heap_formtuple(pg_largeobject->rd_att, values, nulls);
+
+       /*
+        * Insert it
+        */
+       retval = heap_insert(pg_largeobject, ntup);
+
+       /*
+        * Update indices
+        */
+       if (!IsIgnoringSystemIndexes())
+       {
+               CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
+               CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup);
+               CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
+       }
+       
+       heap_close(pg_largeobject, RowExclusiveLock);
+
+       heap_freetuple(ntup);
+
+       return retval;
+}
+
+void
+LargeObjectDrop(Oid loid)
+{
+       bool            found = false;
+       Relation        pg_largeobject;
+       Relation        pg_lo_idx;
+       ScanKeyData     skey[1];
+       IndexScanDesc sd;
+       RetrieveIndexResult     indexRes;
+       HeapTupleData tuple;
+       Buffer          buffer;
+
+       ScanKeyEntryInitialize(&skey[0],
+                                                  (bits16) 0x0,
+                                                  (AttrNumber) 1,
+                                                  (RegProcedure) F_OIDEQ,
+                                                  ObjectIdGetDatum(loid));
+
+       pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock);
+       pg_lo_idx = index_openr(LargeObjectLOidPNIndex);
+
+       sd = index_beginscan(pg_lo_idx, false, 1, skey);
+
+       tuple.t_datamcxt = CurrentMemoryContext;
+       tuple.t_data = NULL;
+
+       while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+       {
+               tuple.t_self = indexRes->heap_iptr;
+               heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer);
+               pfree(indexRes);
+               if (tuple.t_data != NULL)
+               {
+                       heap_delete(pg_largeobject, &tuple.t_self, NULL);
+                       ReleaseBuffer(buffer);
+                       found = true;
+               }
+       }
+
+       index_endscan(sd);
+
+       index_close(pg_lo_idx);
+       heap_close(pg_largeobject, RowShareLock);
+
+       if (!found)
+               elog(ERROR, "LargeObjectDrop: large object %u not found", loid);
+}
+
+bool
+LargeObjectExists(Oid loid)
+{
+       bool            retval = false;
+       Relation        pg_largeobject;
+       Relation        pg_lo_idx;
+       ScanKeyData     skey[1];
+       IndexScanDesc sd;
+       RetrieveIndexResult     indexRes;
+       HeapTupleData tuple;
+       Buffer          buffer;
+
+       /*
+        * See if we can find any tuples belonging to the specified LO
+        */
+       ScanKeyEntryInitialize(&skey[0],
+                                                  (bits16) 0x0,
+                                                  (AttrNumber) 1,
+                                                  (RegProcedure) F_OIDEQ,
+                                                  ObjectIdGetDatum(loid));
+
+       pg_largeobject = heap_openr(LargeObjectRelationName, RowShareLock);
+       pg_lo_idx = index_openr(LargeObjectLOidPNIndex);
+
+       sd = index_beginscan(pg_lo_idx, false, 1, skey);
+
+       tuple.t_datamcxt = CurrentMemoryContext;
+       tuple.t_data = NULL;
+
+       while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+       {
+               tuple.t_self = indexRes->heap_iptr;
+               heap_fetch(pg_largeobject, SnapshotNow, &tuple, &buffer);
+               pfree(indexRes);
+               if (tuple.t_data != NULL)
+               {
+                       retval = true;
+                       ReleaseBuffer(buffer);
+                       break;
+               }
+       }
+
+       index_endscan(sd);
+
+       index_close(pg_lo_idx);
+       heap_close(pg_largeobject, RowShareLock);
+
+       return retval;
+}
index a78b23ec5d393bb02699879078ce9ed990e7bc6c..5143d35a24661a9b52f57c4bd894dda21fcdaed6 100644 (file)
  *-------------------------------------------------------------------------
  */
 
+#include "postgres.h"
+
 #include <fcntl.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
 
-#include "postgres.h"
-
 #include "catalog/pg_shadow.h"
 #include "libpq/be-fsstubs.h"
 #include "libpq/libpq-fs.h"
@@ -50,8 +50,7 @@
 
 /*#define FSDB 1*/
 #define MAX_LOBJ_FDS   256
-#define BUFSIZE                        1024
-#define FNAME_BUFSIZE  8192
+#define BUFSIZE                        8192
 
 /*
  * LO "FD"s are indexes into this array.
@@ -141,10 +140,10 @@ lo_close(PG_FUNCTION_ARGS)
 
        inv_close(cookies[fd]);
 
-       MemoryContextSwitchTo(currentContext);
-
        deleteLOfd(fd);
 
+       MemoryContextSwitchTo(currentContext);
+
        PG_RETURN_INT32(0);
 }
 
@@ -267,7 +266,7 @@ lo_creat(PG_FUNCTION_ARGS)
                PG_RETURN_OID(InvalidOid);
        }
 
-       lobjId = RelationGetRelid(lobjDesc->heap_r);
+       lobjId = lobjDesc->id;
 
        inv_close(lobjDesc);
 
@@ -310,8 +309,8 @@ lo_unlink(PG_FUNCTION_ARGS)
         * any LO-specific data structures at all.      (Again, that's probably
         * more than this module ought to be assuming.)
         *
-        * XXX there ought to be some code to clean up any open LOs that
-        * reference the specified relation... as is, they remain "open".
+        * XXX there ought to be some code to clean up any open LO FDs that
+        * reference the specified LO... as is, they remain "open".
         */
        PG_RETURN_INT32(inv_drop(lobjId));
 }
@@ -367,7 +366,7 @@ lo_import(PG_FUNCTION_ARGS)
        int                     nbytes,
                                tmp;
        char            buf[BUFSIZE];
-       char            fnamebuf[FNAME_BUFSIZE];
+       char            fnamebuf[MAXPGPATH];
        LargeObjectDesc *lobj;
        Oid                     lobjOid;
 
@@ -382,8 +381,8 @@ lo_import(PG_FUNCTION_ARGS)
         * open the file to be read in
         */
        nbytes = VARSIZE(filename) - VARHDRSZ;
-       if (nbytes >= FNAME_BUFSIZE)
-               nbytes = FNAME_BUFSIZE-1;
+       if (nbytes >= MAXPGPATH)
+               nbytes = MAXPGPATH-1;
        memcpy(fnamebuf, VARDATA(filename), nbytes);
        fnamebuf[nbytes] = '\0';
        fd = PathNameOpenFile(fnamebuf, O_RDONLY | PG_BINARY, 0666);
@@ -398,12 +397,7 @@ lo_import(PG_FUNCTION_ARGS)
        if (lobj == NULL)
                elog(ERROR, "lo_import: can't create inv object for \"%s\"",
                         fnamebuf);
-
-       /*
-        * the oid for the large object is just the oid of the relation
-        * XInv??? which contains the data.
-        */
-       lobjOid = RelationGetRelid(lobj->heap_r);
+       lobjOid = lobj->id;
 
        /*
         * read in from the Unix file and write to the inversion file
@@ -411,7 +405,7 @@ lo_import(PG_FUNCTION_ARGS)
        while ((nbytes = FileRead(fd, buf, BUFSIZE)) > 0)
        {
                tmp = inv_write(lobj, buf, nbytes);
-               if (tmp < nbytes)
+               if (tmp != nbytes)
                        elog(ERROR, "lo_import: error while reading \"%s\"",
                                 fnamebuf);
        }
@@ -435,7 +429,7 @@ lo_export(PG_FUNCTION_ARGS)
        int                     nbytes,
                                tmp;
        char            buf[BUFSIZE];
-       char            fnamebuf[FNAME_BUFSIZE];
+       char            fnamebuf[MAXPGPATH];
        LargeObjectDesc *lobj;
        mode_t          oumask;
 
@@ -461,8 +455,8 @@ lo_export(PG_FUNCTION_ARGS)
         * world-writable export files doesn't seem wise.
         */
        nbytes = VARSIZE(filename) - VARHDRSZ;
-       if (nbytes >= FNAME_BUFSIZE)
-               nbytes = FNAME_BUFSIZE-1;
+       if (nbytes >= MAXPGPATH)
+               nbytes = MAXPGPATH-1;
        memcpy(fnamebuf, VARDATA(filename), nbytes);
        fnamebuf[nbytes] = '\0';
        oumask = umask((mode_t) 0022);
@@ -473,12 +467,12 @@ lo_export(PG_FUNCTION_ARGS)
                         fnamebuf);
 
        /*
-        * read in from the Unix file and write to the inversion file
+        * read in from the inversion file and write to the Unix file
         */
        while ((nbytes = inv_read(lobj, buf, BUFSIZE)) > 0)
        {
                tmp = FileWrite(fd, buf, nbytes);
-               if (tmp < nbytes)
+               if (tmp != nbytes)
                        elog(ERROR, "lo_export: error while writing \"%s\"",
                                 fnamebuf);
        }
@@ -513,7 +507,7 @@ lo_commit(bool isCommit)
                if (cookies[i] != NULL)
                {
                        if (isCommit)
-                               inv_cleanindex(cookies[i]);
+                               inv_close(cookies[i]);
                        cookies[i] = NULL;
                }
        }
index 4774d19c47c5e8b3c17349e646600c2726b54210..e66241895192e648a4a2fc43a9c871d5c9cdd690 100644 (file)
  *
  *-------------------------------------------------------------------------
  */
+#include "postgres.h"
+
+#include <errno.h>
 #include <sys/types.h>
 #include <sys/file.h>
 #include <sys/stat.h>
 
-#include "postgres.h"
-
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/nbtree.h"
+#include "access/htup.h"
 #include "catalog/catalog.h"
+#include "catalog/catname.h"
 #include "catalog/heap.h"
 #include "catalog/index.h"
+#include "catalog/indexing.h"
 #include "catalog/pg_opclass.h"
+#include "catalog/pg_largeobject.h"
 #include "catalog/pg_type.h"
 #include "libpq/libpq-fs.h"
 #include "miscadmin.h"
 #include "storage/large_object.h"
 #include "storage/smgr.h"
 #include "utils/fmgroids.h"
-#include "utils/relcache.h"
-
-/*
- *     Warning, Will Robinson...  In order to pack data into an inversion
- *     file as densely as possible, we violate the class abstraction here.
- *     When we're appending a new tuple to the end of the table, we check
- *     the last page to see how much data we can put on it.  If it's more
- *     than IMINBLK, we write enough to fill the page.  This limits external
- *     fragmentation.  In no case can we write more than IMAXBLK, since
- *     the 8K postgres page size less overhead leaves only this much space
- *     for data.
- */
+#include "utils/builtins.h"
 
-/*
- *             In order to prevent buffer leak on transaction commit, large object
- *             scan index handling has been modified. Indexes are persistant inside
- *             a transaction but may be closed between two calls to this API (when
- *             transaction is committed while object is opened, or when no
- *             transaction is active). Scan indexes are thus now reinitialized using
- *             the object current offset. [PA]
- *
- *             Some cleanup has been also done for non freed memory.
- *
- *             For subsequent notes, [PA] is Pascal André <andre@via.ecp.fr>
- */
 
-#define IFREESPC(p)            (PageGetFreeSpace(p) - \
-                                MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \
-                                MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \
-                                sizeof(double))
-#define IMAXBLK                        8092
-#define IMINBLK                        512
-
-/* non-export function prototypes */
-static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
-                        Page page, char *dbuf, int nwrite);
-static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer);
-static int     inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
-static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
-                 HeapTuple tuple, Buffer buffer);
-static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple);
-static int     _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
+static int32
+getbytealen(bytea *data)
+{
+       Assert(! VARATT_IS_EXTENDED(data));
+       if (VARSIZE(data) < VARHDRSZ)
+               elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error.");
+       return (VARSIZE(data) - VARHDRSZ);
+}
 
 /*
  *     inv_create -- create a new large object.
  *
  *             Arguments:
- *               flags -- was archive, smgr
+ *               flags
  *
  *             Returns:
  *               large object descriptor, appropriately filled in.
@@ -87,168 +61,80 @@ static int _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
 LargeObjectDesc *
 inv_create(int flags)
 {
-       LargeObjectDesc *retval;
        Oid                     file_oid;
-       Relation        r;
-       Relation        indr;
-       TupleDesc       tupdesc;
-       IndexInfo  *indexInfo;
-       Oid                     classObjectId[1];
-       char            objname[NAMEDATALEN];
-       char            indname[NAMEDATALEN];
-
-       /*
-        * add one here since the pg_class tuple created will have the next
-        * oid and we want to have the relation name to correspond to the
-        * tuple OID
-        */
-       file_oid = newoid() + 1;
-
-       /* come up with some table names */
-       sprintf(objname, "xinv%u", file_oid);
-       sprintf(indname, "xinx%u", file_oid);
-
-       if (RelnameFindRelid(objname) != InvalidOid)
-               elog(ERROR,
-                 "internal error: %s already exists -- cannot create large obj",
-                        objname);
-       if (RelnameFindRelid(indname) != InvalidOid)
-               elog(ERROR,
-                 "internal error: %s already exists -- cannot create large obj",
-                        indname);
-
-       /* this is pretty painful...  want a tuple descriptor */
-       tupdesc = CreateTemplateTupleDesc(2);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 1,
-                                          "olastbye",
-                                          INT4OID,
-                                          -1, 0, false);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 2,
-                                          "odata",
-                                          BYTEAOID,
-                                          -1, 0, false);
+       LargeObjectDesc *retval;
 
        /*
-        * First create the table to hold the inversion large object.  It will
-        * be located on whatever storage manager the user requested.
+        * Allocate an OID to be the LO's identifier.
         */
+       file_oid = newoid();
 
-       heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT,
-                                                        false, false);
+       /* Check for duplicate (shouldn't happen) */
+       if (LargeObjectExists(file_oid))
+               elog(ERROR, "inv_create: large object %u already exists. This is internal error.", file_oid);
 
-       /* make the relation visible in this transaction */
-       CommandCounterIncrement();
-
-       /*--------------------
-        * We hold AccessShareLock on any large object we have open
-        * by inv_create or inv_open; it is released by inv_close.
-        * Note this will not conflict with ExclusiveLock or ShareLock
-        * that we acquire when actually reading/writing; it just prevents
-        * deletion of the large object while we have it open.
-        *--------------------
+       /*
+        * Create the LO by writing an empty first page for it in pg_largeobject
         */
-       r = heap_openr(objname, AccessShareLock);
+       (void) LargeObjectCreate(file_oid);
 
        /*
-        * Now create a btree index on the relation's olastbyte attribute to
-        * make seeks go faster.
+        * Advance command counter so that new tuple will be seen by later
+        * large-object operations in this transaction.
         */
-       indexInfo = makeNode(IndexInfo);
-       indexInfo->ii_NumIndexAttrs = 1;
-       indexInfo->ii_NumKeyAttrs = 1;
-       indexInfo->ii_KeyAttrNumbers[0] = 1;
-       indexInfo->ii_Predicate = NULL;
-       indexInfo->ii_FuncOid = InvalidOid;
-       indexInfo->ii_Unique = false;
-
-       classObjectId[0] = INT4_OPS_OID;
-
-       index_create(objname, indname, indexInfo,
-                                BTREE_AM_OID, classObjectId,
-                                false, false, false);
-
-       /* make the index visible in this transaction */
        CommandCounterIncrement();
 
-       indr = index_openr(indname);
-
-       if (!RelationIsValid(indr))
-       {
-               elog(ERROR, "cannot create index for large obj on %s under inversion",
-                        DatumGetCString(DirectFunctionCall1(smgrout,
-                                                        Int16GetDatum(DEFAULT_SMGR))));
-       }
-
+       /*
+        * Prepare LargeObjectDesc data structure for accessing LO
+        */
        retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
 
-       retval->heap_r = r;
-       retval->index_r = indr;
-       retval->iscan = (IndexScanDesc) NULL;
-       retval->hdesc = RelationGetDescr(r);
-       retval->idesc = RelationGetDescr(indr);
-       retval->offset = retval->lowbyte = retval->highbyte = 0;
-       ItemPointerSetInvalid(&(retval->htid));
-       retval->flags = 0;
+       retval->id = file_oid;
+       retval->offset = 0;
 
-       if (flags & INV_WRITE)
-       {
-               LockRelation(r, ExclusiveLock);
+       if (flags & INV_WRITE) {
                retval->flags = IFS_WRLOCK | IFS_RDLOCK;
-       }
-       else if (flags & INV_READ)
-       {
-               LockRelation(r, ShareLock);
+               retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+       } else if (flags & INV_READ) {
                retval->flags = IFS_RDLOCK;
-       }
-       retval->flags |= IFS_ATEOF;     /* since we know the object is empty */
+               retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock);
+       } else
+               elog(ERROR, "inv_create: invalid flags: %d", flags);
+
+       retval->index_r = index_openr(LargeObjectLOidPNIndex);
 
        return retval;
 }
 
+/*
+ *     inv_open -- access an existing large object.
+ *
+ *             Returns:
+ *               large object descriptor, appropriately filled in.
+ */
 LargeObjectDesc *
 inv_open(Oid lobjId, int flags)
 {
        LargeObjectDesc *retval;
-       Relation        r;
-       char       *indname;
-       Relation        indrel;
-
-       r = heap_open(lobjId, AccessShareLock);
-
-       indname = pstrdup(RelationGetRelationName(r));
-
-       /*
-        * hack hack hack...  we know that the fourth character of the
-        * relation name is a 'v', and that the fourth character of the index
-        * name is an 'x', and that they're otherwise identical.
-        */
-       indname[3] = 'x';
-       indrel = index_openr(indname);
-
-       if (!RelationIsValid(indrel))
-               return (LargeObjectDesc *) NULL;
 
+       if (! LargeObjectExists(lobjId))
+               elog(ERROR, "inv_open: large object %u not found", lobjId);
+       
        retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
 
-       retval->heap_r = r;
-       retval->index_r = indrel;
-       retval->iscan = (IndexScanDesc) NULL;
-       retval->hdesc = RelationGetDescr(r);
-       retval->idesc = RelationGetDescr(indrel);
-       retval->offset = retval->lowbyte = retval->highbyte = 0;
-       ItemPointerSetInvalid(&(retval->htid));
-       retval->flags = 0;
+       retval->id = lobjId;
+       retval->offset = 0;
 
-       if (flags & INV_WRITE)
-       {
-               LockRelation(r, ExclusiveLock);
+       if (flags & INV_WRITE) {
                retval->flags = IFS_WRLOCK | IFS_RDLOCK;
-       }
-       else if (flags & INV_READ)
-       {
-               LockRelation(r, ShareLock);
+               retval->heap_r = heap_openr(LargeObjectRelationName, RowExclusiveLock);
+       } else if (flags & INV_READ) {
                retval->flags = IFS_RDLOCK;
-       }
+               retval->heap_r = heap_openr(LargeObjectRelationName, AccessShareLock);
+       } else
+               elog(ERROR, "inv_open: invalid flags: %d", flags);
+
+       retval->index_r = index_openr(LargeObjectLOidPNIndex);
 
        return retval;
 }
@@ -261,174 +147,129 @@ inv_close(LargeObjectDesc *obj_desc)
 {
        Assert(PointerIsValid(obj_desc));
 
-       if (obj_desc->iscan != (IndexScanDesc) NULL)
-       {
-               index_endscan(obj_desc->iscan);
-               obj_desc->iscan = NULL;
-       }
-
+       if (obj_desc->flags & IFS_WRLOCK)
+               heap_close(obj_desc->heap_r, RowExclusiveLock);
+       else if (obj_desc->flags & IFS_RDLOCK)
+               heap_close(obj_desc->heap_r, AccessShareLock);
        index_close(obj_desc->index_r);
-       heap_close(obj_desc->heap_r, AccessShareLock);
 
        pfree(obj_desc);
 }
 
 /*
- * Destroys an existing large object, and frees its associated pointers.
+ * Destroys an existing large object (not to be confused with a descriptor!)
  *
  * returns -1 if failed
  */
 int
 inv_drop(Oid lobjId)
 {
-       Relation        r;
-
-       r = RelationIdGetRelation(lobjId);
-       if (!RelationIsValid(r))
-               return -1;
-
-       if (r->rd_rel->relkind != RELKIND_LOBJECT)
-       {
-               /* drop relcache refcount from RelationIdGetRelation */
-               RelationDecrementReferenceCount(r);
-               return -1;
-       }
+       LargeObjectDrop(lobjId);
 
        /*
-        * Since heap_drop_with_catalog will destroy the relcache entry,
-        * there's no need to drop the refcount in this path.
+        * Advance command counter so that tuple removal will be seen by later
+        * large-object operations in this transaction.
         */
-       heap_drop_with_catalog(RelationGetRelationName(r), false);
+       CommandCounterIncrement();
+
        return 1;
 }
 
 /*
- *     inv_stat() -- do a stat on an inversion file.
+ * Determine size of a large object
  *
- *             For the time being, this is an insanely expensive operation.  In
- *             order to find the size of the file, we seek to the last block in
- *             it and compute the size from that.      We scan pg_class to determine
- *             the file's owner and create time.  We don't maintain mod time or
- *             access time, yet.
- *
- *             These fields aren't stored in a table anywhere because they're
- *             updated so frequently, and postgres only appends tuples at the
- *             end of relations.  Once clustering works, we should fix this.
+ * NOTE: LOs can contain gaps, just like Unix files.  We actually return
+ * the offset of the last byte + 1.
  */
-#ifdef NOT_USED
-
-struct pgstat
-{                                                              /* just the fields we need from stat
-                                                                * structure */
-       int                     st_ino;
-       int                     st_mode;
-       unsigned int st_size;
-       unsigned int st_sizehigh;       /* high order bits */
-/* 2^64 == 1.8 x 10^20 bytes */
-       int                     st_uid;
-       int                     st_atime_s;             /* just the seconds */
-       int                     st_mtime_s;             /* since SysV and the new BSD both have */
-       int                     st_ctime_s;             /* usec fields.. */
-};
-
-int
-inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
+static uint32
+inv_getsize(LargeObjectDesc *obj_desc)
 {
+       bool                    found = false;
+       uint32                  lastbyte = 0;
+       uint32                  thislastbyte;
+       ScanKeyData             skey[1];
+       IndexScanDesc   sd;
+       RetrieveIndexResult     indexRes;
+       HeapTupleData   tuple;
+       Buffer                  buffer;
+       Form_pg_largeobject     data;
+       bytea              *datafield;
+       bool                    pfreeit;
+
        Assert(PointerIsValid(obj_desc));
-       Assert(stbuf != NULL);
 
-       /* need read lock for stat */
-       if (!(obj_desc->flags & IFS_RDLOCK))
-       {
-               LockRelation(obj_desc->heap_r, ShareLock);
-               obj_desc->flags |= IFS_RDLOCK;
-       }
+       ScanKeyEntryInitialize(&skey[0],
+                                                  (bits16) 0x0,
+                                                  (AttrNumber) 1,
+                                                  (RegProcedure) F_OIDEQ,
+                                                  ObjectIdGetDatum(obj_desc->id));
 
-       stbuf->st_ino = RelationGetRelid(obj_desc->heap_r);
-#if 1
-       stbuf->st_mode = (S_IFREG | 0666);      /* IFREG|rw-rw-rw- */
-#else
-       stbuf->st_mode = 100666;        /* IFREG|rw-rw-rw- */
-#endif
-       stbuf->st_size = _inv_getsize(obj_desc->heap_r,
-                                                                 obj_desc->hdesc,
-                                                                 obj_desc->index_r);
+       sd = index_beginscan(obj_desc->index_r, true, 1, skey);
 
-       stbuf->st_uid = obj_desc->heap_r->rd_rel->relowner;
+       tuple.t_datamcxt = CurrentMemoryContext;
+       tuple.t_data = NULL;
 
-       /* we have no good way of computing access times right now */
-       stbuf->st_atime_s = stbuf->st_mtime_s = stbuf->st_ctime_s = 0;
+       while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+       {
+               tuple.t_self = indexRes->heap_iptr;
+               heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+               pfree(indexRes);
+               if (tuple.t_data == NULL)
+                       continue;
+               found = true;
+               data = (Form_pg_largeobject) GETSTRUCT(&tuple);
+               datafield = &(data->data);
+               pfreeit = false;
+               if (VARATT_IS_EXTENDED(datafield))
+               {
+                       datafield = (bytea *)
+                               heap_tuple_untoast_attr((varattrib *) datafield);
+                       pfreeit = true;
+               }
+               thislastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
+               if (thislastbyte > lastbyte)
+                       lastbyte = thislastbyte;
+               if (pfreeit)
+                       pfree(datafield);
+               ReleaseBuffer(buffer);
+       }
+       
+       index_endscan(sd);
 
-       return 0;
+       if (!found)
+               elog(ERROR, "inv_getsize: large object %u not found", obj_desc->id);
+       return lastbyte;
 }
 
-#endif
-
 int
 inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
 {
-       int                     oldOffset;
-       Datum           d;
-       ScanKeyData skey;
-
        Assert(PointerIsValid(obj_desc));
 
-       if (whence == SEEK_CUR)
-       {
-               offset += obj_desc->offset;             /* calculate absolute position */
-       }
-       else if (whence == SEEK_END)
+       switch (whence)
        {
-               /* need read lock for getsize */
-               if (!(obj_desc->flags & IFS_RDLOCK))
-               {
-                       LockRelation(obj_desc->heap_r, ShareLock);
-                       obj_desc->flags |= IFS_RDLOCK;
-               }
-               offset += _inv_getsize(obj_desc->heap_r,
-                                                          obj_desc->hdesc,
-                                                          obj_desc->index_r);
-       }
-       /* now we can assume that the operation is SEEK_SET */
-
-       /*
-        * Whenever we do a seek, we turn off the EOF flag bit to force
-        * ourselves to check for real on the next read.
-        */
-
-       obj_desc->flags &= ~IFS_ATEOF;
-       oldOffset = obj_desc->offset;
-       obj_desc->offset = offset;
-
-       /* try to avoid doing any work, if we can manage it */
-       if (offset >= obj_desc->lowbyte
-               && offset <= obj_desc->highbyte
-               && oldOffset <= obj_desc->highbyte
-               && obj_desc->iscan != (IndexScanDesc) NULL)
-               return offset;
-
-       /*
-        * To do a seek on an inversion file, we start an index scan that will
-        * bring us to the right place.  Each tuple in an inversion file
-        * stores the offset of the last byte that appears on it, and we have
-        * an index on this.
-        */
-       if (obj_desc->iscan != (IndexScanDesc) NULL)
-       {
-               d = Int32GetDatum(offset);
-               btmovescan(obj_desc->iscan, d);
-       }
-       else
-       {
-               ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
-                                                          Int32GetDatum(offset));
-
-               obj_desc->iscan = index_beginscan(obj_desc->index_r,
-                                                                                 (bool) 0, (uint16) 1,
-                                                                                 &skey);
+               case SEEK_SET:
+                       if (offset < 0)
+                               elog(ERROR, "inv_seek: invalid offset: %d", offset);
+                       obj_desc->offset = offset;
+                       break;
+               case SEEK_CUR:
+                       if ((obj_desc->offset + offset) < 0)
+                               elog(ERROR, "inv_seek: invalid offset: %d", offset);
+                       obj_desc->offset += offset;
+                       break;
+               case SEEK_END:
+                       {
+                               uint32 size = inv_getsize(obj_desc);
+                               if (offset < 0 || ((uint32) offset) > size)
+                                       elog(ERROR, "inv_seek: invalid offset");
+                               obj_desc->offset = size - offset;
+                       }
+                       break;
+               default:
+                       elog(ERROR, "inv_seek: invalid whence: %d", whence);
        }
-
-       return offset;
+       return obj_desc->offset;
 }
 
 int
@@ -442,862 +283,306 @@ inv_tell(LargeObjectDesc *obj_desc)
 int
 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 {
-       HeapTupleData tuple;
-       int                     nread;
-       int                     off;
-       int                     ncopy;
-       Datum           d;
-       struct varlena *fsblock;
-       bool            isNull;
+       int                             nread = 0;
+       int                             n;
+       int                             off;
+       int                             len;
+       int32                   pageno = (int32) (obj_desc->offset / LOBLKSIZE);
+       uint32                  pageoff;
+       ScanKeyData             skey[2];
+       IndexScanDesc   sd;
+       RetrieveIndexResult     indexRes;
+       HeapTupleData   tuple;
+       Buffer                  buffer;
+       Form_pg_largeobject     data;
+       bytea              *datafield;
+       bool                    pfreeit;
 
        Assert(PointerIsValid(obj_desc));
        Assert(buf != NULL);
 
-       /* if we're already at EOF, we don't need to do any work here */
-       if (obj_desc->flags & IFS_ATEOF)
+       if (nbytes <= 0)
                return 0;
 
-       /* make sure we obey two-phase locking */
-       if (!(obj_desc->flags & IFS_RDLOCK))
-       {
-               LockRelation(obj_desc->heap_r, ShareLock);
-               obj_desc->flags |= IFS_RDLOCK;
-       }
+       ScanKeyEntryInitialize(&skey[0],
+                                                  (bits16) 0x0,
+                                                  (AttrNumber) 1,
+                                                  (RegProcedure) F_OIDEQ,
+                                                  ObjectIdGetDatum(obj_desc->id));
 
-       nread = 0;
+       ScanKeyEntryInitialize(&skey[1],
+                                                  (bits16) 0x0,
+                                                  (AttrNumber) 2,
+                                                  (RegProcedure) F_INT4GE,
+                                                  Int32GetDatum(pageno));
 
-       /* fetch a block at a time */
-       while (nread < nbytes)
-       {
-               Buffer          buffer;
+       sd = index_beginscan(obj_desc->index_r, false, 2, skey);
 
-               /* fetch an inversion file system block */
-               inv_fetchtup(obj_desc, &tuple, &buffer);
+       tuple.t_datamcxt = CurrentMemoryContext;
+       tuple.t_data = NULL;
 
-               if (tuple.t_data == NULL)
-               {
-                       obj_desc->flags |= IFS_ATEOF;
-                       break;
-               }
+       while ((indexRes = index_getnext(sd, ForwardScanDirection)))
+       {
+               tuple.t_self = indexRes->heap_iptr;
+               heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+               pfree(indexRes);
 
-               /* copy the data from this block into the buffer */
-               d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
-               fsblock = (struct varlena *) DatumGetPointer(d);
-               ReleaseBuffer(buffer);
+               if (tuple.t_data == NULL)
+                       continue;
+               
+               data = (Form_pg_largeobject) GETSTRUCT(&tuple);
 
                /*
-                * If block starts beyond current seek point, then we are looking
-                * at a "hole" (unwritten area) in the object.  Return zeroes for
-                * the "hole".
+                * We assume the indexscan will deliver pages in order.  However,
+                * there may be missing pages if the LO contains unwritten "holes".
+                * We want missing sections to read out as zeroes.
                 */
-               if (obj_desc->offset < obj_desc->lowbyte)
+               pageoff = ((uint32) data->pageno) * LOBLKSIZE;
+               if (pageoff > obj_desc->offset)
                {
-                       int             nzeroes = obj_desc->lowbyte - obj_desc->offset;
-
-                       if (nzeroes > (nbytes - nread))
-                               nzeroes = (nbytes - nread);
-                       MemSet(buf, 0, nzeroes);
-                       buf += nzeroes;
-                       nread += nzeroes;
-                       obj_desc->offset += nzeroes;
-                       if (nread >= nbytes)
-                               break;
+                       n = pageoff - obj_desc->offset;
+                       n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
+                       MemSet(buf + nread, 0, n);
+                       nread += n;
+                       obj_desc->offset += n;
                }
 
-               off = obj_desc->offset - obj_desc->lowbyte;
-               ncopy = obj_desc->highbyte - obj_desc->offset + 1;
-               if (ncopy > (nbytes - nread))
-                       ncopy = (nbytes - nread);
-               memmove(buf, &(fsblock->vl_dat[off]), ncopy);
+               if (nread < nbytes)
+               {
+                       Assert(obj_desc->offset >= pageoff);
+                       off = (int) (obj_desc->offset - pageoff);
+                       Assert(off >= 0 && off < LOBLKSIZE);
+
+                       datafield = &(data->data);
+                       pfreeit = false;
+                       if (VARATT_IS_EXTENDED(datafield))
+                       {
+                               datafield = (bytea *)
+                                       heap_tuple_untoast_attr((varattrib *) datafield);
+                               pfreeit = true;
+                       }
+                       len = getbytealen(datafield);
+                       if (len > off)
+                       {
+                               n = len - off;
+                               n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
+                               memcpy(buf + nread, VARDATA(datafield) + off, n);
+                               nread += n;
+                               obj_desc->offset += n;
+                       }
+                       if (pfreeit)
+                               pfree(datafield);
+               }
 
-               /* move pointers past the amount we just read */
-               buf += ncopy;
-               nread += ncopy;
-               obj_desc->offset += ncopy;
+               ReleaseBuffer(buffer);
+               if (nread >= nbytes)
+                       break;
        }
 
+       index_endscan(sd);
+
        return nread;
 }
 
 int
 inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 {
-       HeapTupleData tuple;
-       int                     nwritten;
-       int                     tuplen;
+       int                             nwritten = 0;
+       int                             n;
+       int                             off;
+       int                             len;
+       int32                   pageno = (int32) (obj_desc->offset / LOBLKSIZE);
+       ScanKeyData             skey[2];
+       IndexScanDesc   sd;
+       RetrieveIndexResult     indexRes;
+       HeapTupleData   oldtuple;
+       Buffer                  buffer;
+       Form_pg_largeobject     olddata;
+       bool                    neednextpage;
+       bytea              *datafield;
+       bool                    pfreeit;
+       char                    workbuf[LOBLKSIZE + VARHDRSZ];
+       char               *workb = VARATT_DATA(workbuf);
+       HeapTuple               newtup;
+       Datum                   values[Natts_pg_largeobject];
+       char                    nulls[Natts_pg_largeobject];
+       char                    replace[Natts_pg_largeobject];
+       bool                    write_indices;
+       Relation                idescs[Num_pg_largeobject_indices];
 
        Assert(PointerIsValid(obj_desc));
        Assert(buf != NULL);
 
-       /*
-        * Make sure we obey two-phase locking.  A write lock entitles you to
-        * read the relation, as well.
-        */
+       if (nbytes <= 0)
+               return 0;
 
-       if (!(obj_desc->flags & IFS_WRLOCK))
-       {
-               LockRelation(obj_desc->heap_r, ExclusiveLock);
-               obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
-       }
+       write_indices = ! IsIgnoringSystemIndexes();
+       if (write_indices)
+               CatalogOpenIndices(Num_pg_largeobject_indices,
+                                                  Name_pg_largeobject_indices,
+                                                  idescs);
+
+       ScanKeyEntryInitialize(&skey[0],
+                                                  (bits16) 0x0,
+                                                  (AttrNumber) 1,
+                                                  (RegProcedure) F_OIDEQ,
+                                                  ObjectIdGetDatum(obj_desc->id));
+
+       ScanKeyEntryInitialize(&skey[1],
+                                                  (bits16) 0x0,
+                                                  (AttrNumber) 2,
+                                                  (RegProcedure) F_INT4GE,
+                                                  Int32GetDatum(pageno));
 
-       nwritten = 0;
+       sd = index_beginscan(obj_desc->index_r, false, 2, skey);
+
+       oldtuple.t_datamcxt = CurrentMemoryContext;
+       oldtuple.t_data = NULL;
+       olddata = NULL;
+       buffer = InvalidBuffer;
+       neednextpage = true;
 
-       /* write a block at a time */
        while (nwritten < nbytes)
        {
-               Buffer          buffer;
-
                /*
-                * Fetch the current inversion file system block.  We can skip
-                * the work if we already know we are at EOF.
+                * If possible, get next pre-existing page of the LO.  We assume
+                * the indexscan will deliver these in order --- but there may be
+                * holes.
                 */
-
-               if (obj_desc->flags & IFS_ATEOF)
-                       tuple.t_data = NULL;
-               else
-                       inv_fetchtup(obj_desc, &tuple, &buffer);
-
-               /* either append or replace a block, as required */
-               if (tuple.t_data == NULL)
-                       tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
-               else
+               if (neednextpage)
                {
-                       if (obj_desc->offset > obj_desc->highbyte)
+                       while ((indexRes = index_getnext(sd, ForwardScanDirection)))
                        {
-                               tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
-                               ReleaseBuffer(buffer);
+                               oldtuple.t_self = indexRes->heap_iptr;
+                               heap_fetch(obj_desc->heap_r, SnapshotNow, &oldtuple, &buffer);
+                               pfree(indexRes);
+                               if (oldtuple.t_data != NULL)
+                               {
+                                       olddata = (Form_pg_largeobject) GETSTRUCT(&oldtuple);
+                                       Assert(olddata->pageno >= pageno);
+                                       break;
+                               }
                        }
-                       else
-                               tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer);
-
-                       /*
-                        * inv_wrold() has already issued WriteBuffer() which has
-                        * decremented local reference counter (LocalRefCount). So we
-                        * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4
-                        */
+                       neednextpage = false;
                }
-
-               /* move pointers past the amount we just wrote */
-               buf += tuplen;
-               nwritten += tuplen;
-               obj_desc->offset += tuplen;
-       }
-
-       /* that's it */
-       return nwritten;
-}
-
-/*
- * inv_cleanindex
- *              Clean opened indexes for large objects, and clears current result.
- *              This is necessary on transaction commit in order to prevent buffer
- *              leak.
- *              This function must be called for each opened large object.
- *              [ PA, 7/17/98 ]
- */
-void
-inv_cleanindex(LargeObjectDesc *obj_desc)
-{
-       Assert(PointerIsValid(obj_desc));
-
-       if (obj_desc->iscan == (IndexScanDesc) NULL)
-               return;
-
-       index_endscan(obj_desc->iscan);
-       obj_desc->iscan = (IndexScanDesc) NULL;
-
-       ItemPointerSetInvalid(&(obj_desc->htid));
-}
-
-/*
- *     inv_fetchtup -- Fetch an inversion file system block.
- *
- *             This routine finds the file system block containing the offset
- *             recorded in the obj_desc structure.  Later, we need to think about
- *             the effects of non-functional updates (can you rewrite the same
- *             block twice in a single transaction?), but for now, we won't bother.
- *
- *             Parameters:
- *                             obj_desc -- the object descriptor.
- *                             bufP -- pointer to a buffer in the buffer cache; caller
- *                                             must free this.
- *
- *             Returns:
- *                             A heap tuple containing the desired block, or NULL if no
- *                             such tuple exists.
- */
-static void
-inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
-{
-       RetrieveIndexResult res;
-       Datum           d;
-       int                     firstbyte,
-                               lastbyte;
-       struct varlena *fsblock;
-       bool            isNull;
-
-       /*
-        * If we've exhausted the current block, we need to get the next one.
-        * When we support time travel and non-functional updates, we will
-        * need to loop over the blocks, rather than just have an 'if', in
-        * order to find the one we're really interested in.
-        */
-
-       if (obj_desc->offset > obj_desc->highbyte
-               || obj_desc->offset < obj_desc->lowbyte
-               || !ItemPointerIsValid(&(obj_desc->htid)))
-       {
-               ScanKeyData skey;
-
-               ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
-                                                          Int32GetDatum(obj_desc->offset));
-
-               /* initialize scan key if not done */
-               if (obj_desc->iscan == (IndexScanDesc) NULL)
+               /*
+                * If we have a pre-existing page, see if it is the page we want
+                * to write, or a later one.
+                */
+               if (olddata != NULL && olddata->pageno == pageno)
                {
-
                        /*
-                        * As scan index may be prematurely closed (on commit), we
-                        * must use object current offset (was 0) to reinitialize the
-                        * entry [ PA ].
+                        * Update an existing page with fresh data.
+                        *
+                        * First, load old data into workbuf
                         */
-                       obj_desc->iscan = index_beginscan(obj_desc->index_r,
-                                                                                         (bool) 0, (uint16) 1,
-                                                                                         &skey);
-               }
-               else
-                       index_rescan(obj_desc->iscan, false, &skey);
-
-               do
-               {
-                       res = index_getnext(obj_desc->iscan, ForwardScanDirection);
-
-                       if (res == (RetrieveIndexResult) NULL)
+                       datafield = &(olddata->data);
+                       pfreeit = false;
+                       if (VARATT_IS_EXTENDED(datafield))
                        {
-                               ItemPointerSetInvalid(&(obj_desc->htid));
-                               tuple->t_datamcxt = NULL;
-                               tuple->t_data = NULL;
-                               return;
+                               datafield = (bytea *)
+                                       heap_tuple_untoast_attr((varattrib *) datafield);
+                               pfreeit = true;
                        }
-
+                       len = getbytealen(datafield);
+                       Assert(len <= LOBLKSIZE);
+                       memcpy(workb, VARDATA(datafield), len);
+                       if (pfreeit)
+                               pfree(datafield);
                        /*
-                        * For time travel, we need to use the actual time qual here,
-                        * rather that NowTimeQual.  We currently have no way to pass
-                        * a time qual in.
-                        *
-                        * This is now valid for snapshot !!! And should be fixed in some
-                        * way...       - vadim 07/28/98
-                        *
+                        * Fill any hole
+                        */
+                       off = (int) (obj_desc->offset % LOBLKSIZE);
+                       if (off > len)
+                               MemSet(workb + len, 0, off - len);
+                       /*
+                        * Insert appropriate portion of new data
+                        */
+                       n = LOBLKSIZE - off;
+                       n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
+                       memcpy(workb + off, buf + nwritten, n);
+                       nwritten += n;
+                       obj_desc->offset += n;
+                       off += n;
+                       /* compute valid length of new page */
+                       len = (len >= off) ? len : off;
+                       VARATT_SIZEP(workbuf) = len + VARHDRSZ;
+                       /*
+                        * Form and insert updated tuple
+                        */
+                       memset(values, 0, sizeof(values));
+                       memset(nulls, ' ', sizeof(nulls));
+                       memset(replace, ' ', sizeof(replace));
+                       values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf);
+                       replace[Anum_pg_largeobject_data - 1] = 'r';
+                       newtup = heap_modifytuple(&oldtuple, obj_desc->heap_r,
+                                                                         values, nulls, replace);
+                       heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL);
+                       if (write_indices)
+                               CatalogIndexInsert(idescs, Num_pg_largeobject_indices,
+                                                                  obj_desc->heap_r, newtup);
+                       heap_freetuple(newtup);
+                       /*
+                        * We're done with this old page.
                         */
-                       tuple->t_self = res->heap_iptr;
-                       heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
-                       pfree(res);
-               } while (tuple->t_data == NULL);
-
-               /* remember this tid -- we may need it for later reads/writes */
-               ItemPointerCopy(&(tuple->t_self), &obj_desc->htid);
-       }
-       else
-       {
-               tuple->t_self = obj_desc->htid;
-               heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
-               if (tuple->t_data == NULL)
-                       elog(ERROR, "inv_fetchtup: heap_fetch failed");
-       }
-
-       /*
-        * By here, we have the heap tuple we're interested in.  We cache the
-        * upper and lower bounds for this block in the object descriptor and
-        * return the tuple.
-        */
-
-       d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull);
-       lastbyte = (int32) DatumGetInt32(d);
-       d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
-       fsblock = (struct varlena *) DatumGetPointer(d);
-
-       /*
-        * order of + and - is important -- these are unsigned quantites near
-        * 0
-        */
-       firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
-
-       obj_desc->lowbyte = firstbyte;
-       obj_desc->highbyte = lastbyte;
-
-       return;
-}
-
-/*
- *     inv_wrnew() -- append a new filesystem block tuple to the inversion
- *                                     file.
- *
- *             In response to an inv_write, we append one or more file system
- *             blocks to the class containing the large object.  We violate the
- *             class abstraction here in order to pack things as densely as we
- *             are able.  We examine the last page in the relation, and write
- *             just enough to fill it, assuming that it has above a certain
- *             threshold of space available.  If the space available is less than
- *             the threshold, we allocate a new page by writing a big tuple.
- *
- *             By the time we get here, we know all the parameters passed in
- *             are valid, and that we hold the appropriate lock on the heap
- *             relation.
- *
- *             Parameters:
- *                             obj_desc: large object descriptor for which to append block.
- *                             buf: buffer containing data to write.
- *                             nbytes: amount to write
- *
- *             Returns:
- *                             number of bytes actually written to the new tuple.
- */
-static int
-inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
-{
-       Relation        hr;
-       HeapTuple       ntup;
-       Buffer          buffer;
-       Page            page;
-       int                     nblocks;
-       int                     nwritten;
-
-       hr = obj_desc->heap_r;
-
-       /*
-        * Get the last block in the relation.  If there's no data in the
-        * relation at all, then we just get a new block.  Otherwise, we check
-        * the last block to see whether it has room to accept some or all of
-        * the data that the user wants to write.  If it doesn't, then we
-        * allocate a new block.
-        */
-
-       nblocks = RelationGetNumberOfBlocks(hr);
-
-       if (nblocks > 0)
-       {
-               buffer = ReadBuffer(hr, nblocks - 1);
-               page = BufferGetPage(buffer);
-       }
-       else
-       {
-               buffer = ReadBuffer(hr, P_NEW);
-               page = BufferGetPage(buffer);
-               PageInit(page, BufferGetPageSize(buffer), 0);
-       }
-
-       /*
-        * If the last page is too small to hold all the data, and it's too
-        * small to hold IMINBLK, then we allocate a new page.  If it will
-        * hold at least IMINBLK, but less than all the data requested, then
-        * we write IMINBLK here.  The caller is responsible for noticing that
-        * less than the requested number of bytes were written, and calling
-        * this routine again.
-        */
-
-       nwritten = IFREESPC(page);
-       if (nwritten < nbytes)
-       {
-               if (nwritten < IMINBLK)
-               {
                        ReleaseBuffer(buffer);
-                       buffer = ReadBuffer(hr, P_NEW);
-                       page = BufferGetPage(buffer);
-                       PageInit(page, BufferGetPageSize(buffer), 0);
-                       if (nbytes > IMAXBLK)
-                               nwritten = IMAXBLK;
-                       else
-                               nwritten = nbytes;
-               }
-       }
-       else
-               nwritten = nbytes;
-
-       /*
-        * Insert a new file system block tuple, index it, and write it out.
-        */
-
-       ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
-       inv_indextup(obj_desc, ntup);
-       heap_freetuple(ntup);
-
-       /* new tuple is inserted */
-       WriteBuffer(buffer);
-
-       return nwritten;
-}
-
-static int
-inv_wrold(LargeObjectDesc *obj_desc,
-                 char *dbuf,
-                 int nbytes,
-                 HeapTuple tuple,
-                 Buffer buffer)
-{
-       Relation        hr;
-       HeapTuple       ntup;
-       Buffer          newbuf;
-       Page            page;
-       Page            newpage;
-       int                     tupbytes;
-       Datum           d;
-       struct varlena *fsblock;
-       int                     nwritten,
-                               nblocks,
-                               freespc;
-       bool            isNull;
-       int                     keep_offset;
-       RetrieveIndexResult res;
-
-       /*
-        * Since we're using a no-overwrite storage manager, the way we
-        * overwrite blocks is to mark the old block invalid and append a new
-        * block.  First mark the old block invalid.  This violates the tuple
-        * abstraction.
-        */
-
-       TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
-       tuple->t_data->t_cmax = GetCurrentCommandId();
-       tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
-
-       /*
-        * If we're overwriting the entire block, we're lucky.  All we need to
-        * do is to insert a new block.
-        */
-
-       if (obj_desc->offset == obj_desc->lowbyte
-               && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
-       {
-               WriteBuffer(buffer);
-               return inv_wrnew(obj_desc, dbuf, nbytes);
-       }
-
-       /*
-        * By here, we need to overwrite part of the data in the current
-        * tuple.  In order to reduce the degree to which we fragment blocks,
-        * we guarantee that no block will be broken up due to an overwrite.
-        * This means that we need to allocate a tuple on a new page, if
-        * there's not room for the replacement on this one.
-        */
-
-       newbuf = buffer;
-       page = BufferGetPage(buffer);
-       newpage = BufferGetPage(newbuf);
-       hr = obj_desc->heap_r;
-       freespc = IFREESPC(page);
-       d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
-       fsblock = (struct varlena *) DatumGetPointer(d);
-       tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
-
-       if (freespc < tupbytes)
-       {
-
-               /*
-                * First see if there's enough space on the last page of the table
-                * to put this tuple.
-                */
-
-               nblocks = RelationGetNumberOfBlocks(hr);
-
-               if (nblocks > 0)
-               {
-                       newbuf = ReadBuffer(hr, nblocks - 1);
-                       newpage = BufferGetPage(newbuf);
+                       oldtuple.t_datamcxt = CurrentMemoryContext;
+                       oldtuple.t_data = NULL;
+                       olddata = NULL;
+                       neednextpage = true;
                }
                else
                {
-                       newbuf = ReadBuffer(hr, P_NEW);
-                       newpage = BufferGetPage(newbuf);
-                       PageInit(newpage, BufferGetPageSize(newbuf), 0);
-               }
-
-               freespc = IFREESPC(newpage);
-
-               /*
-                * If there's no room on the last page, allocate a new last page
-                * for the table, and put it there.
-                */
-
-               if (freespc < tupbytes)
-               {
-                       ReleaseBuffer(newbuf);
-                       newbuf = ReadBuffer(hr, P_NEW);
-                       newpage = BufferGetPage(newbuf);
-                       PageInit(newpage, BufferGetPageSize(newbuf), 0);
+                       /*
+                        * Write a brand new page.
+                        *
+                        * First, fill any hole
+                        */
+                       off = (int) (obj_desc->offset % LOBLKSIZE);
+                       if (off > 0)
+                               MemSet(workb, 0, off);
+                       /*
+                        * Insert appropriate portion of new data
+                        */
+                       n = LOBLKSIZE - off;
+                       n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
+                       memcpy(workb + off, buf + nwritten, n);
+                       nwritten += n;
+                       obj_desc->offset += n;
+                       /* compute valid length of new page */
+                       len = off + n;
+                       VARATT_SIZEP(workbuf) = len + VARHDRSZ;
+                       /*
+                        * Form and insert updated tuple
+                        */
+                       memset(values, 0, sizeof(values));
+                       memset(nulls, ' ', sizeof(nulls));
+                       values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
+                       values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
+                       values[Anum_pg_largeobject_data - 1] = PointerGetDatum(workbuf);
+                       newtup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls);
+                       heap_insert(obj_desc->heap_r, newtup);
+                       if (write_indices)
+                               CatalogIndexInsert(idescs, Num_pg_largeobject_indices,
+                                                                  obj_desc->heap_r, newtup);
+                       heap_freetuple(newtup);
                }
+               pageno++;
        }
 
-       nwritten = nbytes;
-       if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
-               nwritten = obj_desc->highbyte - obj_desc->offset + 1;
-       memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
-                       dbuf, nwritten);
-
-       /*
-        * we are rewriting the entire old block, therefore we reset offset to
-        * the lowbyte of the original block before jumping into
-        * inv_newtuple()
-        */
-       keep_offset = obj_desc->offset;
-       obj_desc->offset = obj_desc->lowbyte;
-       ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
-                                               tupbytes);
-       /* after we are done, we restore to the true offset */
-       obj_desc->offset = keep_offset;
-
-       /*
-        * By here, we have a page (newpage) that's guaranteed to have enough
-        * space on it to put the new tuple.  Call inv_newtuple to do the
-        * work.  Passing NULL as a buffer to inv_newtuple() keeps it from
-        * copying any data into the new tuple.  When it returns, the tuple is
-        * ready to receive data from the old tuple and the user's data
-        * buffer.
-        */
-/*
-       ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
-       dptr = ((char *) ntup) + ntup->t_hoff -
-                               (sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) +
-                               sizeof(int4)
-                               + sizeof(fsblock->vl_len);
-
-       if (obj_desc->offset > obj_desc->lowbyte) {
-               memmove(dptr,
-                               &(fsblock->vl_dat[0]),
-                               obj_desc->offset - obj_desc->lowbyte);
-               dptr += obj_desc->offset - obj_desc->lowbyte;
-       }
-
-
-       nwritten = nbytes;
-       if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
-               nwritten = obj_desc->highbyte - obj_desc->offset + 1;
-
-       memmove(dptr, dbuf, nwritten);
-       dptr += nwritten;
-
-       if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
-*/
-/*
-               loc = (obj_desc->highbyte - obj_desc->offset)
-                               + nwritten;
-               sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
-
-               what's going on here?? - jolly
-*/
-/*
-               sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
-               memmove(&(fsblock->vl_dat[0]), dptr, sz);
-       }
-*/
-
-
-       /* index the new tuple */
-       inv_indextup(obj_desc, ntup);
-       heap_freetuple(ntup);
+       if (olddata != NULL)
+               ReleaseBuffer(buffer);
 
-       /*
-        * move the scandesc forward so we don't reread the newly inserted
-        * tuple on the next index scan
-        */
-       res = NULL;
-       if (obj_desc->iscan)
-               res = index_getnext(obj_desc->iscan, ForwardScanDirection);
+       index_endscan(sd);
 
-       if (res)
-               pfree(res);
+       if (write_indices)
+               CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
 
        /*
-        * Okay, by here, a tuple for the new block is correctly placed,
-        * indexed, and filled.  Write the changed pages out.
+        * Advance command counter so that my tuple updates will be seen by later
+        * large-object operations in this transaction.
         */
+       CommandCounterIncrement();
 
-       WriteBuffer(buffer);
-       if (newbuf != buffer)
-               WriteBuffer(newbuf);
-
-       /* Tuple id is no longer valid */
-       ItemPointerSetInvalid(&(obj_desc->htid));
-
-       /* done */
        return nwritten;
 }
-
-static HeapTuple
-inv_newtuple(LargeObjectDesc *obj_desc,
-                        Buffer buffer,
-                        Page page,
-                        char *dbuf,
-                        int nwrite)
-{
-       HeapTuple       ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
-       PageHeader      ph;
-       int                     tupsize;
-       int                     hoff;
-       Offset          lower;
-       Offset          upper;
-       ItemId          itemId;
-       OffsetNumber off;
-       OffsetNumber limit;
-       char       *attptr;
-
-       /* compute tuple size -- no nulls */
-       hoff = offsetof(HeapTupleHeaderData, t_bits);
-       hoff = MAXALIGN(hoff);
-
-       /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
-       tupsize = hoff + (2 * sizeof(int32)) + nwrite;
-       tupsize = MAXALIGN(tupsize);
-
-       /*
-        * Allocate the tuple on the page, violating the page abstraction.
-        * This code was swiped from PageAddItem().
-        */
-
-       ph = (PageHeader) page;
-       limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
-
-       /* look for "recyclable" (unused & deallocated) ItemId */
-       for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
-       {
-               itemId = &ph->pd_linp[off - 1];
-               if ((((*itemId).lp_flags & LP_USED) == 0) &&
-                       ((*itemId).lp_len == 0))
-                       break;
-       }
-
-       if (off > limit)
-               lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
-       else if (off == limit)
-               lower = ph->pd_lower + sizeof(ItemIdData);
-       else
-               lower = ph->pd_lower;
-
-       upper = ph->pd_upper - tupsize;
-
-       itemId = &ph->pd_linp[off - 1];
-       (*itemId).lp_off = upper;
-       (*itemId).lp_len = tupsize;
-       (*itemId).lp_flags = LP_USED;
-       ph->pd_lower = lower;
-       ph->pd_upper = upper;
-
-       ntup->t_datamcxt = NULL;
-       ntup->t_data = (HeapTupleHeader) ((char *) page + upper);
-
-       /*
-        * Tuple is now allocated on the page.  Next, fill in the tuple
-        * header.      This block of code violates the tuple abstraction.
-        */
-
-       ntup->t_len = tupsize;
-       ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off);
-       ntup->t_data->t_oid = newoid();
-       TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin));
-       ntup->t_data->t_cmin = GetCurrentCommandId();
-       StoreInvalidTransactionId(&(ntup->t_data->t_xmax));
-       ntup->t_data->t_cmax = 0;
-       ntup->t_data->t_infomask = HEAP_XMAX_INVALID;
-       ntup->t_data->t_natts = 2;
-       ntup->t_data->t_hoff = hoff;
-
-       /* if a NULL is passed in, avoid the calculations below */
-       if (dbuf == NULL)
-               return ntup;
-
-       /*
-        * Finally, copy the user's data buffer into the tuple.  This violates
-        * the tuple and class abstractions.
-        */
-
-       attptr = ((char *) ntup->t_data) + hoff;
-       *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
-       attptr += sizeof(int32);
-
-       /*
-        * *  mer fixed disk layout of varlenas to get rid of the need for
-        * this. *
-        *
-        * ((int32 *) attptr) = nwrite + sizeof(int32); *  attptr +=
-        * sizeof(int32);
-        */
-
-       *((int32 *) attptr) = nwrite + sizeof(int32);
-       attptr += sizeof(int32);
-
-       /*
-        * If a data buffer was passed in, then copy the data from the buffer
-        * to the tuple.  Some callers (eg, inv_wrold()) may not pass in a
-        * buffer, since they have to copy part of the old tuple data and part
-        * of the user's new data into the new tuple.
-        */
-
-       if (dbuf != (char *) NULL)
-               memmove(attptr, dbuf, nwrite);
-
-       /* keep track of boundary of current tuple */
-       obj_desc->lowbyte = obj_desc->offset;
-       obj_desc->highbyte = obj_desc->offset + nwrite - 1;
-
-       /* new tuple is filled -- return it */
-       return ntup;
-}
-
-static void
-inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
-{
-       InsertIndexResult res;
-       Datum           v[1];
-       char            n[1];
-
-       n[0] = ' ';
-       v[0] = Int32GetDatum(obj_desc->highbyte);
-       res = index_insert(obj_desc->index_r, &v[0], &n[0],
-                                          &(tuple->t_self), obj_desc->heap_r);
-
-       if (res)
-               pfree(res);
-}
-
-#ifdef NOT_USED
-
-static void
-DumpPage(Page page, int blkno)
-{
-               ItemId                  lp;
-               HeapTuple               tup;
-               int                             flags, i, nline;
-               ItemPointerData pointerData;
-
-               printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
-                               ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
-                               ((PageHeader)page)->pd_special);
-
-               printf("\t:MaxOffsetNumber=%d\n",
-                          (int16) PageGetMaxOffsetNumber(page));
-
-               nline = (int16) PageGetMaxOffsetNumber(page);
-
-{
-               int             i;
-               char    *cp;
-
-               i = PageGetSpecialSize(page);
-               cp = PageGetSpecialPointer(page);
-
-               printf("\t:SpecialData=");
-
-               while (i > 0) {
-                               printf(" 0x%02x", *cp);
-                               cp += 1;
-                               i -= 1;
-               }
-               printf("\n");
-}
-               for (i = 0; i < nline; i++) {
-                               lp = ((PageHeader)page)->pd_linp + i;
-                               flags = (*lp).lp_flags;
-                               ItemPointerSet(&pointerData, blkno, 1 + i);
-                               printf("%s:off=%d:flags=0x%x:len=%d",
-                                               ItemPointerFormExternal(&pointerData), (*lp).lp_off,
-                                               flags, (*lp).lp_len);
-
-                               if (flags & LP_USED) {
-                                               HeapTupleData   htdata;
-
-                                               printf(":USED");
-
-                                               memmove((char *) &htdata,
-                                                               (char *) &((char *)page)[(*lp).lp_off],
-                                                               sizeof(htdata));
-
-                                               tup = &htdata;
-
-                                               printf("\n\t:ctid=%s:oid=%d",
-                                                               ItemPointerFormExternal(&tup->t_ctid),
-                                                               tup->t_oid);
-                                               printf(":natts=%d:thoff=%d:",
-                                                               tup->t_natts,
-                                                               tup->t_hoff);
-
-                                               printf("\n\t:cmin=%u:",
-                                                               tup->t_cmin);
-
-                                               printf("xmin=%u:", tup->t_xmin);
-
-                                               printf("\n\t:cmax=%u:",
-                                                               tup->t_cmax);
-
-                                               printf("xmax=%u:\n", tup->t_xmax);
-
-                               } else
-                                               putchar('\n');
-               }
-}
-
-static char*
-ItemPointerFormExternal(ItemPointer pointer)
-{
-               static char             itemPointerString[32];
-
-               if (!ItemPointerIsValid(pointer)) {
-                       memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
-               } else {
-                       sprintf(itemPointerString, "<%u,%u>",
-                                       ItemPointerGetBlockNumber(pointer),
-                                       ItemPointerGetOffsetNumber(pointer));
-               }
-
-               return itemPointerString;
-}
-
-#endif
-
-static int
-_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
-{
-       IndexScanDesc iscan;
-       RetrieveIndexResult res;
-       HeapTupleData tuple;
-       Datum           d;
-       long            size;
-       bool            isNull;
-       Buffer          buffer;
-
-       /* scan backwards from end */
-       iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
-
-       do
-       {
-               res = index_getnext(iscan, BackwardScanDirection);
-
-               /*
-                * If there are no more index tuples, then the relation is empty,
-                * so the file's size is zero.
-                */
-
-               if (res == (RetrieveIndexResult) NULL)
-               {
-                       index_endscan(iscan);
-                       return 0;
-               }
-
-               /*
-                * For time travel, we need to use the actual time qual here,
-                * rather that NowTimeQual.  We currently have no way to pass a
-                * time qual in.
-                */
-               tuple.t_self = res->heap_iptr;
-               heap_fetch(hreln, SnapshotNow, &tuple, &buffer);
-               pfree(res);
-       } while (tuple.t_data == NULL);
-
-       /* don't need the index scan anymore */
-       index_endscan(iscan);
-
-       /* get olastbyte attribute */
-       d = heap_getattr(&tuple, 1, hdesc, &isNull);
-       size = DatumGetInt32(d) + 1;
-       ReleaseBuffer(buffer);
-
-       return size;
-}
index 04a760ab917dfdd9975d4d513040829213307e21..b448256d3c0ddd3d8a1e1a3d5c991eac7fb2a2ed 100644 (file)
@@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal)
                fprintf(stderr, "%s saving BLOBs\n", g_comment_start);
 
        /* Cursor to get all BLOB tables */
-    appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT);
+    appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT DISTINCT loid FROM pg_largeobject");
 
        res = PQexec(g_conn, oidQry->data);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -1874,8 +1874,7 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs)
         * tables before the child tables when traversing the tblinfo*
         *
         * we ignore tables that are not type 'r' (ordinary relation) or 'S'
-        * (sequence) or 'v' (view) --- in particular, Large Object 
-     * relations (type 'l') are ignored.
+        * (sequence) or 'v' (view).
         */
 
        appendPQExpBuffer(query,
@@ -1886,7 +1885,6 @@ getTables(int *numTables, FuncInfo *finfo, int numFuncs)
                                          "where relname !~ '^pg_' "
                                          "and relkind in ('%c', '%c', '%c') "
                                          "order by oid",
-                               RELKIND_VIEW,
                                RELKIND_RELATION, RELKIND_SEQUENCE, RELKIND_VIEW);
 
        res = PQexec(g_conn, query->data);
@@ -2585,7 +2583,7 @@ getIndices(int *numIndices)
         * find all the user-defined indices. We do not handle partial
         * indices.
         *
-        * Notice we skip indices on inversion objects (relkind 'l')
+        * Notice we skip indices on system classes
         *
         * this is a 4-way join !!
         */
@@ -2597,8 +2595,8 @@ getIndices(int *numIndices)
                                        "from pg_index i, pg_class t1, pg_class t2, pg_am a "
                                   "WHERE t1.oid = i.indexrelid and t2.oid = i.indrelid "
                                          "and t1.relam = a.oid and i.indexrelid > '%u'::oid "
-                                         "and t2.relname !~ '^pg_' and t2.relkind != '%c' and not i.indisprimary",
-                                         g_last_builtin_oid, RELKIND_LOBJECT);
+                                         "and t2.relname !~ '^pg_' and not i.indisprimary",
+                                         g_last_builtin_oid);
 
        res = PQexec(g_conn, query->data);
        if (!res ||
index d97c8a7b67024e7e6adb9a08ddc7433f70bff84b..9cb8384dc29dc2b1ca3acda777fef0971ec3e238 100644 (file)
@@ -59,7 +59,7 @@ proc update_attnvals {conn rel} {
 proc updateStats { dbName } {
     # datnames is the list to be result
     set conn [pg_connect $dbName]
-    set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_' and relname !~ '^xinv'"]
+    set res [pg_exec $conn "SELECT relname FROM pg_class WHERE relkind = 'r' and relname !~ '^pg_'"]
     set ntups [pg_result $res -numTuples]
     for {set i 0} {$i < $ntups} {incr i} {
        set rel [pg_result $res -getTuple $i]
index 6423faf0276bc0cab4c756553d5ce45e1b3b8741..0ed6b26bffea4f3e2aa7c2277b9b7ca1c0c7c5ff 100644 (file)
@@ -1020,10 +1020,6 @@ listTables(const char *infotype, const char *name, bool desc)
                        strcat(buf, "'S'");
                strcat(buf, ")\n");
 
-               /* ignore large-obj indices */
-               if (showIndices)
-                       strcat(buf, "  AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n");
-
                strcat(buf, showSystem ? "  AND c.relname ~ '^pg_'\n" : "  AND c.relname !~ '^pg_'\n");
                if (name)
                {
@@ -1050,10 +1046,6 @@ listTables(const char *infotype, const char *name, bool desc)
                        strcat(buf, "'S'");
                strcat(buf, ")\n");
 
-               /* ignore large-obj indices */
-               if (showIndices)
-                       strcat(buf, "  AND (c.relkind != 'i' OR c.relname !~ '^xinx')\n");
-
                strcat(buf, showSystem ? "  AND c.relname ~ '^pg_'\n" : "  AND c.relname !~ '^pg_'\n");
                if (name)
                {
index 8e099151c1048ee585c990e9a79ab9091270cde5..8056703244d5cf46e378138e19ae2e5e9a16cc77 100644 (file)
@@ -193,7 +193,7 @@ do_lo_import(const char *filename_arg, const char *comment_arg)
        /* insert description if given */
        if (comment_arg)
        {
-               sprintf(buf, "INSERT INTO pg_description VALUES (%d, '", loid);
+               sprintf(buf, "INSERT INTO pg_description VALUES (%u, '", loid);
                for (i = 0; i < strlen(comment_arg); i++)
                        if (comment_arg[i] == '\'')
                                strcat(buf, "\\'");
@@ -284,7 +284,7 @@ do_lo_unlink(const char *loid_arg)
        }
 
        /* remove the comment as well */
-       sprintf(buf, "DELETE FROM pg_description WHERE objoid = %d", loid);
+       sprintf(buf, "DELETE FROM pg_description WHERE objoid = %u", loid);
        if (!(res = PSQLexec(buf)))
        {
                if (own_transaction)
@@ -328,15 +328,9 @@ do_lo_list(void)
        printQueryOpt myopt = pset.popt;
 
        strcpy(buf,
-       "SELECT usename as \"Owner\", substring(relname from 5) as \"ID\",\n"
-                  "  obj_description(pg_class.oid) as \"Description\"\n"
-                  "FROM pg_class, pg_user\n"
-                  "WHERE usesysid = relowner AND relkind = 'l'\n"
-                  "UNION\n"
-          "SELECT NULL as \"Owner\", substring(relname from 5) as \"ID\",\n"
-                  "  obj_description(pg_class.oid) as \"Description\"\n"
-                  "FROM pg_class\n"
-                  "WHERE not exists (select 1 from pg_user where usesysid = relowner) AND relkind = 'l'\n"
+       "SELECT DISTINCT loid as \"ID\",\n"
+                  "  obj_description(loid) as \"Description\"\n"
+                  "FROM pg_largeobject\n"
                   "ORDER BY \"ID\"");
 
        res = PSQLexec(buf);
index 0ee3a083e683b8f1229997b49b3b267ea041eb12..967200c262553b8d765b4fd941bd67d9da6edfdc 100644 (file)
@@ -29,6 +29,7 @@
 #define  InheritsRelationName "pg_inherits"
 #define  InheritancePrecidenceListRelationName "pg_ipl"
 #define  LanguageRelationName "pg_language"
+#define  LargeObjectRelationName "pg_largeobject"
 #define  ListenerRelationName "pg_listener"
 #define  LogRelationName "pg_log"
 #define  OperatorClassRelationName "pg_opclass"
index 757350e50f4a37ba14c0c52a828597cde6050e40..47b777d3796a1d46029eaaa158ac222525fc65a8 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     200010231
+#define CATALOG_VERSION_NO     200010232
 
 #endif
index 6f861eb2028cbdfcbf708f0e7edf7b186cfe3673..6bd142ad8ae838062a3e9c86a3e319cc5e10b17d 100644 (file)
@@ -31,6 +31,7 @@
 #define Num_pg_index_indices           2
 #define Num_pg_inherits_indices                1
 #define Num_pg_language_indices                2
+#define Num_pg_largeobject_indices     1
 #define Num_pg_listener_indices                1
 #define Num_pg_opclass_indices         2
 #define Num_pg_operator_indices                2
@@ -62,6 +63,7 @@
 #define InheritsRelidSeqnoIndex                "pg_inherits_relid_seqno_index"
 #define LanguageNameIndex                      "pg_language_name_index"
 #define LanguageOidIndex                       "pg_language_oid_index"
+#define LargeObjectLOidPNIndex         "pg_largeobject_loid_pn_index"
 #define ListenerPidRelnameIndex                "pg_listener_pid_relname_index"
 #define OpclassDeftypeIndex                    "pg_opclass_deftype_index"
 #define OpclassNameIndex                       "pg_opclass_name_index"
@@ -92,6 +94,7 @@ extern char *Name_pg_group_indices[];
 extern char *Name_pg_index_indices[];
 extern char *Name_pg_inherits_indices[];
 extern char *Name_pg_language_indices[];
+extern char *Name_pg_largeobject_indices[];
 extern char *Name_pg_listener_indices[];
 extern char *Name_pg_opclass_indices[];
 extern char *Name_pg_operator_indices[];
@@ -191,6 +194,7 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli
 DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
 DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops));
 DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops));
+DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops));
 DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops));
 /* This column needs to allow multiple zero entries, but is in the cache */
 DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops));
index 3fced9c1566b529fb9cbb799835b0c388f025310..7aed1372744f8bd502ce91cb12c935b9ca0666ae 100644 (file)
@@ -174,7 +174,6 @@ DESCR("");
 #define XactLockTableId                        376
 
 #define                  RELKIND_INDEX                   'i'           /* secondary index */
-#define                  RELKIND_LOBJECT                 'l'           /* large objects */
 #define                  RELKIND_RELATION                'r'           /* ordinary cataloged heap */
 #define                  RELKIND_SPECIAL                 's'           /* special (non-heap) */
 #define                  RELKIND_SEQUENCE                'S'           /* SEQUENCE relation */
diff --git a/src/include/catalog/pg_largeobject.h b/src/include/catalog/pg_largeobject.h
new file mode 100644 (file)
index 0000000..b6fb96c
--- /dev/null
@@ -0,0 +1,63 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.h
+ *       definition of the system "largeobject" relation (pg_largeobject)
+ *       along with the relation's initial contents.
+ *
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id$
+ *
+ * NOTES
+ *       the genbki.sh script reads this file and generates .bki
+ *       information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LARGEOBJECT_H
+#define PG_LARGEOBJECT_H
+
+/* ----------------
+ *             postgres.h contains the system type definintions and the
+ *             CATALOG(), BOOTSTRAP and DATA() sugar words so this file
+ *             can be read by both genbki.sh and the C compiler.
+ * ----------------
+ */
+
+/* ----------------
+ *             pg_largeobject definition.  cpp turns this into
+ *             typedef struct FormData_pg_largeobject. Large object id
+ *             is stored in loid;
+ * ----------------
+ */
+
+CATALOG(pg_largeobject)
+{
+       Oid                     loid;                   /* Identifier of large object */
+       int4            pageno;                 /* Page number (starting from 0) */
+       bytea           data;                   /* Data for page (may be zero-length) */
+} FormData_pg_largeobject;
+
+/* ----------------
+ *             Form_pg_largeobject corresponds to a pointer to a tuple with
+ *             the format of pg_largeobject relation.
+ * ----------------
+ */
+typedef FormData_pg_largeobject *Form_pg_largeobject;
+
+/* ----------------
+ *             compiler constants for pg_largeobject
+ * ----------------
+ */
+#define Natts_pg_largeobject                   3
+#define Anum_pg_largeobject_loid               1
+#define Anum_pg_largeobject_pageno             2
+#define Anum_pg_largeobject_data               3
+
+extern Oid LargeObjectCreate(Oid loid);
+extern void LargeObjectDrop(Oid loid);
+extern bool LargeObjectExists(Oid loid);
+
+#endif  /* PG_LARGEOBJECT_H */
index dbc6f9eeae0b39b177dadc91fc4af4aed57fcc1b..180f76669e3d3fd58194fc859262ed49c2a2e9a5 100644 (file)
 #ifndef LARGE_OBJECT_H
 #define LARGE_OBJECT_H
 
-#include <sys/types.h>
+#include "utils/rel.h"
 
-#include "access/relscan.h"
 
-/*
- * This structure will eventually have lots more stuff associated with it.
+/*----------
+ * Data about a currently-open large object.
+ *
+ * id is the logical OID of the large object
+ * offset is the current seek offset within the LO
+ * heap_r holds an open-relation reference to pg_largeobject
+ * index_r holds an open-relation reference to pg_largeobject_loid_pn_index
+ *
+ * NOTE: before 7.1, heap_r and index_r held references to the separate
+ * table and index of a specific large object.  Now they all live in one rel.
+ *----------
  */
-typedef struct LargeObjectDesc
-{
-       Relation        heap_r;                 /* heap relation */
-       Relation        index_r;                /* index relation on seqno attribute */
-       IndexScanDesc iscan;            /* index scan we're using */
-       TupleDesc       hdesc;                  /* heap relation tuple desc */
-       TupleDesc       idesc;                  /* index relation tuple desc */
-       uint32          lowbyte;                /* low byte on the current page */
-       uint32          highbyte;               /* high byte on the current page */
+typedef struct LargeObjectDesc {
+       Oid                     id;
        uint32          offset;                 /* current seek pointer */
-       ItemPointerData htid;           /* tid of current heap tuple */
+       int                     flags;                  /* locking info, etc */
 
+/* flag bits: */
 #define IFS_RDLOCK             (1 << 0)
 #define IFS_WRLOCK             (1 << 1)
-#define IFS_ATEOF              (1 << 2)
 
-       u_long          flags;                  /* locking info, etc */
+       Relation        heap_r;
+       Relation        index_r;
 } LargeObjectDesc;
 
+
+/*
+ * Each "page" (tuple) of a large object can hold this much data
+ *
+ * Calculation is max tuple size less tuple header, loid field (Oid),
+ * pageno field (int32), and varlena header of data (int32).  Note we
+ * assume none of the fields will be NULL, hence no need for null bitmap.
+ */
+#define        LOBLKSIZE               (MaxTupleSize \
+                                                - MAXALIGN(offsetof(HeapTupleHeaderData, t_bits)) \
+                                                - sizeof(Oid) - sizeof(int32) * 2)
+
+
 /*
  * Function definitions...
  */
@@ -55,7 +70,4 @@ extern int    inv_tell(LargeObjectDesc *obj_desc);
 extern int     inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
 extern int     inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes);
 
-/* added for buffer leak prevention [ PA ] */
-extern void inv_cleanindex(LargeObjectDesc *obj_desc);
-
 #endif  /* LARGE_OBJECT_H */
index 9c99a120adf1ed73c4f98a7476e27df4b355a087..9d4e75a9e0a6742b474dfb3e43fd756357df84b9 100644 (file)
@@ -1007,8 +1007,7 @@ mylog("%s: entering...stmt=%u\n", func, stmt);
        }
 
 
-       /*      filter out large objects unconditionally (they are not system tables) and match users */
-       strcat(tables_query, " and relname !~ '^xinv[0-9]+'");
+       /* match users */
        strcat(tables_query, " and usesysid = relowner");
        strcat(tables_query, " order by relname");
 
index f5d2427cfa1b7d048589d67ecb52d0a1c0a1ec11..9fd96b22803b6434ec91dd10e2e6f5eb54f91ea2 100644 (file)
@@ -482,8 +482,8 @@ WHERE p1.aggtransfn = p2.oid AND
           (p2.pronargs = 1 AND p1.aggbasetype = 0)));
   oid  | aggname | oid |   proname   
 -------+---------+-----+-------------
- 16984 | max     | 768 | int4larger
- 16998 | min     | 769 | int4smaller
+ 16996 | max     | 768 | int4larger
+ 17010 | min     | 769 | int4smaller
 (2 rows)
 
 -- Cross-check finalfn (if present) against its entry in pg_proc.
index 823d9e142db0b9eb1999c48762678fe5f2353a38..f2412386d176bfb5b99971396772a9ccf6d64bbc 100644 (file)
@@ -40,6 +40,7 @@ SELECT relname, relhasindex
  pg_index            | t
  pg_inherits         | t
  pg_language         | t
+ pg_largeobject      | t
  pg_listener         | t
  pg_opclass          | t
  pg_operator         | t
@@ -54,5 +55,5 @@ SELECT relname, relhasindex
  shighway            | t
  tenk1               | t
  tenk2               | t
-(44 rows)
+(45 rows)