Add lo_truncate() to backend and libpq for large object truncation.
authorBruce Momjian <bruce@momjian.us>
Sat, 3 Mar 2007 19:52:47 +0000 (19:52 +0000)
committerBruce Momjian <bruce@momjian.us>
Sat, 3 Mar 2007 19:52:47 +0000 (19:52 +0000)
Kris Jurka

12 files changed:
doc/src/sgml/lobj.sgml
src/backend/libpq/be-fsstubs.c
src/backend/storage/large_object/inv_api.c
src/include/catalog/pg_proc.h
src/include/libpq/be-fsstubs.h
src/include/storage/large_object.h
src/interfaces/libpq/exports.txt
src/interfaces/libpq/fe-lobj.c
src/interfaces/libpq/libpq-fe.h
src/interfaces/libpq/libpq-int.h
src/test/regress/input/largeobject.source
src/test/regress/output/largeobject.source

index 7e46303bfd7c264dd3d7ba1a9993a991bb1d5841..64e4e7fe977b4b8ebd4e67c92a2d372c31f19a4e 100644 (file)
@@ -301,6 +301,37 @@ int lo_tell(PGconn *conn, int fd);
 </para>
 </sect2>
 
+<sect2>
+<title>Truncating a Large Object</title>
+
+<para>
+     To truncate a large object to a given length, call
+<synopsis>
+int lo_truncate(PGcon *conn, int fd, size_t len);
+</synopsis>
+     <indexterm><primary>lo_truncate</></> truncates the large object
+     descriptor <parameter>fd</> to length <parameter>len</>.  The
+     <parameter>fd</parameter> argument must have been returned by a
+     previous <function>lo_open</function>.  If <parameter>len</> is
+     greater than the current large object length, the large object
+     is extended with null bytes ('\0').
+</para>
+
+<para>
+     The file offset is not changed.
+</para>
+
+<para>
+     On success <function>lo_truncate</function> returns
+     zero.  On error, the return value is negative.
+</para>
+
+<para>
+     <function>lo_truncate</> is new as of <productname>PostgreSQL</productname>
+     8.3; if this function is run against an older server version, it will
+     fail and return a negative value.
+</para>
+
 <sect2>
 <title>Closing a Large Object Descriptor</title>
 
index d9b94bf6099aad783315ccb726363dd703bf4e41..2538a3e3ecd61a9482b8f3cacb87274d8afb05d7 100644 (file)
@@ -120,12 +120,10 @@ lo_close(PG_FUNCTION_ARGS)
        int32           fd = PG_GETARG_INT32(0);
 
        if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
-       {
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("invalid large-object descriptor: %d", fd)));
-               PG_RETURN_INT32(-1);
-       }
+
 #if FSDB
        elog(DEBUG4, "lo_close(%d)", fd);
 #endif
@@ -152,12 +150,9 @@ lo_read(int fd, char *buf, int len)
        int                     status;
 
        if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
-       {
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("invalid large-object descriptor: %d", fd)));
-               return -1;
-       }
 
        status = inv_read(cookies[fd], buf, len);
 
@@ -170,12 +165,9 @@ lo_write(int fd, const char *buf, int len)
        int                     status;
 
        if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
-       {
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("invalid large-object descriptor: %d", fd)));
-               return -1;
-       }
 
        if ((cookies[fd]->flags & IFS_WRLOCK) == 0)
                ereport(ERROR,
@@ -198,12 +190,9 @@ lo_lseek(PG_FUNCTION_ARGS)
        int                     status;
 
        if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
-       {
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("invalid large-object descriptor: %d", fd)));
-               PG_RETURN_INT32(-1);
-       }
 
        status = inv_seek(cookies[fd], offset, whence);
 
@@ -248,12 +237,9 @@ lo_tell(PG_FUNCTION_ARGS)
        int32           fd = PG_GETARG_INT32(0);
 
        if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
-       {
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("invalid large-object descriptor: %d", fd)));
-               PG_RETURN_INT32(-1);
-       }
 
        PG_RETURN_INT32(inv_tell(cookies[fd]));
 }
@@ -467,6 +453,26 @@ lo_export(PG_FUNCTION_ARGS)
        PG_RETURN_INT32(1);
 }
 
+/*
+ * lo_truncate -
+ *       truncate a large object to a specified length
+ */
+Datum
+lo_truncate(PG_FUNCTION_ARGS)
+{
+       int32           fd = PG_GETARG_INT32(0);
+       int32           len = PG_GETARG_INT32(1);
+
+       if (fd < 0 || fd >= cookies_size || cookies[fd] == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("invalid large-object descriptor: %d", fd)));
+
+       inv_truncate(cookies[fd], len);
+
+       PG_RETURN_INT32(0);
+}
+
 /*
  * AtEOXact_LargeObject -
  *              prepares large objects for transaction commit
index 5ca906b26ede65e124188baf6ff74d93d520ed0d..fc26e7e321d828dff7597839ff496bd855dc819b 100644 (file)
@@ -681,3 +681,166 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
 
        return nwritten;
 }
+
+void
+inv_truncate(LargeObjectDesc *obj_desc, int len)
+{
+       int32           pageno = (int32) (len / LOBLKSIZE);
+       int                     off;
+       ScanKeyData     skey[2];
+       IndexScanDesc sd;
+       HeapTuple       oldtuple;
+       Form_pg_largeobject     olddata;
+       struct
+       {
+               bytea           hdr;
+               char            data[LOBLKSIZE];
+       }                       workbuf;
+       char       *workb = VARDATA(&workbuf.hdr);
+       HeapTuple       newtup;
+       Datum           values[Natts_pg_largeobject];
+       char            nulls[Natts_pg_largeobject];
+       char            replace[Natts_pg_largeobject];
+       CatalogIndexState indstate;
+
+       Assert(PointerIsValid(obj_desc));
+
+       /* enforce writability because snapshot is probably wrong otherwise */
+       if ((obj_desc->flags & IFS_WRLOCK) == 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("large object %u was not opened for writing",
+                                               obj_desc->id)));
+
+       open_lo_relation();
+
+       indstate = CatalogOpenIndexes(lo_heap_r);
+
+       ScanKeyInit(&skey[0],
+                               Anum_pg_largeobject_loid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(obj_desc->id));
+
+       ScanKeyInit(&skey[1],
+                               Anum_pg_largeobject_pageno,
+                               BTGreaterEqualStrategyNumber, F_INT4GE,
+                               Int32GetDatum(pageno));
+
+       sd = index_beginscan(lo_heap_r, lo_index_r,
+                                                obj_desc->snapshot, 2, skey);
+
+       /*
+        * If possible, get the page the truncation point is in.
+        * The truncation point may be beyond the end of the LO or
+        * in a hole.
+        */
+       olddata = NULL;
+       if ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
+       {
+               olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
+               Assert(olddata->pageno >= pageno);
+       }
+
+       /*
+        * If we found the page of the truncation point we need to
+        * truncate the data in it.  Otherwise if we're in a hole,
+        * we need to create a page to mark the end of data.
+        */
+       if (olddata != NULL && olddata->pageno == pageno)
+       {
+               /* First, load old data into workbuf */
+               bytea *datafield = &(olddata->data);
+               bool pfreeit = false;
+               int pagelen;
+
+               if (VARATT_IS_EXTENDED(datafield))
+               {
+                       datafield = (bytea *)
+                               heap_tuple_untoast_attr((varattrib *) datafield);
+                       pfreeit = true;
+               }
+               pagelen = getbytealen(datafield);
+               Assert(pagelen <= LOBLKSIZE);
+               memcpy(workb, VARDATA(datafield), pagelen);
+               if (pfreeit)
+                               pfree(datafield);
+
+               /*
+                * Fill any hole
+                */
+               off = len % LOBLKSIZE;
+               if (off > pagelen)
+                               MemSet(workb + pagelen, 0, off - pagelen);
+
+               /* compute length of new page */
+               SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
+
+               /*
+                * Form and insert updated tuple
+                */
+               memset(values, 0, sizeof(values));
+               memset(nulls, ' ', sizeof(nulls));
+               memset(replace, ' ', sizeof(replace));
+               values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
+               replace[Anum_pg_largeobject_data - 1] = 'r';
+               newtup = heap_modifytuple(oldtuple, RelationGetDescr(lo_heap_r),
+                                                                 values, nulls, replace);
+               simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
+               CatalogIndexInsert(indstate, newtup);
+               heap_freetuple(newtup);
+       }
+       else
+       {
+               /*
+                * If the first page we found was after the truncation
+                * point, we're in a hole that we'll fill, but we need to
+                * delete the later page.
+                */
+               if (olddata != NULL && olddata->pageno > pageno)
+                       simple_heap_delete(lo_heap_r, &oldtuple->t_self);
+
+               /*
+                * Write a brand new page.
+                * 
+                * Fill the hole up to the truncation point
+                */
+               off = len % LOBLKSIZE;
+               if (off > 0)
+                       MemSet(workb, 0, off);
+
+               /* compute length of new page */
+               SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
+
+               /* 
+                * Form and insert new tuple
+                */
+               memset(values, 0, sizeof(values));
+               memset(nulls, ' ', sizeof(nulls));
+               values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
+               values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
+               values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
+               newtup = heap_formtuple(lo_heap_r->rd_att, values, nulls);
+               simple_heap_insert(lo_heap_r, newtup);
+               CatalogIndexInsert(indstate, newtup);
+               heap_freetuple(newtup);
+       }
+
+       /*
+        * Delete any pages after the truncation point
+        */
+       while ((oldtuple = index_getnext(sd, ForwardScanDirection)) != NULL)
+       {
+               simple_heap_delete(lo_heap_r, &oldtuple->t_self);
+       }
+
+       index_endscan(sd);
+
+       CatalogCloseIndexes(indstate);
+       
+       /*
+        * Advance command counter so that tuple updates will be seen by later
+        * large-object operations in this transaction.
+        */
+       CommandCounterIncrement();
+}
+
index b585d039222553652b1befb53d899755be2f4cb4..f31603824b518d27cca4c5093d0750da4c38476b 100644 (file)
@@ -1233,6 +1233,8 @@ DATA(insert OID = 715 (  lo_create                   PGNSP PGUID 12 1 0 f f t f v 1 26 "26" _n
 DESCR("large object create");
 DATA(insert OID = 958 (  lo_tell                  PGNSP PGUID 12 1 0 f f t f v 1 23 "23" _null_ _null_ _null_  lo_tell - _null_ ));
 DESCR("large object position");
+DATA(insert OID = 1004 (  lo_truncate     PGNSP PGUID 12 1 0 f f t f v 2 23 "23 23" _null_ _null_ _null_ lo_truncate - _null_ ));
+DESCR("truncate large object");
 
 DATA(insert OID = 959 (  on_pl                    PGNSP PGUID 12 1 0 f f t f i 2  16 "600 628" _null_ _null_ _null_    on_pl - _null_ ));
 DESCR("point on line?");
index c64cfe2f85cac664e343c4f7127b2e852a5c0dea..ee794b0be3a5064c7780f6243466a1d4fe8e5fd8 100644 (file)
@@ -34,6 +34,7 @@ extern Datum lowrite(PG_FUNCTION_ARGS);
 extern Datum lo_lseek(PG_FUNCTION_ARGS);
 extern Datum lo_tell(PG_FUNCTION_ARGS);
 extern Datum lo_unlink(PG_FUNCTION_ARGS);
+extern Datum lo_truncate(PG_FUNCTION_ARGS);
 
 /*
  * These are not fmgr-callable, but are available to C code.
index a56970f57b30911c3d5e333bd7b3fbb5e20c7fa9..b6e54f8c8399eaf9e6c2c5bdb9522da13331934d 100644 (file)
@@ -78,5 +78,6 @@ extern int    inv_seek(LargeObjectDesc *obj_desc, int offset, int whence);
 extern int     inv_tell(LargeObjectDesc *obj_desc);
 extern int     inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
 extern int     inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes);
+extern void    inv_truncate(LargeObjectDesc *obj_desc, int len);
 
 #endif   /* LARGE_OBJECT_H */
index d45da8d0d27a767f7ec5661d6b05c1a2ca5d25e9..85352e6ef2357c682f2171813cb4235e351f4e25 100644 (file)
@@ -136,3 +136,4 @@ PQdescribePrepared        133
 PQdescribePortal          134
 PQsendDescribePrepared    135
 PQsendDescribePortal      136
+lo_truncate               137
index 56a4447c07920ff8280e0d222c1254b764f3d6da..e4dc26b6a4e08cb52b2dc7ff0286e9103db2eb96 100644 (file)
@@ -122,6 +122,59 @@ lo_close(PGconn *conn, int fd)
        }
 }
 
+/*
+ * lo_truncate
+ *    truncates an existing large object to the given size
+ *
+ * returns 0 upon success
+ * returns -1 upon failure
+ */
+int
+lo_truncate(PGconn *conn, int fd, size_t len)
+{
+       PQArgBlock      argv[2];
+       PGresult   *res;
+       int                     retval;
+       int                     result_len;
+
+       if (conn->lobjfuncs == NULL)
+       {
+               if (lo_initialize(conn) < 0)
+                       return -1;
+       }
+
+       /* Must check this on-the-fly because it's not there pre-8.3 */
+       if (conn->lobjfuncs->fn_lo_truncate == 0)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                         libpq_gettext("cannot determine OID of function lo_truncate\n"));
+               return -1;
+       }
+
+       argv[0].isint = 1;
+       argv[0].len = 4;
+       argv[0].u.integer = fd;
+       
+       argv[1].isint = 1;
+       argv[1].len = 4;
+       argv[1].u.integer = len;
+
+       res = PQfn(conn, conn->lobjfuncs->fn_lo_truncate,
+                          &retval, &result_len, 1, argv, 2);
+
+       if (PQresultStatus(res) == PGRES_COMMAND_OK)
+       {
+               PQclear(res);
+               return retval;
+       }
+       else
+       {
+               PQclear(res);
+               return -1;
+       }
+}
+
+
 /*
  * lo_read
  *       read len bytes of the large object into buf
@@ -621,6 +674,7 @@ lo_initialize(PGconn *conn)
        /*
         * Execute the query to get all the functions at once.  In 7.3 and later
         * we need to be schema-safe.  lo_create only exists in 8.1 and up.
+        * lo_truncate only exists in 8.3 and up.
         */
        if (conn->sversion >= 70300)
                query = "select proname, oid from pg_catalog.pg_proc "
@@ -632,6 +686,7 @@ lo_initialize(PGconn *conn)
                        "'lo_unlink', "
                        "'lo_lseek', "
                        "'lo_tell', "
+                       "'lo_truncate', "
                        "'loread', "
                        "'lowrite') "
                        "and pronamespace = (select oid from pg_catalog.pg_namespace "
@@ -684,6 +739,8 @@ lo_initialize(PGconn *conn)
                        lobjfuncs->fn_lo_lseek = foid;
                else if (!strcmp(fname, "lo_tell"))
                        lobjfuncs->fn_lo_tell = foid;
+               else if (!strcmp(fname, "lo_truncate"))
+                       lobjfuncs->fn_lo_truncate = foid;
                else if (!strcmp(fname, "loread"))
                        lobjfuncs->fn_lo_read = foid;
                else if (!strcmp(fname, "lowrite"))
@@ -694,7 +751,6 @@ lo_initialize(PGconn *conn)
 
        /*
         * Finally check that we really got all large object interface functions
-        * --- except lo_create, which may not exist.
         */
        if (lobjfuncs->fn_lo_open == 0)
        {
index 437927293d9c5585474f60ed4c011fc27cceecf9..e3476a145d8089f6bfb67b529d25de8f135ec673 100644 (file)
@@ -489,6 +489,7 @@ extern int  lo_lseek(PGconn *conn, int fd, int offset, int whence);
 extern Oid     lo_creat(PGconn *conn, int mode);
 extern Oid     lo_create(PGconn *conn, Oid lobjId);
 extern int     lo_tell(PGconn *conn, int fd);
+extern int     lo_truncate(PGconn *conn, int fd, size_t len);
 extern int     lo_unlink(PGconn *conn, Oid lobjId);
 extern Oid     lo_import(PGconn *conn, const char *filename);
 extern int     lo_export(PGconn *conn, Oid lobjId, const char *filename);
index 948d8d0dfefd06ec3351c5f4a7b68f9cae44f6a8..b5a6fe5cfa80bd0caa3ee01c422198bfdbc9898c 100644 (file)
@@ -239,6 +239,7 @@ typedef struct pgLobjfuncs
        Oid                     fn_lo_unlink;   /* OID of backend function lo_unlink    */
        Oid                     fn_lo_lseek;    /* OID of backend function lo_lseek             */
        Oid                     fn_lo_tell;             /* OID of backend function lo_tell              */
+       Oid                     fn_lo_truncate; /* OID of backend function lo_truncate  */
        Oid                     fn_lo_read;             /* OID of backend function LOread               */
        Oid                     fn_lo_write;    /* OID of backend function LOwrite              */
 } PGlobjfuncs;
index e89ce02fa9995067fb374786fac91dfb230e5d79..8386922133cda69a6f8d8a38e957406719057aa7 100644 (file)
@@ -83,6 +83,25 @@ SELECT lo_close(fd) FROM lotest_stash_values;
 
 END;
 
+-- Test truncation.
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST((2 | 4) * 16^4 AS integer));
+
+SELECT lo_truncate(fd, 10) FROM lotest_stash_values;
+SELECT loread(fd, 15) FROM lotest_stash_values;
+
+SELECT lo_truncate(fd, 10000) FROM lotest_stash_values;
+SELECT loread(fd, 10) FROM lotest_stash_values;
+SELECT lo_lseek(fd, 0, 2) FROM lotest_stash_values;
+SELECT lo_tell(fd) FROM lotest_stash_values;
+
+SELECT lo_truncate(fd, 5000) FROM lotest_stash_values;
+SELECT lo_lseek(fd, 0, 2) FROM lotest_stash_values;
+SELECT lo_tell(fd) FROM lotest_stash_values;
+
+SELECT lo_close(fd) FROM lotest_stash_values;
+END;
+
 -- lo_unlink(lobjId oid) returns integer
 -- return value appears to always be 1
 SELECT lo_unlink(loid) from lotest_stash_values;
index 6d6e5cd24cf8c54aa68d8b990555bc66602c349e..f4addeaab5b1f578cd35a3108444901f7ba2a5d4 100644 (file)
@@ -115,6 +115,70 @@ SELECT lo_close(fd) FROM lotest_stash_values;
         0
 (1 row)
 
+END;
+-- Test truncation.
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST((2 | 4) * 16^4 AS integer));
+SELECT lo_truncate(fd, 10) FROM lotest_stash_values;
+ lo_truncate 
+-------------
+           0
+(1 row)
+
+SELECT loread(fd, 15) FROM lotest_stash_values;
+    loread     
+---------------
+ \012Whose woo
+(1 row)
+
+SELECT lo_truncate(fd, 10000) FROM lotest_stash_values;
+ lo_truncate 
+-------------
+           0
+(1 row)
+
+SELECT loread(fd, 10) FROM lotest_stash_values;
+                  loread                  
+------------------------------------------
+ \000\000\000\000\000\000\000\000\000\000
+(1 row)
+
+SELECT lo_lseek(fd, 0, 2) FROM lotest_stash_values;
+ lo_lseek 
+----------
+    10000
+(1 row)
+
+SELECT lo_tell(fd) FROM lotest_stash_values;
+ lo_tell 
+---------
+   10000
+(1 row)
+
+SELECT lo_truncate(fd, 5000) FROM lotest_stash_values;
+ lo_truncate 
+-------------
+           0
+(1 row)
+
+SELECT lo_lseek(fd, 0, 2) FROM lotest_stash_values;
+ lo_lseek 
+----------
+     5000
+(1 row)
+
+SELECT lo_tell(fd) FROM lotest_stash_values;
+ lo_tell 
+---------
+    5000
+(1 row)
+
+SELECT lo_close(fd) FROM lotest_stash_values;
+ lo_close 
+----------
+        0
+(1 row)
+
 END;
 -- lo_unlink(lobjId oid) returns integer
 -- return value appears to always be 1