#include "funcapi.h"
 #include "lib/stringinfo.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be.h"
 #include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
 static char *generate_relation_name(Relation rel);
 static void dblink_connstr_check(const char *connstr);
-static void dblink_security_check(PGconn *conn, remoteConn *rconn);
+static bool dblink_connstr_has_pw(const char *connstr);
+static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
                             bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
 static char *get_connect_string(const char *servername);
                     errmsg("could not establish connection"),
                     errdetail_internal("%s", msg)));
        }
-       dblink_security_check(conn, rconn);
+       dblink_security_check(conn, rconn, connstr);
        if (PQclientEncoding(conn) != GetDatabaseEncoding())
            PQsetClientEncoding(conn, GetDatabaseEncodingName());
        freeconn = true;
    }
 
    /* check password actually used if not superuser */
-   dblink_security_check(conn, rconn);
+   dblink_security_check(conn, rconn, connstr);
 
    /* attempt to set client encoding to match server encoding, if needed */
    if (PQclientEncoding(conn) != GetDatabaseEncoding())
                 errmsg("undefined connection name")));
 }
 
+/*
+ * We need to make sure that the connection made used credentials
+ * which were provided by the user, so check what credentials were
+ * used to connect and then make sure that they came from the user.
+ */
 static void
-dblink_security_check(PGconn *conn, remoteConn *rconn)
+dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr)
 {
-   if (!superuser())
-   {
-       if (!PQconnectionUsedPassword(conn))
-       {
-           libpqsrv_disconnect(conn);
-           if (rconn)
-               pfree(rconn);
+   /* Superuser bypasses security check */
+   if (superuser())
+       return;
 
-           ereport(ERROR,
-                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-                    errmsg("password is required"),
-                    errdetail("Non-superuser cannot connect if the server does not request a password."),
-                    errhint("Target server's authentication method must be changed.")));
-       }
-   }
+   /* If password was used to connect, make sure it was one provided */
+   if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
+       return;
+
+#ifdef ENABLE_GSS
+   /* If GSSAPI creds used to connect, make sure it was one delegated */
+   if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_deleg(MyProcPort))
+       return;
+#endif
+
+   /* Otherwise, fail out */
+   libpqsrv_disconnect(conn);
+   if (rconn)
+       pfree(rconn);
+
+   ereport(ERROR,
+           (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+            errmsg("password or GSSAPI delegated credentials required"),
+            errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
+            errhint("Ensure provided credentials match target server's authentication method.")));
 }
 
 /*
- * For non-superusers, insist that the connstr specify a password.  This
- * prevents a password from being picked up from .pgpass, a service file,
- * the environment, etc.  We don't want the postgres user's passwords
- * to be accessible to non-superusers.
+ * Function to check if the connection string includes an explicit
+ * password, needed to ensure that non-superuser password-based auth
+ * is using a provided password and not one picked up from the
+ * environment.
  */
-static void
-dblink_connstr_check(const char *connstr)
+static bool
+dblink_connstr_has_pw(const char *connstr)
 {
-   if (!superuser())
-   {
-       PQconninfoOption *options;
-       PQconninfoOption *option;
-       bool        connstr_gives_password = false;
+   PQconninfoOption *options;
+   PQconninfoOption *option;
+   bool        connstr_gives_password = false;
 
-       options = PQconninfoParse(connstr, NULL);
-       if (options)
+   options = PQconninfoParse(connstr, NULL);
+   if (options)
+   {
+       for (option = options; option->keyword != NULL; option++)
        {
-           for (option = options; option->keyword != NULL; option++)
+           if (strcmp(option->keyword, "password") == 0)
            {
-               if (strcmp(option->keyword, "password") == 0)
+               if (option->val != NULL && option->val[0] != '\0')
                {
-                   if (option->val != NULL && option->val[0] != '\0')
-                   {
-                       connstr_gives_password = true;
-                       break;
-                   }
+                   connstr_gives_password = true;
+                   break;
                }
            }
-           PQconninfoFree(options);
        }
-
-       if (!connstr_gives_password)
-           ereport(ERROR,
-                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-                    errmsg("password is required"),
-                    errdetail("Non-superusers must provide a password in the connection string.")));
+       PQconninfoFree(options);
    }
+
+   return connstr_gives_password;
+}
+
+/*
+ * For non-superusers, insist that the connstr specify a password, except
+ * if GSSAPI credentials have been delegated (and we check that they are used
+ * for the connection in dblink_security_check later).  This prevents a
+ * password or GSSAPI credentials from being picked up from .pgpass, a
+ * service file, the environment, etc.  We don't want the postgres user's
+ * passwords or Kerberos credentials to be accessible to non-superusers.
+ */
+static void
+dblink_connstr_check(const char *connstr)
+{
+   if (superuser())
+       return;
+
+   if (dblink_connstr_has_pw(connstr))
+       return;
+
+#ifdef ENABLE_GSS
+   if (be_gssapi_get_deleg(MyProcPort))
+       return;
+#endif
+
+   ereport(ERROR,
+           (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+            errmsg("password or GSSAPI delegated credentials required"),
+            errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
 }
 
 /*
 
 SET SESSION AUTHORIZATION regress_dblink_user;
 -- should fail
 SELECT dblink_connect('myconn', 'fdtest');
-ERROR:  password is required
-DETAIL:  Non-superusers must provide a password in the connection string.
+ERROR:  password or GSSAPI delegated credentials required
+DETAIL:  Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.
 -- should succeed
 SELECT dblink_connect_u('myconn', 'fdtest');
  dblink_connect_u 
 
 #include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
 #include "funcapi.h"
+#include "libpq/libpq-be.h"
 #include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 static void pgfdw_finish_abort_cleanup(List *pending_entries,
                                       List *cancel_requested,
                                       bool toplevel);
+static void pgfdw_security_check(const char **keywords, const char **values,
+                                UserMapping *user, PGconn *conn);
 static bool UserMappingPasswordRequired(UserMapping *user);
 static bool disconnect_cached_connections(Oid serverid);
 
         entry->conn, server->servername, user->umid, user->userid);
 }
 
+/*
+ * Check that non-superuser has used password or delegated credentials
+ * to establish connection; otherwise, he's piggybacking on the
+ * postgres server's user identity. See also dblink_security_check()
+ * in contrib/dblink and check_conn_params.
+ */
+static void
+pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
+{
+   /* Superusers bypass the check */
+   if (superuser_arg(user->userid))
+       return;
+
+#ifdef ENABLE_GSS
+   /* Connected via GSSAPI with delegated credentials- all good. */
+   if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_deleg(MyProcPort))
+       return;
+#endif
+
+   /* Ok if superuser set PW required false. */
+   if (!UserMappingPasswordRequired(user))
+       return;
+
+   /* Connected via PW, with PW required true, and provided non-empty PW. */
+   if (PQconnectionUsedPassword(conn))
+   {
+       /* ok if params contain a non-empty password */
+       for (int i = 0; keywords[i] != NULL; i++)
+       {
+           if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
+               return;
+       }
+   }
+
+   ereport(ERROR,
+           (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+            errmsg("password or GSSAPI delegated credentials required"),
+            errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
+            errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
+}
+
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
                            server->servername),
                     errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
 
-       /*
-        * Check that non-superuser has used password to establish connection;
-        * otherwise, he's piggybacking on the postgres server's user
-        * identity. See also dblink_security_check() in contrib/dblink and
-        * check_conn_params.
-        */
-       if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
-           !PQconnectionUsedPassword(conn))
-           ereport(ERROR,
-                   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-                    errmsg("password is required"),
-                    errdetail("Non-superuser cannot connect if the server does not request a password."),
-                    errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
+       /* Perform post-connection security checks */
+       pgfdw_security_check(keywords, values, user, conn);
 
        /* Prepare new session for use */
        configure_remote_session(conn);
 }
 
 /*
- * For non-superusers, insist that the connstr specify a password.  This
+ * For non-superusers, insist that the connstr specify a password or that the
+ * user provided their own GSSAPI delegated credentials.  This
  * prevents a password from being picked up from .pgpass, a service file, the
  * environment, etc.  We don't want the postgres user's passwords,
  * certificates, etc to be accessible to non-superusers.  (See also
    if (superuser_arg(user->userid))
        return;
 
+#ifdef ENABLE_GSS
+   /* ok if the user provided their own delegated credentials */
+   if (be_gssapi_get_deleg(MyProcPort))
+       return;
+#endif
+
    /* ok if params contain a non-empty password */
    for (i = 0; keywords[i] != NULL; i++)
    {
 
    ereport(ERROR,
            (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-            errmsg("password is required"),
-            errdetail("Non-superusers must provide a password in the user mapping.")));
+            errmsg("password or GSSAPI delegated credentials required"),
+            errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
 }
 
 /*
 
    sslcrl 'value',
    --requirepeer 'value',
    krbsrvname 'value',
-   gsslib 'value'
+   gsslib 'value',
+   gssdeleg 'value'
    --replication 'value'
 );
 -- Error, invalid list syntax
    c8 user_enum
 ) SERVER loopback_nopw OPTIONS (schema_name 'public', table_name 'ft1');
 SELECT 1 FROM ft1_nopw LIMIT 1;
-ERROR:  password is required
-DETAIL:  Non-superusers must provide a password in the user mapping.
+ERROR:  password or GSSAPI delegated credentials required
+DETAIL:  Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.
 -- If we add a password to the connstr it'll fail, because we don't allow passwords
 -- in connstrs only in user mappings.
 ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw');
 -- This won't work with installcheck, but neither will most of the FDW checks.
 ALTER USER MAPPING FOR CURRENT_USER SERVER loopback_nopw OPTIONS (ADD password 'dummypw');
 SELECT 1 FROM ft1_nopw LIMIT 1;
-ERROR:  password is required
-DETAIL:  Non-superuser cannot connect if the server does not request a password.
+ERROR:  password or GSSAPI delegated credentials required
+DETAIL:  Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials.
 HINT:  Target server's authentication method must be changed or password_required=false set in the user mapping attributes.
 -- Unpriv user cannot make the mapping passwordless
 ALTER USER MAPPING FOR CURRENT_USER SERVER loopback_nopw OPTIONS (ADD password_required 'false');
 ERROR:  password_required=false is superuser-only
 HINT:  User mappings with the password_required option set to false may only be created or modified by the superuser.
 SELECT 1 FROM ft1_nopw LIMIT 1;
-ERROR:  password is required
-DETAIL:  Non-superuser cannot connect if the server does not request a password.
+ERROR:  password or GSSAPI delegated credentials required
+DETAIL:  Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials.
 HINT:  Target server's authentication method must be changed or password_required=false set in the user mapping attributes.
 RESET ROLE;
 -- But the superuser can
 -- This will fail again as it'll resolve the user mapping for public, which
 -- lacks password_required=false
 SELECT 1 FROM ft1_nopw LIMIT 1;
-ERROR:  password is required
-DETAIL:  Non-superusers must provide a password in the user mapping.
+ERROR:  password or GSSAPI delegated credentials required
+DETAIL:  Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.
 RESET ROLE;
 -- The user mapping for public is passwordless and lacks the password_required=false
 -- mapping option, but will work because the current user is a superuser.
 
        {"sslcert", UserMappingRelationId, true},
        {"sslkey", UserMappingRelationId, true},
 
+       /*
+        * gssdeleg is also a libpq option but should be allowed in a user
+        * mapping context too
+        */
+       {"gssdeleg", UserMappingRelationId, true},
+
        {NULL, InvalidOid, false}
    };
 
 
    sslcrl 'value',
    --requirepeer 'value',
    krbsrvname 'value',
-   gsslib 'value'
+   gsslib 'value',
+   gssdeleg 'value'
    --replication 'value'
 );
 
 
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-gss-accept-deleg" xreflabel="gss_accept_deleg">
+      <term><varname>gss_accept_deleg</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>gss_accept_deleg</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets whether GSSAPI delegation should be accepted from the client.
+        The default is <literal>off</literal> meaning credentials from the client will
+        NOT be accepted.  Changing this to <literal>on</literal> will make the server
+        accept credentials delegated to it from the client. This parameter can only be
+        set in the <filename>postgresql.conf</filename> file or on the server command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-db-user-namespace" xreflabel="db_user_namespace">
       <term><varname>db_user_namespace</varname> (<type>boolean</type>)
       <indexterm>
 
 
    <para>
     Only superusers may use <function>dblink_connect</function> to create
-    non-password-authenticated connections.  If non-superusers need this
-    capability, use <function>dblink_connect_u</function> instead.
+    non-password-authenticated and non-GSSAPI-authenticated connections.
+    If non-superusers need this capability, use
+    <function>dblink_connect_u</function> instead.
    </para>
 
    <para>
 
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-gssdeleg" xreflabel="gssdeleg">
+      <term><literal>gssdeleg</literal></term>
+      <listitem>
+       <para>
+        Forward (delegate) GSS credentials to the server.  The default is
+        <literal>disable</literal> which means credentials will not be forwarded
+        to the server.  Set this to <literal>enable</literal> to have
+        credentials forwarded when possible.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-service" xreflabel="service">
       <term><literal>service</literal></term>
       <listitem>
       </para>
      </listitem>
     </varlistentry>
+
+    <varlistentry id="libpq-PQconnectionUsedGSSAPI">
+     <term><function>PQconnectionUsedGSSAPI</function><indexterm><primary>PQconnectionUsedGSSAPI</primary></indexterm></term>
+     <listitem>
+      <para>
+       Returns true (1) if the connection authentication method
+       used GSSAPI. Returns false (0) if not.
+
+<synopsis>
+int PQconnectionUsedGSSAPI(const PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       This function can be applied to detect whether the connection was
+       authenticated with GSSAPI.
+      </para>
+     </listitem>
+    </varlistentry>
    </variablelist>
   </para>
 
      </para>
     </listitem>
 
+    <listitem>
+     <para>
+      <indexterm>
+       <primary><envar>PGGSSDELEG</envar></primary>
+      </indexterm>
+      <envar>PGGSSDELEG</envar> behaves the same as the <xref
+      linkend="libpq-connect-gssdeleg"/> connection parameter.
+     </para>
+    </listitem>
+
     <listitem>
      <para>
       <indexterm>
 
        True if GSSAPI encryption is in use on this connection
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>credentials_delegated</structfield> <type>boolean</type>
+      </para>
+      <para>
+       True if GSSAPI credentials were delegated on this connection.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
 
     <literal>sslcert</literal> or <literal>sslkey</literal> settings.
    </para>
    <para>
-    Only superusers may connect to foreign servers without password
-    authentication, so always specify the <literal>password</literal> option
-    for user mappings belonging to non-superusers.
+    Non-superusers may connect to foreign servers using password
+    authentication or with GSSAPI delegated credentials, so specify the
+    <literal>password</literal> option for user mappings belonging to
+    non-superusers where password authentication is required.
    </para>
    <para>
     A superuser may override this check on a per-user-mapping basis by setting
 
             S.pid,
             S.gss_auth AS gss_authenticated,
             S.gss_princ AS principal,
-            S.gss_enc AS encrypted
+            S.gss_enc AS encrypted,
+            S.gss_deleg AS credentials_delegated
     FROM pg_stat_get_activity(NULL) AS S
     WHERE S.client_port IS NOT NULL;
 
 
    {"requiressl", ForeignServerRelationId},
    {"sslmode", ForeignServerRelationId},
    {"gsslib", ForeignServerRelationId},
+   {"gssdeleg", ForeignServerRelationId},
    {NULL, InvalidOid}
 };
 
 
  */
 char      *pg_krb_server_keyfile;
 bool       pg_krb_caseins_users;
+bool       pg_gss_accept_deleg;
 
 
 /*----------------------------------------------------------------
    int         mtype;
    StringInfoData buf;
    gss_buffer_desc gbuf;
+   gss_cred_id_t delegated_creds;
 
    /*
     * Use the configured keytab, if there is one.  Unfortunately, Heimdal
     */
    port->gss->ctx = GSS_C_NO_CONTEXT;
 
+   delegated_creds = GSS_C_NO_CREDENTIAL;
+   port->gss->delegated_creds = false;
+
    /*
     * Loop through GSSAPI message exchange. This exchange can consist of
     * multiple messages sent in both directions. First message is always from
                                          &port->gss->outbuf,
                                          &gflags,
                                          NULL,
-                                         NULL);
+                                         pg_gss_accept_deleg ? &delegated_creds : NULL);
 
        /* gbuf no longer used */
        pfree(buf.data);
 
        CHECK_FOR_INTERRUPTS();
 
+       if (delegated_creds != GSS_C_NO_CREDENTIAL && gflags & GSS_C_DELEG_FLAG)
+       {
+           pg_store_delegated_credential(delegated_creds);
+           port->gss->delegated_creds = true;
+       }
+
        if (port->gss->outbuf.length != 0)
        {
            /*
 
            (errmsg_internal("%s", errmsg),
             errdetail_internal("%s: %s", msg_major, msg_minor)));
 }
+
+/*
+ * Store the credentials passed in into the memory cache for later usage.
+ *
+ * This allows credentials to be delegated to us for us to use to connect
+ * to other systems with, using, e.g. postgres_fdw or dblink.
+ */
+#define GSS_MEMORY_CACHE "MEMORY:"
+void
+pg_store_delegated_credential(gss_cred_id_t cred)
+{
+   OM_uint32   major,
+               minor;
+   gss_OID_set mech;
+   gss_cred_usage_t usage;
+   gss_key_value_element_desc cc;
+   gss_key_value_set_desc ccset;
+
+   cc.key = "ccache";
+   cc.value = GSS_MEMORY_CACHE;
+   ccset.count = 1;
+   ccset.elements = &cc;
+
+   /* Make the delegated credential only available to current process */
+   major = gss_store_cred_into(&minor,
+                               cred,
+                               GSS_C_INITIATE, /* credential only used for
+                                                * starting libpq connection */
+                               GSS_C_NULL_OID, /* store all */
+                               true,   /* overwrite */
+                               true,   /* make default */
+                               &ccset,
+                               &mech,
+                               &usage);
+
+   if (major != GSS_S_COMPLETE)
+   {
+       pg_GSS_error("gss_store_cred", major, minor);
+   }
+
+   /* Credential stored, so we can release our credential handle. */
+   major = gss_release_cred(&minor, &cred);
+   if (major != GSS_S_COMPLETE)
+   {
+       pg_GSS_error("gss_release_cred", major, minor);
+   }
+
+   /*
+    * Set KRB5CCNAME for this backend, so that later calls to
+    * gss_acquire_cred will find the delegated credentials we stored.
+    */
+   setenv("KRB5CCNAME", GSS_MEMORY_CACHE, 1);
+}
 
    bool        complete_next = false;
    OM_uint32   major,
                minor;
+   gss_cred_id_t delegated_creds;
 
    /*
     * Allocate subsidiary Port data for GSSAPI operations.
    port->gss = (pg_gssinfo *)
        MemoryContextAllocZero(TopMemoryContext, sizeof(pg_gssinfo));
 
+   delegated_creds = GSS_C_NO_CREDENTIAL;
+   port->gss->delegated_creds = false;
+
    /*
     * Allocate buffers and initialize state variables.  By malloc'ing the
     * buffers at this point, we avoid wasting static data space in processes
                                       GSS_C_NO_CREDENTIAL, &input,
                                       GSS_C_NO_CHANNEL_BINDINGS,
                                       &port->gss->name, NULL, &output, NULL,
-                                      NULL, NULL);
+                                      NULL, pg_gss_accept_deleg ? &delegated_creds : NULL);
+
        if (GSS_ERROR(major))
        {
            pg_GSS_error(_("could not accept GSSAPI security context"),
            complete_next = true;
        }
 
+       if (delegated_creds != GSS_C_NO_CREDENTIAL)
+       {
+           pg_store_delegated_credential(delegated_creds);
+           port->gss->delegated_creds = true;
+       }
+
        /* Done handling the incoming packet, reset our buffer */
        PqGSSRecvLength = 0;
 
 
    return port->gss->princ;
 }
+
+/*
+ * Return if GSSAPI delegated credentials were included on this
+ * connection.
+ */
+bool
+be_gssapi_get_deleg(Port *port)
+{
+   if (!port || !port->gss)
+       return NULL;
+
+   return port->gss->delegated_creds;
+}
 
        lbeentry.st_gss = true;
        lgssstatus.gss_auth = be_gssapi_get_auth(MyProcPort);
        lgssstatus.gss_enc = be_gssapi_get_enc(MyProcPort);
+       lgssstatus.gss_deleg = be_gssapi_get_deleg(MyProcPort);
        if (princ)
            strlcpy(lgssstatus.gss_princ, princ, NAMEDATALEN);
    }
 
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS  30
+#define PG_STAT_GET_ACTIVITY_COLS  31
    int         num_backends = pgstat_fetch_stat_numbackends();
    int         curr_backend;
    int         pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
            pfree(clipped_activity);
 
            /* leader_pid */
-           nulls[28] = true;
+           nulls[29] = true;
 
            proc = BackendPidGetProc(beentry->st_procpid);
 
                 */
                if (leader && leader->pid != beentry->st_procpid)
                {
-                   values[28] = Int32GetDatum(leader->pid);
-                   nulls[28] = false;
+                   values[29] = Int32GetDatum(leader->pid);
+                   nulls[29] = false;
                }
                else if (beentry->st_backendType == B_BG_WORKER)
                {
 
                    if (leader_pid != InvalidPid)
                    {
-                       values[28] = Int32GetDatum(leader_pid);
-                       nulls[28] = false;
+                       values[29] = Int32GetDatum(leader_pid);
+                       nulls[29] = false;
                    }
                }
            }
                values[25] = BoolGetDatum(beentry->st_gssstatus->gss_auth); /* gss_auth */
                values[26] = CStringGetTextDatum(beentry->st_gssstatus->gss_princ);
                values[27] = BoolGetDatum(beentry->st_gssstatus->gss_enc);  /* GSS Encryption in use */
+               values[28] = BoolGetDatum(beentry->st_gssstatus->gss_deleg);    /* GSS credentials
+                                                                                * delegated */
            }
            else
            {
                nulls[26] = true;   /* No GSS principal */
                values[27] = BoolGetDatum(false);   /* GSS Encryption not in
                                                     * use */
+               values[28] = BoolGetDatum(false);   /* GSS credentials not
+                                                    * delegated */
            }
            if (beentry->st_query_id == 0)
-               nulls[29] = true;
+               nulls[30] = true;
            else
-               values[29] = UInt64GetDatum(beentry->st_query_id);
+               values[30] = UInt64GetDatum(beentry->st_query_id);
        }
        else
        {
            nulls[27] = true;
            nulls[28] = true;
            nulls[29] = true;
+           nulls[30] = true;
        }
 
        tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 
 
            if (princ)
                appendStringInfo(&logmsg,
-                                _(" GSS (authenticated=%s, encrypted=%s, principal=%s)"),
+                                _(" GSS (authenticated=%s, encrypted=%s, deleg_credentials=%s, principal=%s)"),
                                 be_gssapi_get_auth(port) ? _("yes") : _("no"),
                                 be_gssapi_get_enc(port) ? _("yes") : _("no"),
+                                be_gssapi_get_deleg(port) ? _("yes") : _("no"),
                                 princ);
            else
                appendStringInfo(&logmsg,
-                                _(" GSS (authenticated=%s, encrypted=%s)"),
+                                _(" GSS (authenticated=%s, encrypted=%s, deleg_credentials=%s)"),
                                 be_gssapi_get_auth(port) ? _("yes") : _("no"),
-                                be_gssapi_get_enc(port) ? _("yes") : _("no"));
+                                be_gssapi_get_enc(port) ? _("yes") : _("no"),
+                                be_gssapi_get_deleg(port) ? _("yes") : _("no"));
        }
 #endif
 
 
        NULL, NULL, NULL
    },
 
+   {
+       {"gss_accept_deleg", PGC_SIGHUP, CONN_AUTH_AUTH,
+           gettext_noop("Sets whether GSSAPI delegation should be accepted from the client."),
+           NULL
+       },
+       &pg_gss_accept_deleg,
+       false,
+       NULL, NULL, NULL
+   },
+
    {
        {"escape_string_warning", PGC_USERSET, COMPAT_OPTIONS_PREVIOUS,
            gettext_noop("Warn about backslash escapes in ordinary string literals."),
 
 # GSSAPI using Kerberos
 #krb_server_keyfile = 'FILE:${sysconfdir}/krb5.keytab'
 #krb_caseins_users = off
+#gss_accept_deleg = off
 
 # - SSL -
 
 
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,bool,int4,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,gss_deleg,leader_pid,query_id}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
 
 
 extern PGDLLIMPORT char *pg_krb_server_keyfile;
 extern PGDLLIMPORT bool pg_krb_caseins_users;
+extern PGDLLIMPORT bool pg_gss_accept_deleg;
 extern PGDLLIMPORT char *pg_krb_realm;
 
 extern void ClientAuthentication(Port *port);
 
 
 #if defined(HAVE_GSSAPI_H)
 #include <gssapi.h>
+#include <gssapi_ext.h>
 #else
 #include <gssapi/gssapi.h>
+#include <gssapi/gssapi_ext.h>
 #endif
 
 extern void pg_GSS_error(const char *errmsg,
                         OM_uint32 maj_stat, OM_uint32 min_stat);
 
+extern void pg_store_delegated_credential(gss_cred_id_t cred);
 #endif                         /* ENABLE_GSS */
 
 #endif                         /* BE_GSSAPI_COMMON_H */
 
                                 * GSSAPI auth was not used */
    bool        auth;           /* GSSAPI Authentication used */
    bool        enc;            /* GSSAPI encryption in use */
+   bool        delegated_creds;    /* GSSAPI Delegated credentials */
 #endif
 } pg_gssinfo;
 #endif
 extern bool be_gssapi_get_auth(Port *port);
 extern bool be_gssapi_get_enc(Port *port);
 extern const char *be_gssapi_get_princ(Port *port);
+extern bool be_gssapi_get_deleg(Port *port);
 
 /* Read and write to a GSSAPI-encrypted connection. */
 extern ssize_t be_gssapi_read(Port *port, void *ptr, size_t len);
 
    char        gss_princ[NAMEDATALEN]; /* GSSAPI Principal used to auth */
    bool        gss_auth;       /* If GSSAPI authentication was used */
    bool        gss_enc;        /* If encryption is being used */
+   bool        gss_deleg;      /* If credentials delegated */
 
 } PgBackendGSSStatus;
 
 
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQconnectionUsedGSSAPI    187
 
 {
    OM_uint32   maj_stat,
                min_stat,
-               lmin_s;
+               lmin_s,
+               gss_flags = GSS_C_MUTUAL_FLAG;
    gss_buffer_desc ginbuf;
    gss_buffer_desc goutbuf;
 
        ginbuf.value = NULL;
    }
 
+   /* Only try to acquire credentials if GSS delegation isn't disabled. */
+   if (!pg_GSS_have_cred_cache(&conn->gcred))
+       conn->gcred = GSS_C_NO_CREDENTIAL;
+
+   if (conn->gssdeleg && pg_strcasecmp(conn->gssdeleg, "enable") == 0)
+       gss_flags |= GSS_C_DELEG_FLAG;
+
    maj_stat = gss_init_sec_context(&min_stat,
-                                   GSS_C_NO_CREDENTIAL,
+                                   conn->gcred,
                                    &conn->gctx,
                                    conn->gtarg_nam,
                                    GSS_C_NO_OID,
-                                   GSS_C_MUTUAL_FLAG,
+                                   gss_flags,
                                    0,
                                    GSS_C_NO_CHANNEL_BINDINGS,
                                    (ginbuf.value == NULL) ? GSS_C_NO_BUFFER : &ginbuf,
    {
        conn->client_finished_auth = true;
        gss_release_name(&lmin_s, &conn->gtarg_nam);
+       conn->gssapi_used = true;
    }
 
    return STATUS_OK;
 
        "GSS-library", "", 7,   /* sizeof("gssapi") == 7 */
    offsetof(struct pg_conn, gsslib)},
 
+   {"gssdeleg", "PGGSSDELEG", NULL, NULL,
+       "GSS-delegation", "", 8,    /* sizeof("disable") == 8 */
+   offsetof(struct pg_conn, gssdeleg)},
+
    {"replication", NULL, NULL, NULL,
        "Replication", "D", 5,
    offsetof(struct pg_conn, replication)},
    conn->auth_req_received = false;
    conn->client_finished_auth = false;
    conn->password_needed = false;
+   conn->gssapi_used = false;
    conn->write_failed = false;
    free(conn->write_err_msg);
    conn->write_err_msg = NULL;
    free(conn->gssencmode);
    free(conn->krbsrvname);
    free(conn->gsslib);
+   free(conn->gssdeleg);
    free(conn->connip);
    /* Note that conn->Pfdebug is not ours to close or free */
    free(conn->write_err_msg);
        return false;
 }
 
+int
+PQconnectionUsedGSSAPI(const PGconn *conn)
+{
+   if (!conn)
+       return false;
+   if (conn->gssapi_used)
+       return true;
+   else
+       return false;
+}
+
 int
 PQclientEncoding(const PGconn *conn)
 {
 
 {
    ssize_t     ret;
    OM_uint32   major,
-               minor;
+               minor,
+               gss_flags = GSS_REQUIRED_FLAGS;
    uint32      netlen;
    PostgresPollingStatusType result;
    gss_buffer_desc input = GSS_C_EMPTY_BUFFER,
    if (ret != STATUS_OK)
        return PGRES_POLLING_FAILED;
 
+   if (conn->gssdeleg && pg_strcasecmp(conn->gssdeleg, "enable") == 0)
+   {
+       /* Acquire credentials if possbile */
+       if (conn->gcred == GSS_C_NO_CREDENTIAL)
+           (void) pg_GSS_have_cred_cache(&conn->gcred);
+
+       /*
+        * We have credentials and gssdeleg is enabled, so request credential
+        * delegation.  This may or may not actually result in credentials
+        * being delegated- it depends on if the forwardable flag has been set
+        * in the credential and if the server is configured to accept
+        * delegated credentials.
+        */
+       if (conn->gcred != GSS_C_NO_CREDENTIAL)
+           gss_flags |= GSS_C_DELEG_FLAG;
+   }
+
    /*
     * Call GSS init context, either with an empty input, or with a complete
     * packet from the server.
     */
    major = gss_init_sec_context(&minor, conn->gcred, &conn->gctx,
                                 conn->gtarg_nam, GSS_C_NO_OID,
-                                GSS_REQUIRED_FLAGS, 0, 0, &input, NULL,
+                                gss_flags, 0, 0, &input, NULL,
                                 &output, NULL, NULL);
 
    /* GSS Init Sec Context uses the whole packet, so clear it */
         * to do GSS wrapping/unwrapping.
         */
        conn->gssenc = true;
+       conn->gssapi_used = true;
 
        /* Clean up */
        gss_release_cred(&minor, &conn->gcred);
 
 extern PGpipelineStatus PQpipelineStatus(const PGconn *conn);
 extern int PQconnectionNeedsPassword(const PGconn *conn);
 extern int PQconnectionUsedPassword(const PGconn *conn);
+extern int PQconnectionUsedGSSAPI(const PGconn *conn);
 extern int PQclientEncoding(const PGconn *conn);
 extern int PQsetClientEncoding(PGconn *conn, const char *encoding);
 
 
    char       *krbsrvname;     /* Kerberos service name */
    char       *gsslib;         /* What GSS library to use ("gssapi" or
                                 * "sspi") */
+   char       *gssdeleg;       /* Try to delegate GSS credentials? */
    char       *ssl_min_protocol_version;   /* minimum TLS protocol version */
    char       *ssl_max_protocol_version;   /* maximum TLS protocol version */
    char       *target_session_attrs;   /* desired session properties */
    int         sversion;       /* server version, e.g. 70401 for 7.4.1 */
    bool        auth_req_received;  /* true if any type of auth req received */
    bool        password_needed;    /* true if server demanded a password */
+   bool        gssapi_used;    /* true if authenticated via gssapi */
    bool        sigpipe_so;     /* have we masked SIGPIPE via SO_NOSIGPIPE? */
    bool        sigpipe_flag;   /* can we mask SIGPIPE via MSG_NOSIGNAL? */
    bool        write_failed;   /* have we had a write failure on sock? */
 
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
+EXTRA_INSTALL += contrib/postgres_fdw
+EXTRA_INSTALL += contrib/dblink
+
 export with_gssapi with_krb_srvnam
 
 check:
 
 # that the server-side pg_stat_gssapi view reports what we expect to
 # see for each test and that SYSTEM_USER returns what we expect to see.
 #
+# Also test that GSSAPI delegation is working properly and that those
+# credentials can be used to make dblink / postgres_fdw connections.
+#
 # Since this requires setting up a full KDC, it doesn't make much sense
 # to have multiple test scripts (since they'd have to also create their
 # own KDC and that could cause race conditions or other problems)- so
 
 my $krb5_config  = 'krb5-config';
 my $kinit        = 'kinit';
+my $klist        = 'klist';
 my $kdb5_util    = 'kdb5_util';
 my $kadmin_local = 'kadmin.local';
 my $krb5kdc      = 'krb5kdc';
 {
    $krb5_config = $krb5_bin_dir . '/' . $krb5_config;
    $kinit       = $krb5_bin_dir . '/' . $kinit;
+   $klist       = $krb5_bin_dir . '/' . $klist;
 }
 if ($krb5_sbin_dir && -d $krb5_sbin_dir)
 {
 my $kdc_pidfile = "${PostgreSQL::Test::Utils::tmp_check}/krb5kdc.pid";
 my $keytab      = "${PostgreSQL::Test::Utils::tmp_check}/krb5.keytab";
 
+my $pgpass      = "${PostgreSQL::Test::Utils::tmp_check}/.pgpass";
+
 my $dbname      = 'postgres';
 my $username    = 'test1';
 my $application = '001_auth.pl';
   or BAIL_OUT("could not get Kerberos version");
 $krb5_version = $1;
 
+# Construct a pgpass file to make sure we don't use it
+append_to_file(
+   $pgpass,
+   '*:*:*:*:abc123'
+);
+
+chmod 0600, $pgpass;
+
 # Build the krb5.conf to use.
 #
 # Explicitly specify the default (test) realm and the KDC for
 dns_lookup_realm = false
 dns_lookup_kdc = false
 default_realm = $realm
+forwardable = false
 rdns = false
 
 [realms]
 $realm = {
     kdc = $hostaddr:$kdc_port
-}!);
+}
+!);
 
 append_to_file(
    $kdc_conf,
 });
 $node->start;
 
+my $port = $node->port();
+
 $node->safe_psql('postgres', 'CREATE USER test1;');
+$node->safe_psql('postgres', "CREATE USER test2 WITH ENCRYPTED PASSWORD 'abc123';");
+$node->safe_psql('postgres', 'CREATE EXTENSION postgres_fdw;');
+$node->safe_psql('postgres', 'CREATE EXTENSION dblink;');
+$node->safe_psql('postgres', "CREATE SERVER s1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$host', hostaddr '$hostaddr', port '$port', dbname 'postgres');");
+$node->safe_psql('postgres', "CREATE SERVER s2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (port '$port', dbname 'postgres', passfile '$pgpass');");
+
+$node->safe_psql('postgres', 'GRANT USAGE ON FOREIGN SERVER s1 TO test1;');
+
+$node->safe_psql('postgres', "CREATE USER MAPPING FOR test1 SERVER s1 OPTIONS (user 'test1');");
+$node->safe_psql('postgres', "CREATE USER MAPPING FOR test1 SERVER s2 OPTIONS (user 'test2');");
+
+$node->safe_psql('postgres', "CREATE TABLE t1 (c1 int);");
+$node->safe_psql('postgres', "INSERT INTO t1 VALUES (1);");
+$node->safe_psql('postgres', "CREATE FOREIGN TABLE tf1 (c1 int) SERVER s1 OPTIONS (schema_name 'public', table_name 't1');");
+$node->safe_psql('postgres', "GRANT SELECT ON t1 TO test1;");
+$node->safe_psql('postgres', "GRANT SELECT ON tf1 TO test1;");
+
+$node->safe_psql('postgres', "CREATE FOREIGN TABLE tf2 (c1 int) SERVER s2 OPTIONS (schema_name 'public', table_name 't1');");
+$node->safe_psql('postgres', "GRANT SELECT ON tf2 TO test1;");
 
 # Set up a table for SYSTEM_USER parallel worker testing.
 $node->safe_psql('postgres',
 
 unlink($node->data_dir . '/pg_hba.conf');
 $node->append_conf('pg_hba.conf',
-   qq{host all all $hostaddr/32 gss map=mymap});
+   qq{
+local all test2 scram-sha-256
+host all all $hostaddr/32 gss map=mymap
+});
 $node->restart;
 
 test_access($node, 'test1', 'SELECT true', 2, '', 'fails without ticket');
 
 run_log [ $kinit, 'test1' ], \$test1_password or BAIL_OUT($?);
+run_log [ $klist, '-f' ] or BAIL_OUT($?);
 
 test_access(
    $node,
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated AND encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
    '',
-   'succeeds with mapping with default gssencmode and host hba',
+   'succeeds with mapping with default gssencmode and host hba, ticket not forwardable',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
 );
 
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated AND encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
    'gssencmode=prefer',
-   'succeeds with GSS-encrypted access preferred with host hba',
+   'succeeds with GSS-encrypted access preferred with host hba, ticket not forwardable',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
 );
+
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated AND encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
    'gssencmode=require',
-   'succeeds with GSS-encrypted access required with host hba',
+   'succeeds with GSS-encrypted access required with host hba, ticket not forwardable',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
 );
 
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
+   0,
+   'gssencmode=prefer gssdeleg=enable',
+   'succeeds with GSS-encrypted access preferred with host hba and credentials not delegated even though asked for (ticket not forwardable)',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
+);
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
+   0,
+   'gssencmode=require gssdeleg=enable',
+   'succeeds with GSS-encrypted access required with host hba and credentials not delegated even though asked for (ticket not forwardable)',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
+);
+
+
 # Test that we can transport a reasonable amount of data.
 test_query(
    $node,
 
 unlink($node->data_dir . '/pg_hba.conf');
 $node->append_conf('pg_hba.conf',
-   qq{hostgssenc all all $hostaddr/32 gss map=mymap});
+   qq{
+    local all test2 scram-sha-256
+   hostgssenc all all $hostaddr/32 gss map=mymap
+});
+
+string_replace_file($krb5_conf, "forwardable = false", "forwardable = true");
+
+run_log [ $kinit, 'test1' ], \$test1_password or BAIL_OUT($?);
+run_log [ $klist, '-f' ] or BAIL_OUT($?);
+
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated from pg_stat_gssapi where pid = pg_backend_pid();',
+   0,
+   'gssencmode=prefer gssdeleg=enable',
+   'succeeds with GSS-encrypted access preferred and hostgssenc hba and credentials not forwarded (server does not accept them, default)',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
+);
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated from pg_stat_gssapi where pid = pg_backend_pid();',
+   0,
+   'gssencmode=require gssdeleg=enable',
+   'succeeds with GSS-encrypted access required and hostgssenc hba and credentials not forwarded (server does not accept them, default)',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
+);
+
+$node->append_conf('postgresql.conf',
+   qq{gss_accept_deleg=off});
+$node->restart;
+
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated from pg_stat_gssapi where pid = pg_backend_pid();',
+   0,
+   'gssencmode=prefer gssdeleg=enable',
+   'succeeds with GSS-encrypted access preferred and hostgssenc hba and credentials not forwarded (server does not accept them, explicitly disabled)',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
+);
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated from pg_stat_gssapi where pid = pg_backend_pid();',
+   0,
+   'gssencmode=require gssdeleg=enable',
+   'succeeds with GSS-encrypted access required and hostgssenc hba and credentials not forwarded (server does not accept them, explicitly disabled)',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
+);
+
+$node->append_conf('postgresql.conf',
+   qq{gss_accept_deleg=on});
 $node->restart;
 
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated AND encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND encrypted AND credentials_delegated from pg_stat_gssapi where pid = pg_backend_pid();',
+   0,
+   'gssencmode=prefer gssdeleg=enable',
+   'succeeds with GSS-encrypted access preferred and hostgssenc hba and credentials forwarded',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=yes, principal=test1\@$realm)"
+);
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND credentials_delegated from pg_stat_gssapi where pid = pg_backend_pid();',
+   0,
+   'gssencmode=require gssdeleg=enable',
+   'succeeds with GSS-encrypted access required and hostgssenc hba and credentials forwarded',
+   "connection authenticated: identity=\"test1\@$realm\" method=gss",
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=yes, principal=test1\@$realm)"
+);
+test_access(
+   $node,
+   'test1',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
    'gssencmode=prefer',
-   'succeeds with GSS-encrypted access preferred and hostgssenc hba',
+   'succeeds with GSS-encrypted access preferred and hostgssenc hba and credentials not forwarded',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
 );
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated AND encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND encrypted AND NOT credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
-   'gssencmode=require',
-   'succeeds with GSS-encrypted access required and hostgssenc hba',
+   'gssencmode=require gssdeleg=disable',
+   'succeeds with GSS-encrypted access required and hostgssenc hba and credentials explicitly not forwarded',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=no, principal=test1\@$realm)"
+);
+
+my $psql_out = '';
+my $psql_stderr = '';
+my $psql_rc = '';
+
+$psql_rc = $node->psql(
+    'postgres',
+   "SELECT * FROM dblink('user=test1 dbname=$dbname host=$host hostaddr=$hostaddr port=$port','select 1') as t1(c1 int);",
+   connstr => "user=test1 host=$host hostaddr=$hostaddr gssencmode=require gssdeleg=disable",
+   stdout => \$psql_out,
+   stderr => \$psql_stderr
+);
+is($psql_rc,'3','dblink attempt fails without delegated credentials');
+like($psql_stderr, qr/password or GSSAPI delegated credentials required/,'dblink does not work without delegated credentials');
+like($psql_out, qr/^$/,'dblink does not work without delegated credentials');
+
+$psql_out = '';
+$psql_stderr = '';
+
+$psql_rc = $node->psql(
+    'postgres',
+   "SELECT * FROM dblink('user=test2 dbname=$dbname port=$port passfile=$pgpass','select 1') as t1(c1 int);",
+   connstr => "user=test1 host=$host hostaddr=$hostaddr gssencmode=require gssdeleg=disable",
+   stdout => \$psql_out,
+   stderr => \$psql_stderr
 );
+is($psql_rc,'3','dblink does not work without delegated credentials and with passfile');
+like($psql_stderr, qr/password or GSSAPI delegated credentials required/,'dblink does not work without delegated credentials and with passfile');
+like($psql_out, qr/^$/,'dblink does not work without delegated credentials and with passfile');
+
+$psql_out = '';
+$psql_stderr = '';
+
+$psql_rc = $node->psql(
+    'postgres',
+   "TABLE tf1;",
+   connstr => "user=test1 host=$host hostaddr=$hostaddr gssencmode=require gssdeleg=disable",
+   stdout => \$psql_out,
+   stderr => \$psql_stderr
+);
+is($psql_rc,'3','postgres_fdw does not work without delegated credentials');
+like($psql_stderr, qr/password or GSSAPI delegated credentials required/,'postgres_fdw does not work without delegated credentials');
+like($psql_out, qr/^$/,'postgres_fdw does not work without delegated credentials');
+
+$psql_out = '';
+$psql_stderr = '';
+
+$psql_rc = $node->psql(
+    'postgres',
+   "TABLE tf2;",
+   connstr => "user=test1 host=$host hostaddr=$hostaddr gssencmode=require gssdeleg=disable",
+   stdout => \$psql_out,
+   stderr => \$psql_stderr
+);
+is($psql_rc,'3','postgres_fdw does not work without delegated credentials and with passfile');
+like($psql_stderr, qr/password or GSSAPI delegated credentials required/,'postgres_fdw does not work without delegated credentials and with passfile');
+like($psql_out, qr/^$/,'postgres_fdw does not work without delegated credentials and with passfile');
+
 test_access($node, 'test1', 'SELECT true', 2, 'gssencmode=disable',
    'fails with GSS encryption disabled and hostgssenc hba');
 
 
 unlink($node->data_dir . '/pg_hba.conf');
 $node->append_conf('pg_hba.conf',
-   qq{hostnogssenc all all $hostaddr/32 gss map=mymap});
+   qq{
+    local all test2 scram-sha-256
+   hostnogssenc all all $hostaddr/32 gss map=mymap
+});
 $node->restart;
 
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated and not encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND NOT encrypted AND credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
-   'gssencmode=prefer',
+   'gssencmode=prefer gssdeleg=enable',
    'succeeds with GSS-encrypted access preferred and hostnogssenc hba, but no encryption',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=no, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=no, deleg_credentials=yes, principal=test1\@$realm)"
 );
 test_access($node, 'test1', 'SELECT true', 2, 'gssencmode=require',
    'fails with GSS-encrypted access required and hostnogssenc hba');
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated and not encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND NOT encrypted AND credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
-   'gssencmode=disable',
+   'gssencmode=disable gssdeleg=enable',
    'succeeds with GSS encryption disabled and hostnogssenc hba',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=no, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=no, deleg_credentials=yes, principal=test1\@$realm)"
+);
+
+test_query(
+   $node,
+   'test1',
+   "SELECT * FROM dblink('user=test1 dbname=$dbname host=$host hostaddr=$hostaddr port=$port','select 1') as t1(c1 int);",
+   qr/^1$/s,
+   'gssencmode=prefer gssdeleg=enable',
+   'dblink works not-encrypted (server not configured to accept encrypted GSSAPI connections)');
+
+test_query(
+   $node,
+   'test1',
+   "TABLE tf1;",
+   qr/^1$/s,
+   'gssencmode=prefer gssdeleg=enable',
+   'postgres_fdw works not-encrypted (server not configured to accept encrypted GSSAPI connections)');
+
+$psql_out = '';
+$psql_stderr = '';
+
+$psql_rc = $node->psql(
+    'postgres',
+   "SELECT * FROM dblink('user=test2 dbname=$dbname port=$port passfile=$pgpass','select 1') as t1(c1 int);",
+   connstr => "user=test1 host=$host hostaddr=$hostaddr gssencmode=prefer gssdeleg=enable",
+   stdout => \$psql_out,
+   stderr => \$psql_stderr
+);
+is($psql_rc,'3','dblink does not work with delegated credentials and with passfile');
+like($psql_stderr, qr/password or GSSAPI delegated credentials required/,'dblink does not work with delegated credentials and with passfile');
+like($psql_out, qr/^$/,'dblink does not work with delegated credentials and with passfile');
+
+$psql_out = '';
+$psql_stderr = '';
+
+$psql_rc = $node->psql(
+    'postgres',
+   "TABLE tf2;",
+   connstr => "user=test1 host=$host hostaddr=$hostaddr gssencmode=prefer gssdeleg=enable",
+   stdout => \$psql_out,
+   stderr => \$psql_stderr
 );
+is($psql_rc,'3','postgres_fdw does not work with delegated credentials and with passfile');
+like($psql_stderr, qr/password or GSSAPI delegated credentials required/,'postgres_fdw does not work with delegated credentials and with passfile');
+like($psql_out, qr/^$/,'postgres_fdw does not work with delegated credentials and with passfile');
 
 truncate($node->data_dir . '/pg_ident.conf', 0);
 unlink($node->data_dir . '/pg_hba.conf');
 $node->append_conf('pg_hba.conf',
-   qq{host all all $hostaddr/32 gss include_realm=0});
+   qq{
+    local all test2 scram-sha-256
+   host all all $hostaddr/32 gss include_realm=0
+});
 $node->restart;
 
 test_access(
    $node,
    'test1',
-   'SELECT gss_authenticated AND encrypted from pg_stat_gssapi where pid = pg_backend_pid();',
+   'SELECT gss_authenticated AND encrypted AND credentials_delegated FROM pg_stat_gssapi WHERE pid = pg_backend_pid();',
    0,
-   '',
+   'gssdeleg=enable',
    'succeeds with include_realm=0 and defaults',
    "connection authenticated: identity=\"test1\@$realm\" method=gss",
-   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, principal=test1\@$realm)"
+   "connection authorized: user=$username database=$dbname application_name=$application GSS (authenticated=yes, encrypted=yes, deleg_credentials=yes, principal=test1\@$realm)"
 );
 
+test_query(
+   $node,
+   'test1',
+   "SELECT * FROM dblink('user=test1 dbname=$dbname host=$host hostaddr=$hostaddr port=$port password=1234','select 1') as t1(c1 int);",
+   qr/^1$/s,
+   'gssencmode=require gssdeleg=enable',
+   'dblink works encrypted');
+
+test_query(
+   $node,
+   'test1',
+   "TABLE tf1;",
+   qr/^1$/s,
+   'gssencmode=require gssdeleg=enable',
+   'postgres_fdw works encrypted');
+
 # Reset pg_hba.conf, and cause a usermap failure with an authentication
 # that has passed.
 unlink($node->data_dir . '/pg_hba.conf');
 $node->append_conf('pg_hba.conf',
-   qq{host all all $hostaddr/32 gss include_realm=0 krb_realm=EXAMPLE.ORG});
+   qq{
+    local all test2 scram-sha-256
+   host all all $hostaddr/32 gss include_realm=0 krb_realm=EXAMPLE.ORG
+});
 $node->restart;
 
 test_access(
 
   slurp_dir
   slurp_file
   append_to_file
+  string_replace_file
   check_mode_recursive
   chmod_recursive
   check_pg_config
 
 =pod
 
+=item string_replace_file(filename, find, replace)
+
+Find and replace string of a given file.
+
+=cut
+
+sub string_replace_file
+{
+   my ($filename, $find, $replace) = @_;
+   open(my $in, '<', $filename);
+   my $content;
+   while(<$in>)
+   {
+       $_ =~ s/$find/$replace/;
+       $content = $content.$_;
+   }
+   close $in;
+   open(my $out, '>', $filename);
+   print $out $content;
+   close($out);
+
+   return;
+}
+
+=pod
+
 =item check_mode_recursive(dir, expected_dir_mode, expected_file_mode, ignore_list)
 
 Check that all file/dir modes in a directory match the expected values,
 
     s.query_id,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, gss_deleg, leader_pid, query_id)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
 pg_stat_gssapi| SELECT pid,
     gss_auth AS gss_authenticated,
     gss_princ AS principal,
-    gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+    gss_enc AS encrypted,
+    gss_deleg AS credentials_delegated
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, gss_deleg, leader_pid, query_id)
   WHERE (client_port IS NOT NULL);
 pg_stat_io| SELECT backend_type,
     io_object,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, gss_deleg, leader_pid, query_id)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
     ssl_client_dn AS client_dn,
     ssl_client_serial AS client_serial,
     ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, gss_deleg, leader_pid, query_id)
   WHERE (client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,