CStageManager class

Pipelines v1.6

 

Member variables, Member functions

Home

 

Description

 

The CStageManager() class provides access to a range of stage resources, including; parsing, extracting and validating your stage command argument, issuing error and warning messages on behalf of your stage, reading and writing input and output stream records, testing stream connectivity and other general purpose support functions. The StageManager is at the core of Pipelines and it serves as a central class which provides an interface to essential stage services that allow you to control how your stage operates in a multi-stream pipeline configuration.

 

Definition

 

The following class definition is an edited version; it details only those members which are accessible and of use to you. In addition; any examples that may be shown are only intended as a demonstration of how certain functions might be used, they do not represent best practice nor do they take into account the scope and de-allocation of class objects.

 

// Definitions for interface to stage DLLs.
typedef DWORD ( *BEGIN_STAGE )( CStageInitInfo *a_pManager );
#define BEGIN_STAGE_FCN "BeginStage"
 
class CStageManager 
{
    public:
 
    // Pipeline, Stage and Stream functions.
    CStage *GetStage( CProcess &a_Process );
    bool ReadRecord( CProcess &a_Process, DWORD a_dwInStream, CString *a_pBuffer );
    bool ReadAnyRecord( CProcess &a_Process, CString *a_pBuffer );
    bool PeekRecord( CProcess &a_Process, DWORD a_dwInStream, CString *a_pBuffer );
    int PeekAnyRecord( CProcess &a_Process, CString *a_pBuffer );
    bool WriteRecord( CProcess &a_Process, DWORD a_dwOutStream, CString *a_pszRecord );
    bool ConsumeRecord( CProcess &a_Process, DWORD a_dwInStream );
    void StageMessage( CProcess &a_Process, DWORD a_dwType, const char *a_pMessage, ... );
    bool StageMessage( CProcess &a_Process, DWORD a_dwMsgNumber, DWORD a_dwType, ... );
    bool RuntimeError( void ) const;
    double GetProcId( void ) const;
    bool PreProcess( CProcess &a_pProcess, const char **a_pszArgument );
    void SeverOutStream( CProcess &a_Process, DWORD a_dwStream );
    void SeverInStream( CProcess &a_Process, DWORD a_dwStream );
    bool SetStatus( CStageInitInfo *a_pInfo );
 
    // Parsing functions.
    bool ParseForToken( CToken *a_pToken );
    bool ParseForIntegerString( CIntegerString *a_pIntString );
    bool ConvertToInteger( CIntegerString *a_pIntString );
    bool ParseForCharacterString( CCharacterString *a_pCharString );
    bool ParseForRegExp( CProcess &a_Process, CRegExpression *a_pRegExp,
                         bool a_bCaseSensitive = true );
    bool ParseForIntegerRange( CIntegerRange *a_pIntRange );
    bool ParseForCharacter( CCharacterString *a_pCharString );
    bool ParseForCharacterRange( CCharacterRange *a_pCharRange );
    
    // Runtime extraction routines.
    void ExtractColumnRange( CExtractRange *a_pRange );
    void ExtractWordRange( CExtractRange *a_pRange );
    void ExtractFieldRange( CExtractRange *a_pRange );
 
    bool RegExpMatch( CRegExpression *a_pRegExp, CString &a_szSource );
    bool PatternMatch( const char *a_pszPattern, const char *a_pszSource ); 
};
 

Member variables

 
None.
 

Member functions

 

 
CStage *CStageManager::GetStage(
    _in                          CProcess &a_Process
);
 

 

    

Purpose

Use GetStage() to get a pointer to your CStage.

 

With this pointer; you can access your stage arguments, determine your position in the pipeline, set and test your input and output stream connections and more.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

    

Returns

 

 

CStage *

A pointer to your CStage class object.

 

    

Messages

None

 

    

Usage

To get a pointer to your CStage object:

 

DWORD CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    ...
 
    return( _RC_SUCCESS_ );
}

 

 
bool CStageManager::ReadRecord(
    _in                         CProcess &a_Process,
    _in                         DWORD a_dwInStream,
    _inout                      CString *a_pszRecord
);
 

 

    

Purpose

Use ReadRecord() to read a record from the specified input stream a_dwInStream.

 

ReadRecord() issues a read request against the specified input stream a_dwInStream. The function waits for a record to become available; effectively suspending your stage until the read can be satisfied. The length of time that your stage remains suspended cannot be determined. Once a record has become available; the function stores that record in the CString buffer a_pszRecord, removes the record from the input stream (releasing the stage connected to the write-end of the stream, from its suspended state) and returns control to your stage.

 

A stage which calls ReadRecord(); delays the records.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

DWORD a_dwInStream

[in] The number of the input stream to read from.

 

CString *a_pszRecord

[in,out] The address of a buffer in which to store the input record.

 

    

Returns

 

 

bool

true if the read request was successful, otherwise; false which indicates that the input stream is at end-of-file.

 

    

Messages

None

 

    

Usage

To read a record from your primary input stream:

 

DWORD CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
 
    // Record buffer.
    CString szRecord;
 
    while( true )
    {
        // Read a record from my primary input stream.
        if( !pManager->ReadRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary input stream.
            break;
        }
 
        //
        // TODO: operate on the record here.
        //
 
        ...
 
        // Write the record to my primary output stream.
        if( !pManager->WriteRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary output stream.
            break;
        }
    }
 
    return( _RC_SUCCESS_ );
}

 

 
bool CStageManager::ReadAnyRecord(
    _in                            CProcess &a_Process,
    _inout                         CString *a_pszRecord
);
 

 

    

Purpose

Use ReadAnyRecord() to read a record from any connected input stream.

 

ReadAnyRecord() issues a read request against all your input streams. The function waits for a record to become available on any input stream; effectively suspending your stage until the read can be satisfied. The length of time that your stage remains suspended cannot be determined. Once a record has become available; the function stores that record in the CString buffer a_pszRecord, removes the record from the input stream (releasing the stage connected to the write-end of the stream, from its suspended state) and returns control to your stage.

 

A stage which calls ReadAnyRecord(); delays the records.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

CString *a_pszRecord

[in,out] The address of a buffer in which to store the input record.

 

     

Returns

 

 

bool

true if the read request was successful, otherwise; false which indicates that all input streams are at end-of-file.

 

    

Messages

None

 

    

Usage

To read a record from any of your input streams:

 

DWORD CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
 
    // Record buffer.
    CString szRecord;
 
    while( true )
    {
        // Read a record from any of my input streams.
        int nStream = pManager->ReadAnyRecord( *this, &szRecord );
 
        // Test for end-of-file.
        if( nStream < 0 )
        {
            // This is the end-of-file on all my input streams.
            break;
        }
 
        //
        // TODO: operate on the record here.
        //
 
        ...
 
        // Write the record to my primary output stream.
        if( !pManager->WriteRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary output stream.
            break;
        }
    }
 
    return( _RC_SUCCESS_ );
}

 

 
bool CStageManager::PeekRecord(
    _in                         CProcess &a_Process,
    _in                         DWORD a_dwInStream,
    _inout                      CString *a_pszRecord
);
 

 

    

Purpose

Use PeekRecord() to peek at a record on the specified input stream a_dwInStream.

 

PeekRecord() issues a read request against the specified input stream a_dwInStream. The function waits for a record to become available; effectively suspending your stage until the read can be satisfied. The length of time that your stage remains suspended cannot be determined. Once a record has become available; the function stores that record in the CString buffer a_pszRecord, but does not remove the record from the input stream (leaving the stage connected to the write-end of the stream, in its suspended state) and returns control to your stage.

 

A stage which calls PeekRecord(), then WriteRecord(), then ConsumeRecord(), in that order; does not delay the records.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

DWORD a_dwInStream

[in] The number of the input stream to read from.

 

CString *a_pszRecord

[in,out] The address of a buffer in which to store the input record.

 

    

Returns

 

 

bool

true if the read request was successful, otherwise; false which indicates that the input stream is at end-of-file.

 

    

Messages

None

 

    

Usage

To peek at a record on your primary input stream:

 

CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
 
    // Record buffer.
    CString szRecord;
 
    while( true )
    {
        // Peek at a record on my primary input stream.
        if( !pManager->PeekRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary input stream.
            break;
        }
 
        //
        // TODO: operate on the record here.
        //
 
        ...
 
        // Write the record to my primary output stream.
        if( !pManager->WriteRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary output stream.
            break;
        }
 
        // Release the stage that I read from.
        pManager->ConsumeRecord( *this, 0 );
    }
 
    return( _RC_SUCCESS_ );
}

 

 
int CStageManager::PeekAnyRecord(
    _in                           CProcess &a_Process,
    _inout                        CString *a_pszRecord
);
 

 

    

Purpose

Use PeekAnyRecord() to peek at a record on any connected input stream.

 

PeekAnyRecord() issues a read request against all your input streams. The function waits for a record to become available on any input stream; effectively suspending your stage until the read can be satisfied. The length of time that your stage remains suspended cannot be determined. Once a record has become available; the function stores that record in the CString buffer a_pszRecord, but does not remove the record from the input stream (leaving the stage connected to the write-end of the stream, in its suspended state) and returns control to your stage.

 

A stage which calls PeekAnyRecord(), then WriteRecord(), then ConsumeRecord(), in that order; does not delay the records.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

CString *a_pszRecord

[in,out] The address of a buffer in which to store the input record.

 

    

Returns

 

 

int

The zero-based index number of the stream which satisfied the read request, otherwise; a value less than zero, which indicates that all input streams are at end-of-file.

 

    

Messages

None

 

    

Usage

To peek at a record from any of your input streams:

 

CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
 
    // Record buffer.
    CString szRecord;
 
    while( true )
    {
        // Peek at a record on any of my input streams.
        int nStream = pManager->PeekAnyRecord( *this, &szRecord );
        
        // Test for end-of-file.
        if( nStream < 0 )
        {
            // This is the end-of-file on all my input streams.
            break;
        }
 
        //
        // TODO: operate on the record here.
        //
 
        ...
 
        // Write the record to my primary output stream.
        if( !pManager->WriteRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary output stream.
            break;
        }
 
        // Release the stage that I read from.
        pManager->ConsumeRecord( *this, nStream );
    }
 
    return( _RC_SUCCESS_ );
}

 

 
bool CStageManager::WriteRecord(
    _in                          CProcess &a_Process,
    _in                          DWORD a_dwOutStream,
    _in                          CString *a_pszRecord
);
 

 

    

Purpose

Use WriteRecord() to write a record to the specified output stream a_dwOutStream.

 

WriteRecord() writes the record from the CString buffer a_pszRecord to the specified output stream a_dwOutStream. The function suspends your stage until the target stage connected to output stream a_dwOutStream has consumed the record. That is to say; your stage remains suspended pending service by the StageManager until it is released by a call on the read-end of the stream, through any of the functions; ReadRecord(), ReadAnyRecord() or ConsumeRecord(). The length of time that your stage remains suspended cannot be determined. Once the record has been consumed; the function releases your stage from its suspended state and returns control to your stage.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

DWORD a_dwOutStream

[in] The number of the output stream to write to.

 

CString *a_pszRecord

[in] The address of a buffer which contains the output record.

 

    

Returns

 

 

bool

true if the write request was successful, otherwise; false which indicates that the output stream is at end-of-file.

 

    

Messages

None

 

    

Usage

To write a record to your primary output stream:

 

CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
 
    // Record buffer.
    CString szRecord;
 
    while( true )
    {
        // Read a record from my primary input stream.
        if( !pManager->ReadRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary input stream.
            break;
        }
 
        //
        // TODO: operate on the record here.
        //
 
        ...
 
        // Write the record to my primary output stream.
        if( !pManager->WriteRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary output stream.
            break;
        }
    }
 
    return( _RC_SUCCESS_ );
}

 

 
bool CStageManager::ConsumeRecord(
    _in                            CProcess &a_Process,
    _in                            DWORD a_dwInStream
);
 

 

    

Purpose

Use ConsumeRecord() to remove a record from the specified input stream a_dwInStream.

 

ConsumeRecord() removes a record from the specified input stream a_dwInStream (releasing the stage connected to the write-end of the stream, from its suspended state) and returns control to your stage. There must be a record in the input stream for ConsumeRecord() to remove; you must first have made a call to either the PeekRecord() or PeekAnyRecord() function before making a call to ConsumeRecord(). Both peek functions issue a read record request but they do not automatically consume the record once the read is complete (leaving the stage connected to the write-end of the stream, in its suspended state). ConsumeRecord() allows you to remove an input stream record as and when you require.

 

A stage which calls PeekRecord() or PeekAnyRecord(), then WriteRecord(), then ConsumeRecord(), in that order; does not delay the records.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

DWORD a_dwInStream

[in] The number of the input stream to consume a record from.

 

    

Returns

 

 

bool

true if the consume was successful, otherwise; false which indicates that the input stream is at end-of-file.

 

    

Messages

None

 

    

Usage

To create a stage which does not delay the records; use the following construct:

 

CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
 
    // Record buffer.
    CString szRecord;
 
    while( true )
    {
        // Peek at a record on my primary input stream.
        if( !pManager->PeekRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary input stream.
            break;
        }
 
        //
        // TODO: operate on the record here.
        //
 
        ...
 
        // Write the record to my primary output stream.
        if( !pManager->WriteRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary output stream.
            break;
        }
 
        // Release the stage that I read from.
        pManager->ConsumeRecord( *this, 0 );
    }
 
    return( _RC_SUCCESS_ );
}

 

 
void CStageManager::StageMessage(
    _in                           CProcess &a_Process,
    _in                           DWORD a_dwType,
    _in_opt                       const char *a_pMessage,
    _in_opt                       ...
);
 

 

    

Purpose

Use StageMessage() to write a synchronised stage message to the console.

 

In a multi-stage, multi-pipeline configuration; at any one time there may be more than one stage writing messages to the console at the same time, and their output must be managed in order to prevent output overlap. CApplMessage() provides this mechanism, by locking the console for output; it ensures that messages from multiple output sources are written uninterrupted and in the correct sequence. You should use this version of StageMessage() when you want to write a free-form message to the console.

 

There are two flavours of StageMessage(); this version, and the canned message version; which follows, and both allow you to write a warning or error message to the console.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

DWORD a_dwType

[in] The type of message to issue; this can be a warning message or error message.

 

The following values define the type of message to issue:

 

#define _WARNING_    1

#define _ERROR_      2

 

const char *a_pMessage

[in,opt] A constant character pointer to the message to display.

 

. . .

[in,opt] The function arguments.

 

    

Returns

 

void

 

    

Messages

None

 

    

Usage

To issue a free-form message; simply specify the message as an operand on the StageMessage function:

 

bool CMyStage::Initialise( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    pStage->MinInStreamsRequired( 1 );
    pStage->MaxInStreamsAllowed( 2 );
    pStage->MinOutStreamsRequired( 1 );
    pStage->MaxOutStreamsAllowed( 1 );
 
    // Point to my command argument.
    const char *pszArgument = pStage->Argument();
 
    // My stage allows up to two input streams to be connected, however, if there
    // is only one input stream connected; then warn the user.
 
    CInStream *pInStream = pStage->GetInStream( 1 );
    if( pInStream && pInStream->IsDisConnected() )
    {
        // My secondary input stream is not connected!
        pManager->StageMessage( *this, _WARNING_, “There is only one input stream connected!” );
        ...
    }
 
    ...
 
    return( true );
}

 

 
bool CStageManager::StageMessage(
    _in                           CProcess &a_Process,
    _in                           DWORD a_dwMsgNumber,
    _in                           DWORD a_dwType,
    _in_opt                       ...
);
 

 

    

Purpose

Use StageMessage() to write a synchronised stage message to the console.

 

In a multi-stage, multi-pipeline configuration; at any one time there may be more than one stage writing messages to the console at the same time, and their output must be managed in order to prevent output overlap. CApplMessage() provides this mechanism, by locking the console for output; it ensures that messages from multiple output sources are written uninterrupted and in the correct sequence. You should use this version of StageMessage() when you want to write a canned message to the console.

 

There are two flavours of StageMessage(); this version, and the free-form message version; which precedes, and both allow you to write a warning or error message to the console.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

DWORD a_dwMsgNumber

[in]  The canned stage message number of the message template that you want to issue.

 

DWORD a_dwType

[in] The type of message to issue; this can be a warning message or error message.

 

The following values define the type of message to issue:

 

#define _WARNING_    1

#define _ERROR_      2

 

. . .

[in,opt] The function arguments.

 

    

Returns

 

 

bool

true if the message index is in the range, otherwise; false which indicates that the message index is not in the range.

 

    

Messages

None

 

    

Usage

To issue a specific canned stage message; simply specify the canned message number as an operand on the StageMessage function:

 

bool CMyStage::Initialise( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    pStage->MinInStreamsRequired( 1 );
    pStage->MaxInStreamsAllowed( 2 );
    pStage->MinOutStreamsRequired( 1 );
    pStage->MaxOutStreamsAllowed( 1 );
 
    // Point to my command argument.
    const char *pszArgument = pStage->Argument();
 
    // My stage does not require a command argument; if there is one,
    // then it is unexpected.
 
    if( *pszArgument )
    {
        // Tell the user the type of argument that I expect; issue canned
        // stage message number 33.
        pManager->StageMessage( *this, 33, _ERROR_, pszArgument );
        return( false );
    }
 
    return( true );
}

 

 
bool CStageManager::RuntimeError( void ) const;
 

 

    

Purpose

Use RuntimeError() to determine if the StageManager has issued a runtime quiesce command.

 

When any of the stage’s in a pipeline report a runtime error (by returning a value of _RC_FAIL_ from their required stage DLL interface function Go() ) the StageManager begins terminating the pipeline by first setting the runtime quiesce flag, which indicates that the pipeline is quiescent. Next the StageManager disconnects the input and output stream connections for your stage; which will cause the next input read or output write request to fail, allowing your stage to terminate, as if at end-of-file. However, Pipelines is not pre-emptive; the StageManager must first wait for all active stages in the pipeline to request one of its stream services in order to detect that the pipeline is terminating. Your stage may not request i/o on a regular basis; it may for example, sort its input records, which may consist of millions or more entries; where your stage may be pre-occupied for a considerable amount of time and will not be reading or writing any input or output records. The RuntimeError() function allows you to check if the quiesce command has been issued by the StageManager, as an when you need to; so that your stage does not cause the pipeline to ‘seemingly’ hang while your stage needlessly continues with its runtime phase; in a pipeline that is waiting to terminate.

 

    

Parameters

 

 

void

 

    

Returns

 

 

bool

true if the StageManager has issued a runtime quiesce, otherwise; false which indicates that your stage can safely continue with its runtime phase.

 

    

Messages

None

 

    

Usage

The following example shows an edited excerpt from the CSort stage commands’ multiple-key-field recursive sort mechanism. The RuntimeError() function ensures that the RecurseCompare() function un-winds once a runtime error has occurred:

 

bool CSort::RecurseCompare( const CString *l,
                            const CString *r,
                            std::vector< m_Position >::iterator iPos )
{
    // Check if the pipeline is quiescing.
    if( m_pManager->RuntimeError() )
        return( false );
 
    // iPos points to the current sort range.
    // So, extract/isolate the portion defined by iPos and validate the comparison.
 
    m_Position *pPos = &( *iPos );
 
    DWORD dwFrom = pPos->m_dwFrom - 1;
    DWORD dwFor = pPos->m_dwTo - dwFrom;
 
    CString szlMid = l->Mid( dwFrom, dwFor );
    CString szrMid = r->Mid( dwFrom, dwFor );
 
    ...
 
    // The comparison moves down to the next level.
    if( szlMid == szrMid )
    {
        if( ( iPos + 1 ) != m_vPosition.end() )
            return( RecurseCompare( l, r, iPos + 1 ) );
    }
 
    if( pPos->m_bAscending )
        return( szlMid < szrMid ); 
 
    return( szlMid > szrMid ); 
}

 

 
double CStageManager::GetProcId( void ) const;
 

 

    

Purpose

Use GetProcId() to get the Process Identification number of the StageManager.

 

Many of the builtin stages’ that are provided with Pipelines use this number when creating uniquely identifiable names for a range of objects, including; events, mutexes and work files. By using the Process Identification number of the StageManager as part of a name for a global resource, it may help when debugging your new stage. As multiple instances of Pipelines may execute concurrently (and that might mean multiple instances of identical pipelines, with multiple instances of your new stage); creating events, mutexes and work filenames with this number in combination with your pipeline and stage number can make identifying which resource belongs to which pipeline and stage, a great deal easier.

 

    

Parameters

 

void

 

    

Returns

 

 

double

[in] The Process Identification number of the StageManager which loaded and dispatched your stage.

 

    

Messages

None

 

    

Usage

To create a work filename which uniquely identifies your stage as the owner; you might use the following approach:

 

bool CMyStage::Initialise( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    ...
 
    // Create a unique filename.
    CString szTmpFile;
    szTmpFile.Format( "TEMP-%.5d%.3d%.3d.txt", ( DWORD )pManager->GetProcId(), Pipeline(), Stage() );
 
    ...
 
    return( true );
}

 

 
bool CStageManager::PreProcess(
    _in                         CProcess &a_Process,
    _inout                      const char **a_pszArgument
);
 

 

    

Purpose

Use PreProcess() to extract the stage command arguments specified in the CASEI() and ZONE() pre-process functions.

 

There are a number of builtin Pipelines stage’s which operate as simple selection filters; writing records to their primary and secondary output streams, depending on the content of an input record. Consider the FROMLABEL stage for example. By default, all of these filter type stage’s check for the existence/or not of the specified text starting in the first column of an input record. However, the PreProcess() function extends this basic selection capability; instructing the manager to extract a substring of the record, determined by the column, word or field range, and optionally; to translate both the stage operands and the content of the input records to uppercase, and present this altered record to the stage when a read-record is requested. The function does not alter the way in which a stage processes its input and output records, it only alters the format of the record which is presented to the stage. When the stage performs a subsequent write-record request, the manager writes the original unmodified input record to the specified output stream.

 

PreProcess() ensures that the stage command argument syntax for a position or range on which to operate is identical for all filter type stages.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

const char **a_pszArgument

[in,out] The address of a pointer to your stage command argument.

 

    

Returns

 

 

bool

true if the stage command argument is properly formed, otherwise; false which indicates a stage command parsing error.

 

    

Messages

None

 

    

Usage

To enable CASEI() or ZONE() pre-processing:

 

bool CMyStage::Initialise( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    ...
 
    // Point to my command argument.
    const char *pszArgument = pStage->Argument();
 
    // Perform any preprocessing required.
    if( !pManager->PreProcess( *this, &pszArgument ) )
        return( false );
 
    // My stage requires a delimted string.
 
    CString szString;
    CCharacterString *pCharString = new CCharacterString( *this );
 
    pCharString->m_bRequired = true;
    pCharString->m_pszSource = &pszArgument;
    pCharString->m_pszTarget = &szString;
 
    // Extract the string.
    if( !pManager->ParseForCharacterString( pCharString ) )
        return( false );
 
    ...
 
    return( true );
}

 

Excerpt

 

The following example shows how the ZONE() pre-process function greatly extends the selection capability of mystage. By using the PreProcess() function; mystage is able to restrict the selection of records to those that contain the phrase text to find in the third-from-last (+) delimited word of each input record.

 

pipe < myfile.txt
    ...
    | mystage zone( ws ‘+’ w-3 ) /text to find/
    | > textfound.txt

 

 
void CStageManager::SeverOutStream(
    _in                             CProcess &a_Process,
    _in                             DWORD a_dwOutStream
);
 

 

    

Purpose

Use SeverOutStream() to disconnect the specified output stream a_dwOutStream.

 

SeverOutStream() disconnects both the specified output stream a_dwOutStream and the corresponding input stream of the stage connected to the read-end of the stream. If it is suspended; the stage connected to the read-end of the stream is released from its suspended state and the stage regains control immediately. Any subsequent read request made on the read-end of the stream will return an end-of-file condition.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object).

 

DWORD a_dwOutStream

[in] The number of the output stream to sever.

 

    

Returns

 

void

 

    

Messages

None

 

    

Usage

To disconnect an output stream:

 

DWORD CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    ...
 
    // My stage writes only the first 100 input stream records to its
    // primary output stream. The stage then disconnects its primary 
    // output stream and begins writing input stream records to its
    // secondary output stream.
 
    // The target output stream is initially the primary output stream. 
    DWORD dwOutStream = 0;
 
    // Record buffer.
    CString szRecord;
 
    // Accumulate the number of input records read. 
    DWORD dwRecCount = 0;
 
    while( true )
    {
        // Read a record from my primary input stream.
        if( !pManager->ReadRecord( *this, 0, &szRecord ) ) 
        {
            // This is the end-of-file on my primary input stream.
            break;
        }
 
        if( dwRecCount == 100 )
        {
            pManager->SeverOutStream( 0 );
            // Switch to my secondary output stream.
            dwOutStream = 1;
        }
 
        // Write the record to the specified output stream.
        if( !pManager->WriteRecord( *this, dwOutStream, &szRecord ) )
        {
            // This is the end-of-file on this output stream.
            break;
        }
 
        ++dwRecCount;
    }
 
    return( _RC_SUCCESS_ );
}

 

 
void CStageManager::SeverInStream(
    _in                            CProcess &a_Process,
    _in                            DWORD a_dwInStream
);
 

 

    

Purpose

Use SeverInStream() to disconnect the specified input stream a_dwInStream.

 

SeverInStream() disconnects both the specified input stream a_dwInStream and the corresponding output stream of the stage connected to the write-end of the stream. If it is suspended; the stage connected to the write-end of the stream is released from its suspended state and the stage regains control immediately. Any subsequent write request made on the write-end of the stream will return an end-of-file condition.

 

Where you place the SeverInStream() function in your stage is extremely important; it can have a significant effect on how a pipeline behaves. Your stage must be able to anticipate the effect it may have when it disconnects an input stream and forces an end-of-file to propagate backward through a multi-stream pipeline.

 

    

Parameters

 

 

CProcess &a_Process

[in] Your*this pointer (a CProcess base-class object)

 

DWORD a_dwInStream

[in] The number of the input stream to sever.

 

    

Returns

 

void

 

    

Messages

None

 

    

Usage

To disconnect an input stream:

 

DWORD CMyStage::Go( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    ...
 
    // My stage reads only the first 100 input stream records from its
    // primary input stream. The stage then disconnects its primary 
    // input stream and begins reading records from its secondary input
    // stream and writes them to its primary output stream.
 
    // The source input stream is initially the primary input stream.
    DWORD dwInStream = 0;
 
    // Record buffer.
    CString szRecord;
 
    // Accumulate the number of input records read.
    DWORD dwRecCount = 0;
 
    while( true )
    {
        // Read a record from the specified input stream.
        if( !pManager->ReadRecord( *this, dwInStream, &szRecord ) )
        {
            // This is the end-of-file on this input stream.
            break;
        }
 
        if( dwRecCount == 100 )
        {
            pManager->SeverInStream( 0 );
            // Switch to my secondary input stream.
            dwInStream = 1;
        }
 
        // Write the record to my primary output stream.
        if( !pManager->WriteRecord( *this, 0, &szRecord ) )
        {
            // This is the end-of-file on my primary output stream.
            break;
        }
 
        ++dwRecCount;
    }
 
    return( _RC_SUCCESS_ );
}

 

 
bool CStageManager::ParseForToken(
    _inout                         CToken *a_pToken
);
 

 

    

Purpose

Use ParseForToken() to extract the next token or delimited phrase from your stage command argument.

 

ParseForToken() uses the CToken class object to extract a token or delimited phrase from your stage command argument. The ParseForToken() function, which can skip leading and trailing delimiters; extracts the token or phrase and stores it at the address specified in the CToken member m_pszTarget.

 

    

Parameters

 

 

CToken *a_pToken

[in,out] A pointer to a CToken class object; which specifies the leading and trailing extraction delimiters and whether or not they can be skipped.

 

    

Returns

 

 

bool

true if the function successfully extracts a token or delimited phrase, otherwise; false which indicates an invalid or missing; required stage command argument.

 

    

Messages

None

 

    

Usage

The following example shows an edited excerpt from the CBetween stage class commands’ Initialise() function. The portion of code shown; illustrates how the ParseForToken() function is used to extract a stage command keyword.

 

bool CBetween::Initialise( void )
{

    // Access the manager.

    CStageManager *pManager = Manager();

    // Access my stage.

    CStage *pStage = pManager->GetStage( *this );

 
    ...
 
    // Access my argument.
    const char *pszArgument = pStage->Argument();
 
    CString szToken;
    CToken *pToken = new CToken( *this );
    CCharacterString *pCharacterString = new CCharacterString( *this ); 
    
    // Extract the first token and check if it is a keyword.
    pToken->m_pszSource = &pszArgument;
    pToken->m_pszTarget = &szToken;
    if( !pManager->ParseForToken( pToken ) )
    {
        pManager->StageMessage( *this, 19, _ERROR_ );
            return( false );
    }
 
    ...
 
    // Extract the 'from' string.
    pCharacterString->m_pszSource = &pszArgument;
    pCharacterString->m_pszTarget = &m_szFrom;
    pCharacterString->m_bRequired = true;
    if( m_dwFromType == m_eString )
        pCharacterString->m_bNull = true;
    else
    pCharacterString->m_bNull = false;
 
    if( !pManager->ParseForCharacterString( pCharacterString ) )
        return( false );
 
    ...
 
    // There must be more argument.
    if( !pManager->ParseForToken( m_pToken ))
    {
        pManager->StageMessage( *this, 19, _ERROR_ );
        return( false );
    }
 
    ...
}

 

 
bool CStageManager::ParseForIntegerString(
    _inout                                 CIntegerString *a_pIntString
);
 

 

    

Purpose

Use ParseForIntegerString() to extract an integer string from your stage command argument.

 

ParseForIntegerString() uses the CIntegerString class object to extract an integer string from your stage command argument. The ParseForIntegerString() function, which can skip leading and trailing delimiters, extracts and validates the format of the string and stores it at the address specified in the CIntegerString member m_pszTarget.

    

Parameters

 

 

CIntegerString *a_pIntString

[in,out] A pointer to a CIntegerString class object; which may specify a minimum and maximum allowed integer value, whether the value is a required or optional stage command argument and whether leading and trailing delimiters can be skipped or not.

 

    

Returns

 

 

bool

true if the function successfully extracts an integer string, otherwise; false which indicates an invalid or missing; required stage command argument. When the function returns; it sets the CIntegerString member m_nReturnCode to one of the following values:

 

m_nReturnCode

Meaning

 

 

_RC_NOT_FOUND_

The integer string is missing from the stage command argument.

_RC_NOT_NUMERIC_

The integer string contains non-numeric characters.

_RC_SUCCESS_

The function was successful.

 

    

Messages

When the CIntegerString member m_bRequired is set to true and the argument is missing or invalid; ParseForIntegerString() will issue one of the following stage messages: 9, 20, 21, 60, 61.

 

    

Usage

The following example shows an edited excerpt from the CDuplicate stage class commands’ Initialise() function. The portion of code shown; illustrates how the ParseForIntegerString() function is used to extract an integer string from your stage command argument.

 

bool CDuplicate::Initialise( void )
{

    // Access the manager.

    CStageManager *pManager = Manager();

    // Access my stage.

    CStage *pStage = pManager->GetStage( *this );

 
    ...
 
    // Access my argument.
    const char *pszArgument = pStage->Argument();
 
    // Set defaults.
    m_lCopies = 1;
    
    ...
 
    if( *pszArgument )
    {
        CString szToken;
        CToken *pToken = new CToken( *this );
        CIntegerString *pIntegerString = new CIntegerString( *this ); 
 
        pIntegerString->m_pszSource = &pszArgument;
        pIntegerString->m_pszTarget = &szToken;
        pIntegerString->m_dwMinUnsignedValue = 0;
        pIntegerString->m_dwMaxUnsignedValue = _MAX_INT_ - 1;
 
        // Try to extract an integer string.
        if( pManager->ParseForIntegerString( pIntegerString ) )
        {
            pIntegerString->m_bRequired = true;
            if( !pManager->ConvertToInteger( pIntegerString ) )
               return( false );
 
            m_lCopies = pIntegerString->m_lResult;
        }
        else
        {
            ...
        }
 
        // There should be no other arguments.
        if( *pszArgument )
        {
            pManager->StageMessage( *this, 33, _ERROR_, pszArgument );
            return( false );
        }
    }
 
    ...
 
    return( true );
}

 

 
bool CStageManager::ParseForCharacterString(
    _inout                                   CCharacterString *a_pCharString
);
 

 

    

Purpose

Use ParseForCharacterString() to extract a delimited literal character string, hexadecimal string or binary string from your stage command argument.

 

ParseForCharacterString() uses the CCharacterString class object to extract the string from your stage command argument. The ParseForCharacterString() function, which can skip leading and trailing delimiters; extracts the string, converts it to an ASCII character value and stores the result at the address specified in the CCharacterString member m_pszTarget.

 

    

Parameters

 

 

CCharacterString*a_pCharString

[in,out] A pointer to a CCharacterString class object; which specifies the leading and trailing extraction delimiters and whether or not they can be skipped.

 

    

Returns

 

 

bool

true if the function successfully extracts a character string; in which case the function sets the CCharacterString member m_nReturnCode to a value of _RC_SUCCESS_ and the member m_nType to one of the following values:

 

m_nType

 

 

 

_STRING_TYPE_BINARY_

The string was specified as a binary character string.

_STRING_TYPE_CHARACTER_

The string was specified as a delimited ASCII character string.

_STRING_TYPE_HEXADECIMAL_

The string was specified as a hexadecimal character string.

 

Otherwise the function returns a value of false; which indicates an invalid or missing, required stage command argument. In this case the function sets the CCharacterString member m_nReturnCode to one of the following values:

 

m_nReturnCode

 

 

 

_RC_NOT_FOUND_

The string is missing from the stage command argument.

_RC_INVALID_FORMAT_

The string is not properly formed.

_RC_INVALID_HEX_CHAR_

The string contains an invalid hexadecimal character.

_RC_INVALID_LENGTH_

The string is specified as a binary or hexadecimal string and the number of characters is not a multiple of 2 or 8 characters in length, respectively. Or the length of the string is invalid; it must resolve to an ASCII character string that is at least as long as the minimum required length m_dwMinLength, and no longer than the maximum allowed length m_dwMaxLength.

_RC_INVALID_BIN_CHAR_

The string contains an invalid binary character.

_RC_NULL_STRING_

The CCharacterString member m_bNull is set to false; however, the string is of zero length.

_RC_INVALID_RANGE_

The string contains a character which resolves to a value of zero; the string must contain character definitions that are in the ASCII character range 1 – 255 (decimal).

_RC_SUCCESS_

The function was successful.

 

    

Messages

When the CCharacterString member m_bRequired is set to true and the argument is missing or invalid; ParseForCharacterString() will issue one of the following stage messages: 12, 15, 17, 19, 22, 23, 37, 38, 39, 40, 43, 50.

 

    

Usage

The following example shows an edited excerpt from the CLiteral stage class commands’ Initialise() function. The portion of code shown; illustrates how the ParseForCharacterString() function is used to extract a delimited character string, hexadecimal string or binary string from your stage command argument.

 

bool CLiteral::Initialise( void )
{
    // Access the manager.
    CStageManager *pManager = Manager();
    // Access my stage.
    CStage *pStage = pManager->GetStage( *this );
 
    ...
 
    // Access my argument.
    const char *pszArgument = m_pStage->Argument();
 
    // Set the default.
    CString szLiteral = "";
 
    if( *pszArgument )
    {
        ...
 
        if( *pszArgument )
        {
            CCharacterString *pCharacterString = new CCharacterString( *this ); 
 
            pCharacterString->m_bRequired = false;
            pCharacterString->m_pszSource = &pszArgument;
            pCharacterString->m_pszTarget = &szLiteral;
            pCharacterString->m_bNull = true;
 
            // If we cannot extract a character string, then the argument must be
            // a character range.
 
            if( !pManager->ParseForCharacterString( pCharacterString ) )
            {
                CCharacterRange *pCharacterRange = new CCharacterRange( *this );
 
                pCharacterRange->m_pszSource = &pszArgument;
                pCharacterRange->m_bRequired = true;
                pCharacterRange->m_bSkipTrailing = true;
                // Extract an expanded character range.
                pCharacterRange->m_bExpand = true;
                pCharacterRange->m_pszExpandedRange = &szLiteral;
                if( !pManager->ParseForCharacterRange( pCharacterRange ) )
                    return( false );
            }
        }
 
        // There should be no other arguments.
        if( *pszArgument )
        {
            m_pManager->StageMessage( *this, 33, _ERROR_, pszArgument );
            return( false );
        }
    }
 
    return( true );
}

 

 
bool CStageManager::ParseForRegExp(
    _in                             CProcess &a_Process,