#include <sys/socket.h>
 #include <sys/time.h>
 
+#ifdef USE_VALGRIND
+#include <valgrind/valgrind.h>
+#endif
+
 #include "access/parallel.h"
 #include "access/printtup.h"
 #include "access/xact.h"
 static void disable_statement_timeout(void);
 
 
+/* ----------------------------------------------------------------
+ *     infrastructure for valgrind debugging
+ * ----------------------------------------------------------------
+ */
+#ifdef USE_VALGRIND
+/* This variable should be set at the top of the main loop. */
+static unsigned int old_valgrind_error_count;
+
+/*
+ * If Valgrind detected any errors since old_valgrind_error_count was updated,
+ * report the current query as the cause.  This should be called at the end
+ * of message processing.
+ */
+static void
+valgrind_report_error_query(const char *query)
+{
+   unsigned int valgrind_error_count = VALGRIND_COUNT_ERRORS;
+
+   if (unlikely(valgrind_error_count != old_valgrind_error_count) &&
+       query != NULL)
+       VALGRIND_PRINTF("Valgrind detected %u error(s) during execution of \"%s\"\n",
+                       valgrind_error_count - old_valgrind_error_count,
+                       query);
+}
+
+#else                          /* !USE_VALGRIND */
+#define valgrind_report_error_query(query) ((void) 0)
+#endif                         /* USE_VALGRIND */
+
+
 /* ----------------------------------------------------------------
  *     routines to obtain user input
  * ----------------------------------------------------------------
    if (save_log_statement_stats)
        ShowUsage("BIND MESSAGE STATISTICS");
 
+   valgrind_report_error_query(debug_query_string);
+
    debug_query_string = NULL;
 }
 
    if (save_log_statement_stats)
        ShowUsage("EXECUTE MESSAGE STATISTICS");
 
+   valgrind_report_error_query(debug_query_string);
+
    debug_query_string = NULL;
 }
 
        /* Report the error to the client and/or server log */
        EmitErrorReport();
 
+       /*
+        * If Valgrind noticed something during the erroneous query, print the
+        * query string, assuming we have one.
+        */
+       valgrind_report_error_query(debug_query_string);
+
        /*
         * Make sure debug_query_string gets reset before we possibly clobber
         * the storage it points at.
         */
        doing_extended_query_message = false;
 
+       /*
+        * For valgrind reporting purposes, the "current query" begins here.
+        */
+#ifdef USE_VALGRIND
+       old_valgrind_error_count = VALGRIND_COUNT_ERRORS;
+#endif
+
        /*
         * Release storage left over from prior query cycle, and create a new
         * query input buffer in the cleared MessageContext.
                    else
                        exec_simple_query(query_string);
 
+                   valgrind_report_error_query(query_string);
+
                    send_ready_for_query = true;
                }
                break;
 
                    exec_parse_message(query_string, stmt_name,
                                       paramTypes, numParams);
+
+                   valgrind_report_error_query(query_string);
                }
                break;
 
                 * the field extraction out-of-line
                 */
                exec_bind_message(&input_message);
+
+               /* exec_bind_message does valgrind_report_error_query */
                break;
 
            case 'E':           /* execute */
                    pq_getmsgend(&input_message);
 
                    exec_execute_message(portal_name, max_rows);
+
+                   /* exec_execute_message does valgrind_report_error_query */
                }
                break;
 
                /* commit the function-invocation transaction */
                finish_xact_command();
 
+               valgrind_report_error_query("fastpath function call");
+
                send_ready_for_query = true;
                break;
 
 
                    if (whereToSendOutput == DestRemote)
                        pq_putemptymessage('3');    /* CloseComplete */
+
+                   valgrind_report_error_query("CLOSE message");
                }
                break;
 
                                            describe_type)));
                            break;
                    }
+
+                   valgrind_report_error_query("DESCRIBE message");
                }
                break;
 
            case 'S':           /* sync */
                pq_getmsgend(&input_message);
                finish_xact_command();
+               valgrind_report_error_query("SYNC message");
                send_ready_for_query = true;
                break;