diff --git a/.github/workflows/build-assets.yml b/.github/workflows/build-assets.yml index 945a2a22694..29fce8c2c04 100644 --- a/.github/workflows/build-assets.yml +++ b/.github/workflows/build-assets.yml @@ -87,6 +87,10 @@ jobs: strategy: matrix: include: + - os: ubuntu-24.04 + - os: ubuntu-24.04 + name: k8s + container: true - os: ubuntu-22.04 - os: ubuntu-22.04 name: k8s @@ -100,6 +104,10 @@ jobs: name: k8s container: true - os: rockylinux-8 + - os: ubuntu-24.04 + name: LN k8s + ln: true + container: true - os: ubuntu-22.04 name: LN k8s ln: true @@ -109,9 +117,15 @@ jobs: name: LN k8s ln: true container: true + - os: ubuntu-24.04 + name: LN + ln: true - os: ubuntu-22.04 name: LN ln: true + - os: ubuntu-24.04 + name: Enterprise + ee: true - os: ubuntu-22.04 name: Enterprise ee: true diff --git a/cmake_modules/vcpkg.cmake b/cmake_modules/vcpkg.cmake index 35e6a0652ef..9c0466ddedb 100644 --- a/cmake_modules/vcpkg.cmake +++ b/cmake_modules/vcpkg.cmake @@ -6,7 +6,6 @@ set(CMAKE_TOOLCHAIN_FILE ${HPCC_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cma set(VCPKG_ROOT ${HPCC_SOURCE_DIR}/vcpkg) set(VCPKG_INSTALLED_DIR "${VCPKG_FILES_DIR}/vcpkg_installed") set(VCPKG_INSTALL_OPTIONS "--x-abi-tools-use-exact-versions;--downloads-root=${VCPKG_FILES_DIR}/vcpkg_downloads;--x-buildtrees-root=${VCPKG_FILES_DIR}/vcpkg_buildtrees;--x-packages-root=${VCPKG_FILES_DIR}/vcpkg_packages") -set(VCPKG_VERBOSE OFF) if(WIN32) set(VCPKG_HOST_TRIPLET "x64-windows" CACHE STRING "host triplet") @@ -24,6 +23,15 @@ elseif(UNIX) set(VCPKG_TARGET_TRIPLET "x64-linux-dynamic" CACHE STRING "target triplet") endif() +message("-- vcpkg settings:") +message("---- VCPKG_FILES_DIR: ${VCPKG_FILES_DIR}") +message("---- CMAKE_TOOLCHAIN_FILE: ${CMAKE_TOOLCHAIN_FILE}") +message("---- VCPKG_ROOT: ${VCPKG_ROOT}") +message("---- VCPKG_INSTALLED_DIR: ${VCPKG_INSTALLED_DIR}") +message("---- VCPKG_INSTALL_OPTIONS: ${VCPKG_INSTALL_OPTIONS}") +message("---- VCPKG_HOST_TRIPLET: ${VCPKG_HOST_TRIPLET}") +message("---- VCPKG_TARGET_TRIPLET: ${VCPKG_TARGET_TRIPLET}") + # Create a catalog of the vcpkg dependencies --- file(GLOB VCPKG_PACKAGES ${VCPKG_FILES_DIR}/vcpkg_packages/*/CONTROL) list(APPEND VCPKG_PACKAGE_LIST "-----------------\n") diff --git a/common/thorhelper/thorcommon.cpp b/common/thorhelper/thorcommon.cpp index 4be88493223..86f91ce0d43 100644 --- a/common/thorhelper/thorcommon.cpp +++ b/common/thorhelper/thorcommon.cpp @@ -1801,8 +1801,6 @@ void ActivityTimeAccumulator::addStatistics(IStatisticGatherer & builder) const if (blockedCycles) builder.addStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles)); } - if (lookAheadCycles) - builder.addStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles)); } void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged) const @@ -1816,8 +1814,6 @@ void ActivityTimeAccumulator::addStatistics(CRuntimeStatisticCollection & merged if (blockedCycles) merged.mergeStatistic(StTimeBlocked, cycle_to_nanosec(blockedCycles)); } - if (lookAheadCycles) - merged.mergeStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles)); } void ActivityTimeAccumulator::merge(const ActivityTimeAccumulator & other) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 9a1958078a7..96bbb7a12b6 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -14985,3 +14985,12 @@ void recordTraceDebugOptions(IWorkUnit * target, const IProperties * source) } } } + +WuidPattern::WuidPattern(const char* _pattern) + : pattern(_pattern) +{ + if (!pattern.isEmpty()) + pattern.trim(); + if (!pattern.isEmpty() && islower(pattern.charAt(0))) + pattern.setCharAt(0, toupper(pattern.charAt(0))); +} diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index debaf1321a6..c1fccd97c76 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1843,4 +1843,17 @@ class WORKUNIT_API StatisticsAggregator : public CInterface const StatisticsMapping & mapping; }; +class WORKUNIT_API WuidPattern +{ +private: + StringBuffer pattern; +public: + WuidPattern(const char* _pattern); + inline bool isEmpty() const { return pattern.isEmpty(); } + inline const char* str() const { return pattern.str(); } + inline operator const char* () const { return pattern.str(); } + inline operator const StringBuffer& () const { return pattern; } + inline operator StringBuffer& () { return pattern; } +}; + #endif diff --git a/dali/base/dautils.cpp b/dali/base/dautils.cpp index d25e0a7ef9a..5bc36552b7b 100644 --- a/dali/base/dautils.cpp +++ b/dali/base/dautils.cpp @@ -3675,6 +3675,11 @@ static CConfigUpdateHook directIOUpdateHook; static CriticalSection dafileSrvNodeCS; static Owned tlsDirectIONode, nonTlsDirectIONode; +unsigned getPreferredDaFsServerPort() +{ + return getPreferredDafsClientPort(true); +} + void remapGroupsToDafilesrv(IPropertyTree *file, bool foreign, bool secure) { Owned iter = file->getElements("Cluster"); @@ -3683,7 +3688,7 @@ void remapGroupsToDafilesrv(IPropertyTree *file, bool foreign, bool secure) IPropertyTree &cluster = iter->query(); const char *planeName = cluster.queryProp("@name"); Owned plane = getDataStoragePlane(planeName, true); - if ((0 == plane->queryHosts().size()) && isAbsolutePath(plane->queryPrefix())) // if hosts group, or url, don't touch + if (isAbsolutePath(plane->queryPrefix())) // if url (i.e. not absolute prefix path) don't touch { if (isContainerized()) { diff --git a/dali/base/dautils.hpp b/dali/base/dautils.hpp index 7cb0edeaceb..e6818c80db6 100644 --- a/dali/base/dautils.hpp +++ b/dali/base/dautils.hpp @@ -583,7 +583,7 @@ inline unsigned calcStripeNumber(unsigned partNum, const char *lfnName, unsigned } interface INamedGroupStore; extern da_decl void remapGroupsToDafilesrv(IPropertyTree *file, bool foreign, bool secure); - +extern da_decl unsigned getPreferredDaFsServerPort(); #ifdef NULL_DALIUSER_STACKTRACE extern da_decl void logNullUser(IUserDescriptor *userDesc); #else diff --git a/dali/sasha/saarch.cpp b/dali/sasha/saarch.cpp index 0d0d71efd10..730a96b6dab 100644 --- a/dali/sasha/saarch.cpp +++ b/dali/sasha/saarch.cpp @@ -1166,7 +1166,7 @@ class CDFUWorkUnitArchiver: public CBranchArchiver getWorkUnitCreateTime(wuid,time); } virtual ~cDFUWUBranchItem() {} - bool isempty() { return (wuid[0]!='D')||iserr; } + bool isempty() { return (toupper(wuid[0])!='D')||iserr; } bool qualifies() { if (isprotected) diff --git a/dockerfiles/vcpkg/centos-7.dockerfile b/dockerfiles/vcpkg/centos-7.dockerfile index 19ab3186f89..f415699054e 100644 --- a/dockerfiles/vcpkg/centos-7.dockerfile +++ b/dockerfiles/vcpkg/centos-7.dockerfile @@ -7,4 +7,8 @@ RUN yum install -y \ ENTRYPOINT ["/bin/bash", "--login", "-c"] +RUN yum install -y \ + rpm-build && \ + yum -y clean all && rm -rf /var/cache + CMD ["/bin/bash"] diff --git a/dockerfiles/vcpkg/platform-core-ubuntu-22.04.dockerfile b/dockerfiles/vcpkg/platform-core-ubuntu-22.04.dockerfile index 76d0b1db8ea..dc820de1e1a 100644 --- a/dockerfiles/vcpkg/platform-core-ubuntu-22.04.dockerfile +++ b/dockerfiles/vcpkg/platform-core-ubuntu-22.04.dockerfile @@ -62,7 +62,7 @@ RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.29.7/b chmod +x ./kubectl && \ mv ./kubectl /usr/local/bin -RUN curl -LO https://packagecloud.io/github/git-lfs/packages/ubuntu/jammy/git-lfs_3.6.0_amd64.deb/download && \ +RUN curl -LO https://packagecloud.io/github/git-lfs/packages/ubuntu/jammy/git-lfs_3.6.1_amd64.deb/download && \ dpkg -i download && \ rm download diff --git a/dockerfiles/vcpkg/platform-core-ubuntu-22.04/Dockerfile b/dockerfiles/vcpkg/platform-core-ubuntu-22.04/Dockerfile index c7bbe050514..4b974ef2701 100644 --- a/dockerfiles/vcpkg/platform-core-ubuntu-22.04/Dockerfile +++ b/dockerfiles/vcpkg/platform-core-ubuntu-22.04/Dockerfile @@ -62,7 +62,7 @@ RUN curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.29.7/b chmod +x ./kubectl && \ mv ./kubectl /usr/local/bin -RUN curl -LO https://packagecloud.io/github/git-lfs/packages/ubuntu/jammy/git-lfs_3.6.0_amd64.deb/download && \ +RUN curl -LO https://packagecloud.io/github/git-lfs/packages/ubuntu/jammy/git-lfs_3.6.1_amd64.deb/download && \ dpkg -i download && \ rm download diff --git a/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ConfigureValues.xml b/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ConfigureValues.xml index d424a8d8a54..0394c028f98 100644 --- a/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ConfigureValues.xml +++ b/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ConfigureValues.xml @@ -509,7 +509,7 @@ components - + @@ -1332,9 +1332,9 @@ thor: - + - + diff --git a/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ContainerLogging.xml b/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ContainerLogging.xml index 3bcbccdd27c..6c7751b0a2f 100644 --- a/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ContainerLogging.xml +++ b/docs/EN_US/ContainerizedHPCC/ContainerizedMods/ContainerLogging.xml @@ -70,8 +70,39 @@ choose to integrate HPCC Systems logging data with any existing logging solutions, or to implement another one specifically for HPCC Systems data. Starting with HPCC Systems version 8.4, we provide a lightweight, - yet complete log-processing solution for your convenience. The following - sections will look at a couple of the possible solutions. + yet complete log-processing solution for convenience. Subsequent + sections will look at a couple other possible log-processing + solutions. + + + Log Dependant Applications + + Currently there is a utility delivered with a containerized HPCC + Systems deployment which is dependant on having a properly configured + log-processing solution for optimal results. + + + The Z.A.P. Utility + + The Zipped Analysis Package (Z.A.P.) utility collects system + information and encapsulates it into a shareable package. This + utility packages up information to send for further analysis. ZAP + reports contain several artifacts related to a given workunit, to + aid in debugging. + + The Component logs are one of most important artifacts + expected to be included in the report. In containerized deployments + logging is handled differently from bare metal. The log fetching is + dependent on a back-end log processor being properly configured and + available and the HPCC LogAccess feature configured to bind to the + log processor. If those two dependencies are not met, the + containerized cluster logs are not included in the ZAP report. Those + ZAP reports will then be incomplete. To ensure inclusion of the logs + in the ZAP report you must have log access configured properly. See + the Containerized + Logging sections for more information. + + diff --git a/docs/EN_US/HPCCSystemAdmin/HPCCSystemAdministratorsGuide.xml b/docs/EN_US/HPCCSystemAdmin/HPCCSystemAdministratorsGuide.xml index b5e8466344b..b33fc7f033c 100644 --- a/docs/EN_US/HPCCSystemAdmin/HPCCSystemAdministratorsGuide.xml +++ b/docs/EN_US/HPCCSystemAdmin/HPCCSystemAdministratorsGuide.xml @@ -721,6 +721,146 @@ workunits in the event of a loss. In addition it would affect every other Thor/Roxie cluster in the same environment if you lose this node. + + + Restoring Dali from backup + + If configured correctly, Dali creates a backup or mirror + copy to a secondary location on another physical server. + (Bare-metal only). + + Systems can be configured with their own scheduled backup to + create a snapshot of the primary store files to a custom location. + The same steps apply when using a snapshot copy of a backup set as + when using the mirror copy. In other words, this technique applies + to either bare-metal or k8s deployments. + + The Dali meta files are comprised of: + + + + store.<NNNN> + (e.g., store.36). This file is a reference to the current Dali + meta file edition. There should never be more than one of + these files. The NNNN is used to determine the current base + and delta files in use. + + + + dalisds<NNNN>.xml + (e.g., dalisds36.xml). This is the main Dali meta info file, + containing all logical file, workunit, and state information. + Sasha (or Dali on save) periodically creates new versions + (with incrementally rising NNNN’s). It will keep the last T + copies (default 10) based on the configuration option + “keepStores”. + + + + daliinc<NNNN>.xml + (e.g., daliinc36.xml). This is the delta transaction log. Dali + continuously writes to this file, recording all changes that + are made to any meta data. It is used to playback changes and + apply them to the base meta info from the + dalisds<NNNN>xml file. + + Specifically, when Sasha creates a new store version, it + loads the base file (e.g., dalisds36.xml), then loads and + applies the delta file (e.g., daliinc36.xml). Sasha then has + its own independent representation of the current state and + saves a new base file (e.g., dalisds(NNNN+1).xml). + + + + dalidet<NNNN>.xml + (e.g., dalidet36.xml). This file is created at the point that + Sasha starts the process of creating a new base file. At which + point it atomically renames the delta transaction file to a + ‘det’ file (short for 'detached'). For example, it renames + daliinc36.xml to dalidet36.xml. Dali then continues to write + new transactions to daliinc36.xml. + + + + dalisds_<MMMM>.bv2 files. These + files are in effect part of the main store (part of + dalisdsNNNN.xml). They are single large values that were + deemed too big to keep in Dali memory, and written to disk + separately instead (and are loaded on demand). + + + + If Dali is shutdown cleanly and saves its files as expected, + the daliinc*.xml and dalidet*.xml files are not needed, since it + saves the entire state of the store directly from internal memory, + and on startup, there is no daliincNNNN.xml or dalidetNNNN.xml + related to the new version. + + These transaction delta files are only used by Sasha when + creating new versions of the base store or if Dali has been + stopped abruptly (e.g., machine rebooted). If Dali restarts after + an unclean exit, there will be a daliincNNN.xml (and possibly a + dalidetNNNN.xml file if Sasha was actively creating a new version + at the time). In those cases, Dali will load these files in + addition to the base file. + + By default Dali’s main data store directory is + /var/lib/HPCCSystems/hpcc-data/dali/ . In other words, all meta + data is written to and read from this location. + + When restoring from a backup: + + Make sure Dali is not running + + + + Make sure the /var/lib/HPCCSystems/hpcc-data/dali + folder is empty. + + + + Copy all pertinent backup file into the + /var/lib/HPCCSystems/hpcc-data/dali folder: + + + + One store.NNNN file + + + + One dalisdsNNNN.xml file + + + + <=1 daliincNNNN.xml file (only if + present) + + + + <=1 dalidetNNNN.xml file (only if + present) + + + + All dalisds_MMMM.bv2 files. + + + + Other/older dalisds/daliinc/dalidet editions could + be copied, but the above are the only ones that will be used. In + other words, only the NNNN version based on the single store.NNNN + file will be loaded. + + The automatic back to a mirror location is bare-metal only. + In a cloud deployment, it is assumed that the storage choices + provided by the cloud provider are providing redundancy, such as + multi-zone replication. + + In either case, and/or if a manual strategy has been used to + copy Dali’s files on a schedule, the process of restoring from a + backup should be the same. + diff --git a/docs/common/Version.xml b/docs/common/Version.xml index 0c1ee1cb2f2..88b53c282c8 100644 --- a/docs/common/Version.xml +++ b/docs/common/Version.xml @@ -5,11 +5,11 @@ DEVELOPER NON-GENERATED VERSION - © 2024 HPCC + © 2025 HPCC Systems®. All rights reserved - 2024 HPCC Systems®. All rights + 2025 HPCC Systems®. All rights reserved @@ -23,9 +23,9 @@ serve one purpose and that is to store the chapterinfo and other information used by several document components. - 2024 Version ${DOC_VERSION} + 2025 Version ${DOC_VERSION} - 2024 + 2025 The following line is the code to be put into the document you wish to include the above version info in: diff --git a/docs/common/Version.xml.in b/docs/common/Version.xml.in index f499b8cc9e4..030b68e470a 100644 --- a/docs/common/Version.xml.in +++ b/docs/common/Version.xml.in @@ -3,13 +3,13 @@ "http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd"> - 2024 Version ${DOC_VERSION} + 2025 Version ${DOC_VERSION} - © 2024 HPCC + © 2025 HPCC Systems®. All rights reserved - 2024 HPCC Systems®. All rights + 2025 HPCC Systems®. All rights reserved @@ -24,9 +24,9 @@ serve one purpose and that is to store the chapterinfo the above sections that are being used by several other documents. - 2024 Version ${DOC_VERSION} + 2025 Version ${DOC_VERSION} - 2024 + 2025 The following line is the code to be put into the document you wish to include the above version info in: diff --git a/esp/bindings/SOAP/xpp/xpp/XmlPullParser.h b/esp/bindings/SOAP/xpp/xpp/XmlPullParser.h index cd89f3cbdfb..1a1c7f38a9a 100644 --- a/esp/bindings/SOAP/xpp/xpp/XmlPullParser.h +++ b/esp/bindings/SOAP/xpp/xpp/XmlPullParser.h @@ -228,6 +228,10 @@ namespace xpp { case XmlPullParser::END_TAG: --level; break; + case XmlPullParser::END_DOCUMENT: + throw XmlPullParserException( + string("unexpected end of document while skipping sub tree") + +tokenizer.getPosDesc(), tokenizer.getLineNumber(), tokenizer.getColumnNumber()); } } return hasChildren; diff --git a/esp/bindings/http/platform/httpbinding.cpp b/esp/bindings/http/platform/httpbinding.cpp index c5a9e831844..6145e2ec170 100644 --- a/esp/bindings/http/platform/httpbinding.cpp +++ b/esp/bindings/http/platform/httpbinding.cpp @@ -446,11 +446,8 @@ EspHttpBinding::EspHttpBinding(IPropertyTree* tree, const char *bindname, const m_feature_authmap.setown(m_secmgr->createFeatureMap(authcfg)); m_setting_authmap.setown(m_secmgr->createSettingMap(authcfg)); } - else if(stricmp(m_authmethod.str(), "Local") == 0) - { - m_secmgr.setown(SecLoader::loadSecManager("Local", "EspHttpBinding", NULL)); - m_authmap.setown(m_secmgr->createAuthMap(authcfg)); - } + else if(strieq(m_authmethod.str(), "Local") || strieq(m_authmethod.str(), "Default")) + throw makeStringExceptionV(-1, "obsolete auth method %s; update configuration", m_authmethod.str()); IRestartManager* restartManager = dynamic_cast(m_secmgr.get()); if(restartManager!=NULL) { diff --git a/esp/platform/application_config.cpp b/esp/platform/application_config.cpp index f999bc44203..ba8a3436071 100644 --- a/esp/platform/application_config.cpp +++ b/esp/platform/application_config.cpp @@ -156,6 +156,8 @@ bool addAuthNZSecurity(const char *name, IPropertyTree *legacyEsp, IPropertyTree appSecMgr = authNZ; } const char *method = appSecMgr->queryProp("@name"); + if (isEmptyString(method)) + throw MakeStringException(-1, "SecurityManager name attribute required. To run without security set 'auth: none'"); const char *tag = appSecMgr->queryProp("@type"); if (isEmptyString(tag)) throw MakeStringException(-1, "SecurityManager type attribute required. To run without security set 'auth: none'"); @@ -167,7 +169,7 @@ bool addAuthNZSecurity(const char *name, IPropertyTree *legacyEsp, IPropertyTree mergePTree(legacy, authNZ); //extra info clean up later legacy->removeProp("SecurityManager"); //already copied these attributes above, don't need this as a child - bindAuth.setf("", method ? method : "unknown"); + bindAuth.setf("", method); return true; } diff --git a/esp/services/ws_dfsservice/ws_dfsservice.cpp b/esp/services/ws_dfsservice/ws_dfsservice.cpp index d79e3afdd7a..d74cf9bb702 100644 --- a/esp/services/ws_dfsservice/ws_dfsservice.cpp +++ b/esp/services/ws_dfsservice/ws_dfsservice.cpp @@ -126,6 +126,26 @@ static void populateLFNMeta(IUserDescriptor *userDesc, const char *logicalName, { VStringBuffer storagePlaneXPath("storage/%s", planeXPath.str()); Owned dataPlane = getGlobalConfigSP()->getPropTree(storagePlaneXPath); + + const char *hostGroupName = dataPlane->queryProp("@hostGroup"); + if (!isEmptyString(hostGroupName)) + { + Owned hostGroup = getHostGroup(hostGroupName, false); + if (hostGroup) + { + // This is only likely to be used if this service is in BM + // Cloud based storage planes are unlikely to be backed by hosts/hostGroups + dataPlane.setown(createPTreeFromIPT(dataPlane)); + unsigned daFsSrvPort = getPreferredDaFsServerPort(); + Owned iter = hostGroup->getElements("hosts"); + ForEach(*iter) + { + VStringBuffer endpoint("%s:%u", iter->query().queryProp(nullptr), daFsSrvPort); + dataPlane->addProp("hosts", endpoint); + } + dataPlane->removeProp("@hostGroup"); + } + } metaRoot->addPropTree("planes", dataPlane.getClear()); } } @@ -135,6 +155,8 @@ static void populateLFNMeta(IUserDescriptor *userDesc, const char *logicalName, void CWsDfsEx::init(IPropertyTree *cfg, const char *process, const char *service) { DBGLOG("Initializing %s service [process = %s]", service, process); + VStringBuffer xpath("Software/EspProcess/EspBinding[@service=\"%s\"]/@protocol", service); + isHttps = strsame("https", cfg->queryProp(xpath)); } bool CWsDfsEx::onGetLease(IEspContext &context, IEspLeaseRequest &req, IEspLeaseResponse &resp) @@ -177,8 +199,13 @@ bool CWsDfsEx::onDFSFileLookup(IEspContext &context, IEspDFSFileLookupRequest &r if (req.getAccessViaDafilesrv()) opts |= LfnMOptRemap; - // NB: if we ever have some services with tls, and some without in bare-metal, this may need revisiting. - if (getComponentConfigSP()->getPropBool("@tls")) + if (isContainerized()) + { + // NB: if we ever have some services with tls, and some without in bare-metal, this may need revisiting. + if (getComponentConfigSP()->getPropBool("@tls")) + opts |= LfnMOptTls; + } + else if (isHttps) opts |= LfnMOptTls; Owned responseTree = createPTree(); diff --git a/esp/services/ws_dfsservice/ws_dfsservice.hpp b/esp/services/ws_dfsservice/ws_dfsservice.hpp index a56ac006ec9..a1b8f3d2894 100644 --- a/esp/services/ws_dfsservice/ws_dfsservice.hpp +++ b/esp/services/ws_dfsservice/ws_dfsservice.hpp @@ -26,6 +26,7 @@ class CWsDfsEx : public CWsDfs { + bool isHttps = false; public: virtual ~CWsDfsEx() {} virtual void init(IPropertyTree *cfg, const char *process, const char *service); diff --git a/esp/services/ws_fs/ws_fsService.cpp b/esp/services/ws_fs/ws_fsService.cpp index 43e500e94b2..4a3a9138719 100644 --- a/esp/services/ws_fs/ws_fsService.cpp +++ b/esp/services/ws_fs/ws_fsService.cpp @@ -924,10 +924,9 @@ bool CFileSprayEx::onGetDFUWorkunits(IEspContext &context, IEspGetDFUWorkunits & { context.ensureFeatureAccess(DFU_WU_URL, SecAccess_Read, ECLWATCH_DFU_WU_ACCESS_DENIED, "Access to DFU workunit is denied."); - StringBuffer wuidStr(req.getWuid()); - const char* wuid = wuidStr.trim().str(); - if (wuid && *wuid && looksLikeAWuid(wuid, 'D')) - return getOneDFUWorkunit(context, wuid, resp); + WuidPattern wuidPattern(req.getWuid()); + if (!wuidPattern.isEmpty() && looksLikeAWuid(wuidPattern, 'D')) + return getOneDFUWorkunit(context, wuidPattern, resp); double version = context.getClientVersion(); if (version > 1.02) @@ -1055,11 +1054,11 @@ bool CFileSprayEx::onGetDFUWorkunits(IEspContext &context, IEspGetDFUWorkunits & filterbuf.append(""); } - if(wuid && *wuid) + if(!isEmptyString(wuidPattern)) { filters[filterCount] = DFUsf_wildwuid; filterCount++; - filterbuf.append(wuid); + filterbuf.append(wuidPattern); } if(clusterName && *clusterName) diff --git a/esp/services/ws_smc/ws_smcService.cpp b/esp/services/ws_smc/ws_smcService.cpp index e176172c5a0..5486ee2dfb5 100644 --- a/esp/services/ws_smc/ws_smcService.cpp +++ b/esp/services/ws_smc/ws_smcService.cpp @@ -1237,7 +1237,7 @@ void CWsSMCEx::addWUsToResponse(IEspContext &context, const IArrayOfqueryWuid()); } @@ -1754,6 +1755,7 @@ void doWUQueryByFile(IEspContext &context, const char *logicalFile, IEspWUQueryR resp.setFirst(false); resp.setPageSize(1); resp.setCount(1); + resp.setNumWUs(1); } bool addWUQueryFilter(WUSortField *filters, unsigned short &count, MemoryBuffer &buff, const char *name, WUSortField value) @@ -1893,6 +1895,8 @@ void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQue unsigned short filterCount = 0; MemoryBuffer filterbuf; + WuidPattern wuidPattern(req.getWuid()); + // Query filters should be added in order of expected power - add the most restrictive filters first bool bDoubleCheckState = false; @@ -1907,7 +1911,7 @@ void doWUQueryWithSort(IEspContext &context, IEspWUQueryRequest & req, IEspWUQue bDoubleCheckState = true; } - addWUQueryFilter(filters, filterCount, filterbuf, req.getWuid(), WUSFwildwuid); + addWUQueryFilter(filters, filterCount, filterbuf, wuidPattern.str(), WUSFwildwuid); addWUQueryFilter(filters, filterCount, filterbuf, req.getCluster(), WUSFcluster); addWUQueryFilter(filters, filterCount, filterbuf, req.getLogicalFile(), (WUSortField) (WUSFfileread | WUSFnocase)); addWUQueryFilter(filters, filterCount, filterbuf, req.getOwner(), (WUSortField) (WUSFuser | WUSFnocase)); @@ -2108,6 +2112,8 @@ void doWULightWeightQueryWithSort(IEspContext &context, IEspWULightWeightQueryRe unsigned short filterCount = 0; MemoryBuffer filterbuf; + WuidPattern wuidPattern(req.getWuid()); + // Query filters should be added in order of expected power - add the most restrictive filters first bool bDoubleCheckState = false; @@ -2122,7 +2128,7 @@ void doWULightWeightQueryWithSort(IEspContext &context, IEspWULightWeightQueryRe bDoubleCheckState = true; } - addWUQueryFilter(filters, filterCount, filterbuf, req.getWuid(), WUSFwildwuid); + addWUQueryFilter(filters, filterCount, filterbuf, wuidPattern.str(), WUSFwildwuid); addWUQueryFilter(filters, filterCount, filterbuf, req.getCluster(), WUSFcluster); addWUQueryFilter(filters, filterCount, filterbuf, req.getOwner(), (WUSortField) (WUSFuser | WUSFnocase)); addWUQueryFilter(filters, filterCount, filterbuf, req.getJobName(), (WUSortField) (WUSFjob | WUSFnocase)); @@ -2596,13 +2602,14 @@ bool CWsWorkunitsEx::onWUQuery(IEspContext &context, IEspWUQueryRequest & req, I { try { - StringBuffer wuidStr(req.getWuid()); - const char* wuid = wuidStr.trim().str(); + WuidPattern pattern(req.getWuid()); + + resp.setNumWUs(0); if (req.getType() && strieq(req.getType(), "archived workunits")) doWUQueryFromArchive(context, sashaServerIp.get(), sashaServerPort, *archivedWuCache, awusCacheMinutes, req, resp); - else if(notEmpty(wuid) && looksLikeAWuid(wuid, 'W')) - doWUQueryBySingleWuid(context, wuid, resp); + else if(notEmpty(pattern) && looksLikeAWuid(pattern, 'W')) + doWUQueryBySingleWuid(context, pattern, resp); else if (notEmpty(req.getLogicalFile()) && req.getLogicalFileSearchType() && strieq(req.getLogicalFileSearchType(), "Created")) doWUQueryByFile(context, req.getLogicalFile(), resp); else @@ -5186,16 +5193,16 @@ bool CWsWorkunitsEx::onWUGetStats(IEspContext &context, IEspWUGetStatsRequest &r if (!req.getCreateDescriptions_isNull()) createDescriptions = req.getCreateDescriptions(); - StringBuffer wuid(req.getWUID()); - PROGLOG("WUGetStats: %s", wuid.str()); + WuidPattern wuidPattern(req.getWUID()); + PROGLOG("WUGetStats: %s", wuidPattern.str()); IArrayOf statistics; - if (strchr(wuid, '*')) + if (strchr(wuidPattern, '*')) { WUSortField filters[2]; MemoryBuffer filterbuf; filters[0] = WUSFwildwuid; - filterbuf.append(wuid.str()); + filterbuf.append(wuidPattern.str()); filters[1] = WUSFterm; Owned factory = getWorkUnitFactory(context.querySecManager(), context.queryUser()); Owned iter = factory->getWorkUnitsSorted((WUSortField) (WUSFwuid), filters, filterbuf.bufferBase(), 0, INT_MAX, NULL, NULL); @@ -5212,14 +5219,14 @@ bool CWsWorkunitsEx::onWUGetStats(IEspContext &context, IEspWUGetStatsRequest &r } else { - WsWuHelpers::checkAndTrimWorkunit("WUInfo", wuid); - ensureWsWorkunitAccess(context, wuid, SecAccess_Read); + WsWuHelpers::checkAndTrimWorkunit("WUInfo", wuidPattern); + ensureWsWorkunitAccess(context, wuidPattern, SecAccess_Read); - WsWuInfo winfo(context, wuid); + WsWuInfo winfo(context, wuidPattern); winfo.getStats(filter, statsFilter, createDescriptions, statistics); } resp.setStatistics(statistics); - resp.setWUID(wuid.str()); + resp.setWUID(wuidPattern.str()); } catch(IException* e) { diff --git a/esp/src/src-react/components/controls/Grid.tsx b/esp/src/src-react/components/controls/Grid.tsx index e2846cf5fbe..939b9d8bef3 100644 --- a/esp/src/src-react/components/controls/Grid.tsx +++ b/esp/src/src-react/components/controls/Grid.tsx @@ -1,5 +1,5 @@ import * as React from "react"; -import { DetailsList, DetailsListLayoutMode, Dropdown, IColumn as _IColumn, ICommandBarItemProps, IDetailsHeaderProps, IDetailsListStyles, mergeStyleSets, Selection, Stack, TooltipHost, TooltipOverflowMode, IRenderFunction, IDetailsRowProps, SelectionMode, ConstrainMode, ISelection, ScrollablePane, Sticky } from "@fluentui/react"; +import { DetailsListLayoutMode, Dropdown, IColumn as _IColumn, ICommandBarItemProps, IDetailsHeaderProps, IDetailsListStyles, mergeStyleSets, Selection, Stack, TooltipHost, TooltipOverflowMode, IRenderFunction, IDetailsRowProps, SelectionMode, ConstrainMode, ISelection, ScrollablePane, ShimmeredDetailsList, Sticky } from "@fluentui/react"; import { Pagination } from "@fluentui/react-experiments/lib/Pagination"; import { useConst } from "@fluentui/react-hooks"; import { BaseStore, Memory, QueryRequest, QuerySortItem } from "src/store/Memory"; @@ -227,6 +227,7 @@ const FluentStoreGrid: React.FunctionComponent = ({ const memoizedColumns = useDeepMemo(() => columns, [], [columns]); const [sorted, setSorted] = React.useState(sort); const [items, setItems] = React.useState([]); + const [loaded, setLoaded] = React.useState(false); const [columnWidths] = useNonReactiveEphemeralPageStore("columnWidths"); const selectionHandler = useConst(() => { @@ -248,6 +249,7 @@ const FluentStoreGrid: React.FunctionComponent = ({ const refreshTable = useDeepCallback((clearSelection = false) => { if (isNaN(start) || isNaN(count)) return; + setLoaded(false); if (clearSelection) { selectionHandler.setItems([], true); } @@ -257,6 +259,7 @@ const FluentStoreGrid: React.FunctionComponent = ({ }); storeQuery.then(items => { const selectedIndices = selectionHandler.getSelectedIndices(); + setLoaded(true); setItems(items); selectedIndices.forEach(index => selectionHandler.setIndexSelected(index, true, false)); }); @@ -319,8 +322,9 @@ const FluentStoreGrid: React.FunctionComponent = ({ return
- queryProp("@rowServiceConfiguration"); // merge in bare-metal dafilesrv component expert settings - IPropertyTree *componentExpert = nullptr; - componentExpert = daFileSrv->queryPropTree("expert"); + IPropertyTree *componentExpert = daFileSrv->queryPropTree("expert"); if (componentExpert) synchronizePTree(expert, componentExpert, false, true); + // merge in bare-metal dafilesrv component cert settings into newConfig + IPropertyTree *componentCert = daFileSrv->queryPropTree("cert"); + if (componentCert) + { + IPropertyTree *cert = ensurePTree(newConfig, "cert"); + synchronizePTree(cert, componentCert, false, true); + } + // any overrides by Instance definitions? Owned iter = daFileSrv->getElements("Instance"); ForEach(*iter) diff --git a/fs/dafsclient/rmtfile.cpp b/fs/dafsclient/rmtfile.cpp index d7132bad273..12d1a521a18 100644 --- a/fs/dafsclient/rmtfile.cpp +++ b/fs/dafsclient/rmtfile.cpp @@ -1421,6 +1421,9 @@ class CRemoteFileIO : public CInterfaceOf size32_t read(offset_t pos, size32_t len, void * data) { + if (0 == len) + return 0; + dbgassertex(data); size32_t got; MemoryBuffer replyBuffer; CCycleTimer timer; diff --git a/fs/dafsserver/dafsserver.cpp b/fs/dafsserver/dafsserver.cpp index f3ffc8fd5df..27da86cd085 100644 --- a/fs/dafsserver/dafsserver.cpp +++ b/fs/dafsserver/dafsserver.cpp @@ -134,10 +134,10 @@ static ISecureSocket *createSecureSocket(ISocket *sock, bool disableClientCertVe { #ifdef _CONTAINERIZED /* Connections are expected from 3rd parties via TLS, - * we do not expect them to provide a valid certificate for verification. - * Currently the server (this dafilesrv), will use either the "public" certificate issuer, - * unless it's visibility is "cluster" (meaning internal only) - */ + * we do not expect them to provide a valid certificate for verification. + * Currently the server (this dafilesrv), will use either the "public" certificate issuer, + * unless it's visibility is "cluster" (meaning internal only) + */ const char *certScope = strsame("cluster", getComponentConfigSP()->queryProp("service/@visibility")) ? "local" : "public"; Owned info = getIssuerTlsSyncedConfig(certScope, nullptr, disableClientCertVerification); @@ -145,7 +145,14 @@ static ISecureSocket *createSecureSocket(ISocket *sock, bool disableClientCertVe throw makeStringException(-1, "createSecureSocket() : missing MTLS configuration"); secureContextServer.setown(createSecureSocketContextSynced(info, ServerSocket)); #else - secureContextServer.setown(createSecureSocketContextEx2(securitySettings.getSecureConfig(), ServerSocket)); + Owned cert = getComponentConfigSP()->getPropTree("cert"); + if (cert) + { + Owned certSyncedWrapper = createSyncedPropertyTree(cert); + secureContextServer.setown(createSecureSocketContextSynced(certSyncedWrapper, ServerSocket)); + } + else + secureContextServer.setown(createSecureSocketContextEx2(securitySettings.getSecureConfig(), ServerSocket)); #endif } } @@ -5449,7 +5456,11 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface securitySettings.privateKey = nullptr; } } - else + else if (!isContainerized() && getComponentConfigSP()->hasProp("cert")) + { + // validated when context is created in createSecureSocket + } + else // using environment.conf HPCCCertificateFile etc. validateSSLSetup(); #endif diff --git a/plugins/couchbase/couchbaseembed.hpp b/plugins/couchbase/couchbaseembed.hpp index 3a0c4ba0557..e95d8f3dcd3 100644 --- a/plugins/couchbase/couchbaseembed.hpp +++ b/plugins/couchbase/couchbaseembed.hpp @@ -25,6 +25,7 @@ #endif //Using cpp wrapper from https://github.com/couchbaselabs/libcouchbase-cxx +#include #include #include #include diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 7da23b7d033..af0d00649eb 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -314,6 +314,8 @@ extern bool blockedLocalAgent; extern bool acknowledgeAllRequests; extern unsigned packetAcknowledgeTimeout; extern cycle_t dynPriorityAdjustCycles; +extern bool traceThreadStartDelay; +extern int adjustBGThreadNiceValue; extern bool alwaysTrustFormatCrcs; extern bool allFilesDynamic; extern bool lockSuperFiles; diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index a0c014358b6..45eca11ab9a 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -1566,12 +1566,8 @@ class RoxieProtocolMsgContext : implements IHpccProtocolMsgContext, public CInte ensureContextLogger(); - const char * spanQueryName = !isEmptyString(queryName) ? queryName : "run_query"; - StringBuffer spanName(querySetName); - if (spanName.length()) - spanName.append('/'); - spanName.append(spanQueryName); - requestSpan.setown(queryTraceManager().createServerSpan(spanName, allHeaders, flags)); + requestSpan.setown(queryTraceManager().createServerSpan(!isEmptyString(queryName) ? queryName : "run_query", allHeaders, flags)); + requestSpan->setSpanAttribute("queryset.name", querySetName); logctx->setActiveSpan(requestSpan); const char * globalId = requestSpan->queryGlobalId(); diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 92db1614993..0c00c4f98b2 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -76,6 +76,8 @@ bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned packetAcknowledgeTimeout = 100; cycle_t dynPriorityAdjustCycles = 0; // default off (0) +bool traceThreadStartDelay = true; +int adjustBGThreadNiceValue = 5; unsigned ccdMulticastPort; bool enableHeartBeat = true; unsigned parallelLoopFlowLimit = 100; @@ -1010,6 +1012,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) unsigned dynAdjustMsec = topology->getPropInt("@dynPriorityAdjustTime", 0); if (dynAdjustMsec) dynPriorityAdjustCycles = dynAdjustMsec * (queryOneSecCycles() / 1000ULL); + traceThreadStartDelay = topology->getPropBool("@traceThreadStartDelay", traceThreadStartDelay); + adjustBGThreadNiceValue = topology->getPropInt("@adjustBGThreadNiceValue", adjustBGThreadNiceValue); + if (adjustBGThreadNiceValue < 0) + adjustBGThreadNiceValue = 0; + if (adjustBGThreadNiceValue > 19) + adjustBGThreadNiceValue = 19; ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); roxiemem::setMemTraceSizeLimit((memsize_t) topology->getPropInt64("@memTraceSizeLimit", 0)); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 0918f6df969..7046420cc08 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -1188,6 +1188,13 @@ class RoxieQueue : public CInterface, implements IThreadFactory if (qname && *qname) tname.appendf(" (%s)", qname); workers.setown(createThreadPool(tname.str(), this, false, nullptr, numWorkers)); + if (traceThreadStartDelay) + workers->setStartDelayTracing(60); + if (qname && *qname) + { + if (streq(qname, "BG")) + workers->setNiceValue(adjustBGThreadNiceValue); + } started = 0; idle = 0; if (IBYTIbufferSize) @@ -1893,7 +1900,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface public: IMPLEMENT_IINTERFACE; - RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers/2 + 1, "BG"), numWorkers(_numWorkers) + RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers, "SLA"), hiQueue(_numWorkers, "HIGH"), loQueue(_numWorkers, "LOW"), bgQueue(_numWorkers, "BG"), numWorkers(_numWorkers) { } @@ -1902,7 +1909,7 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface loQueue.start(); hiQueue.start(); slaQueue.start(); - bgQueue.start(); // consider nice(+3) BG threads + bgQueue.start(); // NB BG thread priority can be adjusted } virtual void stop() diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index a4d611da13f..c17b2ecd5e7 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4611,6 +4611,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie ctx->queryOptions().dynPriority = QUERY_BG_PRIORITY_VALUE; unsigned dynAdjustMsec = (dynPriorityAdjustCycles * 1000ULL) / queryOneSecCycles(); UWARNLOG("WARNING: %d msec dynamic adjustment threshold reached, shifting query to BG queue", dynAdjustMsec); + p->queryHeader().activityId &= ~ROXIE_PRIORITY_MASK; p->queryHeader().activityId |= ROXIE_BG_PRIORITY; // TODO: what to do about still running activities' continuation/ack priorities ? } diff --git a/system/jlib/jdebug.cpp b/system/jlib/jdebug.cpp index 58cd0f36f00..f63d7162161 100644 --- a/system/jlib/jdebug.cpp +++ b/system/jlib/jdebug.cpp @@ -667,18 +667,34 @@ MODULE_EXIT() //=========================================================================== -void BlockedTimeTracker::noteWaiting() +//Calculate how much time has elapsed total - with the given number of threads still active +__uint64 BlockedTimeTracker::calcActiveTime(cycle_t tally, unsigned active) const { + if (active != 0) + { + cycle_t now = get_cycles_now(); + tally += active * now; + } + + return cycle_to_nanosec(tally); +} + +cycle_t BlockedTimeTracker::noteWaiting() +{ + cycle_t now = get_cycles_now(); CriticalBlock block(cs); - numWaiting++; - timeStampTally -= get_cycles_now(); + numStarted++; + timeStampTally -= now; + return now; } -void BlockedTimeTracker::noteComplete() +cycle_t BlockedTimeTracker::noteComplete() { + cycle_t now = get_cycles_now(); CriticalBlock block(cs); - numWaiting--; - timeStampTally += get_cycles_now(); + numFinished++; + timeStampTally += now; + return now; } __uint64 BlockedTimeTracker::getWaitingNs() const @@ -687,17 +703,30 @@ __uint64 BlockedTimeTracker::getWaitingNs() const cycle_t tally; { CriticalBlock block(cs); - active = numWaiting; + active = numStarted - numFinished; tally = timeStampTally; } - if (active != 0) + return calcActiveTime(tally, active); +} + + +void BlockedTimeTracker::extractOverlapInfo(OverlapTimeInfo & info, bool isStart) const +{ + unsigned started; + unsigned finished; + cycle_t tally; + { - cycle_t now = get_cycles_now(); - tally += active * now; + CriticalBlock block(cs); + started = numStarted; + finished = numFinished; + tally = timeStampTally; } - return cycle_to_nanosec(tally); + //Record so that when counts are subtracted, the total count will include all jobs that overlapped in any part + info.count = isStart ? finished : started; + info.elapsedNs = calcActiveTime(tally, started - finished); } diff --git a/system/jlib/jdebug.hpp b/system/jlib/jdebug.hpp index e0983b892c5..2d2065e76ac 100644 --- a/system/jlib/jdebug.hpp +++ b/system/jlib/jdebug.hpp @@ -307,21 +307,56 @@ Also since you are only ever interested in (sumEndTimestamps - sumStartTimestamp There are two versions, one that uses a critical section, and a second that uses atomics, but is limited to the number of active blocked items. + +There is a second potential use for the BlockedTimeTracker class - for monitoring how many queries overlap the execution of the current query. +This can be calculated in the following way: + +a) When a server query starts it calls + serverLoadTracker.extractInfo(startServer, true) + workerLoadTracker.extractInfo(startWorker, true); + startCycles = serverLoadTracker.noteWaiting(); +b) When a server query finished it calls + serverLoadTracker.noteComplete(); + serverLoadTracker.extractInfo(finishServer, true) + endCycles = workerLoadTracker.extractInfo(finishWorker, true); +c) When a worker query starts it calls + workerLoadTracker.noteWaiting(); +d) When a worker query finished it calls + workerLoadTracker.noteComplete(); + +* (finishX.count - startX.count) gives the total number of queries (including this one) that executed at the same time as the query +* (finishX.elapsedNs - startX.elapsedNs) gives the total execution time of all queries (including this one) +* endCycles - startCycles gives the elapsed time for this query. +* This can be reported separately for the server and worker to give an estimate of the concurrent load when a query was running. */ +struct OverlapTimeInfo +{ + unsigned count; + stat_type elapsedNs; +}; + class jlib_decl BlockedTimeTracker { public: BlockedTimeTracker() = default; BlockedTimeTracker(const BlockedTimeTracker &) = delete; - void noteWaiting(); - void noteComplete(); + //The following return get_cycles_now() - which can be used for tracking elapsed time. + cycle_t noteWaiting(); + cycle_t noteComplete(); __uint64 getWaitingNs() const; + //A helper function to help calculate the overlapping load on the system + void extractOverlapInfo(OverlapTimeInfo & info, bool isStart) const; + +protected: + __uint64 calcActiveTime(cycle_t tally, unsigned active) const; + private: mutable CriticalSection cs; - unsigned numWaiting = 0; + unsigned numStarted = 0; + unsigned numFinished = 0; cycle_t timeStampTally = 0; }; diff --git a/system/jlib/jlog.hpp b/system/jlib/jlog.hpp index 92d05260394..d8f2b92fca4 100644 --- a/system/jlib/jlog.hpp +++ b/system/jlib/jlog.hpp @@ -608,7 +608,7 @@ class jlib_decl LogMsgTraceInfo const char * queryTraceID() const { - if (span) + if (span && span->isValid()) { const char * traceId = span->queryTraceId(); if (traceId) @@ -619,7 +619,7 @@ class jlib_decl LogMsgTraceInfo const char * querySpanID() const { - if (span) + if (span && span->isValid()) { const char * spanId = span->querySpanId(); if (spanId) diff --git a/system/jlib/jthread.cpp b/system/jlib/jthread.cpp index 8545d2bd641..769f04e9595 100644 --- a/system/jlib/jthread.cpp +++ b/system/jlib/jthread.cpp @@ -995,6 +995,7 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter unsigned stacksize; unsigned timeoutOnRelease; unsigned traceStartDelayPeriod = 0; + int niceValue = 0; unsigned startsInPeriod = 0; cycle_t startDelayInPeriod = 0; CCycleTimer overAllTimer; @@ -1114,6 +1115,8 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter CPooledThreadWrapper &ret = *new CPooledThreadWrapper(*this,newid,factory->createNew()); if (stacksize) ret.setStackSize(stacksize); + if (niceValue) + ret.setNice(niceValue); ret.start(false); threadwrappers.append(ret); return ret; @@ -1281,6 +1284,10 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter { traceStartDelayPeriod = secs; } + void setNiceValue(int value) + { + niceValue = value; + } bool waitAvailable(unsigned timeout) { if (!defaultmax) diff --git a/system/jlib/jthread.hpp b/system/jlib/jthread.hpp index 5d312aa9d2f..b48dff635a1 100644 --- a/system/jlib/jthread.hpp +++ b/system/jlib/jthread.hpp @@ -289,6 +289,7 @@ interface IThreadPool : extends IInterface virtual unsigned runningCount()=0; // number of currently running threads virtual PooledThreadHandle startNoBlock(void *param)=0; // starts a new thread if it can do so without blocking, else throws exception virtual void setStartDelayTracing(unsigned secs) = 0; // set start delay tracing period + virtual void setNiceValue(int value) = 0; // set priority for thread virtual bool waitAvailable(unsigned timeout) = 0; // wait until a pool member is available }; diff --git a/system/jlib/jtrace.cpp b/system/jlib/jtrace.cpp index 9984078879d..e91d63d0d9e 100644 --- a/system/jlib/jtrace.cpp +++ b/system/jlib/jtrace.cpp @@ -770,6 +770,15 @@ class CSpan : public CInterfaceOf return span ? span->IsRecording() : false; } + virtual bool isValid() const override + { + if (span == nullptr) + return false; + + auto spanCtx = span->GetContext(); + return spanCtx.IsValid(); + } + virtual void setSpanStatusSuccess(bool spanSucceeded, const char * statusMessage) { if (span != nullptr) @@ -923,6 +932,7 @@ class CNullSpan final : public CInterfaceOf virtual void toString(StringBuffer & out) const override {} virtual void getLogPrefix(StringBuffer & out) const override {} virtual bool isRecording() const { return false; } + virtual bool isValid() const { return false; } virtual void recordException(IException * e, bool spanFailed, bool escapedScope) override {} virtual void recordError(const SpanError & error) override {}; diff --git a/system/jlib/jtrace.hpp b/system/jlib/jtrace.hpp index 6fa7bd3f71b..9a0db0d7b8d 100644 --- a/system/jlib/jtrace.hpp +++ b/system/jlib/jtrace.hpp @@ -133,6 +133,7 @@ struct SpanTimeStamp interface ISpan : extends IInterface { + virtual bool isValid() const = 0; virtual void setSpanAttribute(const char * key, const char * val) = 0; virtual void setSpanAttribute(const char *name, __uint64 value) = 0; virtual void setSpanAttributes(const IProperties * attributes) = 0; diff --git a/system/jlib/jutil.cpp b/system/jlib/jutil.cpp index d22fda4888d..daf6270d0e9 100644 --- a/system/jlib/jutil.cpp +++ b/system/jlib/jutil.cpp @@ -2062,130 +2062,6 @@ bool deduceMask(const char *fn, bool expandN, StringAttr &mask, unsigned &pret, } //============================================================== -#ifdef _WIN32 - - -class CWindowsAuthenticatedUser: implements IAuthenticatedUser, public CInterface -{ - StringAttr name; - HANDLE usertoken; -public: - IMPLEMENT_IINTERFACE; - CWindowsAuthenticatedUser() - { - usertoken = (HANDLE)-1; - } - ~CWindowsAuthenticatedUser() - { - if (usertoken != (HANDLE)-1) - CloseHandle(usertoken); - } - bool login(const char *user, const char *passwd) - { - name.clear(); - if (usertoken != (HANDLE)-1) - CloseHandle(usertoken); - StringBuffer domain(""); - const char *ut = strchr(user,'\\'); - if (ut) { - domain.clear().append((size32_t)(ut-user),user); - user = ut+1; - } - BOOL res = LogonUser((LPTSTR)user,(LPTSTR)(domain.length()==0?NULL:domain.str()),(LPTSTR)passwd,LOGON32_LOGON_NETWORK,LOGON32_PROVIDER_DEFAULT,&usertoken); - if (res==0) - return false; - name.set(user); - return true; - } - void impersonate() - { - if (!ImpersonateLoggedOnUser(usertoken)) - throw makeOsException(GetLastError()); - } - - void revert() - { - RevertToSelf(); - } - - const char *username() - { - return name.get(); - } -}; - -IAuthenticatedUser *createAuthenticatedUser() { return new CWindowsAuthenticatedUser; } - -#elif defined(__linux__) - -class CLinuxAuthenticatedUser: implements IAuthenticatedUser, public CInterface -{ - StringAttr name; - uid_t uid; - gid_t gid; - uid_t saveuid; - gid_t savegid; - -public: - IMPLEMENT_IINTERFACE; - bool login(const char *user, const char *passwd) - { - name.clear(); - const char *ut = strchr(user,'\\'); - if (ut) - user = ut+1; // remove windows domain - struct passwd *pw; - char *epasswd; - if ((pw = getpwnam(user)) == NULL) - return false; - struct spwd *spwd = getspnam(user); - if (spwd) - epasswd = spwd->sp_pwdp; - else - epasswd = pw->pw_passwd; - if (!epasswd||!*epasswd) - return false; - if (strcmp(crypt(passwd,epasswd),epasswd)!=0) - return false; - uid = pw->pw_uid; - gid = pw->pw_gid; - name.set(pw->pw_name); - return true; - } - void impersonate() - { - saveuid = geteuid(); - savegid = getegid(); - if (setegid(gid) == -1) - throw makeOsException(errno, "Failed to set effective group id"); - if (seteuid(uid) == -1) - throw makeOsException(errno, "Failed to set effective user id"); - } - - void revert() - { - if (seteuid(saveuid) == -1) - throw makeOsException(errno, "Failed to restore effective group id"); - if (setegid(savegid) == -1) - throw makeOsException(errno, "Failed to restore effective user id"); - } - - const char *username() - { - return name.get(); - } - -}; - - - -IAuthenticatedUser *createAuthenticatedUser() { return new CLinuxAuthenticatedUser; } -#elif defined(__FreeBSD__) || defined (__APPLE__) - -IAuthenticatedUser *createAuthenticatedUser() { UNIMPLEMENTED; } - -#endif - extern jlib_decl void serializeAtom(MemoryBuffer & target, IAtom * name) { diff --git a/system/jlib/jutil.hpp b/system/jlib/jutil.hpp index 4841d61761b..af405735b95 100644 --- a/system/jlib/jutil.hpp +++ b/system/jlib/jutil.hpp @@ -373,17 +373,7 @@ class jlib_decl NamedCount extern jlib_decl StringBuffer &dumpNamedCounts(StringBuffer &str); - -interface IAuthenticatedUser: extends IInterface -{ - virtual bool login(const char *user, const char *passwd) = 0; - virtual void impersonate()=0; - virtual void revert()=0; - virtual const char *username()=0; -}; - interface IAtom; -extern jlib_decl IAuthenticatedUser *createAuthenticatedUser(); extern jlib_decl void serializeAtom(MemoryBuffer & target, IAtom * name); extern jlib_decl IAtom * deserializeAtom(MemoryBuffer & source); diff --git a/system/masking/include/tracer.hpp b/system/masking/include/tracer.hpp index 70cc9534c4a..3aef92ce189 100644 --- a/system/masking/include/tracer.hpp +++ b/system/masking/include/tracer.hpp @@ -135,7 +135,8 @@ class CModularTracer : public CBaseTracer public: CModularTracer() { - setSink(DefaultHelperName, new CStandardTraceMsgSink()); + Owned sink(new CStandardTraceMsgSink()); + setSink(DefaultHelperName, sink); } inline bool setSink(IModularTraceMsgSink* sink) { diff --git a/testing/regress/ecl/despray.ecl b/testing/regress/ecl/despray.ecl index 0a71c847cb5..c59a77f6bfd 100644 --- a/testing/regress/ecl/despray.ecl +++ b/testing/regress/ecl/despray.ecl @@ -30,7 +30,7 @@ import $.setup; import Std.File AS FileServices; -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); prefix := setup.Files(false, false).QueryFilePrefix; diff --git a/testing/regress/ecl/spray_dir_test.ecl b/testing/regress/ecl/spray_dir_test.ecl index fce195b88fa..4b84fd72245 100644 --- a/testing/regress/ecl/spray_dir_test.ecl +++ b/testing/regress/ecl/spray_dir_test.ecl @@ -39,7 +39,7 @@ import ^ as root; engine := thorlib.platform(); prefix := '~regress::' + engine + '::' + WORKUNIT + '::'; -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); unsigned VERBOSE := 0; diff --git a/testing/regress/ecl/spray_expire_test.ecl b/testing/regress/ecl/spray_expire_test.ecl index f2acdae9eec..824075f26b8 100644 --- a/testing/regress/ecl/spray_expire_test.ecl +++ b/testing/regress/ecl/spray_expire_test.ecl @@ -26,7 +26,7 @@ import Std.File AS FileServices; import $.setup; prefix := setup.Files(false, false).QueryFilePrefix; -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); unsigned VERBOSE := 0; diff --git a/testing/regress/ecl/spray_header_test.ecl b/testing/regress/ecl/spray_header_test.ecl index 3079897ea66..a56121fcd84 100644 --- a/testing/regress/ecl/spray_header_test.ecl +++ b/testing/regress/ecl/spray_header_test.ecl @@ -28,7 +28,7 @@ import ^ as root; isTerminated := #IFDEFINED(root.isTerminated, false); -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); prefix := setup.Files(false, false).QueryFilePrefix; diff --git a/testing/regress/ecl/spray_replicate_test.ecl b/testing/regress/ecl/spray_replicate_test.ecl index 8d117cb8b43..8853cfe852b 100644 --- a/testing/regress/ecl/spray_replicate_test.ecl +++ b/testing/regress/ecl/spray_replicate_test.ecl @@ -22,7 +22,7 @@ import std.system.thorlib; import Std.File AS FileServices; -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); espIpPort := 'http://127.0.0.1:8010/FileSpray' : STORED('espIpPort'); engine := thorlib.platform(); diff --git a/testing/regress/ecl/spray_test.ecl b/testing/regress/ecl/spray_test.ecl index a420ee1f004..23f53f4fc48 100644 --- a/testing/regress/ecl/spray_test.ecl +++ b/testing/regress/ecl/spray_test.ecl @@ -34,7 +34,7 @@ prefix := setup.Files(false, false).QueryFilePrefix; boolean sprayFixed := #IFDEFINED(root.sprayFixed, true); boolean sprayEmpty := #IFDEFINED(root.sprayEmpty, false); -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); unsigned VERBOSE := 0; diff --git a/testing/regress/ecl/spray_test_json.ecl b/testing/regress/ecl/spray_test_json.ecl index 0a5e0645e36..997ad27cd96 100644 --- a/testing/regress/ecl/spray_test_json.ecl +++ b/testing/regress/ecl/spray_test_json.ecl @@ -35,7 +35,7 @@ isSmallFile := #IFDEFINED(root.isSmallFile, true); isUnBallanced := #IFDEFINED(root.isUnBallanced, false); -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); engine := thorlib.platform() : stored('thor'); prefix := setup.Files(false, false).FilePrefix + '-' + WORKUNIT; diff --git a/testing/regress/ecl/spray_test_xml.ecl b/testing/regress/ecl/spray_test_xml.ecl index d0a45f1e03b..bbbee4fb3bf 100644 --- a/testing/regress/ecl/spray_test_xml.ecl +++ b/testing/regress/ecl/spray_test_xml.ecl @@ -36,7 +36,7 @@ import ^ as root; prefix := setup.Files(false, false).QueryFilePrefix; -dropzonePathTemp := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath'); +dropzonePathTemp := FileServices.GetDefaultDropZone() : STORED('dropzonePath'); dropzonePath := dropzonePathTemp + IF(dropzonePathTemp[LENGTH(dropzonePathTemp)]='/', '', '/'); espUrl := FileServices.GetEspURL() + '/FileSpray'; diff --git a/testing/unittests/jlibtests.cpp b/testing/unittests/jlibtests.cpp index f2372de370c..b820db549b9 100644 --- a/testing/unittests/jlibtests.cpp +++ b/testing/unittests/jlibtests.cpp @@ -4170,6 +4170,7 @@ class BlockedTimingTests : public CppUnit::TestFixture CPPUNIT_TEST(testStandard3); CPPUNIT_TEST(testLightweight); CPPUNIT_TEST(testLightweight2); + CPPUNIT_TEST(testConcurrent); CPPUNIT_TEST_SUITE_END(); void testStandard() @@ -4286,6 +4287,53 @@ class BlockedTimingTests : public CppUnit::TestFixture if (trace) DBGLOG("%" I64F "u %" I64F "u", blockTime-expected, postBlockTime-blockTime); } + + inline bool within10Percent(__uint64 expected, __uint64 actual) + { + __uint64 threshold = expected / 10; + return (actual >= expected - threshold) && (actual <= expected + threshold); + } + + void testConcurrent() + { + BlockedTimeTracker serverLoadTracker; + BlockedTimeTracker workerLoadTracker; + OverlapTimeInfo serverStartInfo; + OverlapTimeInfo workerStartInfo; + OverlapTimeInfo serverFinishInfo; + OverlapTimeInfo workerFinishInfo; + OverlapTimeInfo dummyInfo; + + serverLoadTracker.extractOverlapInfo(serverStartInfo, true); + workerLoadTracker.extractOverlapInfo(workerStartInfo, true); + cycle_t startCycles = serverLoadTracker.noteWaiting(); + + MilliSleep(100); + serverLoadTracker.noteWaiting(); // 2nd query starts Q1 Q2 + workerLoadTracker.noteWaiting(); // worker thread starts Q1 Q2 W1 + MilliSleep(100); + workerLoadTracker.noteComplete(); // worker thread finishes Q1 Q2 + serverLoadTracker.noteWaiting(); // 3rd query starts Q1 Q2 Q3 + MilliSleep(100); + serverLoadTracker.noteComplete(); // 2nd query finishes Q1 Q3 + workerLoadTracker.noteWaiting(); // worker thread starts Q1 Q3 W2 + MilliSleep(100); + workerLoadTracker.noteComplete(); // worker thread finishes Q1 Q3 + MilliSleep(100); + + cycle_t endCycles = serverLoadTracker.noteComplete(); // Q3 + workerLoadTracker.extractOverlapInfo(workerFinishInfo, false); + serverLoadTracker.extractOverlapInfo(serverFinishInfo, false); + + CPPUNIT_ASSERT_EQUAL(2U, workerFinishInfo.count - workerStartInfo.count); + CPPUNIT_ASSERT_EQUAL(true, within10Percent(200'000'000, workerFinishInfo.elapsedNs - workerStartInfo.elapsedNs)); + + CPPUNIT_ASSERT_EQUAL(3U, serverFinishInfo.count - serverStartInfo.count); + + // This query for 500, second query for 200, 3rd query for 300 + CPPUNIT_ASSERT_EQUAL(true, within10Percent(1000'000'000, serverFinishInfo.elapsedNs - serverStartInfo.elapsedNs)); + CPPUNIT_ASSERT_EQUAL(true, within10Percent(500'000'000, cycle_to_nanosec(endCycles - startCycles))); + } }; CPPUNIT_TEST_SUITE_REGISTRATION( BlockedTimingTests ); diff --git a/testing/unittests/wutests.cpp b/testing/unittests/wutests.cpp index 7b509ef85fc..bdefd82641b 100644 --- a/testing/unittests/wutests.cpp +++ b/testing/unittests/wutests.cpp @@ -23,6 +23,7 @@ class wuTests : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE( wuTests ); CPPUNIT_TEST(testLooksLikeAWuid); + CPPUNIT_TEST(testWuidPattern); CPPUNIT_TEST_SUITE_END(); public: @@ -74,6 +75,11 @@ class wuTests : public CppUnit::TestFixture CPPUNIT_ASSERT_MESSAGE("looksLikeAWuid should fail", !looksLikeAWuid("W12345678-123456-", 'W')); CPPUNIT_ASSERT_MESSAGE("looksLikeAWuid should fail", !looksLikeAWuid("*", 'W')); } + + void testWuidPattern() + { + CPPUNIT_ASSERT_MESSAGE("wuidPattern should pass", looksLikeAWuid(WuidPattern(" \t\rw12345678-123456 \t\r"), 'W')); + } }; CPPUNIT_TEST_SUITE_REGISTRATION( wuTests ); diff --git a/thorlcr/activities/diskread/thdiskreadslave.cpp b/thorlcr/activities/diskread/thdiskreadslave.cpp index 6c28ad1db05..abbabc586aa 100644 --- a/thorlcr/activities/diskread/thdiskreadslave.cpp +++ b/thorlcr/activities/diskread/thdiskreadslave.cpp @@ -1084,7 +1084,14 @@ class CDiskGroupAggregateSlave merging = false; appendOutputLinked(this); } - +// CSlaveActivity overloaded methods + virtual unsigned __int64 queryLookAheadCycles() const override + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorGroupAggregateCallback virtual void processRow(const void *next) { diff --git a/thorlcr/activities/fetch/thfetchslave.cpp b/thorlcr/activities/fetch/thfetchslave.cpp index 96a6d75bfb6..487ed89619e 100644 --- a/thorlcr/activities/fetch/thfetchslave.cpp +++ b/thorlcr/activities/fetch/thfetchslave.cpp @@ -188,7 +188,12 @@ class CFetchStream : public IRowStream, implements IStopInput, implements IFetch if (distributor) distributor->abort(); } - + virtual unsigned __int64 queryLookAheadCycles() const override + { + if (distributor) + return distributor->queryLookAheadCycles(); + return 0; + } // IStopInput virtual void stopInput() { @@ -404,6 +409,15 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler { } + virtual unsigned __int64 queryLookAheadCycles() const override + { + CriticalBlock b(fetchStreamCS); + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (fetchStream) + lookAheadCycles += fetchStream->queryLookAheadCycles(); + return lookAheadCycles; + } + // IThorDataLink impl. virtual void start() override { @@ -515,6 +529,8 @@ class CFetchSlaveBase : public CSlaveActivity, implements IFetchHandler OwnedRoxieString fileName = fetchBaseHelper->getFileName(); { CriticalBlock b(fetchStreamCS); + if (fetchStream) + slaveTimerStats.lookAheadCycles += fetchStream->queryLookAheadCycles(); fetchStream.setown(createFetchStream(*this, keyInIf, rowIf, abortSoon, fileName, parts, offsetCount, offsetMapSz, offsetMapBytes.toByteArray(), this, mptag, eexp)); } fetchStreamOut = fetchStream->queryOutput(); diff --git a/thorlcr/activities/fetch/thfetchslave.ipp b/thorlcr/activities/fetch/thfetchslave.ipp index dd391e6f49b..0cc3a17e9ed 100644 --- a/thorlcr/activities/fetch/thfetchslave.ipp +++ b/thorlcr/activities/fetch/thfetchslave.ipp @@ -39,6 +39,7 @@ interface IFetchStream : extends IInterface virtual void abort() = 0; virtual void getStats(CRuntimeStatisticCollection & stats) const = 0; virtual void getFileStats(std::vector> & fileStats, unsigned fileTableStart) const = 0; + virtual unsigned __int64 queryLookAheadCycles() const = 0; }; IFetchStream *createFetchStream(CSlaveActivity &owner, IThorRowInterfaces *keyRowIf, IThorRowInterfaces *fetchRowIf, bool &abortSoon, const char *logicalFilename, CPartDescriptorArray &parts, unsigned offsetCount, size32_t offsetMapSz, const void *offsetMap, IFetchHandler *iFetchHandler, mptag_t tag, IExpander *eexp=NULL); diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp index b2e5f034417..4fce9175a60 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.cpp @@ -505,6 +505,7 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl RelaxedAtomic numLocalRows {0}; RelaxedAtomic numRemoteRows {0}; RelaxedAtomic sizeRemoteWrite {0}; + RelaxedAtomic lookAheadCycles {0}; void init() { @@ -859,10 +860,19 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl } if (aborted) break; - const void *row = input->ungroupedNextRow(); + const void *row; + if (owner.activity->queryTimeActivities()) + { + CCycleTimer rowTimer; + row = input->ungroupedNextRow(); + lookAheadCycles.fastAdd(rowTimer.elapsedCycles()); + } + else + { + row = input->ungroupedNextRow(); + } if (!row) break; - CTarget *target = nullptr; if (owner.isAll) target = targets.item(0); @@ -947,6 +957,10 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl stats.setStatistic(StNumRemoteRows, numRemoteRows.load()); stats.setStatistic(StSizeRemoteWrite, sizeRemoteWrite.load()); } + virtual unsigned __int64 queryLookAheadCycles() const + { + return lookAheadCycles.load(); + } // IThreadFactory impl. virtual IPooledThread *createNew() { @@ -1257,6 +1271,17 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl ihash = NULL; iCompare = NULL; } + virtual void mergeStats(CRuntimeStatisticCollection &stats) const + { + sender.mergeStats(stats); + CriticalBlock block(critPiperd); + if (piperd) + mergeRemappedStats(stats, piperd, diskToTempStatsMap); + } + virtual unsigned __int64 queryLookAheadCycles() const + { + return sender.queryLookAheadCycles(); + } virtual void abort() { if (!aborted) @@ -1451,13 +1476,6 @@ class CDistributorBase : implements IHashDistributor, implements IExceptionHandl virtual void stopRecv() = 0; virtual bool sendBlock(unsigned i,CMessageBuffer &mb) = 0; - virtual void mergeStats(CRuntimeStatisticCollection &stats) const - { - sender.mergeStats(stats); - CriticalBlock block(critPiperd); - if (piperd) - mergeRemappedStats(stats, piperd, diskToTempStatsMap); - } // IExceptionHandler impl. virtual bool fireException(IException *e) { @@ -4103,6 +4121,15 @@ class HashJoinSlaveActivity : public CSlaveActivity, implements IStopInput activeStats.setStatistic(StNumRightRows, joinhelper->getRhsProgress()); } } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (lhsDistributor) + lookAheadCycles += lhsDistributor->queryLookAheadCycles(); + if (rhsDistributor) + lookAheadCycles += rhsDistributor->queryLookAheadCycles(); + return lookAheadCycles; + } }; #ifdef _MSC_VER #pragma warning(pop) @@ -4584,6 +4611,13 @@ class CHashAggregateSlave : public CSlaveActivity, implements IHThorRowAggregato info.canStall = true; // maybe more? } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorRowAggregator impl virtual size32_t clearAggregate(ARowBuilder & rowBuilder) override { return helper->clearAggregate(rowBuilder); } virtual size32_t processFirst(ARowBuilder & rowBuilder, const void * src) override { return helper->processFirst(rowBuilder, src); } diff --git a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp index 862ec32bdd9..fed9d564f5b 100644 --- a/thorlcr/activities/hashdistrib/thhashdistribslave.ipp +++ b/thorlcr/activities/hashdistrib/thhashdistribslave.ipp @@ -31,6 +31,7 @@ interface IHashDistributor : extends IInterface virtual void join()=0; virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0; virtual void mergeStats(CRuntimeStatisticCollection &stats) const = 0; + virtual unsigned __int64 queryLookAheadCycles() const = 0; virtual void abort()=0; }; diff --git a/thorlcr/activities/indexread/thindexreadslave.cpp b/thorlcr/activities/indexread/thindexreadslave.cpp index c73d933337f..e24cb47d302 100644 --- a/thorlcr/activities/indexread/thindexreadslave.cpp +++ b/thorlcr/activities/indexread/thindexreadslave.cpp @@ -1159,6 +1159,13 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements merging = false; appendOutputLinked(this); } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (distributor) + lookAheadCycles += distributor->queryLookAheadCycles(); + return lookAheadCycles; + } // IHThorGroupAggregateCallback virtual void processRow(const void *next) { diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 2e9f426a97a..febde309d58 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -2837,6 +2837,15 @@ class CLookupJoinActivityBase : public CInMemJoinBaseisGrouped() == grouped)); // std. lookup join expects these to match } + virtual unsigned __int64 queryLookAheadCycles() const + { + cycle_t lookAheadCycles = PARENT::queryLookAheadCycles(); + if (rhsDistributor) + lookAheadCycles += rhsDistributor->queryLookAheadCycles(); + if (lhsDistributor) + lookAheadCycles += lhsDistributor->queryLookAheadCycles(); + return lookAheadCycles; + } virtual void reset() override { PARENT::reset(); diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index ff11c691ffc..eb2b065fb05 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -554,7 +554,7 @@ MemoryBuffer &CSlaveActivity::getInitializationData(unsigned slave, MemoryBuffer return mb.append(queryInitializationData(slave)); } -unsigned __int64 CSlaveActivity::queryLocalCycles() const +unsigned __int64 CSlaveActivity::queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const { unsigned __int64 inputCycles = 0; if (1 == inputs.ordinality()) @@ -587,11 +587,10 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const break; } } - unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles(); + unsigned __int64 processCycles = totalCycles + lookAheadCycles; if (processCycles < inputCycles) // not sure how/if possible, but guard against return 0; processCycles -= inputCycles; - const unsigned __int64 blockedCycles = queryBlockedCycles(); if (processCycles < blockedCycles) { ActPrintLog("CSlaveActivity::queryLocalCycles - process %" I64F "uns < blocked %" I64F "uns", cycle_to_nanosec(processCycles), cycle_to_nanosec(blockedCycles)); @@ -600,6 +599,11 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const return processCycles-blockedCycles; } +unsigned __int64 CSlaveActivity::queryLocalCycles() const +{ + return queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), queryLookAheadCycles()); +} + void CSlaveActivity::serializeStats(MemoryBuffer &mb) { CriticalBlock b(crit); // JCSMORE not sure what this is protecting.. @@ -619,7 +623,15 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb) queryCodeContext()->gatherStats(serializedStats); // JCS->GH - should these be serialized as cycles, and a different mapping used on master? - serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles())); + // + // Note: Look ahead cycles are not being kept up to date in slaverStats as multiple objects and threads are updating + // look ahead cycles. At the moment, each thread and objects that generate look ahead cycles, track its own look ahead + // cycles and the up to date lookahead cycles is only available with a call to queryLookAheadCycles(). The code would + // need to be refactored to change this behaviour. + unsigned __int64 lookAheadCycles = queryLookAheadCycles(); + unsigned __int64 localCycles = queryLocalCycles(queryTotalCycles(), queryBlockedCycles(), lookAheadCycles); + serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(lookAheadCycles)); + serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(localCycles)); slaveTimerStats.addStatistics(serializedStats); serializedStats.serialize(mb); ForEachItemIn(i, outputs) diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index aca3bf3a385..5476e5f999a 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -269,6 +269,7 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres bool canStall() const; bool isFastThrough() const; bool suppressLookAhead() const; + unsigned __int64 queryLocalCycles(unsigned __int64 totalCycles, unsigned __int64 blockedCycles, unsigned __int64 lookAheadCycles) const; // IThorDataLink virtual CSlaveActivity *queryFromActivity() override { return this; } diff --git a/tools/roxie/extract-roxie-timings.py b/tools/roxie/extract-roxie-timings.py index 236386305a1..eaf6fa55644 100755 --- a/tools/roxie/extract-roxie-timings.py +++ b/tools/roxie/extract-roxie-timings.py @@ -50,8 +50,11 @@ def calculateDerivedStats(curRow): timeLocalExecute = float(curRow.get("TimeLocalExecute", 0.0)) timeAgentWait = float(curRow.get("TimeAgentWait", 0.0)) + timeAgentQueue = float(curRow.get("TimeAgentQueue", 0.0)) timeAgentProcess = float(curRow.get("TimeAgentProcess", 0.0)) timeSoapcall = float(curRow.get("TimeSoapcall", 0.0)) + sizeAgentReply = float(curRow.get("SizeAgentReply", 0.0)) + sizeAgentRequests = float(curRow.get("SizeAgentRequests", 0.0)) agentRequestEstimate = numLeafHits + numLeafAdds # This could be completely wrong, but better than nothing numAgentRequests = float(curRow.get("NumAgentRequests", agentRequestEstimate)) # 9.8.x only @@ -76,6 +79,10 @@ def calculateDerivedStats(curRow): if numBranchFetches: curRow["AvgTimeBranchFetch"] = timeBranchFetches/(numBranchFetches) + if (numLeafAdds + numLeafHits): + numIndexRowsRead = float(curRow.get("NumIndexRowsRead", 0.0)) + curRow["Rows/Leaf"] = numIndexRowsRead / (numLeafAdds + numLeafHits) + if numLeafFetches: curRow["AvgTimeLeafFetch"] = timeLeafFetches/(numLeafFetches) @@ -86,9 +93,74 @@ def calculateDerivedStats(curRow): if numAgentRequests: curRow["avgTimeAgentProcess"] = timeAgentProcess / numAgentRequests -def calculateSummaryStats(curRow, numCpus, numRows): + # Generate a summary analyis for the transaction(s) + normalSummary = '' + unusualSummary = '' + notes = '' + if numBranchFetches or numLeafFetches: + avgDiskReadDelay = (timeLeafFetches+timeBranchFetches)/(numLeafFetches+numBranchFetches) + if avgDiskReadDelay < 150: + normalSummary += ",Disk Fetch" + elif avgDiskReadDelay < 2000: + unusualSummary += ",Disk Fetch slow for NVME" + else: + unusualSummary += ",Disk Fetch slow for remote storage" + + if numBranchAdds < (numBranchAdds + numBranchHits) // 100: + normalSummary += ",Branch hits" + else: + normalSummary += ",Branch hits" + + if numLeafAdds < (numLeafAdds + numLeafHits) * 50 // 100: + normalSummary += ",Leaf hits" + else: + normalSummary += ",Leaf hits" + + if timeSoapcall > 0: + if timeSoapcall * 2 > timeLocalExecute: + notes += ", Most of time in soapcall" + + if timeAgentWait == 0: + notes += ", Only executes on the server" + else: + if timeAgentWait * 2 > timeLocalExecute: + if timeAgentWait * 10 > timeLocalExecute * 8: + notes += ", Agent bound" + else: + notes += ", Most of time in agent" + else: + if (timeAgentWait + timeSoapcall) < (timeLocalExecute / 5): + unusualSummary += ",Unexplained Server Time" + + if sizeAgentReply > 1000000: + unusualSummary += ",Size Agent reply" + + if timeLocalExecute * 10 < elapsed * 11: + notes += ", Single threaded" + + if timeAgentProcess: + if (timeAgentQueue > timeAgentProcess): + unusualSummary += ",Agent Queue backlog" + + if timeAgentWait > (timeAgentQueue + timeAgentProcess) * 2: + unusualSummary += ",Agent send backlog" + + if timeBranchDecompress + timeLeafDecompress > timeAgentProcess / 4: + unusualSummary += ",Decompress time" - curRow["summary"] = "summary" + numAckRetries = float(curRow.get("NumAckRetries", 0.0)) + resentPackets = float(curRow.get("resentPackets", 0.0)) + + if resentPackets: + unusualSummary += ",Resent packets" + elif numAckRetries: + unusualSummary += ",Ack retries" + + curRow["unusual"] = '"' + unusualSummary[1:] + '"' + curRow["normal"] = '"' + normalSummary[1:] + '"' + curRow["notes"] = '"' + notes[1:] + '"' + +def calculateSummaryStats(curRow, numCpus, numRows): timeLocalCpu = float(curRow.get("TimeLocalCpu", 0.0)) timeRemoteCpu = float(curRow.get("TimeRemoteCpu", 0.0)) @@ -144,10 +216,11 @@ def printRow(curRow): parser=argparse.ArgumentParser() parser.add_argument("filename") parser.add_argument("--all", "-a", help="Combine all services into a single result", action='store_true') + parser.add_argument("--commentary", help="Add commentary to the end of the output", action='store_true') + parser.add_argument("--cpu", "-c", type=int, default=8, help="Number of CPUs to use (default: 8)") + parser.add_argument("--ignorecase", "-i", help="Use case-insensitive query names", action='store_true') parser.add_argument("--nosummary", "-n", help="Avoid including a summary", action='store_true') parser.add_argument("--summaryonly", "-s", help="Only generate a summary", action='store_true') - parser.add_argument("--ignorecase", "-i", help="Use case-insensitive query names", action='store_true') - parser.add_argument("--cpu", "-c", type=int, default=8, help="Number of CPUs to use (default: 8)") args = parser.parse_args() combineServices = args.all suppressDetails = args.summaryonly @@ -247,11 +320,11 @@ def printRow(curRow): castValue = float(value[0:-2]) elif value.endswith("s"): castValue = float(value[0:-1])*1000 - elif value.endswith("MB"): + elif value.endswith("MB") or value.endswith("Mb"): castValue = float(value[0:-2])*1000000 - elif value.endswith("KB"): + elif value.endswith("KB") or value.endswith("Kb"): castValue = float(value[0:-2])*1000 - elif value.endswith("B"): + elif value.endswith("B") or value.endswith("b"): castValue = float(value[0:-1]) else: try: @@ -291,6 +364,7 @@ def printRow(curRow): allStats["AvgTimeBranchDecompress"] = 1 allStats["TimeLeafDecompress"] = 1 allStats["AvgTimeLeafDecompress"] = 1 + allStats["Rows/Leaf"] = 1 allStats["WorkerCpuLoad"] = 1 allStats["TimeLocalCpu"] = 1 allStats["TimeRemoteCpu"] = 1 @@ -301,6 +375,9 @@ def printRow(curRow): allStats["MaxWorkerThreads"] = 1 allStats["MaxFarmers"] = 1 allStats["CpuLoad@10q/s"] = 1 + allStats["unusual"] = 1 + allStats["normal"] = 1 + allStats["notes"] = 1 elapsed = 0 try: @@ -363,7 +440,7 @@ def printRow(curRow): numRows = len(allRows) avgRow = dict(_id_="avg", totalRow="avg") for statName in allStats: - if statName in totalRow: + if statName in totalRow and type(totalRow[statName]) != str: avgRow[statName] = float(totalRow[statName]) / numRows calculateDerivedStats(avgRow) @@ -457,7 +534,8 @@ def sortFunc(cur): "" "Note: All times in ms unless explicitly stated" ''' - print(commentary) + if (args.commentary): + print(commentary) # Thoughts for future enhancements: # diff --git a/vcpkg b/vcpkg index 70ea56ab7b2..376d53ed2f4 160000 --- a/vcpkg +++ b/vcpkg @@ -1 +1 @@ -Subproject commit 70ea56ab7b2ec07bfed8b6170cecdd85b2e36fe0 +Subproject commit 376d53ed2f40080b6c094501f1f06bb57bb153bc diff --git a/vcpkg-configuration.json b/vcpkg-configuration.json index 7ed29e6c79b..bff4e206ca6 100644 --- a/vcpkg-configuration.json +++ b/vcpkg-configuration.json @@ -2,7 +2,7 @@ "default-registry": { "kind": "git", "repository": "https://github.com/microsoft/vcpkg", - "baseline": "f7423ee180c4b7f40d43402c2feb3859161ef625" + "baseline": "b2cb0da531c2f1f740045bfe7c4dac59f0b2b69c" }, "registries": [], "overlay-ports": [ diff --git a/vcpkg.json.in b/vcpkg.json.in index df7d2ce54a9..c26c8d0a390 100644 --- a/vcpkg.json.in +++ b/vcpkg.json.in @@ -1,7 +1,7 @@ { "$schema": "https://raw.githubusercontent.com/microsoft/vcpkg/master/scripts/vcpkg.schema.json", "name": "hpcc-platform", - "version": "9.8.0", + "version": "9.10.0", "dependencies": [ { "name": "apr", @@ -166,7 +166,6 @@ "name": "openssl", "platform": "@VCPKG_OPENSSL@" }, - "pcre2", { "name": "opentelemetry-cpp", "default-features": false, @@ -175,6 +174,7 @@ "otlp-grpc" ] }, + "pcre2", { "name": "python3", "platform": "@VCPKG_PYTHON3@ & windows"