#include <mysql.h>

#include <cctype>
#include <cerrno>
#include <cinttypes>
#include <cstdio>
#include <cstdlib>
#include <cstring>

#include <fstream>
#include <iomanip>
#include <iostream>
#include <map>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <string>
#include <vector>

namespace
{

struct ProgramOptions
{
    std::string credentials_path;
    int         server_id;
    std::string schema_name;
    std::string table_name;
    std::string output_path;
};

struct DbCredentials
{
    std::string host = "localhost";
    std::string user;
    std::string password;
    std::string socket;
    std::string database;
    unsigned int port = 0;
};

struct MysqlConfig
{
    std::string ssl_ca;
    std::string ssl_cert;
    std::string ssl_key;
    int         ssl_verify_server_cert;
};

enum class OutputKind
{
    SignedTiny,
    UnsignedTiny,
    SignedShort,
    UnsignedShort,
    SignedInt,
    UnsignedInt,
    SignedLongLong,
    UnsignedLongLong,
    Float,
    Double,
    DecimalText,
    Text,
    BinaryHex
};

struct LocalBufferField
{
    std::string field_name;
    enum_field_types mysql_type = MYSQL_TYPE_STRING;
    bool is_unsigned = false;
    unsigned long declared_length = 0;
    unsigned long allocated_length = 0;
    OutputKind output_kind = OutputKind::Text;

    my_bool is_null = 0;
    unsigned long buffer_length = 0;

    signed char tiny_value = 0;
    unsigned char unsigned_tiny_value = 0;
    short short_value = 0;
    unsigned short unsigned_short_value = 0;
    int int_value = 0;
    unsigned int unsigned_int_value = 0;
    long long long_long_value = 0;
    unsigned long long unsigned_long_long_value = 0;
    float float_value = 0.0f;
    double double_value = 0.0;
    std::vector<unsigned char> byte_buffer;

    std::map<unsigned long long, unsigned long long> is_null_counts;
    std::map<unsigned long, unsigned long long> buffer_length_counts;

    void *buffer_ptr()
    {
        switch (output_kind) {
            case OutputKind::SignedTiny:
                return &tiny_value;
            case OutputKind::UnsignedTiny:
                return &unsigned_tiny_value;
            case OutputKind::SignedShort:
                return &short_value;
            case OutputKind::UnsignedShort:
                return &unsigned_short_value;
            case OutputKind::SignedInt:
                return &int_value;
            case OutputKind::UnsignedInt:
                return &unsigned_int_value;
            case OutputKind::SignedLongLong:
                return &long_long_value;
            case OutputKind::UnsignedLongLong:
                return &unsigned_long_long_value;
            case OutputKind::Float:
                return &float_value;
            case OutputKind::Double:
                return &double_value;
            case OutputKind::DecimalText:
            case OutputKind::Text:
            case OutputKind::BinaryHex:
                return byte_buffer.empty() ? nullptr : &byte_buffer[0];
        }
        return nullptr;
    }
};

struct LocalBuffer
{
    std::vector<LocalBufferField> fields;
    std::vector<MYSQL_BIND> binds;
};

struct MetadataHandle
{
    MYSQL_RES *metadata = nullptr;

    explicit MetadataHandle(MYSQL_RES *value)
        : metadata(value)
    {
    }

    ~MetadataHandle()
    {
        if (metadata != nullptr) {
            mysql_free_result(metadata);
        }
    }

    MetadataHandle(const MetadataHandle &) = delete;
    MetadataHandle &operator=(const MetadataHandle &) = delete;
};

static void fail(const std::string &message)
{
    throw std::runtime_error(message);
}

static std::string trim(const std::string &input)
{
    std::string::size_type first = 0;
    while (first < input.size() && std::isspace(static_cast<unsigned char>(input[first])) != 0) {
        ++first;
    }

    std::string::size_type last = input.size();
    while (last > first && std::isspace(static_cast<unsigned char>(input[last - 1])) != 0) {
        --last;
    }

    return input.substr(first, last - first);
}

static bool is_valid_identifier(const std::string &value)
{
    if (value.empty()) {
        return false;
    }

    for (std::string::const_iterator it = value.begin(); it != value.end(); ++it) {
        const unsigned char ch = static_cast<unsigned char>(*it);
        if (!(std::isalnum(ch) != 0 || ch == '_' || ch == '$')) {
            return false;
        }
    }
    return true;
}

static std::string mysql_type_name(enum_field_types type)
{
    switch (type) {
        case MYSQL_TYPE_TINY: return "MYSQL_TYPE_TINY";
        case MYSQL_TYPE_SHORT: return "MYSQL_TYPE_SHORT";
        case MYSQL_TYPE_LONG: return "MYSQL_TYPE_LONG";
        case MYSQL_TYPE_INT24: return "MYSQL_TYPE_INT24";
        case MYSQL_TYPE_LONGLONG: return "MYSQL_TYPE_LONGLONG";
        case MYSQL_TYPE_FLOAT: return "MYSQL_TYPE_FLOAT";
        case MYSQL_TYPE_DOUBLE: return "MYSQL_TYPE_DOUBLE";
        case MYSQL_TYPE_DECIMAL: return "MYSQL_TYPE_DECIMAL";
        case MYSQL_TYPE_NEWDECIMAL: return "MYSQL_TYPE_NEWDECIMAL";
        case MYSQL_TYPE_STRING: return "MYSQL_TYPE_STRING";
        case MYSQL_TYPE_VAR_STRING: return "MYSQL_TYPE_VAR_STRING";
        case MYSQL_TYPE_VARCHAR: return "MYSQL_TYPE_VARCHAR";
        case MYSQL_TYPE_BLOB: return "MYSQL_TYPE_BLOB";
        case MYSQL_TYPE_TINY_BLOB: return "MYSQL_TYPE_TINY_BLOB";
        case MYSQL_TYPE_MEDIUM_BLOB: return "MYSQL_TYPE_MEDIUM_BLOB";
        case MYSQL_TYPE_LONG_BLOB: return "MYSQL_TYPE_LONG_BLOB";
        case MYSQL_TYPE_BIT: return "MYSQL_TYPE_BIT";
        case MYSQL_TYPE_DATE: return "MYSQL_TYPE_DATE";
        case MYSQL_TYPE_TIME: return "MYSQL_TYPE_TIME";
        case MYSQL_TYPE_DATETIME: return "MYSQL_TYPE_DATETIME";
        case MYSQL_TYPE_TIMESTAMP: return "MYSQL_TYPE_TIMESTAMP";
        case MYSQL_TYPE_YEAR: return "MYSQL_TYPE_YEAR";
        default: return "MYSQL_TYPE_OTHER";
    }
}

static unsigned long compute_buffer_size(const MYSQL_FIELD &field)
{
    switch (field.type) {
        case MYSQL_TYPE_TINY:
            return sizeof(signed char);
        case MYSQL_TYPE_SHORT:
            return sizeof(short);
        case MYSQL_TYPE_LONG:
        case MYSQL_TYPE_INT24:
            return sizeof(int);
        case MYSQL_TYPE_LONGLONG:
            return sizeof(long long);
        case MYSQL_TYPE_FLOAT:
            return sizeof(float);
        case MYSQL_TYPE_DOUBLE:
            return sizeof(double);
        case MYSQL_TYPE_DECIMAL:
        case MYSQL_TYPE_NEWDECIMAL: {
            unsigned long width = field.length;
            if (width < 32) {
                width = 32;
            }
            return width + 2;
        }
        case MYSQL_TYPE_DATE:
            return 11;
        case MYSQL_TYPE_TIME:
        case MYSQL_TYPE_DATETIME:
        case MYSQL_TYPE_TIMESTAMP:
            return 32;
        case MYSQL_TYPE_YEAR:
            return 5;
        case MYSQL_TYPE_BIT:
        case MYSQL_TYPE_TINY_BLOB:
        case MYSQL_TYPE_BLOB:
        case MYSQL_TYPE_MEDIUM_BLOB:
        case MYSQL_TYPE_LONG_BLOB:
        case MYSQL_TYPE_STRING:
        case MYSQL_TYPE_VAR_STRING:
        case MYSQL_TYPE_VARCHAR:
        default: {
            unsigned long width = field.length;
            if (width == 0) {
                width = 1;
            }
            if ((field.flags & BINARY_FLAG) == 0 && field.charsetnr != 63) {
                ++width;
            }
            return width;
        }
    }
}

static OutputKind determine_output_kind(const MYSQL_FIELD &field)
{
    switch (field.type) {
        case MYSQL_TYPE_TINY:
            return ((field.flags & UNSIGNED_FLAG) != 0) ? OutputKind::UnsignedTiny : OutputKind::SignedTiny;
        case MYSQL_TYPE_SHORT:
            return ((field.flags & UNSIGNED_FLAG) != 0) ? OutputKind::UnsignedShort : OutputKind::SignedShort;
        case MYSQL_TYPE_LONG:
        case MYSQL_TYPE_INT24:
            return ((field.flags & UNSIGNED_FLAG) != 0) ? OutputKind::UnsignedInt : OutputKind::SignedInt;
        case MYSQL_TYPE_LONGLONG:
            return ((field.flags & UNSIGNED_FLAG) != 0) ? OutputKind::UnsignedLongLong : OutputKind::SignedLongLong;
        case MYSQL_TYPE_FLOAT:
            return OutputKind::Float;
        case MYSQL_TYPE_DOUBLE:
            return OutputKind::Double;
        case MYSQL_TYPE_DECIMAL:
        case MYSQL_TYPE_NEWDECIMAL:
            return OutputKind::DecimalText;
        case MYSQL_TYPE_BIT:
            return OutputKind::BinaryHex;
        case MYSQL_TYPE_TINY_BLOB:
        case MYSQL_TYPE_BLOB:
        case MYSQL_TYPE_MEDIUM_BLOB:
        case MYSQL_TYPE_LONG_BLOB:
            return (((field.flags & BINARY_FLAG) != 0) || field.charsetnr == 63) ? OutputKind::BinaryHex : OutputKind::Text;
        case MYSQL_TYPE_STRING:
        case MYSQL_TYPE_VAR_STRING:
        case MYSQL_TYPE_VARCHAR:
            return (((field.flags & BINARY_FLAG) != 0) || field.charsetnr == 63) ? OutputKind::BinaryHex : OutputKind::Text;
        case MYSQL_TYPE_DATE:
        case MYSQL_TYPE_TIME:
        case MYSQL_TYPE_DATETIME:
        case MYSQL_TYPE_TIMESTAMP:
        case MYSQL_TYPE_YEAR:
        default:
            return OutputKind::Text;
    }
}

static enum_field_types bind_buffer_type(OutputKind output_kind)
{
    switch (output_kind) {
        case OutputKind::SignedTiny:
        case OutputKind::UnsignedTiny:
            return MYSQL_TYPE_TINY;
        case OutputKind::SignedShort:
        case OutputKind::UnsignedShort:
            return MYSQL_TYPE_SHORT;
        case OutputKind::SignedInt:
        case OutputKind::UnsignedInt:
            return MYSQL_TYPE_LONG;
        case OutputKind::SignedLongLong:
        case OutputKind::UnsignedLongLong:
            return MYSQL_TYPE_LONGLONG;
        case OutputKind::Float:
            return MYSQL_TYPE_FLOAT;
        case OutputKind::Double:
            return MYSQL_TYPE_DOUBLE;
        case OutputKind::DecimalText:
        case OutputKind::Text:
            return MYSQL_TYPE_STRING;
        case OutputKind::BinaryHex:
            return MYSQL_TYPE_BLOB;
    }
    return MYSQL_TYPE_STRING;
}

static std::string quote_identifier(const std::string &identifier)
{
    return "`" + identifier + "`";
}

static ProgramOptions parse_input_arguments(int argc, char **argv)
{
    if (argc != 5) {
        std::ostringstream usage;
        usage
            << "Usage: " << argv[0] << " <credentials.txt> 0 <schema.table> <output.tsv>\n"
            << "Where: \n"
            << "    Credentials file format:\n"
            << "      Server=localhost\n"
            << "      User=my_user\n"
            << "      Password=my_password\n"
            << "      Port=3306\n"
            << "      Socket=/path/to/mysql.sock\n";
        fail(usage.str());
    }

    ProgramOptions options;
    options.credentials_path = argv[1];
    options.server_id = atoi(argv[2] ? argv[2] : "0" );
    options.output_path = argv[4];

    const std::string qualified_name = argv[3];
    const std::string::size_type dot = qualified_name.find('.');
    if (dot == std::string::npos || dot == 0 || dot + 1 >= qualified_name.size()) {
        fail("Expected table argument in the form schema.table");
    }

    options.schema_name = qualified_name.substr(0, dot);
    options.table_name = qualified_name.substr(dot + 1);

    if (!is_valid_identifier(options.schema_name) || !is_valid_identifier(options.table_name)) {
        fail("Schema and table names may contain only letters, digits, '_' or '$'");
    }

    return options;
}

static DbCredentials load_credentials_file(const std::string &path)
{
    std::ifstream input(path.c_str());
    if (!input.is_open()) {
        fail("Unable to open credentials file: " + path);
    }

    DbCredentials credentials;
    std::string line;

    while (std::getline(input, line)) {
        line = trim(line);
        if (line.empty() || line[0] == '#') {
            continue;
        }

        const std::string::size_type equals = line.find('=');
        if (equals == std::string::npos) {
            continue;
        }

        const std::string key = trim(line.substr(0, equals));
        const std::string value = trim(line.substr(equals + 1));

        if (key == "Server") {
            credentials.host = value;
        } else if (key == "User") {
            credentials.user = value;
        } else if (key == "Password") {
            credentials.password = value;
        } else if (key == "Port") {
            char *end = nullptr;
            errno = 0;
            const unsigned long parsed_port = std::strtoul(value.c_str(), &end, 10);
            if (errno != 0 || end == value.c_str() || *end != '\0' || parsed_port > 65535UL) {
                fail("Invalid port value in credentials file: " + value);
            }
            credentials.port = static_cast<unsigned int>(parsed_port);
        } else if (key == "Socket") {
            credentials.socket = value;
        } else if (key == "Database") {
            credentials.database = value;
        }
    }

    if (credentials.user.empty()) {
        fail("Credentials file must define user=<value>");
    }

    return credentials;
}

static MysqlConfig load_mysql_cnf_file(const std::string &cnf_path)
{
    std::ifstream input(cnf_path.c_str());
    if (!input.is_open()) {
        fail("Unable to open Mysql CNF file: " + cnf_path);
    }

    MysqlConfig mysqlConfig;
    std::string line;

    std::string header = "[default]";

    while (std::getline(input, line)) {
        line = trim(line);
        if (line.empty() || line[0] == '#') {
            continue;
        }

        if((line[0] == '[') && (line[line.size() - 1] == ']'))
        {
            header = line;
            continue;
        }

        const std::string::size_type equals = line.find('=');
        if (equals == std::string::npos) {
            continue;
        }

        const std::string key = trim(line.substr(0, equals));
        const std::string value = trim(line.substr(equals + 1));

        if( header == "[client]")
        {
            if( key == "ssl-ca") {
                mysqlConfig.ssl_ca = value;
            }
            else if( key == "ssl-cert") {
                mysqlConfig.ssl_cert = value;
            }
            else if( key == "ssl-key") {
                mysqlConfig.ssl_key = value;
            }
            else if( key == "ssl-verify-server-cert") {
                mysqlConfig.ssl_verify_server_cert = atoi(value.c_str());
            }
        }
    }

    return mysqlConfig ;
}

static MYSQL *establish_database_connection(const MysqlConfig &mysqlConfig, const DbCredentials &credentials)
{
    static int atexit_mysql_end_dbconnect = false;
    my_bool verify = 1;

    MYSQL *connection = mysql_init(nullptr);
    if (connection == nullptr) {
        fail("mysql_init() failed");
    }

    if( atexit_mysql_end_dbconnect == false) {
        atexit( mysql_library_end );
        atexit_mysql_end_dbconnect  = true;
    }

    if(mysqlConfig.ssl_ca.empty() == false) {
        mysql_optionsv(connection, MYSQL_OPT_SSL_CA, (void*) mysqlConfig.ssl_ca.c_str());
    }

    if(mysqlConfig.ssl_cert.empty() == false) {
        mysql_optionsv(connection, MYSQL_OPT_SSL_CERT, (void*) mysqlConfig.ssl_cert.c_str());
    }

    if(mysqlConfig.ssl_key.empty() == false) {
        mysql_optionsv(connection, MYSQL_OPT_SSL_KEY, (void*) mysqlConfig.ssl_key.c_str());
    }

    verify = (mysqlConfig.ssl_verify_server_cert > 0);
    mysql_optionsv(connection, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, (void*) &verify);

    printf("Establishing Connection to Host: %s\n", credentials.host.c_str() );

    if (mysql_real_connect(connection,
                           credentials.host.empty() ? nullptr : credentials.host.c_str(),
                           credentials.user.empty() ? nullptr : credentials.user.c_str(),
                           credentials.password.empty() ? nullptr : credentials.password.c_str(),
                           credentials.database.empty() ? nullptr : credentials.database.c_str(),
                           credentials.port,
                           nullptr, // credentials.socket.empty() ? nullptr : credentials.socket.c_str(),
                           0) == nullptr) {
        std::string error = mysql_error(connection);
        mysql_close(connection);
        fail("mysql_real_connect() failed: " + error);
    }

    return connection;
}

static MYSQL *establish_table_connection(const MysqlConfig &mysqlConfig, const std::string &credentials_path, int server_id)
{
    // Establish regular connection
    const DbCredentials regular_credentials = load_credentials_file(credentials_path);
    MYSQL* connection = establish_database_connection(mysqlConfig, regular_credentials);
    return connection;
}

static MYSQL_STMT *prepare_select_statement(MYSQL *connection, const ProgramOptions &options, std::string &sql_out)
{
    sql_out = "SELECT * FROM " + quote_identifier(options.schema_name) + "." + quote_identifier(options.table_name);

    MYSQL_STMT *statement = mysql_stmt_init(connection);
    if (statement == nullptr) {
        fail("mysql_stmt_init() failed");
    }

    if (mysql_stmt_prepare(statement, sql_out.c_str(), static_cast<unsigned long>(sql_out.size())) != 0) {
        const std::string error = mysql_stmt_error(statement);
        mysql_stmt_close(statement);
        fail("mysql_stmt_prepare() failed, SQL:" +sql_out + " , Error:" + error);
    }

    return statement;
}

static MetadataHandle query_table_structure(MYSQL_STMT *statement)
{
    MYSQL_RES *metadata = mysql_stmt_result_metadata(statement);
    if (metadata == nullptr) {
        fail("mysql_stmt_result_metadata() returned null; statement does not expose a result set");
    }
    return MetadataHandle(metadata);
}

static LocalBuffer allocate_local_buffer(MYSQL_RES *metadata)
{
    LocalBuffer local_buffer;
    const unsigned int field_count = mysql_num_fields(metadata);
    local_buffer.fields.resize(field_count);
    local_buffer.binds.resize(field_count);

    MYSQL_FIELD *fields = mysql_fetch_fields(metadata);
    for (unsigned int index = 0; index < field_count; ++index) {
        LocalBufferField &field = local_buffer.fields[index];
        const MYSQL_FIELD &metadata_field = fields[index];

        field.field_name = metadata_field.name != nullptr ? metadata_field.name : "";
        field.mysql_type = metadata_field.type;
        field.is_unsigned = (metadata_field.flags & UNSIGNED_FLAG) != 0;
        field.declared_length = metadata_field.length;
        field.allocated_length = compute_buffer_size(metadata_field);
        field.output_kind = determine_output_kind(metadata_field);

        if (field.output_kind == OutputKind::DecimalText ||
            field.output_kind == OutputKind::Text ||
            field.output_kind == OutputKind::BinaryHex) {
            field.byte_buffer.assign(field.allocated_length, 0);
        }
    }

    return local_buffer;
}

static void bind_prepared_statement_results(MYSQL_STMT *statement, LocalBuffer &local_buffer)
{
    for (std::size_t index = 0; index < local_buffer.fields.size(); ++index) {
        LocalBufferField &field = local_buffer.fields[index];
        MYSQL_BIND &bind = local_buffer.binds[index];
        std::memset(&bind, 0, sizeof(bind));

        bind.buffer_type = bind_buffer_type(field.output_kind);
        bind.buffer = field.buffer_ptr();
        bind.buffer_length = field.allocated_length;
        bind.length = &field.buffer_length;
        bind.is_null = &field.is_null;
        bind.is_unsigned = field.is_unsigned ? 1 : 0;
    }

    if (mysql_stmt_bind_result(statement, &local_buffer.binds[0]) != 0) {
        fail("mysql_stmt_bind_result() failed: " + std::string(mysql_stmt_error(statement)));
    }
}

static std::string escape_tsv_text(const char *value, std::size_t length)
{
    std::string output;
    output.reserve(length);

    for (std::size_t index = 0; index < length; ++index) {
        switch (value[index]) {
            case '\t':
                output.append("\\t");
                break;
            case '\n':
                output.append("\\n");
                break;
            case '\r':
                output.append("\\r");
                break;
            default:
                output.push_back(value[index]);
                break;
        }
    }

    return output;
}

static std::string bytes_to_hex(const unsigned char *data, std::size_t length)
{
    static const char digits[] = "0123456789ABCDEF";
    std::string output;
    output.reserve(length * 2);

    for (std::size_t index = 0; index < length; ++index) {
        const unsigned char value = data[index];
        output.push_back(digits[(value >> 4) & 0x0F]);
        output.push_back(digits[value & 0x0F]);
    }

    return output;
}

static std::string format_field_value(const LocalBufferField &field)
{
    if (field.is_null != 0) {
        return "\\N";
    }

    if (field.buffer_length == 0) {
        return "";
    }

    char buffer[128];
    switch (field.output_kind) {
        case OutputKind::SignedTiny:
            std::snprintf(buffer, sizeof(buffer), "%d", static_cast<int>(field.tiny_value));
            return buffer;
        case OutputKind::UnsignedTiny:
            std::snprintf(buffer, sizeof(buffer), "%u", static_cast<unsigned int>(field.unsigned_tiny_value));
            return buffer;
        case OutputKind::SignedShort:
            std::snprintf(buffer, sizeof(buffer), "%hd", field.short_value);
            return buffer;
        case OutputKind::UnsignedShort:
            std::snprintf(buffer, sizeof(buffer), "%hu", field.unsigned_short_value);
            return buffer;
        case OutputKind::SignedInt:
            std::snprintf(buffer, sizeof(buffer), "%d", field.int_value);
            return buffer;
        case OutputKind::UnsignedInt:
            std::snprintf(buffer, sizeof(buffer), "%u", field.unsigned_int_value);
            return buffer;
        case OutputKind::SignedLongLong:
            std::snprintf(buffer, sizeof(buffer), "%lld", field.long_long_value);
            return buffer;
        case OutputKind::UnsignedLongLong:
            std::snprintf(buffer, sizeof(buffer), "%llu", field.unsigned_long_long_value);
            return buffer;
        case OutputKind::Float:
            std::snprintf(buffer, sizeof(buffer), "%.9g", static_cast<double>(field.float_value));
            return buffer;
        case OutputKind::Double:
            std::snprintf(buffer, sizeof(buffer), "%.17g", field.double_value);
            return buffer;
        case OutputKind::DecimalText:
        case OutputKind::Text: {
            const char *value = reinterpret_cast<const char *>(field.byte_buffer.empty() ? nullptr : &field.byte_buffer[0]);
            const std::size_t length = static_cast<std::size_t>(field.buffer_length);
            return value == nullptr ? std::string() : escape_tsv_text(value, length);
        }
        case OutputKind::BinaryHex: {
            const unsigned char *value = field.byte_buffer.empty() ? nullptr : &field.byte_buffer[0];
            const std::size_t length = static_cast<std::size_t>(field.buffer_length);
            return value == nullptr ? std::string() : bytes_to_hex(value, length);
        }
    }

    return std::string();
}

static void write_header_row(FILE *output_file, const LocalBuffer &local_buffer)
{
    for (std::size_t index = 0; index < local_buffer.fields.size(); ++index) {
        if (index > 0) {
            std::fputc('\t', output_file);
        }
        std::fputs(local_buffer.fields[index].field_name.c_str(), output_file);
    }
    std::fputc('\n', output_file);
}

static void write_output_record(FILE *output_file, const LocalBuffer &local_buffer)
{
    for (std::size_t index = 0; index < local_buffer.fields.size(); ++index) {
        if (index > 0) {
            std::fputc('\t', output_file);
        }

        const std::string value = format_field_value(local_buffer.fields[index]);
        std::fwrite(value.data(), 1, value.size(), output_file);
    }
    std::fputc('\n', output_file);
}

static void record_fetch_observations(LocalBuffer &local_buffer)
{
    for (std::size_t index = 0; index < local_buffer.fields.size(); ++index) {
        LocalBufferField &field = local_buffer.fields[index];
        ++field.is_null_counts[static_cast<unsigned long long>(field.is_null)];
        ++field.buffer_length_counts[field.buffer_length];
    }
}

static bool fetch_next_record(MYSQL_STMT *statement, LocalBuffer &local_buffer)
{
    const int fetch_status = mysql_stmt_fetch(statement);
    if (fetch_status == 0 || fetch_status == MYSQL_DATA_TRUNCATED) {
        record_fetch_observations(local_buffer);
        return true;
    }

    if (fetch_status == MYSQL_NO_DATA) {
        return false;
    }

    fail("mysql_stmt_fetch() failed: " + std::string(mysql_stmt_error(statement)));
    return false;
}

static std::string summarize_counts(const std::map<unsigned long long, unsigned long long> &counts)
{
    std::ostringstream output;
    bool first = true;
    for (std::map<unsigned long long, unsigned long long>::const_iterator it = counts.begin(); it != counts.end(); ++it) {
        if (!first) {
            output << ", ";
        }
        first = false;
        output << it->first << ":" << it->second;
    }
    return output.str();
}

static std::string summarize_length_counts(const std::map<unsigned long, unsigned long long> &counts)
{
    std::ostringstream output;
    bool first = true;
    for (std::map<unsigned long, unsigned long long>::const_iterator it = counts.begin(); it != counts.end(); ++it) {
        if (!first) {
            output << ", ";
        }
        first = false;
        output << it->first << ":" << it->second;
    }
    return output.str();
}

static void print_observation_summary(const LocalBuffer &local_buffer, unsigned long long record_count)
{
    int widths[6] = {15, 25, 13,  26, 20};

    std::cout << "\nSummary\n";
    std::cout << "total_records\t" << record_count << "\n";
    std::cout
            << std::setfill(' ')
            << std::left
            << std::setw( widths[0]) << "field_name"
            << std::setw( widths[1]) << "mysql_type"
            << std::setw( widths[2]) << "is_unsigned"
            << std::setw( widths[3]) << "is_null_counts(valu:cnt) "
            << std::setw( widths[4]) << "buffer_length_counts(valu:cnt)"
            << '\n'
            ;


    for (std::size_t index = 0; index < local_buffer.fields.size(); ++index) {
        const LocalBufferField &field = local_buffer.fields[index];
        std::cout
            << std::setfill(' ')
            << std::left
            << std::setw( widths[0]) << field.field_name 
            << std::setw( widths[1]) << mysql_type_name(field.mysql_type) 
            << std::setw( widths[2]) << (field.is_unsigned ? "Y" : "N") 
            << std::setw( widths[3]) << summarize_counts(field.is_null_counts) 
            << std::setw( widths[4]) << summarize_length_counts(field.buffer_length_counts) 
            << '\n'
            ;
    }
}

static FILE *open_output_file(const std::string &path)
{
    FILE *output_file = std::fopen(path.c_str(), "wb");
    if (output_file == nullptr) {
        fail("Unable to open output file: " + path);
    }
    return output_file;
}

} // namespace

int main(int argc, char **argv)
{
    MYSQL *connection = nullptr;
    MYSQL_STMT *statement = nullptr;
    FILE *output_file = nullptr;

    try {
        const ProgramOptions options = parse_input_arguments(argc, argv);
        const MysqlConfig    mysqlConfig = load_mysql_cnf_file("/etc/my.cnf");

        connection = establish_table_connection(mysqlConfig, options.credentials_path, options.server_id );

        std::string sql;
        statement = prepare_select_statement(connection, options, sql);

        MetadataHandle metadata = query_table_structure(statement);
        LocalBuffer local_buffer = allocate_local_buffer(metadata.metadata);

        if (mysql_stmt_execute(statement) != 0) {
            fail("mysql_stmt_execute() failed: " + std::string(mysql_stmt_error(statement)));
        }

        bind_prepared_statement_results(statement, local_buffer);

        output_file = open_output_file(options.output_path);
        write_header_row(output_file, local_buffer);

        unsigned long long record_count = 0;
        while (fetch_next_record(statement, local_buffer)) {
            write_output_record(output_file, local_buffer);
            ++record_count;
        }

        std::fflush(output_file);
        print_observation_summary(local_buffer, record_count);

        std::fclose(output_file);
        output_file = nullptr;

        mysql_stmt_close(statement);
        statement = nullptr;

        mysql_close(connection);
        connection = nullptr;

        return EXIT_SUCCESS;
    } catch (const std::exception &error) {
        if (output_file != nullptr) {
            std::fclose(output_file);
        }
        if (statement != nullptr) {
            mysql_stmt_close(statement);
        }
        if (connection != nullptr) {
            mysql_close(connection);
        }

        std::fprintf(stderr, "ERROR: %s\n", error.what());
        return EXIT_FAILURE;
    }
}
