CDX pull request with data post processing

If you have not read previous post about CDX push requests, please read it as there you can find configuration information and how to extend standard RetailCDXSeedDataAX7 xml resource.
In my case it is required to add additional table and pull data from POS.
Please be aware that pull request are handled in transaction way. Meaning that only new records will be pulled from channel DB, it will not get changes to already pulled records. Bear that in mind and choose correct development aproach.

Changes to CDX seed data

    <Subjob Id="VKOCustomerMarketingInformation" AxTableName="VKOCustomerMarketingInformation" TargetTableName="VKOCustomerMarketingInformation" TargetTableSchema="ext" ReplicationCounterFieldName="REPLICATIONCOUNTERFROMORIGIN" IsUpload="true">
      <ScheduledByJobs>
        <ScheduledByJob>P-0001</ScheduledByJob>
      </ScheduledByJobs>
      <AxFields>
        <Field Name="EmailAddress" />
        <Field Name="CustomerName" />
        <Field Name="Salutation" />
        <Field Name="Channel" />
        <Field Name="ReplicationCounterFromOrigin" />
      </AxFields>
    </Subjob>

ReplicationCounterFromOrigin (RetailReplicationCounter EDT) field is mandatory, it is auto increment field (identity), which value will be used to save last pulled record id.
Please take a note that table pull request is scheduled by standard P-0001 job.
I'm skipping step where you should create relevant table in AX, there is nothing special.

SQL script to create table in channel DB:

USE [AxDB]
GO

SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[ext].[VKOCUSTOMERMARKETINGINFORMATION]') AND type in (N'U'))
BEGIN


	Create Table [ext].[VKOCUSTOMERMARKETINGINFORMATION](
		 EMAILADDRESS NVARCHAR(50) NOT NULL, 
		 SALUTATION NVARCHAR(50) NOT NULL,
		 CUSTOMERNAME NVARCHAR(50) NOT NULL,
		 CREATEDDATE DATETIME NOT NULL,
		[CHANNEL] [bigint] NOT NULL,
		[REPLICATIONCOUNTERFROMORIGIN] [int] IDENTITY(1,1) NOT NULL,
		[DATAAREAID] [nvarchar](4) NOT NULL,
		[ROWVERSION] [timestamp] NOT NULL,
		 CONSTRAINT [PK_VKOCUSTOMERMARKETINGINFORMATION] PRIMARY KEY CLUSTERED 
		(
			[CHANNEL], [REPLICATIONCOUNTERFROMORIGIN] ASC
		)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
		) ON [PRIMARY]
		 

END

GO

GRANT SELECT ON [ext].[VKOCUSTOMERMARKETINGINFORMATION] TO [UsersRole];
GO

GRANT INSERT, SELECT ON OBJECT::[ext].[VKOCUSTOMERMARKETINGINFORMATION] TO [DataSyncUsersRole];
GO
Basically that it, if you do not need data post processing you can initialize retail and run P-0001 job to see how AX pulls data into table from channel.

Data post processing

Unfortunately, 'X' table and static table method InsertToRegularTable is no longer valid for D365.There are a lot of garbage left around this approach in D365 but it is not used.

AX7 processes all data in bulk way you will not be able to fall to record by record processing.

In my case there was requirement to store in AX all unique customer email addresses with some additional information. The newest data is valid. As AX supports pull requests only in a transaction way we had to create table in POS to store "transactions". If customer information was taken several times, it means that table will contain several records regardless of email existence.

In AX I had to create similar to POS table, which is used only for pulling data. Actually, I set it as TempDB as I do not need to store all "these transaction". Then on post processing I'm selecting records from "regular" table (VKOCustomerMarketingInformation - which I made temporary, but framework consider it as regular) and making either insert or update to my target table (VKOCustMarketing) where all emails are unique.

To have a post processing you need to set post handler to RetailConnSCDataBulkCopy.insertIntoTargetTable:

/// <summary>
/// The <c>VKORetailConnSCPostProcessing</c> class performs post processing of data after bulk copy to regular tables in AX.
/// </summary>
class VKORetailConnSCPostProcessing
{
    /// <summary>
    /// Executed after data gets into AX table
    /// </summary>
    /// <param name="args"></param>
    [PostHandlerFor(classStr(RetailConnSCDataBulkCopy), methodStr(RetailConnSCDataBulkCopy, insertIntoTargetTable))]
    public static void RetailConnSCDataBulkCopy_Post_insertIntoTargetTable(XppPrePostArgs args)
    {
        common targetTempTableCursor = args.getArgNum(1);

        switch (targetTempTableCursor.TableId)
        {
            case tableNum(VKOCustomerMarketingInformation):
                VKORetailConnSCPostProcessing::customerMarketingInformationPostProcessing(targetTempTableCursor);
                break;
        }
    }

    public static void customerMarketingInformationPostProcessing(VKOCustomerMarketingInformation _tempTable)
    {
        VKOCustMarketing                 target;
        VKOCustomerMarketingInformation  tmpSource;
        int         maxRetry    = 5;
        int         retryCount  = 0;
        int         i           = 0;
        container   companies;
        DataAreaID  le;

        while select crossCompany DataAreaId from _tempTable group by DataAreaId
        {
            companies += _tempTable.dataAreaId;
        }

        try
        {
            ttsBegin;
            ++retryCount;

            for (i = 1; i <= conLen(companies); ++i)
            {
                le = conPeek(companies, i);

                changecompany(le)
                {
                    Map keyMap  = new Map(types::String, types::Int64);
                    target      = null;
                    tmpSource   = null;
                    tmpSource.setTempDB();
                    tmpSource.linkPhysicalTableInstance(_tempTable);

                    while select tmpSource
                        outer join forUpdate target
                        order by tmpSource.ReplicationCounterFromOrigin
                        where target.EmailAddress == tmpSource.EmailAddress
                    {
                        if (keyMap.exists(tmpSource.EmailAddress) && !target)
                        {
                            // there are two records related to same email. Initially email does not exist
                            // On the first run it will insert record, on second it will fail as duplicate
                            // So in such cases we will have to select target table, to update it
                            select firstonly forUpdate target
                                where target.RecId == keyMap.lookup(tmpSource.EmailAddress);
                        }
                        target.EmailAddress = tmpSource.EmailAddress;
                        target.Salutation   = tmpSource.Salutation;
                        target.CustomerName = tmpSource.CustomerName;
                        target.Channel      = tmpSource.Channel;

                        target.write();

                        if (!keyMap.exists(tmpSource.EmailAddress))
                        {
                            keyMap.insert(tmpSource.EmailAddress, target.RecId);
                        }
                    }
                }
            }

            ttsCommit;
        }
        catch(Exception::Error)
        {
            if (retryCount >= maxRetry)
            {
                throw Exception::Error;
            }
            else
            {
                retry;
            }
        }
    }

}

Replication counter operations

If you need to manipulate with ReplicationCounterFromOrigin. The information about last value might be stored in RetailCDXUploadCounter table.

Using its static methods updateCounter/getCounter you might be able to get/update the value. However, I did not have a need to use it. And it is also possible that it is no longer used in AX7. You can get example of its usage in RetailTransactionTableX table:

public static void UpdateUploadCounter(RetailTransactionTableX cursor)
{
    RetailTransactionTableX cursor2;
    RefRecId dataStoreRecID;
    ;

    cursor2.linkPhysicalTableInstance(cursor);

    while select crossCompany Origin, maxof(ReplicationCounterFromOrigin) from cursor2
        group by Origin
    {
        dataStoreRecID = str2int64(cursor2.Origin);
        RetailCDXUploadCounter::updateCounter(dataStoreRecID, tableStr(RetailTransactionTable), cursor2.ReplicationCounterFromOrigin);
    }
}

There is also [crt].[TABLEREPLICATIONLOG] table, whicl also hold min/max filter, it is used in RetailCdxChannelDbDirectAccess.getSourceRequestHeader

Please be aware that in AX7 approach with 'X' table and it's methods is no longer used. The code is still in the application, but it is not working.

You can see it by comparing \Classes\RetailConnSCUploadManager\insertToRegularTable realization of AX2012 and AX7.


 

Search

About

DaxOnline.org is free platform that allows you to quickly store and reuse snippets, notes, articles related to Dynamics AX.

Authors are allowed to set their own AdSense units.
Join us.

Blog Tags