Adjusted Concatenate function leads to an error

Moderator: NorbertKrupa

Post Reply
dameyerdave
Newbie
Newbie
Posts: 1
Joined: Wed Aug 26, 2015 12:38 pm

Adjusted Concatenate function leads to an error

Post by dameyerdave » Wed Aug 26, 2015 12:47 pm

I adjusted the Concatenate function to aggregate values like group_concat from mysql. I'd like to get a sorted distinct list of all values in the group.

Here is my code:

Code: Select all

#include "Vertica.h"
#include <time.h> 
#include <sstream>
#include <iostream>
#include <list>

using namespace Vertica;
using namespace std;


/**
 * User Defined Aggregate Function concatenate that takes in strings and concatenates
 * them together. Right now, the max length of the resulting string is ten times the
 * maximum length of the input string.
 */
class Concatenate : public AggregateFunction
{

    virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs)
    {
        try {
			srvInterface.log("----------- Doing INIT_AGGREGATE -----------");
            VString &concat = aggs.getStringRef(0);
            concat.copy("");
			//srvInterface.log("concat1: %s", concat.data());
        } catch(exception& e) {
            // Standard exception. Quit.
            vt_report_error(0, "Exception while initializing intermediate aggregates: [%s]", e.what());
        }
    }

    void aggregate(ServerInterface &srvInterface, 
                   BlockReader &argReader, 
                   IntermediateAggs &aggs)
    {
        try {
			srvInterface.log("----------- Doing AGGREGATE -----------");
            VString &concat = aggs.getStringRef(0);
            string word = concat.str();
            //uint32 maxSize = aggs.getTypeMetaData().getColumnType(0).getStringLength();
			std::list<std::string> wordlist;
			std::list<std::string>::iterator it;
            do {
                const VString &input = argReader.getStringRef(0);

                if (!input.isNull()) {
                    //if ((word.length() + input.length()) +1 > maxSize) break;
					wordlist.push_back(input.str().c_str());
					srvInterface.log("push_back: %s", input.str().c_str());
                    //word.append(input.str());
					//word.append(";");
                }
            } while (argReader.next());
			wordlist.sort();
			wordlist.unique();
			for (it = wordlist.begin(); it != wordlist.end(); ++it) {
                //if (word.length() +5 > maxSize) break;
				word.append(*it);
				word.append(";");
			}
			//word = word.substr(0, word.size()-1);
			srvInterface.log("copy word: %s", word.c_str());
            concat.copy(word);
        } catch(exception& e) {
            // Standard exception. Quit.
            vt_report_error(0, "Exception while processing aggregate: [%s]", e.what());
        }
    }

    virtual void combine(ServerInterface &srvInterface, 
                         IntermediateAggs &aggs, 
                         MultipleIntermediateAggs &aggsOther)
    {
        try {
            //uint32 maxSize = aggs.getTypeMetaData().getColumnType(0).getStringLength();
            VString myConcat = aggs.getStringRef(0);

			srvInterface.log("----------- Doing COMBINE -----------");
            do {
                const VString otherConcat = aggsOther.getStringRef(0);
				srvInterface.log("combine %s \nwith   %s", myConcat.data(), otherConcat.data());
                //if ((myConcat.length() + otherConcat.length()) <= maxSize) {
                    string word = myConcat.str();
                    word.append(otherConcat.str());
                    myConcat.copy(word);
                //}
            } while (aggsOther.next());
        } catch(exception& e) {
            // Standard exception. Quit.
            vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
        }
    }

    virtual void terminate(ServerInterface &srvInterface, 
                           BlockWriter &resWriter, 
                           IntermediateAggs &aggs)
    {
        try {
			srvInterface.log("----------- Doing TERMINATE-----------");
            const VString &concat = aggs.getStringRef(0);
            VString &result = resWriter.getStringRef();

            result.copy(&concat);
        } catch(exception& e) {
            // Standard exception. Quit.
            vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what());
        }
    }

    InlineAggregate()
};


class ConcatenateFactory : public AggregateFunctionFactory
{
    virtual void getIntermediateTypes(ServerInterface &srvInterface, const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData)
    {
		srvInterface.log("----------- Doing getIntermediateTypes -----------");
        //int input_len = inputTypes.getColumnType(0).getStringLength();
        //intermediateTypeMetaData.addVarchar(input_len*10);
        intermediateTypeMetaData.addLongVarchar(100000);
    }

    virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType)
    {
		srvfloaterface.log("----------- Doing getPrototype -----------");
        argTypes.addLongVarchar();
        returnType.addLongVarchar();
    }

    virtual void getReturnType(ServerInterface &srvfloaterface, 
                               const SizedColumnTypes &inputTypes, 
                               SizedColumnTypes &outputTypes)
    {
		srvfloaterface.log("----------- Doing getReturnType -----------");
        //int input_len = inputTypes.getColumnType(0).getStringLength();
        //outputTypes.addVarchar(input_len*10);
        outputTypes.addLongVarchar(100000);
    }

    virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface)
    { return vt_createFuncObject<Concatenate>(srvfloaterface.allocator); }

};

RegisterFactory(ConcatenateFactory);
The SQL statement looks like

Code: Select all

CREATE OR REPLACE LIBRARY AggregateFunctionsConcatenate AS '/opt/vertica/sdk/examples/AggregateFunctions/Concatenate.so';
CREATE OR REPLACE AGGREGATE FUNCTION agg_group_concat AS LANGUAGE 'C++' NAME 'ConcatenateFactory' LIBRARY AggregateFunctionsConcatenate;

select 
        a.id AS id,
        MIN(a.f1) AS f1,
        MIN(a.f2) AS f2,
        agg_group_concat(b.code) AS code
    from
        a,b
    where
        a.id = b.id
    group by a.id;
Here is the error from Vertica:

Code: Select all

2015-08-26 13:35:23.189 EEcmdq:0x7faa5c51a170 [EE] <WARNING> Vertica Internal Error 12 'ecnt > 0'
2015-08-26 13:35:23.189 EEcmdq:0x7faa5c51a170 <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.203 EEcmdq:0x7faa5c151230 [EE] <WARNING> Vertica Internal Error 12 'ecnt > 0'
2015-08-26 13:35:23.203 EEcmdq:0x7faa5c151230 <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.257 EEcmdq:0x7faa5c3c8e20 [EE] <WARNING> Vertica Internal Error 12 'ecnt > 0'
2015-08-26 13:35:23.258 EEcmdq:0x7faa5c3c8e20 <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.273 EEcmdq:0x7faa5c260ce0 [EE] <WARNING> Vertica Internal Error 12 'ecnt > 0'
2015-08-26 13:35:23.274 EEcmdq:0x7faa5c260ce0 <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.336 Init Session:0x7faadc011550-a000000000a59a [TIMING] Dist: execution complete; used 2.607 seconds
2015-08-26 13:35:23.361 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.362 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.362 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.362 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.383 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.383 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.383 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588
2015-08-26 13:35:23.403 Init Session:0x7faadc011550-a000000000a59a [TIMING] Dist: finished; used 0.067 seconds
2015-08-26 13:35:23.405 Init Session:0x7faadc011550-a000000000a59a <INTERNAL> @v_usz_node0001: VX001/3591: Internal EE Error (11)
	DETAIL:  ecnt > 0
	HINT:  Please report this error to Vertica; try restating your query
	LOCATION:  compressWriteBlock, /scratch_a/release/16125/vbuild/vertica/EE/EEUtil/SortQueue.cpp:588


I don't know what this error wants to tell me. Could someone please assist. - Thank you

Post Reply

Return to “Vertica User Defined Functions (UDFs)”