2
2
3
3
import co .elastic .logstash .api .Configuration ;
4
4
import co .elastic .logstash .api .Context ;
5
+ import co .elastic .logstash .api .Input ;
5
6
import co .elastic .logstash .api .LogstashPlugin ;
6
7
import co .elastic .logstash .api .PluginConfigSpec ;
7
- import co .elastic .logstash .api .v0 .Input ;
8
8
import org .apache .commons .lang3 .StringUtils ;
9
- import org .logstash .execution .queue .QueueWriter ;
10
9
11
10
import java .util .Arrays ;
12
11
import java .util .Collection ;
13
12
import java .util .Collections ;
13
+ import java .util .Map ;
14
14
import java .util .concurrent .CountDownLatch ;
15
+ import java .util .function .Consumer ;
15
16
16
17
// class name must match plugin name
17
18
@ LogstashPlugin (name ="java_input_example" )
18
19
public class JavaInputExample implements Input {
19
20
20
21
public static final PluginConfigSpec <Long > EVENT_COUNT_CONFIG =
21
- Configuration .numSetting ("count" , 3 );
22
+ PluginConfigSpec .numSetting ("count" , 3 );
22
23
23
24
public static final PluginConfigSpec <String > PREFIX_CONFIG =
24
- Configuration .stringSetting ("prefix" , "message" );
25
+ PluginConfigSpec .stringSetting ("prefix" , "message" );
25
26
27
+ private String id ;
26
28
private long count ;
27
29
private String prefix ;
28
30
private final CountDownLatch done = new CountDownLatch (1 );
29
31
private volatile boolean stopped ;
30
32
31
- // all plugins must provide a constructor that accepts Configuration and Context
32
- public JavaInputExample (Configuration config , Context context ) {
33
+ // all plugins must provide a constructor that accepts id, Configuration, and Context
34
+ public JavaInputExample (String id , Configuration config , Context context ) {
33
35
// constructors should validate configuration options
36
+ this .id = id ;
34
37
count = config .get (EVENT_COUNT_CONFIG );
35
38
prefix = config .get (PREFIX_CONFIG );
36
39
}
37
40
38
41
@ Override
39
- public void start (QueueWriter queueWriter ) {
42
+ public void start (Consumer < Map < String , Object >> consumer ) {
40
43
41
44
// The start method should push Map<String, Object> instances to the supplied QueueWriter
42
45
// instance. Those will be converted to Event instances later in the Logstash event
@@ -51,7 +54,7 @@ public void start(QueueWriter queueWriter) {
51
54
try {
52
55
while (!stopped && eventCount < count ) {
53
56
eventCount ++;
54
- queueWriter . push (Collections .singletonMap ("message" ,
57
+ consumer . accept (Collections .singletonMap ("message" ,
55
58
prefix + " " + StringUtils .center (eventCount + " of " + count , 20 )));
56
59
}
57
60
} finally {
@@ -75,4 +78,9 @@ public Collection<PluginConfigSpec<?>> configSchema() {
75
78
// should return a list of all configuration options for this plugin
76
79
return Arrays .asList (EVENT_COUNT_CONFIG , PREFIX_CONFIG );
77
80
}
81
+
82
+ @ Override
83
+ public String getId () {
84
+ return this .id ;
85
+ }
78
86
}
0 commit comments