Skip to content

Commit

Permalink
Add deadline for rpcClient calls (#994)
Browse files Browse the repository at this point in the history
* Update dispatchQuery to use min_cores

Sorting jobs only by priority causes a situation where low priority jobs can get starved by a constant flow of high priority jobs.
The new formula adds a modifier to the sorting rank to take into account the number of cores the job is requesting and also the number of days the job is waiting on the queue.
Priorities numbers over 200 will mostly override the formula and work as a priority only based scheduling.
sort = priority + (100 * (1 - (job.cores/job.int_min_cores))) + (age in days)

Besides that, also take layer_int_cores_min into account when filtering folder_resourse limitations to avoid allocating more cores than the folder limits.

(cherry picked from commit 566411aeeddc60983a30eabe121fd03263d05525)

* Revert "Update dispatchQuery to use min_cores"

This reverts commit 2eb4936

* Add deadline for rpcClient calls

Calls are frequently getting locked, setting a timeout is strongly recommended
on the grpc documentation.

Change rqd cache arguments

Increasing the cache size to reduce cacheMiss (Currently at 50%) and
reduce expiration to avoid too many channels idle.
  • Loading branch information
DiegoTavares committed Jul 19, 2021
1 parent 97fd308 commit 87bcf7a
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cuebot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: '26.0-android'
compile group: 'com.sun.mail', name: 'mailapi', version: '1.5.4'
compile group: 'commons-lang', name: 'commons-lang', version: '2.6'
compile group: 'io.grpc', name: 'grpc-all', version: '1.14.0'
compile group: 'io.grpc', name: 'grpc-all', version: '1.36.2'
compile group: 'org.apache.activemq', name: 'activemq-pool', version: activemqVersion
compile group: 'org.apache.velocity', name: 'velocity', version: '1.7'
compile group: 'org.jdom', name: 'jdom', version: '1.1.3'
Expand Down
21 changes: 16 additions & 5 deletions cuebot/src/main/java/com/imageworks/spcue/rqd/RqdClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,27 @@ public final class RqdClientGrpc implements RqdClient {

private final int rqdCacheSize;
private final int rqdCacheExpiration;
private final int rqdCacheConcurrency;
private final int rqdServerPort;
private final int rqdTaskDeadlineSeconds;
private LoadingCache<String, ManagedChannel> channelCache;

private boolean testMode = false;


public RqdClientGrpc(int rqdServerPort, int rqdCacheSize, int rqdCacheExpiration) {
public RqdClientGrpc(int rqdServerPort, int rqdCacheSize, int rqdCacheExpiration,
int rqdCacheConcurrency, int rqdTaskDeadline) {
this.rqdServerPort = rqdServerPort;
this.rqdCacheSize = rqdCacheSize;
this.rqdCacheExpiration = rqdCacheExpiration;
this.rqdCacheConcurrency = rqdCacheConcurrency;
this.rqdTaskDeadlineSeconds = rqdTaskDeadline;
}

private void buildChannelCache() {
this.channelCache = CacheBuilder.newBuilder()
.maximumSize(rqdCacheSize)
.concurrencyLevel(rqdCacheConcurrency)
.expireAfterAccess(rqdCacheExpiration, TimeUnit.MINUTES)
.removalListener(new RemovalListener<String, ManagedChannel>() {
@Override
Expand All @@ -80,8 +86,9 @@ public void onRemoval(RemovalNotification<String, ManagedChannel> removal){
new CacheLoader<String, ManagedChannel>() {
@Override
public ManagedChannel load(String host) throws Exception {
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(
host, rqdServerPort).usePlaintext();
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
.forAddress(host, rqdServerPort)
.usePlaintext();
return channelBuilder.build();
}
});
Expand All @@ -92,15 +99,19 @@ private RqdInterfaceGrpc.RqdInterfaceBlockingStub getStub(String host) throws Ex
buildChannelCache();
}
ManagedChannel channel = channelCache.get(host);
return RqdInterfaceGrpc.newBlockingStub(channel);
return RqdInterfaceGrpc
.newBlockingStub(channel)
.withDeadlineAfter(rqdTaskDeadlineSeconds, TimeUnit.SECONDS);
}

private RunningFrameGrpc.RunningFrameBlockingStub getRunningFrameStub(String host) throws ExecutionException {
if (channelCache == null) {
buildChannelCache();
}
ManagedChannel channel = channelCache.get(host);
return RunningFrameGrpc.newBlockingStub(channel);
return RunningFrameGrpc
.newBlockingStub(channel)
.withDeadlineAfter(rqdTaskDeadlineSeconds, TimeUnit.SECONDS);
}

public void setHostLock(HostInterface host, LockState lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<constructor-arg index="2" type="int">
<value>${grpc.rqd_cache_expiration}</value>
</constructor-arg>
<constructor-arg index="3" type="int">
<value>${grpc.rqd_cache_concurrency}</value>
</constructor-arg>
<constructor-arg index="4" type="int">
<value>${grpc.rqd_task_deadline}</value>
</constructor-arg>
</bean>

<bean id="launchQueue" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
Expand Down
8 changes: 6 additions & 2 deletions cuebot/src/main/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ grpc.cue_port=${CUEBOT_GRPC_CUE_PORT:8443}
grpc.rqd_server_port=${CUEBOT_GRPC_RQD_SERVER_PORT:8444}
grpc.max_message_bytes=104857600
# Number of entries allowed in the RQD channel cache
grpc.rqd_cache_size=500
grpc.rqd_cache_size=2000
# RQD Channel Cache Expiration in Minutes
grpc.rqd_cache_expiration=30
grpc.rqd_cache_expiration=5
# RQD Channel Cache expected concurrency
grpc.rqd_cache_concurrency=20
# RQD Channel task deadline in seconds
grpc.rqd_task_deadline=10

# Whether or not to enable publishing to a messaging topic.
# Set to a boolean value. See com/imageworks/spcue/services/JmsMover.java.
Expand Down
4 changes: 4 additions & 0 deletions cuebot/src/test/resources/opencue.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ grpc.max_message_bytes=104857600
grpc.rqd_cache_size=500
# RQD Channel Cache Expiration in Minutes
grpc.rqd_cache_expiration=30
# RQD Channel Cache expected concurrency
grpc.rqd_cache_concurrency=20
# RQD Channel task deadline in seconds
grpc.rqd_task_deadline=10

log.frame-log-root=/arbitraryLogDirectory

Expand Down

0 comments on commit 87bcf7a

Please sign in to comment.
  翻译: